ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

event-export源码解析

2020-06-08 23:37:17  阅读:258  来源: 互联网

标签:Run 函数 err eventExporter 源码 export sink func event


背景:在kubernetes里event记录了集群运行所遇到的各种大事件,有助于排错,但大量的事件如果都存储在etcd中,会带来较大的性能与容量压力,所以etcd中默认只保存最近1小时的,如果我们将该时间改大 会大大增加集群etcd的压力,所以我们需要将该数据存储到其他地方。经过一段时间的寻找 觉的k8s-stackdriver这个项目比较符合目前我的需求,但是它存储到的是promethues,但是目前我司给与promethues的资源并不是很足,并且保存的时间默认只有15天本,地存储也意味着Prometheus无法持久化数据,无法存储大量历史数据,同时也无法灵活扩展。,所以考虑使用了ES作为存储。但是es存储方案目前官方并不支持,只有通过fluentd去收集日志达到效果,所以我这边通过修改源码将存储直接改为es,这样就不需要借助fluentd去收集日志了。

 

本文主要是先分析event-export源码,之后再讲修改过程写出来。

 

程序入口 main.go

sink, err := stackdriver.NewSdSinkFactory().CreateNew(strings.Split(*sinkOpts, " "))
if err != nil {
   glog.Fatalf("Failed to initialize sink: %v", err)
}

解释:初始化sink对象 并根据sink-opts传递进来的 参数设置对应的参数

参数列表(NewSdSinkFactory):

flagSet

flushDelay

maxBufferSize

maxConcurrency

resourceModelVersion

endpoint

解析sinkOpts的参数(CreateNew)

client, err := newKubernetesClient()
if err != nil {
   glog.Fatalf("Failed to initialize kubernetes client: %v", err)
}
eventExporter := newEventExporter(client, sink, *resyncPeriod)

newKubernetesClient 初始化go-client 接口

通过集群内部配置创建 k8s 配置信息,通过 KUBERNETES_SERVICE_HOST 和 KUBERNETES_SERVICE_PORT 环境变量方式获取

默认tokenfile rootCAFile 在/var/run/secrets/kubernetes.io/serviceaccount/

 

newEventExporter函数的作用 我们可以看他具体实现方法:

func newEventExporter(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration) *eventExporter {
   return &eventExporter{
      sink:    sink,
      watcher: createWatcher(client, sink, resyncPeriod),
   }
}

func createWatcher(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration) watchers.Watcher {
   return events.NewEventWatcher(client, &events.EventWatcherConfig{
      OnList:       sink.OnList,
      ResyncPeriod: resyncPeriod,
      Handler:      sink,
   })
}

newEventExporter 返回了eventExporter这个结构体 该结构体包含sink 和watcher两个字段。

sink: 包含所有的参数配置等

watcher: 通过createwatcher函数 监听events resource

分解:createWatcher函数

NewEventWatcher配置了需要watche的目标对象为event,以及间隔多久watch一次

events.EventWatcherConfig 结构体包含三个字段

OnList OnListFunc event列表

ResyncPeriod watcher的时间间隔 默认为1分钟watch一次 。 通过resyncPeriod参数设置

Handler 包含三个字段OnAdd OnUpdate OnDelete

OnAdd: 主要作用就是将watch到的数据通过通道存入sdSink下的logEntryChannel字段,并且在promethues插入数据

OnUpdate: 更新logEntryChannel字段,并更新promethues里的数据

 

go func() {
   http.Handle("/metrics", promhttp.Handler())
   glog.Fatalf("Prometheus monitoring failed: %v", http.ListenAndServe(*prometheusEndpoint, nil))
}()

stopCh := newSystemStopChannel()
eventExporter.Run(stopCh)

 首先 通过gorouteing执行了匿名函数 该匿名函数 主要设置了promethues的/metrics路由并且监听localhost的本地端口 默认为80 通过启动参数prometheusEndpoint设置

func newSystemStopChannel() chan struct{} {
   ch := make(chan struct{})
   go func() {
      c := make(chan os.Signal)
      signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
      sig := <-c
      glog.Infof("Received signal %s, terminating", sig.String())

      // Close stop channel to make sure every goroutine will receive stop signal.
      close(ch)
   }()

   return ch
}

newSystemStopChannel函数,通过gorouteing 执行匿名函数,当接收到SIGINT(2),SIGTERM(9)的时候 打印Received signal sig.String(), terminating的信号

 

eventExporter.Run(stopCh) 函数:

func (e *eventExporter) Run(stopCh <-chan struct{}) {
   utils.RunConcurrentlyUntil(stopCh, e.sink.Run, e.watcher.Run)
}

run函数:

stopCh: 接收newSystemStopChannel传递过来的信号值通道

utils.RunConcurrentlyUntil 方法

func RunConcurrentlyUntil(stopCh <-chan struct{}, funcs ...StoppableFunc) {
   var wg sync.WaitGroup
   for i := range funcs {
      wg.Add(1)
      f := funcs[i]
      go func() {
         defer wg.Done()
         f(stopCh)
      }()
   }

   <-stopCh

   wg.Wait()
}

并发运行e.sink.Run, e.watcher.Run

通过sync.WaitGroup解决同步阻塞等待的问题。

wg.Add 将计数器设置为1

执行e.sink.Run, e.watcher.Run两个函数

当执行完成的时候 将计数器设置减去1

在计数器不为0的时候Wait() 一直阻塞

只有当计数器为0的时候 或者接收到退出信号的时候才会退出。

 

e.sink.Run:

在eventExporter的sink字段 之后跳转到

interface.go下的Sink接口下的run方法

因为在newEventExporter函数里sink的值已经被赋值为sink, err := stackdriver.NewSdSinkFactory().CreateNew(strings.Split(*sinkOpts, " "))

返回的值 createNew

并且CreateNew 函数return newSdSink(writer, clk, config, resourceModelFactory), nil

通过查看newSdSink 该函数返回的是sdSink结构体

所以e.sink.Run 就相当于函数拥有者为sdSink下的Run方法

func (s *sdSink) Run(stopCh <-chan struct{}) {
   glog.Info("Starting Stackdriver sink")
   for {
      select {
      case entry := <-s.logEntryChannel:
         s.currentBuffer = append(s.currentBuffer, entry)
         if len(s.currentBuffer) >= s.config.MaxBufferSize {
            s.flushBuffer()
         } else if len(s.currentBuffer) == 1 {
            s.setTimer()
         }
         break
      case <-s.getTimerChannel():
         s.flushBuffer()
         break
      case <-stopCh:
         glog.Info("Stackdriver sink received stop signal, waiting for all requests to finish")
         for i := 0; i < s.config.MaxConcurrency; i++ {
            s.concurrencyChannel <- struct{}{}
         }
         glog.Info("All requests to Stackdriver finished, exiting Stackdriver sink")
         return
      }
   }
}

 run方法是一个for死循环 只有当收到newSystemStopChannel传递过来的信号时才会退出,通过s.logEntryChannel传递过来的日志,将该日志存储到s.currentBuffer里 当存储里的个数大于s.config.MaxBufferSize的值时 执行flushBuffer函数

 

func (s *sdSink) flushBuffer() {
   entries := s.currentBuffer
   s.currentBuffer = nil
   s.concurrencyChannel <- struct{}{}
   go s.sendEntries(entries)
}

func (s *sdSink) sendEntries(entries []*sd.LogEntry) {
   glog.V(4).Infof("Sending %d entries to Stackdriver", len(entries))

   written := s.writer.Write(entries, s.logName, s.sdResourceFactory.defaultMonitoredResource())
   successfullySentEntryCount.Add(float64(written))

   <-s.concurrencyChannel

   glog.V(4).Infof("Successfully sent %d entries to Stackdriver", len(entries))
}

flushBuffer首先会清空s.concurrencyChannel里的值

之后将entries的值传递给sendEntries

sendEntries函数下通过successfullySentEntryCount将日志写入promethues

之后接收s.concurrencyChannel传递过来的值,这里主要解决接收到SIGINT,SIGTERM的时候等待所有请求完成之后再停止

 

当s.currentBuffer等于1时 充值计时器,默认为5s,如果5s内没有数据代表超时 直接执行flushBuffer函数

 

e.watcher.Run:

func (w *watcher) Run(stopCh <-chan struct{}) {
   w.reflector.Run()
   <-stopCh
}

w.reflector.Run()会跳转到所属者为reflector下的Run函数

func (r *Reflector) Run() {
   glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
   go wait.Until(func() {
      if err := r.ListAndWatch(wait.NeverStop); err != nil {
         utilruntime.HandleError(err)
      }
   }, r.period, wait.NeverStop)
}

 声明一个匿名函数 每隔一段时间执行这个匿名函数

该函数下的主要方法就是ListAndWatch 也就是由它收集event的日志

 

 

 

 

标签:Run,函数,err,eventExporter,源码,export,sink,func,event
来源: https://blog.csdn.net/qq_22543991/article/details/106607144

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有