ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

etcd 笔记(08)— 基于 etcd 实现分布式锁

2021-05-10 22:58:55  阅读:148  来源: 互联网

标签:err 实现 08 数据库 etcd 节点 分布式


1. 为什么需要分布式锁?

在分布式环境下,数据一致性问题一直是个难点。分布式与单机环境最大的不同在于它不是多线程而是多进程。由于多线程可以共享堆内存,因此可以简单地采取内存作为标记存储位置。而多进程可能都不在同一台物理机上,就需要将标记存储在一个所有进程都能看到的地方。

例如秒杀场景就是一个常见的多进程场景。订单服务部署了多个服务实例,如秒杀商品有 4 个,第一个用户购买 3 个,第二个用户购买 2 个,理想状态下第一个用户能购买成功,第二个用户提示购买失败,反之亦可。而实际可能出现的情况是,两个用户都得到库存为 4,第一个用户买到了 3 个,更新库存之前,第二个用户下了 2 个商品的订单,更新库存为 2,导致业务逻辑出错。

在上面的场景中,商品的库存是共享变量,面对高并发情况,需要保证对资源的访问互斥。

在单机环境中,比如 Java 语言中其实提供了很多并发处理相关的 API ,但是这些 API 在分布式场景中就无能为力了。

由于分布式系统具备多线程和多进程的特点,且分布在不同机器中,synchronizedlock 关键字将失去原有锁的效果,仅依赖这些语言自身提供的 API 并不能实现分布式锁的功能,因此需要我们找到其他方法实现分布式锁。

常见的锁方案如下:

  • 基于数据库实现分布式锁;
  • 基于 ZooKeeper 实现分布式锁;
  • 基于缓存实现分布式锁,如 redisetcd 等;

2. 基于数据库实现分布式锁

基于数据库实现分布式锁有两种方式:

  • 基于数据库表;
  • 基于数据库的排他锁;

2.1 基于数据库表的增删

基于数据库表的增删是最简单的实现方式,首先创建一张锁的表,主要包含方法名、时间戳等字段。

具体使用的方法为:当需要锁住某个方法时,往该表中插入一条相关的记录。需要注意的是,方法名有唯一性约束。如果有多个请求同时提交到数据库,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行业务逻辑。执行完毕,需要删除该记录。

对于上述方案我们可以进行优化,如应用主从数据库,数据之间双向同步。一旦主库挂掉,将应用服务快速切换到从库上。除此之外还可以记录当前获得锁的机器的主机信息和线程信息,下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到,直接把锁分配给该线程,实现可重入锁。

2.2 基于数据库排他锁

还可以通过数据库的排他锁来实现分布式锁。基于 MySQLInnoDB 引擎,可以使用以下方法来实现加锁操作:

public void lock(){
    connection.setAutoCommit(false)
    int count = 0;
    while(count < 4){
        try{
            select * from lock where lock_name=xxx for update;
            if(结果不为空){
                // 代表获取到锁
                return;
            }
        }catch(Exception e){
        }
        // 为空或者抛异常都表示没有获取到锁
        sleep(1000);
        count++;
    }
    throw new LockException();
}

在查询语句后面增加 for update ,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程就无法再在该行记录上增加排他锁。其他没有获取到锁的线程就会阻塞在上述 select 语句上,可能出现两种结果:在超时之前获取到了锁,在超时之前仍未获取到锁。

获得排他锁的线程即可获得分布式锁,获取到锁之后,可以执行业务逻辑,执行业务后释放锁即可。

2.3 数据库分布式锁总结

上面两种方式的实现都是依赖数据库的一张表,一种是通过表中记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。

  • 优点:直接借助现有的关系型数据库,简单且容易理解;
  • 缺点:操作数据库需要一定的开销,性能问题以及 SQL 执行超时的异常需要考虑;

3. 基于 ZooKeeper 实现分布式锁

基于 ZooKeeper 的临时节点和顺序特性可以实现分布式锁。

申请对某个方法加锁时,在 ZooKeeper 上与该方法对应的指定节点的目录下,生成一个唯一的临时有序节点。当需要获取锁时,只需要判断有序节点中该节点是否为序号最小的一个。业务逻辑执行完成释放锁,只需将这个临时节点删除。这种方式也可以避免由于服务宕机导致的锁无法释放,产生的死锁问题。

Netflix 开源了一套 ZooKeeper 客户端框架 Curator,Curator 提供的 InterProcessMutex 是分布式锁的一种实现。acquire 方法获取锁,release 方法释放锁。另外,锁释放、阻塞锁、可重入锁等问题都可以有效解决。

关于阻塞锁的实现,客户端可以通过在 ZooKeeper 中创建顺序节点,并且在节点上绑定监听器 Watch。一旦节点发生变化,ZooKeeper 会通知客户端,客户端可以检查自己创建的节点是否是当前所有节点中序号最小的,如果是就获取到锁,执行业务逻辑。

ZooKeeper 实现的分布式锁也存在一些缺陷,比如在性能上可能不如基于缓存实现的分布式锁。因为每次创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点,实现锁功能。

此外,ZooKeeper 中创建和删除节点只能通过 Leader 节点来执行,然后将数据同步到集群中的其他节点。分布式环境中难免存在网络抖动,导致客户端和 ZooKeeper 集群之间的 session 连接中断,此时 ZooKeeper 服务端以为客户端挂了,就会删除临时节点。这时其他客户端就可以获取到分布式锁了,会出现多个请求获取到了同一把锁的问题,导致业务数据不一致。

4. 基于缓存实现分布式锁

相对于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现得更好一点,存取速度会快很多,而且很多缓存是可以集群部署的,可以解决单点问题。

基于缓存的锁有如下几种: memcachedredisetcd。下面我们主要讲解基于 etcd 实现的分布式锁。

通过 etcd 实现分布式锁,同样需要满足一致性、互斥性和可靠性等要求。etcd 中的事务 txnlease 租约以及 watch 监听特性,能够实现上述要求的分布式锁。

4.1 整体思路

通过 etcd 的事务特性可以帮助我们实现一致性和互斥性。etcd 的事务特性,使用 IF-Then-Else 语句,IF 语言判断 etcd 服务端是否存在指定的 key,通过该 key 创建的版本号 create_revision 是否为 0 来检查 key 是否已存在,如果该 key 存在,版本号不为 0。满足 IF 条件的情况下则使用 Then 执行 put 操作,否则 Else 语句将返回抢锁失败的结果。

当然,除了使用 key 是否创建成功作为 IF 的判断依据,还可以创建前缀相同的 key,通过比较这些 keyrevision 来判断分布式锁应该属于哪个请求。

客户端请求在获取到分布式锁后,如果发生异常,需要及时将锁释放掉,因此需要租约。我们申请分布式锁时也需要指定租约时间,超过 lease 租期时间将会自动释放锁,保证业务的可用性。

但是在执行业务逻辑时,如果客户端发起的是一个耗时的操作,在操作未完成的情况下,租约时间过期,就会导致其他请求获取到分布式锁,造成不一致。这种情况下就需要续租,即刷新租约,使得客户端和 etcd 服务端持续保持心跳。

流程图

4.2 代码实现

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/coreos/etcd/clientv3"
)

func main() {
	config := clientv3.Config{
		Endpoints:   []string{"192.168.0.113:2379"}, // 集群列表
		DialTimeout: 5 * time.Second,
	}

	// 建立一个客户端
	client, err := clientv3.New(config)
	if err != nil {
		fmt.Println(err)
		return
	}

	// lease实现锁自动过期:
	// op操作
	// txn事务: if else then

	// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
	lease := clientv3.NewLease(client)

	// 申请一个5秒的租约
	leaseGrantResp, err := lease.Grant(context.TODO(), 5)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 拿到租约的ID
	leaseId := leaseGrantResp.ID

	// 准备一个用于取消自动续租的context
	ctx, cancelFunc := context.WithCancel(context.TODO())

	// 确保函数退出后, 自动续租会停止
	defer cancelFunc()
	defer lease.Revoke(context.TODO(), leaseId)

	// 5秒后会取消自动续租
	keepRespChan, err := lease.KeepAlive(ctx, leaseId)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 处理续约应答的协程
	go func() {
		for {
			select {
			case keepResp := <-keepRespChan:
				if keepResp == nil {
					fmt.Println("租约已经失效了")
					goto END
				} else { // 每秒会续租一次, 所以就会受到一次应答
					fmt.Println("收到自动续租应答:", keepResp.ID)
				}
			}
		}
	END:
	}()

	//  if 不存在key, then 设置它, else 抢锁失败
	kv := clientv3.NewKV(client)

	// 创建事务
	txn := kv.Txn(context.TODO())

	// 定义事务

	// 如果key不存在
	txn.If(clientv3.Compare(clientv3.CreateRevision("/demo/A/B1"), "=", 0)).
		Then(clientv3.OpPut("/demo/A/B1", "xxx", clientv3.WithLease(leaseId))).
		Else(clientv3.OpGet("/demo/A/B1")) // 否则抢锁失败

	// 提交事务
	txnResp, err := txn.Commit()
	if err != nil {
		fmt.Println(err)
		return // 没有问题
	}

	// 判断是否抢到了锁
	if !txnResp.Succeeded {
		fmt.Println("锁被占用:", string(
			txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
		return
	}

	// 2, 处理业务

	fmt.Println("处理任务")
	time.Sleep(5 * time.Second)

	// 3, 释放锁(取消自动续租, 释放租约)
	// defer 会把租约释放掉, 关联的KV就被删除了
}

标签:err,实现,08,数据库,etcd,节点,分布式
来源: https://blog.csdn.net/wohu1104/article/details/116273801

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有