ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

深入理解Zookeeper(二)如何通过zookeeper实现分布式锁

2021-12-26 14:32:30  阅读:148  来源: 互联网

标签:orderId zookeeper String Zookeeper public new order id 分布式


二、如何通过zookeeper实现分布式锁

(1)发现的问题

在这里插入图片描述

(2)解决方法

在这里插入图片描述
在这里插入图片描述

(3)代码实现

(1)用户支付订单代码

package com.yyds.quartzstudy.zk;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class OrderPay {

    private final static String timeOutOrderLockPrefix = "/timeOutOrderLock";
    public void pay(Long orderId){
        try {
            String zkAddr = "192.168.42.101:2181,192.168.42.102:2181,192.168.42.103:2181";
            // 只要执行一次countDown(),等待的线程就会继续执行
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(zkAddr, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {// 异步创建
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
            System.out.println("orderId-" + orderId +  "  连接zk成功...");

            String status = startPay(zooKeeper, orderId);
            System.out.println("orderId-" + orderId+ ",支付状态为:" + status);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    private String startPay(ZooKeeper zooKeeper, Long orderId) {
        /**
         * 支付前,需要先加锁 /timeOutOrderLock/{orderId}
         *   如果创建成功,就进行付款操作
         *   如果创建失败,就稍后再次付款
         */
        String timeOutOrderLockPath = timeOutOrderLockPrefix + "/" + orderId;
        try {
            zooKeeper.create(timeOutOrderLockPath,
                    "order".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL);
            // 模拟去支付订单
            System.out.println("订单-" + orderId + " 准备支付...");
            TimeUnit.SECONDS.sleep(3);

            zooKeeper.delete(timeOutOrderLockPath,-1);
            System.out.println("订单-" + orderId + " 支付成功...");
            return "success";
        }catch (KeeperException.NodeExistsException e){
            System.out.println("订单-" + orderId + " 创建失败...请稍后重试");
            return "failed";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

(2)超时处理逻辑

package com.yyds.quartzstudy.zk;

import org.apache.zookeeper.*;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class OrderTimedOut {

    private final static String timeOutOrderLockPrefix = "/timeOutOrderLock";

    public void timedOut(){
        try {
            String zkAddr = "192.168.42.101:2181,192.168.42.102:2181,192.168.42.103:2181";
            // 只要执行一次countDown(),等待的线程就会继续执行
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper = new ZooKeeper(zkAddr, 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {// 异步创建
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
            System.out.println("OrderTimedOut-" +   "  连接zk成功...");

            startUpdateOrderTimedOut(zooKeeper);

        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private void startUpdateOrderTimedOut(ZooKeeper zooKeeper) {
        /**
         * 1、首先找出30min没有付款的订单
         * 2、处理每条订单,加上分布式锁,尝试创建/timeOutOrderLock/{orderId}
         *    如果创建成功,进行业务处理
         *    如果创建失败,不作任何处理
         * 3、修改需要修改状态的订单
         */
        ArrayList<Order> orders = new ArrayList<>();
        orders.add(new Order(1L,"NOT_PAY"));
        orders.add(new Order(2L,"NOT_PAY"));
        Iterator<Order> iterator = orders.iterator();
        while (iterator.hasNext()){
            Order order = iterator.next();
            String timeOutOrderLockPath = timeOutOrderLockPrefix + "/" + order.id;
            zooKeeper.create(timeOutOrderLockPath,
                    "".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
                        @Override
                        public void processResult(int i, String s, Object o, String s1) {
                            if(i == KeeperException.Code.OK.intValue()){
                                System.out.println("OrderTimedOut开始执行业务处理order.id-" + order.id);
                                // 模拟处理
                                try {
                                    TimeUnit.SECONDS.sleep(3);
                                    // 删除锁
                                    zooKeeper.delete(timeOutOrderLockPath,-1);
                                    System.out.println("OrderTimedOut执行业务处理完毕order.id-" + order.id);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }else if(i == KeeperException.Code.NODEEXISTS.intValue()){
                                System.out.println("OrderTimedOut不执行order.id-" + order.id);
                                iterator.remove();
                            }else {
                                System.out.println("OrderTimedOut异常,order.id-" + order.id);
                            }
                        }
                    },"call_back");
        }
    }

    private class Order{
       private Long id;
       private String status;

        public Order(Long id, String status) {
            this.id = id;
            this.status = status;
        }
        public Long getId() {
            return id;
        }
        public void setId(Long id) {
            this.id = id;
        }

        public String getStatus() {
            return status;
        }
        public void setStatus(String status) {
            this.status = status;
        }
    }
}

(3)测试

package com.yyds.quartzstudy.zk;

import java.util.concurrent.TimeUnit;

public class OrderTest {
    public static void main(String[] args) throws InterruptedException {
        Thread pay = new Thread(() -> new OrderPay().pay(1L));
        Thread timeOut = new Thread(() -> new OrderTimedOut().timedOut());

        //用户先拿到锁
        pay.start();
        TimeUnit.SECONDS.sleep(1);
        timeOut.start();
        TimeUnit.SECONDS.sleep(6);
    }
}

(4)测试结果

OrderTimedOut-  连接zk成功...
orderId-1  连接zk成功...
订单-1 准备支付...
OrderTimedOut不执行order.id-1
OrderTimedOut开始执行业务处理order.id-2
订单-1 支付成功...
orderId-1,支付状态为:success

(4)分布式读写锁的逻辑

在这里插入图片描述

在这里插入图片描述

标签:orderId,zookeeper,String,Zookeeper,public,new,order,id,分布式
来源: https://blog.csdn.net/qq_44665283/article/details/122155072

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有