ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ--Work Queues

2022-01-24 02:02:05  阅读:190  来源: 互联网

标签:-- Queues Work rabbitmq client import com public Channel


Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

image-20220124002633222

image-20220124002857138

轮训发送消息

启动两个线程,一个消息发送线程,来看看这两个工作线程是如何工作的。

抽取工具类

package com.uin;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author wanglufei
 * @description: 工具类
 * @date 2022/1/24/12:29 AM
 */
public class RabbitMQUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        //引入连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

启动两个工作线程

package com.uin.work_queues;


import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.uin.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author wanglufei
 * @description: TODO
 * @date 2022/1/24/12:40 AM
 */
public class Consumer_work01 {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //接受消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接受到的消息:" + new String(message.getBody()));
        };
        //取消消息的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消息被取消消费者接口的回调逻辑!");
        };
        /**
         * 消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
         * 3.未成功消费的一个回调
         * 4.消费者取消消费的回调
         */
        System.out.println("第一个工作线程!等待接受消息。。。。");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

image-20220124010642181

package com.uin.work_queues;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.uin.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author wanglufei
 * @description: TODO
 * @date 2022/1/24/1:03 AM
 */
public class Consumer_work02 {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //接受消息的回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接受到的消息:" + new String(message.getBody()));
        };
        //取消消息的回调
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消息被取消消费者接口的回调逻辑!");
        };
        /**
         * 消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答
         * 3.未成功消费的一个回调
         * 4.消费者取消消费的回调
         */
        System.out.println("第二个工作线程!等待接受消息。。。。");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

image-20220124010658569

生产者

package com.uin.work_queues;

import com.rabbitmq.client.Channel;
import com.uin.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @author wanglufei
 * @description: TODO
 * @date 2022/1/24/1:09 AM
 */
public class Producer_task01 {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            /**
             * 发送一个消息
             *  1.发送到那个交换机
             *  2.路由的 key 是哪个
             *  3.其他的参数信息
             *  4.发送消息的消息体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//以二进制传输
            System.out.println("发送消息完成:" + message);
        }
    }
}

image-20220124012930108

image-20220124012952307

image-20220124013011771

标签:--,Queues,Work,rabbitmq,client,import,com,public,Channel
来源: https://www.cnblogs.com/bearbrick0/p/15838028.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有