ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

disruptor无锁队列实现流水记录

2022-01-28 13:06:20  阅读:209  来源: 互联网

标签:disruptor 无锁 log 队列 private LogEvent RingBuffer import public


目录

1 无锁机制简介

2 RingBuffer简介

2.1 工作原理简介

3 disruptor实现流水异步入库

3.1 定义事件实体类

3.2 定义事件服务类

3.3 定义消费者

3.3.1 单任务处理

3.3.2 批处理

3.4 运行

4 pom依赖

5 参考


1 无锁机制简介

普通队列写入时需要通过锁机制避免并发,disruptor不用锁,使用CAS(Compare And Swap/Set)操作确保线程安全,这是一个CPU级别的指令,工作方式类似乐观锁。

2 RingBuffer简介

Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首尾相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。

RingBuffer中分离了读指针和写指针,从而使生产者和消费者互不干扰,两者可以完全并发执行,从而使性能达到数倍于传统基于互斥锁方式实现的消息队列模型。

RingBuffer保持数组元素永远有效,入队列直接覆盖旧的数据,相比普通数组队列,无需GC。

2.1 工作原理简介

disruptor的是基于事件实现的,那么就有了生产者(provider)和消费者(consumer)存在,生产者生产元素放入数组中,消费者从数组中消费元素,这个数组就是RingBuffer。每一个生产者和消费者内部都会有一个私有指针pri-sequence,表示当前操作的元素序号,同时RingBuffer内部也会有一个全局指针global-sequence指向最后一个可以被消费的元素。这样当生产者需要放数据时,只需要获取global-sequence的下一个位置,下一个位置如果还未被消费,那么就会进入等待策略,如果下一个位置已经被消费,那么就会直接覆盖当前位置的属性值。

当生产者需要向容器中存放数据时,只需要使用sequence%(数组长度-1)就可以得到要添加的元素应该放在哪儿个位置上,这样就实现了数组的首尾相连。

disruptor初始化时需要指定容器大小,容器大小指定为2^n,计算时可以可以使用位运算:

如果容器大小是8,要放12号元素。12%8 = 12 &(8-1)=1100&0111=0100=4。

使用位运算可以提升效率。

3 disruptor实现流水异步入库

3.1 定义事件实体类

LogEvent作为队列RingBuffer中的元数据

import java.io.Serializable;
import java.util.Date;

public class LogEvent implements Serializable {

	private static final long serialVersionUID = 1L;

	private String userId;// char(32)
	private String rspCd;// char(2)
	private String rspMsg;// varchar(128)
	private Date transCrtTs;// timestamp(3)
	private Date transCfmTs;// timestamp(3)

	public LogEvent() {
		this.userId = "";
		this.rspCd = "";
		this.rspMsg = "";
		this.transCrtTs = new Date();
		this.transCfmTs = new Date();
	}

	public String getUserId() {
		return userId;
	}

	public void setUserId(String userId) {
		if (userId == null) {
			return;
		}
		this.userId = userId;
	}

	public String getRspCd() {
		return rspCd;
	}

	public void setRspCd(String rspCd) {
		if (rspCd == null) {
			return;
		}
		this.rspCd = rspCd;
	}

	public String getRspMsg() {
		return rspMsg;
	}

	public void setRspMsg(String rspMsg) {
		if (rspMsg == null) {
			return;
		}
		this.rspMsg = rspMsg;
	}

	public Date getTransCrtTs() {
		return transCrtTs;
	}

	public void setTransCrtTs(Date transCrtTs) {
		if (transCrtTs == null) {
			return;
		}
		this.transCrtTs = transCrtTs;
	}

	public Date getTransCfmTs() {
		return transCfmTs;
	}

	public void setTransCfmTs(Date transCfmTs) {
		if (transCfmTs == null) {
			return;
		}
		this.transCfmTs = transCfmTs;
	}

	@Override
	public String toString() {
		StringBuffer stringBuffer = new StringBuffer();
		stringBuffer.append("LogEvent{");
		stringBuffer.append("userId=");
		stringBuffer.append(userId);
		stringBuffer.append(", rspCd=");
		stringBuffer.append(rspCd);
		stringBuffer.append(", rspMsg=");
		stringBuffer.append(rspMsg);
		stringBuffer.append(", transCrtTs=");
		stringBuffer.append(transCrtTs);
		stringBuffer.append(", transCfmTs=");
		stringBuffer.append(transCfmTs);
		return stringBuffer.toString();
	}
}

3.2 定义事件服务类

LogEventService中,初始化队列RingBuffer,为生产者提供接口。

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.beanutils.BeanUtils;

import java.util.concurrent.Executors;

public class LogEventService {
    private static LogEventService instance;
    private static int RING_BUFFER_SIZE = 1024 * 1024;
    private RingBuffer<LogEvent> ringBuffer;
    private LogEventHandler logEventHandler;

    /**
     * 构造函数,调用初始化队列
     */
    public LogEventService() {
        initRingBuffer();
    }

    /**
     * 创建service实例
     * @return LogEventService
     */
    public static LogEventService getInstance() {
        if (instance == null) {
            instance = new LogEventService();
        }
        return instance;
    }

    /**
     * 初始化队列
     */
    private void initRingBuffer() {
        try {
            logEventHandler = new LogEventHandler();
            Disruptor<LogEvent> disruptor = new Disruptor<>(EVENT_FACTORY, RING_BUFFER_SIZE, Executors.defaultThreadFactory());
            disruptor.handleEventsWith(logEventHandler);
            ringBuffer = disruptor.start();
        } catch (Exception e) {

        }
    }

    /**
     * 生产者入队列接口
     * @param log LogEvent
     */
    public void publish(LogEvent log) {
        long sequence = ringBuffer.next();
        try {
            LogEvent ringValue = ringBuffer.get(sequence);
            BeanUtils.copyProperties(ringValue, log);//复制对象中的所有属性
        } catch (Exception e) {

        } finally {
            ringBuffer.publish(sequence);
        }
    }

    /**
     * 初始化填充队列,提前分配内存,降低GC
     */
    public final EventFactory<LogEvent> EVENT_FACTORY =
            new EventFactory<LogEvent>() {
                @Override
                public LogEvent newInstance() {
                    return new LogEvent();
                }
            };

}

3.3 定义消费者

3.3.1 单任务处理

import com.lmax.disruptor.EventHandler;

import com.dto.LogEvent;
import com.task.LogTask;

public class LogEventHandler implements EventHandler<LogEvent> {
    
    /**
     * 消费队列
     * @param log 队列中的任务
     * @param sequence 当前消费到的队列位置
     * @param endOfBatch 是否为RingBuffer内存片中的最后一块
     */
    @Override
    public void onEvent(LogEvent log, long sequence, boolean endOfBatch) {
        LogTask logTask = new LogTask();        
        logTask.process(log);//调用相关服务
    }
}

3.3.2 批处理

使用批处理方式,消费队列中的对象,调用相关服务

import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import java.util.List;

import com.dto.LogEvent;
import com.task.LogTask;

public class LogEventHandler implements EventHandler<LogEvent> {

    private final static int DB_BATCH_SIZE = 100;
    private final static int RING_BATCH_SIZE = 1024;
    private List<Object> cache = Lists.newArrayList();

    /**
     * 消费队列,批处理
     * @param log 队列中的任务
     * @param sequence 当前消费到的队列位置
     * @param endOfBatch 是否为RingBuffer内存片中的最后一块
     */
    @Override
    public void onEvent(LogEvent log, long sequence, boolean endOfBatch) {
        cache.add(log);
        LogTask logTask = new LogTask();
        if ((sequence + 1) % DB_BATCH_SIZE == 0) {
            logTask.process(cache);//调用相关服务
            cache.clear();
        }
        if (endOfBatch) {
            if ((sequence + 1) % RING_BATCH_SIZE != 0) {
                logTask.process(cache);//调用相关服务
                cache.clear();
            }
        }
    }
}

3.4 运行

创建2个生产线程,测试生产和消费过程。

import java.util.Date;

import com.dto.LogEvent;

public class DisruptorTest {

    private final static int THREAD_NUM = 2;//生产者线程数
    private final static int TASK_NUM = 10000;//每个生产者生产任务的数量

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_NUM; i++) {
            new DisruptorThread().start();//创建并启动生产者线程
        }
    }

    /**
     * 生产者单线程执行任务
     */
    private static class DisruptorThread extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < TASK_NUM; i++) {
                LogEvent log = initLogEvent();
                LogEventService.getInstance().publish(log);
            }
        }
    }

    /**
     * 创建log对象
     *
     * @return LogEvent
     */
    private static LogEvent initLogEvent() {
        LogEvent log = new LogEvent();
        log.setUserId("123456789");
        log.setRspCd("00");
        log.setRspMsg("成功");
        log.setTransCrtTs(new Date());
        log.setTransCfmTs(new Date());
        return log;
    }
}

4 pom依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.1</version>
</dependency>

5 参考

Disruptorhttp://www.manongjc.com/detail/22-eslcjjgowuksoks.htmlJava多线程之Disruptor入门https://www.jb51.net/article/211039.htm利用disruptor DB批量存储https://blog.csdn.net/hanbaoqi99/article/details/78954915

标签:disruptor,无锁,log,队列,private,LogEvent,RingBuffer,import,public
来源: https://blog.csdn.net/lizehao1973/article/details/122727789

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有