ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

高性能存储队列:Disruptor

2021-12-17 16:03:18  阅读:195  来源: 互联网

标签:Disruptor disruptor 队列 高性能 new muyichen import com


文章目录


一、JUC包下队列的缺陷

  • 1、JUC包下队列大部分使用的都是ReentrantLock锁方式来保证线程安全的。在高并发的情况下为了防止OOM,只能选择有界队列,这样就会导致一部分请求的丢失;
  • 2、加锁方式的等待唤醒机制对内存的开销很大,而且存在死锁的隐患;
  • 3、有界队列通常采用数组实现,而数组结构又会导致另一个问题:伪共享,进而导致性能问题;

二、Disruptor为避免缺陷而设计的解决方案

1、存储结构:环形数组、

使用数组可以避免垃圾回收,同时由于空间局部性原理,数组对于处理器的缓存机制更加友好。

2、定位方式:位运算

Disruptor定义的数组长度都是2^n,所以使用的定位方式都是位运算。位运算都是使用二进制的形式实现的,而机器对于二进制的指令显然会更加友好,速度更快。

3、线程安全策略:CAS

Disruptor对数组中的元素进行操作都是通过CAS进行获取的,这样就能大大减少加锁对性能带来的影响。

4、存储方式:缓存填充

缓存填充是为了解决伪共享而设计出来的,它能让每一个缓存行只有一个元素,这样对元素的写入操作就不会影响其它元素的缓存了。

5、任务执行:事件监听机制

使用观察者模式,是为了防止消费者对任务池的不断重试,从而减少这个过程中对CPU性能的消耗。

三、Disruptor的针对数据覆盖的四种策略

1、BlockingWaitStrategy:

常见且默认的等待策略。当这个队列满了,不执行覆盖而是阻塞等待。使用ReentrantLock + Condition实现阻塞,最节省CPU,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景

2、SleepingWaitStrategy:

这是一个循环等待策略,会在循环中不断的等待数据。它会先进行自旋等待,如果等待不成功(没有CAS到数据的写入权限),就会使用Thread.yield()方法让出CPU,并最终使用LockSupport.partNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时,典型的应用场景就是异步日志

3、YieldingWaitStrategy:

这个策略用于低延时场合。消费者线程会不断的循环监测缓冲区的变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延迟有比较高的要求,可以考虑这种策略。

4、BusySpinWaitStrategy:

该策略采用死循环,消费者线程会尽最大的努力监控缓冲区的变化,对延时非常苛刻的场景使用。在这个策略下CPU核数必须大于消费者线程数,推荐在线程绑定到固定的CPU的场景下使用

四、Disruptor的简单使用

需要引入的依赖:

<!-- 引入Disruptor -->
<dependency>
	<groupId>com.lmax</groupId>
	<artifactId>disruptor</artifactId>
	<version>3.3.4</version>
</dependency>

代码实现:

1、构建消息载体(事件Event)

package com.muyichen.demo.disruptor.event;

import lombok.Data;

/**
 * 消息载体(事件)
 */
@Data
public class OrderEvent {

    private long value;

    private String name;

}

2、构建消息(事件)生产者

package com.muyichen.demo.disruptor.producer;

import com.lmax.disruptor.RingBuffer;
import com.muyichen.demo.disruptor.event.OrderEvent;

/**
 * 消息(事件)生产者
 */
public class OrderEventProducer {

    /**
     * 事件环形队列
     */
    private RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(long value, String name) {
        // 获取事件队列的下一个槽
        long sequence = ringBuffer.next();

        try {
            // 获取消息(事件)
            OrderEvent orderEvent = ringBuffer.get(sequence);
            // 写入数据消息
            orderEvent.setValue(value);
            orderEvent.setName(name);
        } catch (Exception e) {
            // 异常处理
            e.printStackTrace();
        } finally {
            System.out.println("生产者" + Thread.currentThread().getName() +
                    "发送数据:value:" + value + ",name:" + name);
            // 发布事件
            ringBuffer.publish(sequence);
        }

    }

}

3、构建消息(事件)消费者

package com.muyichen.demo.disruptor.consumer;


import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import com.muyichen.demo.disruptor.event.OrderEvent;

/**
 * 消息(事件)消费者
 */
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {

    @Override
    public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
        // TODO 消费逻辑
        System.out.println("消费者" + Thread.currentThread().getName() +
                "消费数据:value:" + orderEvent.getValue() + ",name:" + orderEvent.getName());
    }

    @Override
    public void onEvent(OrderEvent orderEvent) throws Exception {
        // TODO 消费逻辑
        System.out.println("消费者" + Thread.currentThread().getName() +
                "消费数据:value:" + orderEvent.getValue() + ",name:" + orderEvent.getName());
    }
}

测试用例

package com.muyichen.demo.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.muyichen.demo.disruptor.consumer.OrderEventHandler;
import com.muyichen.demo.disruptor.event.OrderEvent;
import com.muyichen.demo.disruptor.producer.OrderEventProducer;

import java.util.concurrent.Executors;

/**
 * 高性能队列测试
 */
public class DisruptorDemo {

    public static void main(String[] args) {
        //创建Disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
                OrderEvent::new, // 等同new OrderEventFactory()
                1024 * 1024,    // 环形数组容量
                Executors.defaultThreadFactory(),
                ProducerType.SINGLE,    // 单生产者 (生产类型有两种:单生产者、多生产者)
                new YieldingWaitStrategy()  // 等待策略
        );

        // 设置消费者用于处理RingBuffer的事件
        disruptor.handleEventsWith(new OrderEventHandler());
        // 设置多消费者,消息会被重复消费
        // disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
        // 设置多消费者,消费者要实现WorkHandler接口,这样能保证,消息只会被一个消费者消费
        // disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

        // 启动Disruptor
        disruptor.start();

        // 构建环形队列
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        // 创建生产者并绑定环形队列
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);
        // 发送消息
        for (int i=0; i<100; i++) {
            producer.onData(i, "muyichen" + i);
        }
        disruptor.shutdown();

    }

}

标签:Disruptor,disruptor,队列,高性能,new,muyichen,import,com
来源: https://blog.csdn.net/qq_42697271/article/details/121990656

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有