ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

2021-03-03

2021-03-03 20:30:08  阅读:158  来源: 互联网

标签:03 purchaseOrderCreatedEvent 定义 事件处理 2021 事件 public 通道


 

事件模型

内部&外部事件模型

 

  • 所有的领域事件都是Spring Application Event
  • 内部事件:如果领域事件只是在本微服务内使用,则定义该类领域事件为内部事件
    • 内部事件被本微服务内其他模块(或者同一模块)消费,主要目的是解耦业务逻辑
  • 外部事件:如果领域事件需要被其他微服务使用,则发布该领域事件到消息中间件(RabbitMQ/Kafka),则定义该类领域事件为外部事件
    • 外部事件被其他微服务消费,主要目的是解耦不同微服务之间的同步调用

外部事件模型

 

领域事件

事件命名规则

  • 领域名称(名词)+ 表示事件的动词被动时态)+(宾语)【可选】 + Event,比如:UserAssignedToRoleEvent
  • 事件类包位置:event

事件执行规则

  • 不需要执行特定接口
  • 根据业务定义事件所包含的数据

BaseServiceFeeDocumentEvent 

事件实现方式

Spring Cloud Stream

 

发布订阅模型

RabbitMQ消息模型

 

RabbitMQ部署模型

  • 高可用集群 cluster
  • 镜像队列 mirrored queues

外部事件配置

外部事件发布通道配置

  • 定义事件发布配置类,即事件源类,命名规则:领域名称 + EventSource(事件源),比如:PurchaseOrderEventSource
  • 定义事件发布通道,通常一个事件定义为一个发布通道 Output,发布通道命名规则:领域事件名称 + Output,通常常量和方法配对定义,比如:purchaseOrderCreatedEventOutput,PURCHASE_ORDER_CREATED_EVENT_OUTPUT
  • 事件发布通道配置类包位置:event.config

PurchaseOrderEventSource 

public interface PurchaseOrderEventSource {
    String PURCHASE_ORDER_CREATED_EVENT_OUTPUT = "purchaseOrderCreatedEventOutput";
 
 
    @Output(PURCHASE_ORDER_CREATED_EVENT_OUTPUT)
    MessageChannel purchaseOrderCreatedEventOutput();
}

外部事件订阅通道配置

  • 定义事件订阅配置类,即事件池类,命名规则:领域名称 + EventSink(事件池),比如:PurchaseOrderEventSink
  • 定义事件订阅通道,通常一个事件定义为一个订阅通道 Input,订阅通道命名规则:领域事件名称 + Input,通常常量和方法配对定义,比如:PurchaseOrderCreatedEventInput,PURCHASE_ORDER_CREATED_EVENT_INPUT
  • 如果在一个服务内同一个外部事件被多个不同种类的消费方消费,可以为每一个消费方定义一个订阅通道Input,订阅通道命名规则:领域事件名称 + Input + For + 消费方业务名称,通常常量和方法配对定义,比如:OrderCreatedEventInputForInventoryDeduction,ORDER_CREATED_EVENT_INPUT_FOR_INVENTORY_DEDUCTION.
  • 事件订阅通道配置类包位置:event.config

PurchaseOrderEventSink 展开源码

外部事件配置文件设定

  • 在spring.cloud.stream.bindings中定义每一个发布通道
    • 发布通道的名称与事件发布通道配置类中的output保持一致
    • destination对应消息基础设置的Topic(exchange),Topic命名规则:领域事件名称 + .topic,比如 purchaseOrderCreatedEvent.topic
    • binder对应相应的消息基础设置
  • 在spring.cloud.stream.bindings中定义每一个订阅通道
    • 订阅通道的名称与事件订阅通道配置类中的input保持一致
    • destination对应消息基础设置的Topic(exchange)
    • binder对应相应的消息基础设置
    • input必须设定group,一组对应一个消息队列queue, group名称通常为服务名称 +.模块名称,group保证消息只被集群中的一个节点消费并且消息是持久化的。比如:purchase-service.purchaseRequirement

配置文件 

spring:
  cloud:
    stream:
      bindings:
        purchaseOrderCreatedEventOutput:
          destination: purchaseOrderCreatedEvent.topic
          binder: local_rabbit
        purchaseOrderCreatedEventInput:
          group: purchase-service.purchaseRequirement
          destination: purchaseOrderCreatedEvent.topic
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 188.188.188.56
                port: 5672
                username: admin
                password: leading2018
                virtual-host: /

 

事件发布

  • 基于 Spring 应用事件机制发布领域事件,在 service 中依据具体的业务场景构建领域事件对象,使用 ApplicationContext 发布领域事件。

在服务方法中发布事件 

//service实现类
 
 
@Autowired
private ApplicationContext applicationContext;
 
 
public void doSomethingThenPublishAnEvent() {
    ......
    PurchaseOrderCreatedEvent purchaseOrderCreatedEvent = new PurchaseOrderCreatedEvent (...);
    applicationContext.publishEvent(purchaseOrderCreatedEvent);
}..

 

  • 如果该领域事件需要成为外部事件,则每类事件定义一个事件发布者,其作用是发布领域事件到消息中间件。
  • 如果该领域事件只是内部事件,则不需要定义事件发布者。
  • 事件发布者包位置:event.publisher
  • 事件发布者命名规范:领域名称 + EventPublisher,比如:PurchaseOrderEventPublisher
  • 在事件发布者绑定事件发布通道配置,例如:@EnableBinding(PurchaseOrderEventSource.class)
  • 事件发布者方法签名规范:public void publishXXXEvent(事件名称)
  • 事件发布者监听Spring应用事件,发布相应的事件到消息基础设施(RabbitMQ/Kafka)

PurchaseOrderEventPublisher外部事件发布者 展开源码

@Component
@EnableBinding(PurchaseOrderEventSource.class)
public class PurchaseOrderEventPublisher {
 
    @Autowired
    PurchaseOrderEventSource purchaseOrderEventSource;
 
    @EventListener
    public void publishPurchaseOrderCreatedEvent(PurchaseOrderCreatedEvent purchaseOrderCreatedEvent){
        purchaseOrderEventSource.purchaseOrderCreatedEventOutput().send(MessageBuilder.withPayload(purchaseOrderCreatedEvent).build());
    }
 
    ......
}

 

事件消费

  • 通常每类事件定义一个事件处理者类。
  • 事件处理者包位置:event.handler
  • 事件处理者命名规范:领域名称 + EventHandler,比如 PurchaseOrderEventHandler
  • 如果是处理外部事件,在事件处理者绑定事件订阅通道配置,例如:@EnableBinding(PurchaseOrderEventSink.class)
  • 事件处理者方法签名规范:public void receiveXXXEvent(事件名称)
  • 如果是处理外部事件,在事件处理方法上加注解@StreamListener
  • 如果是处理内部事件,在事件处理方法上加注解@EventListener
  • 同一事件不应该即被当做内部事件消费,同时又被当做外部事件消费,逻辑上不通。
  • 事件处理者方法执行:做一些参数处理,日志记录,核心实现需要调用某个service方法来完成,和Controller的处理方式类似。

OrderEventHandler处理外部事件 展开源码

@Component
@EnableBinding(OrderEventSink.class)
public class OrderEventHandler {
 
    @StreamListener(OrderEventSink.ORDER_CREATED_EVENT_INPUT)
    public void receiveOrderCreatedEvent(OrderCreatedEvent orderCreatedEvent) {
        //Process the outer Event
    }
}

PurchaseOrderEventHandler处理内部事件 展开源码

@Component
@Slf4j
public class PurchaseOrderEventHandler {
 
    @EventListener
    public void receivePurchaseOrderCreatedEvent(PurchaseOrderCreatedEvent purchaseOrderCreatedEvent) {
        //Process the inner Event
        log.debug("Received the inner event: PurchaseOrderCreatedEvent - {}", purchaseOrderCreatedEvent.toString());
    }
}

内部事件消费事务同步处理

  • @EventListener

    • 该注解标注的方法与事件发布方服务方法在同一事务内被同步调用,即如果事件监听器方法执行失败,则事件发布方服务方法也同步回滚
  • @TransactionalEventListener

    • 默认只有当前事务成功提交之后,才会执行事件监听器的方法。

  • @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT
    • AFTER_COMMIT  事务被成功提交之后,才会执行事件监听器的方法
    • AFTER_ROLLBACK 事务回滚之后,才会执行事件监听器的方法
    • AFTER_COMPLETION 事务被完成(不考虑是否成功)之后,才会执行事件监听器的方法
    • BEFORE_COMMIT 事务被提交之前,才会执行事件监听器的方法
    • 应用场景:解决内部事件异步处理数据同步问题

  • 使用@TransactionalEventListener需要注意的问题:

    • 默认可以保证事件触发的时候,事务已经提交,但数据库连接并没有被释放,因此也没有被连接池回收。并且由于Spring的机制,在这个事件处理代码块里面,如果需要访问数据库,会继续复用之前事务内的数据库连接Connection。

    • 这就会带来以下几个问题:
      1、如果对应的事件处理器里面并不需要获取数据库连接,并且对应的代码块中一旦阻塞,在高并发下,会造成连接池瞬间打满,造成系统宕机。即使没有打满,也没有及时释放出资源,在连接有限的情况下,会对系统健壮性有一定影响。
      2、如果在对应的事件处理器里面需要切换数据源的话,由于事务内Spring会共用同一个数据源连接,同理,在这里面也会用同一个数据源连接,不会去重新获取连接,造成数据源切换失败!
      3、特别小心:此时如果在事件处理器里面执行添加、修改、删除等操作事务不会被提交,因为在事件响应时,事务已经被提交了。如果想执行事务操作,需要开启一个新的事务,需要在服务方法事务上使用

      @Transactional(propagation = Propagation.REQUIRES_NEW)

    • 解决方法:异步执行事件处理方法,这样就能保证连接及时释放掉。而且由于是一个新的线程,数据源切换也就迎刃而解了。

内部事件处理事务同步 


@Component
@Slf4j
public class PurchaseOrderEventHandler {
 
    @TransactionalEventListener
    public void receivePurchaseOrderCreatedEvent(PurchaseOrderCreatedEvent purchaseOrderCreatedEvent) {
        //Process the inner Event
        log.debug("Received the inner event after transaction commit has completed successfully: PurchaseOrderCreatedEvent - {}", purchaseOrderCreatedEvent.toString());
    }
}

 

标签:03,purchaseOrderCreatedEvent,定义,事件处理,2021,事件,public,通道
来源: https://blog.csdn.net/e12489/article/details/114330029

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有