ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

mqtt模式--Work 模式--公平分发

2022-07-01 16:34:30  阅读:186  来源: 互联网

标签:-- Work factory 模式 try connection import null channel


rabbitmq 使用带有 perfetchCount = 1 设置的 basicQos 方法。当消费者接受处理并确认前一条消息前,不向此消费者发送新消息,会分配给其他空闲的消费者。
package com.tszr.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Productor {
    public static void main(String[] args){
        // 1、创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2、获取连接、通道
            connection = factory.newConnection();
            channel = connection.createChannel();

            // 3、向 Queue1 发布20个消息
            for (int i = 0; i < 20; i++) {
                String msg = "晴天: " + i;
                channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));
            }
            System.out.println("消息发送成功!");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            System.out.println("消息发送异常");
        } finally {
            // 关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
package com.tszr.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Worker1 {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 获取连接、通道
            connection = factory.newConnection();
            channel = connection.createChannel();
            Channel finalChannel = channel;
            // Channel 使用 Qos 机制
            finalChannel.basicQos(16);
            finalChannel.basicConsume("queue1", false, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    System.out.println("Worker1" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                }
            });
            System.out.println("Worker1 开始接收消息");
            System.in.read();
        } catch (IOException |
                TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
package com.tszr.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Worker2 {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 获取连接、通道
            connection = factory.newConnection();
            channel = connection.createChannel();
            Channel finalChannel = channel;
            // Channel 使用 Qos 机制
            finalChannel.basicQos(5);
            finalChannel.basicConsume("queue1", false, new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    System.out.println("Worker2" + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                }
            });
            System.out.println("Worker2 开始接收消息");
            System.in.read();
        } catch (IOException |
                TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

 

 

相较于轮询分发模式,添加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条消息

 

标签:--,Work,factory,模式,try,connection,import,null,channel
来源: https://www.cnblogs.com/tszr/p/16435116.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有