ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

go-zero使用Etcd进行服务注册代码分析

2021-12-18 19:33:53  阅读:453  来源: 互联网

标签:key Publisher return func err zero Etcd go lease


代码分析

github.com/tal-tech/go-zero@v1.2.3/core/discov/publisher.go

package discov

import (
    "github.com/tal-tech/go-zero/core/discov/internal"
    "github.com/tal-tech/go-zero/core/lang"
    "github.com/tal-tech/go-zero/core/logx"
    "github.com/tal-tech/go-zero/core/proc"
    "github.com/tal-tech/go-zero/core/syncx"
    "github.com/tal-tech/go-zero/core/threading"
    clientv3 "go.etcd.io/etcd/client/v3"
)

type (
    // PubOption defines the method to customize a Publisher.
    PubOption func(client *Publisher)

    // A Publisher can be used to publish the value to an etcd cluster on the given key.
    Publisher struct {
        endpoints  []string
        key        string
        fullKey    string
        id         int64
        value      string
        lease      clientv3.LeaseID
        quit       *syncx.DoneChan
        pauseChan  chan lang.PlaceholderType
        resumeChan chan lang.PlaceholderType
    }
)

// NewPublisher returns a Publisher.
// endpoints is the hosts of the etcd cluster.
// key:value are a pair to be published.
// opts are used to customize the Publisher.
func NewPublisher(endpoints []string, key, value string, opts ...PubOption) *Publisher {
    publisher := &Publisher{
        endpoints:  endpoints,
        key:        key,
        value:      value,
        quit:       syncx.NewDoneChan(),
        pauseChan:  make(chan lang.PlaceholderType),
        resumeChan: make(chan lang.PlaceholderType),
    }

    for _, opt := range opts {
        opt(publisher)
    }

    return publisher
}

// KeepAlive keeps key:value alive.
func (p *Publisher) KeepAlive() error {
    cli, err := internal.GetRegistry().GetConn(p.endpoints)
    if err != nil {
        return err
    }

    p.lease, err = p.register(cli)
    if err != nil {
        return err
    }

    proc.AddWrapUpListener(func() {
        p.Stop()
    })

    return p.keepAliveAsync(cli)
}

// Pause pauses the renewing of key:value.
func (p *Publisher) Pause() {
    p.pauseChan <- lang.Placeholder
}

// Resume resumes the renewing of key:value.
func (p *Publisher) Resume() {
    p.resumeChan <- lang.Placeholder
}

// Stop stops the renewing and revokes the registration.
func (p *Publisher) Stop() {
    p.quit.Close()
}

func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
    ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
    if err != nil {
        return err
    }

    threading.GoSafe(func() {
        for {
            select {
            case _, ok := <-ch:
                if !ok {
                    p.revoke(cli)
                    if err := p.KeepAlive(); err != nil {
                        logx.Errorf("KeepAlive: %s", err.Error())
                    }
                    return
                }
            case <-p.pauseChan:
                logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
                p.revoke(cli)
                select {
                case <-p.resumeChan:
                    if err := p.KeepAlive(); err != nil {
                        logx.Errorf("KeepAlive: %s", err.Error())
                    }
                    return
                case <-p.quit.Done():
                    return
                }
            case <-p.quit.Done():
                p.revoke(cli)
                return
            }
        }
    })

    return nil
}

func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
    resp, err := client.Grant(client.Ctx(), TimeToLive)
    if err != nil {
        return clientv3.NoLease, err
    }

    lease := resp.ID
    if p.id > 0 {
        p.fullKey = makeEtcdKey(p.key, p.id)
    } else {
        p.fullKey = makeEtcdKey(p.key, int64(lease))
    }
    _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

    return lease, err
}

func (p *Publisher) revoke(cli internal.EtcdClient) {
    if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
        logx.Error(err)
    }
}

// WithPubEtcdAccount provides the etcd username/password.
func WithPubEtcdAccount(user, pass string) PubOption {
    return func(pub *Publisher) {
        internal.AddAccount(pub.endpoints, user, pass)
    }
}

// WithId customizes a Publisher with the id.
func WithId(id int64) PubOption {
    return func(publisher *Publisher) {
        publisher.id = id
    }
}

这个文件的功能就是做 Etcd的服务注册,在文件里定义了一个struct: Publisher

  Publisher struct {
        endpoints  []string
        key        string
        fullKey    string
        id         int64
        value      string
        lease      clientv3.LeaseID
        quit       *syncx.DoneChan
        pauseChan  chan lang.PlaceholderType
       resumeChan chan lang.PlaceholderType
    }
  • endpoints: edct的地址
  • key: 该服务的在etcd中的key前缀, 从配置文件中取
  • fullkey: 在etcd中的key 如果设置了id 则是key和id的组合;如果没有设置id,则是key和lease的组合
  • id: 如果设置了则用于生成fullKey
  • value: etcd的值,是RPC服务的地址
  • lease: etcd的lease id
  • quit: 用于退出操作
  • pauseChan:用于服务注册暂停,如果暂停则会注销etcd的lease,暂停后有两个后续操作 重启或者关闭
  • resumeChan:用于服务暂停后重启

Publisher 提供以下公共方法

func NewPublisher(endpoints []string, key, value string, opts ...PubOption) *Publisher 
func (p *Publisher) KeepAlive() error
func (p *Publisher) Pause()
func (p *Publisher) Resume() 
func (p *Publisher) Stop()
  • NewPublisher: Publish的构造方法
  • KeepAlive(): 服务发布以及连接
  • Pause(): 服务暂停
  • Resume(): 服务重启
  • Stop(): 服务停止
    最核心的方法就是KeepAlive()方法,他的代码如下
func (p *Publisher) KeepAlive() error {
    cli, err := internal.GetRegistry().GetConn(p.endpoints)
    if err != nil {
        return err
    }

    p.lease, err = p.register(cli)
    if err != nil {
        return err
    }

    proc.AddWrapUpListener(func() {
        p.Stop()
    })

    return p.keepAliveAsync(cli)
}

line2-5: etcd的连接操作
line7-10: 创建lease
line12-14: 这个我没弄太清楚,应该是添加到监听队列中 如果Rpc服务停掉了 这个服务也需要停掉
line 16: 异步监听主要是处理与etcd间的连接监听,如果连接断掉需要重新启动;还有就是监听 暂停 重启 停止命令

先看下line7-10 register方法

func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
    resp, err := client.Grant(client.Ctx(), TimeToLive)
    if err != nil {
        return clientv3.NoLease, err
    }

    lease := resp.ID
    if p.id > 0 {
        p.fullKey = makeEtcdKey(p.key, p.id)
    } else {
        p.fullKey = makeEtcdKey(p.key, int64(lease))
    }
    _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

    return lease, err
}

line2-7: 创建etcd lease 过期时间 TimeToLive为10秒
line8-12: 创建fullKey,如果id有设置过则以key和id生成fullKey;如果id没有设置则以key和lease id生成
line13: 将value存储到etcd中,绑定到刚创建的lease中

再看 line 16 keepAliveAsync方法

func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
    ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
    if err != nil {
        return err
    }

    threading.GoSafe(func() {
        for {
            select {
            case _, ok := <-ch:
                if !ok {
                    p.revoke(cli)
                    if err := p.KeepAlive(); err != nil {
                        logx.Errorf("KeepAlive: %s", err.Error())
                    }
                    return
                }
            case <-p.pauseChan:
                logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
                p.revoke(cli)
                select {
                case <-p.resumeChan:
                    if err := p.KeepAlive(); err != nil {
                        logx.Errorf("KeepAlive: %s", err.Error())
                    }
                    return
                case <-p.quit.Done():
                    return
                }
            case <-p.quit.Done():
                p.revoke(cli)
                return
            }
        }
    })

    return nil
}

line2-5: lease 定时续租
line7: 开启一个携程监听
line10-17: 监听etcd keeplive状态 如果返回的不是true,则先注销当前租约然后再重新KeepLive()
line18-29: 通过Publisher的pauseChan监听是否暂停操作,如果需要暂停就先注销当前租约,然后再通过 resumeChan和quit监听后续是重启还是停止,如果是重启则重新keepLive()
line30-32: 监听是否有停止操作,如果需要停止就直接注销当前租约

demo展示

使用go-zero创建一个user的rpc服务,具体创建步骤掠过,最后在etc/user.yaml做如下配置

Name: user.rpc
ListenOn: 127.0.0.1:8081
Etcd:
  Hosts:
    - 172.16.1.36:12379
  Key: user.rpc

然后启动user服务,启动成功后去etcd 中查看

./etcdctl --endpoints=127.0.0.1:12379 get user.rpc --prefix

查看到结果为
在这里插入图片描述

标签:key,Publisher,return,func,err,zero,Etcd,go,lease
来源: https://blog.csdn.net/pyf511765/article/details/122015861

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有