ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

Ingress nginx Controller源码分析

2022-02-20 22:34:11  阅读:291  来源: 互联网

标签:Ingress nil err nginx 源码 conf klog config store


主要结构图

image

入口函数

cmd/nginx/main.go

func main() {
   klog.InitFlags(nil)

   rand.Seed(time.Now().UnixNano())

   fmt.Println(version.String())

   showVersion, conf, err := parseFlags()
   if showVersion {
      os.Exit(0)
   }

   if err != nil {
      klog.Fatal(err)
   }

   err = file.CreateRequiredDirectories()
   if err != nil {
      klog.Fatal(err)
   }

   kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
   if err != nil {
      handleFatalInitError(err)
   }

   if len(conf.DefaultService) > 0 {
      err := checkService(conf.DefaultService, kubeClient)
      if err != nil {
         klog.Fatal(err)
      }

      klog.InfoS("Valid default backend", "service", conf.DefaultService)
   }

   if len(conf.PublishService) > 0 {
      err := checkService(conf.PublishService, kubeClient)
      if err != nil {
         klog.Fatal(err)
      }
   }

   if conf.Namespace != "" {
      _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})
      if err != nil {
         klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)
      }
   }

   conf.FakeCertificate = ssl.GetFakeSSLCert()
   klog.InfoS("SSL fake certificate created", "file", conf.FakeCertificate.PemFileName)

   if !k8s.NetworkingIngressAvailable(kubeClient) {
      klog.Fatalf("ingress-nginx requires Kubernetes v1.19.0 or higher")
   }

   _, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{})
   if err != nil {
      if !errors.IsNotFound(err) {
         if errors.IsForbidden(err) {
            klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled", err)
            conf.IngressClassConfiguration.IgnoreIngressClass = true
         }
      }
   }
   conf.Client = kubeClient

   err = k8s.GetIngressPod(kubeClient)
   if err != nil {
      klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v", err)
   }

   reg := prometheus.NewRegistry()

   reg.MustRegister(prometheus.NewGoCollector())
   reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{
      PidFn:        func() (int, error) { return os.Getpid(), nil },
      ReportErrors: true,
   }))

   mc := metric.NewDummyCollector()
   if conf.EnableMetrics {
      mc, err = metric.NewCollector(conf.MetricsPerHost, reg, conf.IngressClassConfiguration.Controller)
      if err != nil {
         klog.Fatalf("Error creating prometheus collector:  %v", err)
      }
   }
   // Pass the ValidationWebhook status to determine if we need to start the collector
   // for the admissionWebhook
   mc.Start(conf.ValidationWebhook)

   if conf.EnableProfiling {
      go registerProfiler()
   }

   ngx := controller.NewNGINXController(conf, mc)

   mux := http.NewServeMux()
   registerHealthz(nginx.HealthPath, ngx, mux)
   registerMetrics(reg, mux)

   go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux)
   go ngx.Start()

   handleSigterm(ngx, func(code int) {
      os.Exit(code)
   })
}

主要逻辑

  1. Step1 初始化配置,获取kubeClient

    kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)

  2. Step2 检查DefaultService、PublicService、Namespace等存在

  3. step3 检查IngressClasses权限

  4. step4 检查ingress controller pod存在

  5. Step5 创建NGINXController, 开启健康检查,metrics

    ngx := controller.NewNGINXController(conf, mc)

方法NewNGINXController

//internal/ingress/controller/nginx.go
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartLogging(klog.Infof)
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
      Interface: config.Client.CoreV1().Events(config.Namespace),
   })

   h, err := dns.GetSystemNameServers()
   if err != nil {
      klog.Warningf("Error reading system nameservers: %v", err)
   }

   n := &NGINXController{
      isIPV6Enabled: ing_net.IsIPv6Enabled(),

      resolver:        h,
      cfg:             config,
      syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),

      recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
         Component: "nginx-ingress-controller",
      }),

      stopCh:   make(chan struct{}),
      updateCh: channels.NewRingChannel(1024),

      ngxErrCh: make(chan error),

      stopLock: &sync.Mutex{},

      runningConfig: new(ingress.Configuration),

      Proxy: &TCPProxy{},

      metricCollector: mc,

      command: NewNginxCommand(),
   }

   if n.cfg.ValidationWebhook != "" {
      n.validationWebhookServer = &http.Server{
         Addr:      config.ValidationWebhook,
         Handler:   adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),
         TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),
         // disable http/2
         // https://github.com/kubernetes/kubernetes/issues/80313
         // https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159
         TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
      }
   }

   n.store = store.New(
      config.Namespace,
      config.WatchNamespaceSelector,
      config.ConfigMapName,
      config.TCPConfigMapName,
      config.UDPConfigMapName,
      config.DefaultSSLCertificate,
      config.ResyncPeriod,
      config.Client,
      n.updateCh,
      config.DisableCatchAll,
      config.IngressClassConfiguration)

   n.syncQueue = task.NewTaskQueue(n.syncIngress)

   if config.UpdateStatus {
      n.syncStatus = status.NewStatusSyncer(status.Config{
         Client:                 config.Client,
         PublishService:         config.PublishService,
         PublishStatusAddress:   config.PublishStatusAddress,
         IngressLister:          n.store,
         UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
         UseNodeInternalIP:      config.UseNodeInternalIP,
      })
   } else {
      klog.Warning("Update of Ingress status is disabled (flag --update-status)")
   }

   onTemplateChange := func() {
      template, err := ngx_template.NewTemplate(nginx.TemplatePath)
      if err != nil {
         // this error is different from the rest because it must be clear why nginx is not working
         klog.ErrorS(err, "Error loading new template")
         return
      }

      n.t = template
      klog.InfoS("New NGINX configuration template loaded")
      n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
   }

   ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
   if err != nil {
      klog.Fatalf("Invalid NGINX configuration template: %v", err)
   }

   n.t = ngxTpl

   _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
   if err != nil {
      klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
   }

   filesToWatch := []string{}
   err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
      if err != nil {
         return err
      }

      if info.IsDir() {
         return nil
      }

      filesToWatch = append(filesToWatch, path)
      return nil
   })

   if err != nil {
      klog.Fatalf("Error creating file watchers: %v", err)
   }

   for _, f := range filesToWatch {
      _, err = watch.NewFileWatcher(f, func() {
         klog.InfoS("File changed detected. Reloading NGINX", "path", f)
         n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
      })
      if err != nil {
         klog.Fatalf("Error creating file watcher for %v: %v", f, err)
      }
   }

   return n
}

主要逻辑:

  1. 实例化NGINXController

  2. 创建validationWebhookServer

  3. 创建store缓存

    n.store = store.New
    
  4. 监听模板文件以及/etc/nginx/geoip下的文件变化,如有变化,发送对应的文件变化事件

  5. 当ingress有变化时,执行syncIngress方法回调

    n.syncQueue = task.NewTaskQueue(n.syncIngress)

万能的死循环

Start方法,开启万能的死循环,处理各种Event

//internal/ingress/controller/nginx.go
for {
   select {
   case err := <-n.ngxErrCh:
      if n.isShuttingDown {
         return
      }

      // if the nginx master process dies, the workers continue to process requests
      // until the failure of the configured livenessProbe and restart of the pod.
      if process.IsRespawnIfRequired(err) {
         return
      }

   case event := <-n.updateCh.Out():
      if n.isShuttingDown {
         break
      }

      if evt, ok := event.(store.Event); ok {
         klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
         if evt.Type == store.ConfigurationEvent {
            // TODO: is this necessary? Consider removing this special case
            n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
            continue
         }

         n.syncQueue.EnqueueSkippableTask(evt.Obj)
      } else {
         klog.Warningf("Unexpected event type received %T", event)
      }
   case <-n.stopCh:
      return
   }
}

当updateCh中有数据时,放到syncQueue中

//internal/task/queue.go
// worker processes work in the queue through sync.
func (t *Queue) worker() {
   for {
      key, quit := t.queue.Get()
      if quit {
         if !isClosed(t.workerDone) {
            close(t.workerDone)
         }
         return
      }
      ts := time.Now().UnixNano()

      item := key.(Element)
      if item.Timestamp != 0 && t.lastSync > item.Timestamp {
         klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
         t.queue.Forget(key)
         t.queue.Done(key)
         continue
      }

      klog.V(3).InfoS("syncing", "key", item.Key)
      if err := t.sync(key); err != nil {
         klog.ErrorS(err, "requeuing", "key", item.Key)
         t.queue.AddRateLimited(Element{
            Key:       item.Key,
            Timestamp: 0,
         })
      } else {
         t.queue.Forget(key)
         t.lastSync = ts
      }

      t.queue.Done(key)
   }
}

当syncQueue中有数据,回调syncIngress方法 t.sync(key)

//internal/ingress/controller/store/store.go
ingEventHandler := cache.ResourceEventHandlerFuncs{
   AddFunc: func(obj interface{}) {
      ing, _ := toIngress(obj)

      if !watchedNamespace(ing.Namespace) {
         return
      }

      ic, err := store.GetIngressClass(ing, icConfig)
      if err != nil {
         klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err)
         return
      }

      klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic)

      if hasCatchAllIngressRule(ing.Spec) && disableCatchAll {
         klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing))
         return
      }

      recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync")

      store.syncIngress(ing)
      store.updateSecretIngressMap(ing)
      store.syncSecrets(ing)

      updateCh.In() <- Event{
         Type: CreateEvent,
         Obj:  obj,
      }
   },
   DeleteFunc: ingDeleteHandler,
   UpdateFunc: func(old, cur interface{}) {
      oldIng, _ := toIngress(old)
      curIng, _ := toIngress(cur)

      if !watchedNamespace(oldIng.Namespace) {
         return
      }

      var errOld, errCur error
      var classCur string
      if !icConfig.IgnoreIngressClass {
         _, errOld = store.GetIngressClass(oldIng, icConfig)
         classCur, errCur = store.GetIngressClass(curIng, icConfig)
      }
      if errOld != nil && errCur == nil {
         if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
            klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng))
            return
         }

         klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur)
         recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
      } else if errOld == nil && errCur != nil {
         klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng))
         ingDeleteHandler(old)
         return
      } else if errCur == nil && !reflect.DeepEqual(old, cur) {
         if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll {
            klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng))
            ingDeleteHandler(old)
            return
         }

         recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync")
      } else {
         klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng))
         return
      }

      store.syncIngress(curIng)
      store.updateSecretIngressMap(curIng)
      store.syncSecrets(curIng)

      updateCh.In() <- Event{
         Type: UpdateEvent,
         Obj:  cur,
      }
   },
}

internal/ingress/controller/store/store.go

New方法构造各种资源的处理EventHandler

Ingress举例:ResourceEventHandlerFuncs

处理Ingress的Add、Delete、Update

store.informers.Ingress.AddEventHandler(ingEventHandler)
if !icConfig.IgnoreIngressClass {
   store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
}
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(serviceHandler)

Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service注册到k8s的sdk Informers中

标签:Ingress,nil,err,nginx,源码,conf,klog,config,store
来源: https://www.cnblogs.com/devilfeng08/p/ingresscontroller.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有