ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

源码解析-Abp vNext丨分布式事件总线DistributedEventBus

2021-10-31 23:04:51  阅读:286  来源: 互联网

标签:vNext DistributedEventBus RabbitMQ properties 源码 var new configuration public


前言

上一节咱们讲了LocalEventBus,本节来讲本地事件总线(DistributedEventBus),采用的RabbitMQ进行实现。

Volo.Abp.EventBus.RabbitMQ模块内部代码并不多,RabbitMQ的操作都集中在Volo.Abp.RabbitMQ这个包中。

正文

我们从模块定义开始看,项目启动的时候分别读取了appsetting.json的配置参数和调用了RabbitMqDistributedEventBusInitialize函数。

    public class AbpEventBusRabbitMqModule : AbpModule
    {
        public override void ConfigureServices(ServiceConfigurationContext context)
        {
            var configuration = context.Services.GetConfiguration();

            Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus"));
        }

        public override void OnApplicationInitialization(ApplicationInitializationContext context)
        {
            context
                .ServiceProvider
                .GetRequiredService<RabbitMqDistributedEventBus>()
                .Initialize();
        }
    }

Initialize函数中我们根据 MessageConsumerFactory.Create向内部进行查阅可以看到最终调用方法为RabbitMqMessageConsumer.TryCreateChannelAsync并且在其内部我们可以看到下面代码,这里定义了消费的回调函数。反推Initialize方法其实是在启动一个消费者。

      public void Initialize()
        {
            Consumer = MessageConsumerFactory.Create(
                new ExchangeDeclareConfiguration(
                    AbpRabbitMqEventBusOptions.ExchangeName,
                    type: "direct",
                    durable: true
                ),
                new QueueDeclareConfiguration(
                    AbpRabbitMqEventBusOptions.ClientName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false
                ),
                AbpRabbitMqEventBusOptions.ConnectionName
            );

            Consumer.OnMessageReceived(ProcessEventAsync);

            SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
        }

 var consumer = new AsyncEventingBasicConsumer(Channel);
                consumer.Received += HandleIncomingMessageAsync;

继续向下看Consumer.OnMessageReceived(ProcessEventAsync);该方法向一个并发安全集合输入一个委托事件,并该事件会在上面的HandleIncomingMessageAsync会调中触发故确定为消费者的执行逻辑,而ProcessEventAsync其实还是走了我们在讲LocalEventBus哪一套,寻找Handler执行函数。

SubscribeHandlers还是上节讲的基类的函数,这里要注意内部调用的Subscribe该方法中的 Consumer.BindAsync会根据为消费者Bind路由,这样才能触发事件处理函数。


       public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
        {
            var handlerFactories = GetOrCreateHandlerFactories(eventType);

            if (factory.IsInFactories(handlerFactories))
            {
                return NullDisposable.Instance;
            }

            handlerFactories.Add(factory);

            if (handlerFactories.Count == 1) //TODO: Multi-threading!
            {
                Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
            }

            return new EventHandlerFactoryUnregistrar(this, eventType, factory);
        }

看完了事件消费者我们来看看事件发布,直接看PublishAsync函数就完事了,整个函数非常简单,都是RabbitMQ的操作语法,这里的路由Key是在EventNameAttribute.GetNameOrDefault(eventType);函数中通过读取ETO上指定注解Name来指定的。

protected Task PublishAsync(
            string eventName,
            byte[] body,
            IBasicProperties properties,
            Dictionary<string, object> headersArguments = null,
            Guid? eventId = null)
        {
            using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
            {
                channel.ExchangeDeclare(
                    AbpRabbitMqEventBusOptions.ExchangeName,
                    "direct",
                    durable: true
                );

                if (properties == null)
                {
                    properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
                }

                if (properties.MessageId.IsNullOrEmpty())
                {
                    properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
                }

                SetEventMessageHeaders(properties, headersArguments);

                channel.BasicPublish(
                    exchange: AbpRabbitMqEventBusOptions.ExchangeName,
                    routingKey: eventName,
                    mandatory: true,
                    basicProperties: properties,
                    body: body
                );
            }

            return Task.CompletedTask;
        }

解析

整个分布式事件的实现其实非常简单,在事件发生时发布者只需要定义好路由名称和消息内容发送RabbitMQ中,而消费者则是在项目运行的时候的通过调用Initialize就启动起来了。

这里我们也同样根据整个原理自己实现一下这个流程。

Dppt.EventBus分别定义IDistributedEventBus、DistributedEventBusOptions、IDistributedEventHandler分别用于采用分布式事件总线调用、配置选项用于存储处理程序Handler、定义分布式处理程序抽象。

新建Dppt.EventBus.RabbitMQ类库先简单对RabbitMQ进行一个简单的封装

public class RabbitMqConnections : IRabbitMqConnections
    {
        private readonly IConnectionFactory _connectionFactory;
        private readonly ILogger<RabbitMqConnections> _logger;
        IConnection _connection;
        bool _disposed;
        public RabbitMqConnections(IConnectionFactory connectionFactory, ILogger<RabbitMqConnections> logger)
        {
            _connectionFactory = connectionFactory;
            _logger = logger;
        }


        public bool IsConnected
        {
            get
            {
                return _connection != null && _connection.IsOpen && !_disposed;
            }
        }

        public void TryConnect() {

            _connection = _connectionFactory.CreateConnection();

        }


        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
            }

            return _connection.CreateModel();
        }


        public void Dispose()
        {
            if (_disposed) return;

            _disposed = true;

            try
            {
                _connection.Dispose();
            }
            catch (IOException ex)
            {
                _logger.LogCritical(ex.ToString());
            }
        }

    }

然后我们分别定义ExchangeDeclareConfiguration、QueueDeclareConfiguration用于记录配置信息。

开始处理RabbitMqEventBus处理程序首先是发布事件,大体代码如下就是往RabbitMQ里面丢消息。

        /// <summary>
        /// rabbmitmq 连接服务
        /// </summary>
        public readonly IRabbitMqConnections _rabbitMqConnections;


public Task PublishAsync<TEvent>(TEvent eventData)
        {
            var eventName = EventNameAttribute.GetNameOrDefault(typeof(TEvent));
            var body = JsonSerializer.Serialize(eventData);
            return PublishAsync(eventName, body, null, null);
        }

        public Task PublishAsync(string eventName, string body, IBasicProperties properties, Dictionary<string, object> headersArguments = null, Guid? eventId = null)
        {

            if (!_rabbitMqConnections.IsConnected)
            {
                _rabbitMqConnections.TryConnect();
            }
            using (var channel = _rabbitMqConnections.CreateModel())
            {
                // durable 设置队列持久化  
                channel.ExchangeDeclare(RabbitMqEventBusOptions.ExchangeName, "direct", durable: true);

                if (properties == null)
                {
                    properties = channel.CreateBasicProperties();
                    // 设置消息持久化
                    properties.DeliveryMode = 2;
                }

                if (properties.MessageId.IsNullOrEmpty())
                {
                    // 消息的唯一性标识
                    properties.MessageId = (eventId ?? Guid.NewGuid()).ToString("N");
                }

                SetEventMessageHeaders(properties, headersArguments);

                channel.BasicPublish(
                   exchange: RabbitMqEventBusOptions.ExchangeName,
                   routingKey: eventName,
                   mandatory: true,
                   basicProperties: properties,
                   body: Encoding.UTF8.GetBytes(body)
               );

            }

            return Task.CompletedTask;
        }

      private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object> headersArguments)
        {
            if (headersArguments == null)
            {
                return;
            }

            properties.Headers ??= new Dictionary<string, object>();

            foreach (var header in headersArguments)
            {
                properties.Headers[header.Key] = header.Value;
            }
        }


然后就是消费者的处理,我们同样定义Initialize函数,并简化部分封装代码,完成消费者启动。

 public void Initialize()
        {

            Exchange = new ExchangeDeclareConfiguration(RabbitMqEventBusOptions.ExchangeName,"direct",true);
            Queue = new QueueDeclareConfiguration(RabbitMqEventBusOptions.ClientName, true, false, false);

            // 启动一个消费者
            if (!_rabbitMqConnections.IsConnected)
            {
                _rabbitMqConnections.TryConnect();
            }

            try
            {

                Channel = _rabbitMqConnections.CreateModel();



                Channel.ExchangeDeclare(
                  exchange: Exchange.ExchangeName,
                  type: Exchange.Type,
                  durable: Exchange.Durable,
                  autoDelete: Exchange.AutoDelete,
                  arguments: Exchange.Arguments
              );


                Channel.QueueDeclare(
                   queue: Queue.QueueName,
                   durable: Queue.Durable,
                   exclusive: Queue.Exclusive,
                   autoDelete: Queue.AutoDelete,
                   arguments: Queue.Arguments
               );

                var consumer = new AsyncEventingBasicConsumer(Channel);
                consumer.Received += HandleIncomingMessageAsync;

                Channel.BasicConsume(
                    queue: Queue.QueueName,
                    autoAck: false,
                    consumer: consumer
                );

                SubscribeHandlers(DistributedEventBusOptions.Handlers);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error:" + ex.Message);
            }
        }

参数配置这边主要是读取AppSetting信息和索要Handler

 public static class DpptEventBusRabbitMqRegistrar
    {
        public static void AddDpptEventBusRabbitMq(this IServiceCollection services, IConfiguration configuration, List<Type> types)
        {
     
            services.AddSingleton<IRabbitMqConnections>(sp =>
            {
                var logger = sp.GetRequiredService<ILogger<RabbitMqConnections>>();

                var factory = new ConnectionFactory()
                {
                    HostName = configuration["RabbitMQ:EventBusConnection"],
                    VirtualHost = configuration["RabbitMQ:EventBusVirtualHost"],
                    DispatchConsumersAsync = true,
                    AutomaticRecoveryEnabled = true
            };

                if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusUserName"]))
                {
                    factory.UserName = configuration["RabbitMQ:EventBusUserName"];
                }

                if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusPassword"]))
                {
                    factory.Password = configuration["RabbitMQ:EventBusPassword"];
                }

                return new RabbitMqConnections(factory, logger);
            });

            var distributedHandlers = types;
            foreach (var item in distributedHandlers)
            {
                services.AddSingleton(item);
            }

            services.Configure<DistributedEventBusOptions>(options =>
            {
                options.Handlers.AddIfNotContains(distributedHandlers);
            });

            services.Configure<DpptRabbitMqEventBusOptions>(options => {

                options.ExchangeName = configuration["RabbitMQ:EventBus:ExchangeName"];
                options.ClientName = configuration["RabbitMQ:EventBus:ClientName"];
            });

            services.AddSingleton<IDistributedEventBus, RabbitMqDistributedEventBus>();

          
        }
    }

测试

新建一个空项目,进行插件注册,然后创建ETO和Handler进行测试。

64

测试结果放在下面了。

62

63

结语

本次挑选了一个比较简单的示例来讲,整个EventBus我应该分成3篇 下一篇我来讲分布式事务。

最后欢迎各位读者关注我的博客, https://github.com/MrChuJiu/Dppt/tree/master/src 欢迎大家Star

另外这里有个社区地址(https://github.com/MrChuJiu/Dppt/discussions),如果大家有技术点希望我提前档期可以写在这里,希望本项目助力我们一起成长

标签:vNext,DistributedEventBus,RabbitMQ,properties,源码,var,new,configuration,public
来源: https://www.cnblogs.com/MrChuJiu/p/15491549.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有