ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ-Work Queues

2022-03-06 13:00:49  阅读:170  来源: 互联网

标签:false 队列 Queues Work RabbitMQ queue var Console channel


工作队列(Work Queues)

 

 

场景:假设生产者向队列中添加一条数据的时间为1秒,消费者从队列中消费一条数据执行完业务逻辑需要5秒,在这种情况下队列就会不断堆积最终导致服务瘫痪。

解决方案:运行多个消费者,同时消费队列中的任务

生产者

定义一个task_Queue队列;1秒向队列中发送一条消息

var factory = new ConnectionFactory
{
   Uri = new Uri("amqp://admin:admin@192.168.65.133:5672"),
   AutomaticRecoveryEnabled = true
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
   channel.QueueDeclare(queue: "task_queue",
                        durable: true,//可持久化
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
   int count = 0;
   while (true)
  {
       count++;
       var message = $"Task {count}";
       var body = Encoding.UTF8.GetBytes(message);

       var properties = channel.CreateBasicProperties();
       properties.Persistent = true;

       channel.BasicPublish(exchange: "",
                            routingKey: "task_queue",
                            basicProperties: properties,
                            body: body);
       Console.WriteLine("Send {0}", message);
       //暂停一秒
       Task.Delay(1000).Wait();
  }
}

 

消费者

定义一个task_Queue队列;5秒消费一条消息

var factory = new ConnectionFactory
{
   Uri = new Uri("amqp://admin:admin@192.168.65.133:5672"),
   AutomaticRecoveryEnabled = true
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
   channel.QueueDeclare(queue: "task_queue",
                        durable: true,//可持久化
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

   channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

   Console.WriteLine("Waiting for messages.");

   var consumer = new EventingBasicConsumer(channel);
   consumer.Received += (sender, ea) =>
  {
       var body = ea.Body.ToArray();
       var message = Encoding.UTF8.GetString(body);
       Console.WriteLine("Received {0}", message);
       Task.Delay(5000).Wait();  //等待5秒
       Console.WriteLine("Task Done");

       channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动确认
  };
   channel.BasicConsume(queue: "task_queue",
                        autoAck: false,//关闭自动确认
                        consumer: consumer);
   Console.WriteLine(" Press [enter] to exit.");
   Console.ReadLine();
}

测试

启动5个消费者

 

 

可视化界面中查看连接数为5个

 

 

启动1个生产者

 

 

可视化界面连接多了1个生产者

 

 

测试结果

5个消费者一起消费队列中的任务。

 

 

运行一段时间后 rabbitmq 队列中 也没有过多的堆积任务

 

 公众号同步更新

 

标签:false,队列,Queues,Work,RabbitMQ,queue,var,Console,channel
来源: https://www.cnblogs.com/xianchengzhang/p/15971370.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有