ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Rabbitmq之死信队列

2020-06-20 13:54:23  阅读:271  来源: 互联网

标签:false string exchange 队列 Rabbitmq 死信 arguments test model


在使用rabbitmq过程中可能会遇到的情况:

    1、消费者挂了,导致队列积压,超出最大长度,超时若有新的消息过来,这消息会被丢弃或发送不成功;
    2、对于特殊场景的消息,可能有需要处理消息超时的需求;

以上两种情况,应用rabbitmq的死信队列可以比较好的解决。死信队列也是一个正常的exchange,其下绑定了一个或多个queue。这个exchange有dead-letter-exchange的标志。
而消息进入死信队列的情况主要有
1. 消息设置了ttl,并且超时;
2. 消息被拒绝;
3. 队列未被消费的消息数量达到了队列的最大长度。

废话不多说,下面做个简单的demo,直接上代码(.net core3.1 + Rabbitmq.Client 6.1):

1、DMRabbitmqClient.cs, 简单封装的rabbitmqclient。

    public class DMRabbitmqClient
    {
        private ConnectionFactory Factory { get; }

        public DMRabbitmqClient(string host, string username = "guest", string password = "guest", int port = 5672)
        {
            Factory = new ConnectionFactory();
            Factory.UserName = username;
            Factory.HostName = host;
            Factory.Password = password;
            Factory.Port = port;
        }

        public IConnection CreateConnection()
        {
            return Factory.CreateConnection();
        }

        public IModel GetModel(IConnection connection)
        {
            return connection.CreateModel();
        }

        public void ExchangeDeclare(IModel model, string exchangename, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments)
        {
            if (arguments == null)
                arguments = new Dictionary<string, object>();
            model.ExchangeDeclare(exchangename, type, durable, autoDelete, arguments);
        }

        public void QueueDeclare(IModel model, string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments)
        {
            if (arguments == null)
                arguments = new Dictionary<string, object>();
            model.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
        }

        public void QueueBind(IModel model, string queue, string exchange, string routeKey, IDictionary<string, object> arguments)
        {
            if (arguments == null)
                arguments = new Dictionary<string, object>();
            model.QueueBind(queue, exchange, routeKey, arguments);
        }

        public void Publish(IModel model, string dto, string exchange, string routekey)
        {
            IBasicProperties properties = model.CreateBasicProperties();
            properties.DeliveryMode = 2;
            properties.Expiration = "10000";
            properties.ContentEncoding = "UTF-8";

            var content = Encoding.UTF8.GetBytes(dto);

            model.BasicPublish(exchange, routekey, false, properties, content);
        }

        public EventingBasicConsumer AddConsumerEvent(IModel model,EventHandler<BasicDeliverEventArgs> action)
        {
            var eventConsumer = new EventingBasicConsumer(model);
            eventConsumer.Received += action;
            return eventConsumer;
        }

    }

2、生产者代码

```
class Program
{
    static void Main(string[] args)
    {
        var client = new DMRabbitmqClient("127.0.0.1", "guest", "guest");
        IConnection connection = client.CreateConnection();
        var model = client.GetModel(connection);

        //定义死信队列
        client.ExchangeDeclare(model, "test_exchange_dl", "topic", false, false, null);
        client.QueueDeclare(model, "test_queue_dl", false, false, false, null);
        //exchange接收到消息都往这个队列里面发
        client.QueueBind(model, "test_queue_dl", "test_exchange_dl", "#", null);
        //定义死信队列消费者
        var dlconsumer = client.AddConsumerEvent(model,DLReceiver);
        model.BasicConsume("test_queue_dl", true, "", false, true, null, dlconsumer);


        //定义正常的队列
        IModel common_model = client.GetModel(connection);
        client.ExchangeDeclare(common_model, "test_exchange", "topic", false, false, null);
        var arguments = new Dictionary<string, object>();
        arguments.Add("x-dead-letter-exchange", "test_exchange_dl");//定义reject或者ttl超时后,发送到死信队列的exchange
        arguments.Add("x-max-length", 3);//定义队列最大长度
        client.QueueDeclare(common_model, "test_queue", false, false, false, arguments);
        client.QueueBind(common_model, "test_queue", "test_exchange", "test_#", null);

        Console.WriteLine("输入1、发送消息;2、退出");
        string key = Console.ReadLine();
        bool isExists = false;
        while (!string.IsNullOrWhiteSpace(key))
        {
            switch (key)
            {
                case "1":
                    client.Publish(common_model, $"{DateTime.Now:yyyy-MM-dd HH:mm:ss},您好。", "test_exchange", "test_#");
                    break;
                case "2":
                    isExists = true;
                    break;
            }
            if (isExists)
                break;
            Console.WriteLine("输入1、发送消息;2、退出");
            key = Console.ReadLine();
        }

        Console.WriteLine("please pass any key to exists");
        Console.ReadKey();
    }

    private static void DLReceiver(object sender, BasicDeliverEventArgs args)
    {
        string msg = Encoding.UTF8.GetString(args.Body.ToArray());

        Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss}死信队列接收到消息:{msg}");

    }
}

2、消费者代码

    ```
    class Program
    {
        static void Main(string[] args)
        {
            var client = new DMRabbitmqClient("127.0.0.1", "guest", "guest");
            IConnection connection = client.CreateConnection();
            var model = client.GetModel(connection);
            //model.ExchangeDeclare("test_exchange", "topic", false, false, null);
            //model.QueueDeclare("test_queue", true, true, false);

            Console.WriteLine("请输入:1、拉取消息并自动ack;2、拉取消息但是不ack;3、退出。");
            string key = Console.ReadLine();
            bool isExists = false;
            while (!string.IsNullOrWhiteSpace(key))
            {
                switch (key)
                {
                    case "1":
                        PullMessage(model, true);
                        break;
                    case "2":
                        PullMessage(model, false);
                        break;
                    case "3":
                        isExists = true;
                        break;
                }
                if (isExists)
                    break;
                Console.WriteLine("请输入:1、拉取消息并自动ack;2、拉取消息但是不ack;3、退出。");
                key = Console.ReadLine();
            }



            Console.WriteLine("please pass any keys to exists");
            Console.ReadKey();
        }

        private static void PullMessage(IModel model ,bool isAck)
        {
            var result = model.BasicGet("test_queue", isAck);
            if (result != null && !result.Body.IsEmpty)
            {
                string message = Encoding.UTF8.GetString(result.Body.ToArray());
                Console.WriteLine($"{DateTime.Now:yyyy-MM-dd HH:mm:ss}接收到消息:{message},ack:{isAck}");
                if (!isAck)
                {
                    Console.WriteLine("ack为false,拒绝...");
                    model.BasicReject(1, false);//拒绝ack,并且deliverytag设置为1,requeue设置为false
                }
            }
        }
    }
  实践
  1、生产者发送消息,但是不主动拉取消息,此时消息进入私信队列。如下图

  2、生产者发送消息,消费者端接收消息,但是调用basereject方法拒绝。如下图

  3、此处生产者代码已经设置了最大队列长度为3,我们尝试连续发送消息,让队列积压的消息达到最大长度,如下图

标签:false,string,exchange,队列,Rabbitmq,死信,arguments,test,model
来源: https://www.cnblogs.com/wind-ye/p/13168300.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有