ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

parca 对于frostdb 的使用简单说明

2022-09-12 21:04:44  阅读:187  来源: 互联网

标签:parca err nil level 简单 frostdb sl return logger


parca 使用badger 存储元数据,使用frostdb 列存,存储采样数据

简单集成使用说明

frostdb 官方文档提供了一些简单的集成demo,值得学习参考
参考代码pkg/parca/parca.go ,pkg/scrape/manager.go,pkg/scrape/scrape.go 如果需要开启持久化的就需要使用对象存储了,frostdb 支持对象存储进行持久化
因为parca 很多地方参考了prometheus,所以对于frostdb的集成是包装为了一个prometheuts 服务

  • 初始化
    pkg/parca/parca.go 以及pkg/scrape/manager.go
    初始化
 
col, err := frostdb.New(frostdbOptions...)
    if err != nil {
        level.Error(logger).Log("msg", "failed to initialize storage", "err", err)
        return err
    }
 
    if err := col.ReplayWALs(context.Background()); err != nil {
        level.Error(logger).Log("msg", "failed to replay WAL", "err", err)
        return err
    }
 
    colDB, err := col.DB(ctx, "parca")
    if err != nil {
        level.Error(logger).Log("msg", "failed to load database", "err", err)
        return err
    }
 
    schema, err := parcacol.Schema()
    if err != nil {
        level.Error(logger).Log("msg", "failed to get schema", "err", err)
        return err
    }
 
    table, err := colDB.Table("stacktraces", frostdb.NewTableConfig(schema))
    if err != nil {
        level.Error(logger).Log("msg", "create table", "err", err)
        return err
    }
 
    s := profilestore.NewProfileColumnStore(
        logger,
        tracerProvider.Tracer("profilestore"),
        metastore,
        table,
        schema,
        flags.StorageDebugValueLog,
    )

使用Manager 中使用

func (m *Manager) reload() {
    m.mtxScrape.Lock()
    defer m.mtxScrape.Unlock()
    var wg sync.WaitGroup
    level.Debug(m.logger).Log("msg", "Reloading scrape manager")
    for setName, groups := range m.targetSets {
        var sp *scrapePool
        existing, ok := m.scrapePools[setName]
        if !ok {
            scrapeConfig, ok := m.scrapeConfigs[setName]
            if !ok {
                level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
                return
            }
          // 此方法是核心,进行pull 模式需要依赖列存,具体数据写入参考数据写入部分
            sp = newScrapePool(scrapeConfig, m.store, log.With(m.logger, "scrape_pool", setName), m.externalLabels, &scrapePoolMetrics{
                targetIntervalLength:          m.targetIntervalLength,
                targetReloadIntervalLength:    m.targetReloadIntervalLength,
                targetSyncIntervalLength:      m.targetSyncIntervalLength,
                targetScrapePoolSyncsCounter:  m.targetScrapePoolSyncsCounter,
                targetScrapeSampleLimit:       m.targetScrapeSampleLimit,
                targetScrapeSampleDuplicate:   m.targetScrapeSampleDuplicate,
                targetScrapeSampleOutOfOrder:  m.targetScrapeSampleOutOfOrder,
                targetScrapeSampleOutOfBounds: m.targetScrapeSampleOutOfBounds,
            })
            m.scrapePools[setName] = sp
        } else {
            sp = existing
        }
 
        wg.Add(1)
        // Run the sync in parallel as these take a while and at high load can't catch up.
        go func(sp *scrapePool, groups []*targetgroup.Group) {
            sp.Sync(groups)
            wg.Done()
        }(sp, groups)
    }
    wg.Wait()
}
  • 数据写入操作
    pkg/scrape/scrape.go
 
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
    select {
    case <-time.After(sl.scraper.offset(interval)):
        // Continue after a scraping offset.
    case <-sl.scrapeCtx.Done():
        close(sl.stopped)
        return
    }
 
    var last time.Time
 
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
 
mainLoop:
    for {
        select {
        case <-sl.ctx.Done():
            close(sl.stopped)
            return
        case <-sl.scrapeCtx.Done():
            break mainLoop
        default:
        }
 
        start := time.Now()
 
        // Only record after the first scrape.
        if !last.IsZero() {
            sl.intervalLength.WithLabelValues(interval.String()).Observe(
                time.Since(last).Seconds(),
            )
        }
 
        b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
        buf := bytes.NewBuffer(b)
 
        var profileType string
        for _, l := range sl.target.labels {
            if l.Name == ProfileName {
                profileType = l.Value
                break
            }
        }
 
        scrapeCtx, cancel := context.WithTimeout(sl.ctx, timeout)
        scrapeErr := sl.scraper.scrape(scrapeCtx, buf, profileType)
        cancel()
 
        if scrapeErr == nil {
            b = buf.Bytes()
            // NOTE: There were issues with misbehaving clients in the past
            // that occasionally returned empty results. We don't want those
            // to falsely reset our buffer size.
            if len(b) > 0 {
                sl.lastScrapeSize = len(b)
            }
 
            tl := sl.target.Labels()
            tl = append(tl, labels.Label{Name: "__name__", Value: profileType})
            for _, l := range sl.externalLabels {
                tl = append(tl, labels.Label{
                    Name:  l.Name,
                    Value: l.Value,
                })
            }
            level.Debug(sl.l).Log("msg", "appending new sample", "labels", tl.String())
 
            protolbls := &profilepb.LabelSet{
                Labels: []*profilepb.Label{},
            }
            for _, l := range tl {
                protolbls.Labels = append(protolbls.Labels, &profilepb.Label{
                    Name:  l.Name,
                    Value: l.Value,
                })
            }
          // 数据写入
            _, err := sl.store.WriteRaw(sl.ctx, &profilepb.WriteRawRequest{
                Tenant: "",
                Series: []*profilepb.RawProfileSeries{
                    {
                        Labels: protolbls,
                        Samples: []*profilepb.RawSample{
                            {
                                RawProfile: buf.Bytes(),
                            },
                        },
                    },
                },
            })
            if err != nil {
                switch errc {
                case nil:
                    level.Error(sl.l).Log("msg", "WriteRaw failed for scraped profile", "err", err)
                default:
                    errc <- err
                }
            }
 
            sl.target.health = HealthGood
            sl.target.lastScrapeDuration = time.Since(start)
            sl.target.lastError = nil
        } else {
            level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
            if errc != nil {
                errc <- scrapeErr
            }
 
            sl.target.health = HealthBad
            sl.target.lastScrapeDuration = time.Since(start)
            sl.target.lastError = scrapeErr
        }
 
        sl.buffers.Put(b)
        last = start
 
        sl.target.lastScrape = last
 
        select {
        case <-sl.ctx.Done():
            close(sl.stopped)
            return
        case <-sl.scrapeCtx.Done():
            break mainLoop
        case <-ticker.C:
        }
    }
 
    close(sl.stopped)
}
  • 数据查询部分
    因为frostdb 对于查询是分离的
    pkg/parca/parca.go
 
q := queryservice.NewColumnQueryAPI(
        logger,
        tracerProvider.Tracer("query-service"),
        sharepb.NewShareClient(conn),
        parcacol.NewQuerier(
            tracerProvider.Tracer("querier"),
            query.NewEngine(
                memory.DefaultAllocator,
                colDB.TableProvider(),
                query.WithTracer(tracerProvider.Tracer("query-engine")),
            ),
            "stacktraces",
            metastore,
        ),
    )

查询接口服务

parcaserver := server.NewServer(reg, version)
    gr.Add(
        func() error {
            return parcaserver.ListenAndServe(
                ctx,
                logger,
                flags.Port,
                flags.CORSAllowedOrigins,
                flags.PathPrefix,
                server.RegisterableFunc(func(ctx context.Context, srv *grpc.Server, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error {
                    debuginfopb.RegisterDebugInfoServiceServer(srv, dbgInfo)
                    profilestorepb.RegisterProfileStoreServiceServer(srv, s)
                    querypb.RegisterQueryServiceServer(srv, q)
                    scrapepb.RegisterScrapeServiceServer(srv, m)
 
                    if err := debuginfopb.RegisterDebugInfoServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
                        return err
                    }
 
                    if err := profilestorepb.RegisterProfileStoreServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
                        return err
                    }
 
                    if err := querypb.RegisterQueryServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
                        return err
                    }
 
                    if err := scrapepb.RegisterScrapeServiceHandlerFromEndpoint(ctx, mux, endpoint, opts); err != nil {
                        return err
                    }
 
                    return nil
                }),
            )
        },
        func(_ error) {
            ctx, cancel := context.WithTimeout(ctx, 30*time.Second) // TODO make this a graceful shutdown config setting
            defer cancel()
 
            level.Debug(logger).Log("msg", "server shutting down")
            err := parcaserver.Shutdown(ctx)
            if err != nil && !errors.Is(err, context.Canceled) {
                level.Error(logger).Log("msg", "error shutting down server", "err", err)
            }
 
            // Close the columnstore after the parcaserver has shutdown to ensure no more writes occur against it.
            if err := col.Close(); err != nil {
                level.Error(logger).Log("msg", "error closing columnstore", "err", err)
            }
        },
    )

说明

因为frostdb包装的比较方便,parca 对于frostdb 的使用没有太多复杂的东西,基本是直接使用,frostdb 内部机制还是值得学习的

参考资料

https://github.com/polarsignals/frostdb
https://www.parca.dev/docs/storage
https://github.com/polarsignals/frostdb/blob/main/examples/simple.go

标签:parca,err,nil,level,简单,frostdb,sl,return,logger
来源: https://www.cnblogs.com/rongfengliang/p/16687230.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有