ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

EasyNetQ在.Net5中的应用

2022-05-01 10:04:11  阅读:178  来源: 互联网

标签:EasyNetQ service app message cs 应用 services Net5 public


这篇文章参考网上的教程,主要解决了EasyNetQ的依赖注入服务问题。

主要涉及四个文件:

ApplicationExtenssion.cs    .Net中间件的实现

OrderConsumer.cs         消息消费者的实现

WindsorMessageDispatcher.cs   通过容器解析获得消费者服务

Startup.cs             相关服务的注册及中间件引用

主要参考代码:

ApplicationExtenssion.cs

    public static class ApplicationExtenssion
    {
        /// <summary>
        /// 中间件实现
        /// </summary>
        /// <param name="appBuilder"></param>
        /// <param name="subscriptionIdPrefix"></param>
        /// <param name="assembly"></param>
        /// <returns></returns>
        public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly)
        {
            var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider;
            var lifeTime = services.GetService<IHostApplicationLifetime>();
            var bus = services.GetRequiredService<IBus>();
            lifeTime.ApplicationStarted.Register(() =>
            {
                var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
                //需要指定AutoSubscriberMessageDispatcher对应的实例
                //并可以通过构造函数传参,如:IServicesProvider,即:services
                subscriber.AutoSubscriberMessageDispatcher = new WindsorMessageDispatcher(services);
                subscriber.Subscribe(new Assembly[] { assembly });
                subscriber.SubscribeAsync(new Assembly[] { assembly });
            }); 
            lifeTime.ApplicationStopped.Register(() => { bus.Dispose(); }); 
            return appBuilder;
        }
    }

OrderConsumer.cs

    /// <summary>
    /// IConsume接口的实现类
    /// </summary>
    public class OrderConsumer : IConsume<MessageDto>
    {
        private readonly ILogger<OrderConsumer> logger;

        /// <summary>
        /// 可以注入相关服务
        /// </summary>
        /// <param name="logger"></param>
        public OrderConsumer(ILogger<OrderConsumer> logger)
        {
            this.logger = logger;
        }

        /// <summary>
        /// 消息的消费方法
        /// </summary>
        /// <param name="message"></param>
        /// <param name="cancellationToken"></param>
        public void Consume(MessageDto message, CancellationToken cancellationToken = default)
        {
            logger.LogInformation(JsonConvert.SerializeObject(message));
        }
    }

WindsorMessageDispatcher.cs

    /// <summary>
    /// 结合官方教程实现自动订阅消息转发
    /// </summary>
    public class WindsorMessageDispatcher : IAutoSubscriberMessageDispatcher
    {
        private readonly IServiceProvider service;

        /// <summary>
        /// 参数由中间件的AutoSubscriberMessageDispatcher进行传递
        /// </summary>
        /// <param name="service"></param>
        public WindsorMessageDispatcher(IServiceProvider service)
        {
            this.service = service;
        }

        /// <summary>
        /// 同步方法,可以解析TConsumer对应的服务
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <typeparam name="TConsumer"></typeparam>
        /// <param name="message"></param>
        /// <param name="cancellationToken"></param>
        void IAutoSubscriberMessageDispatcher.Dispatch<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken)
        {
            //通过容器获取对应的消费者服务,也可以通过Autofac实现
            var consumer = service.GetService<IConsume<MessageDto>>();
            //通过解析到的服务调用对应的消费者实现的方法
            consumer.Consume(message as MessageDto, cancellationToken);
        }

        /// <summary>
        /// 异步方法
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <typeparam name="TConsumer"></typeparam>
        /// <param name="message"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        async Task IAutoSubscriberMessageDispatcher.DispatchAsync<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken)
        {
            var consumer = service.GetService<TConsumer>();
            await consumer.ConsumeAsync(message, cancellationToken);
        }
    }

Startup.cs

        public void ConfigureServices(IServiceCollection services)
        {
            string rabbitMqConnection = Configuration["RabbitMqConnection"];
            //注册IBus为单例模式
            services.AddSingleton(RabbitHutch.CreateBus(rabbitMqConnection));
            //注册IConsume的实现类,EasyNetQ会自动扫描实现该接口的类
            services.AddSingleton<IConsume<MessageDto>,OrderConsumer>();

            services.AddControllers();
            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "EasyNetQ.WebApi", Version = "v1" });
            });
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
                app.UseSwagger();
                app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "EasyNetQ.WebApi v1"));
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            //添加相应的中间件
            app.UseSubscribe("OrderService", Assembly.GetExecutingAssembly());

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }

 

标签:EasyNetQ,service,app,message,cs,应用,services,Net5,public
来源: https://www.cnblogs.com/superfeeling/p/16212236.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有