博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息队列和发布订阅
阅读量:6620 次
发布时间:2019-06-25

本文共 2552 字,大约阅读时间需要 8 分钟。

编程语言集成了发布订阅

很多编程语言框架里都提供了发布订阅的组件,或者叫事件处理机制,而spring框架对这个功能也有支持,主要使用EventListener实现订阅,使用ApplicationEventPublisher使用发布。这种系统集成的我们先叫它“集成组件”

与语言无关的消息队列

事实上,发布订阅真的与开发语言没有什么关系,所以出现了另一种产品,消息中间件,或者叫消息队列,它是以发布订阅模式为理论基础的,同时很多消息队列产品又有自己的特色,这种独立的消息队列我们为rabbitmq为例子。

共同点

  1. 代码解耦,发布者与订阅者可以互不关心
  2. 异步处理,集成组件有的是同步的,需要加@Async注解
  3. 消息安全

不同点

  1. rabbitmq实现的是多服务之间的发布与订阅
  2. 集成组件实现的是一个服务内部的发布与订阅
  3. rabbitmq是异步的,集成组件可以是异步,也可以是同步
  4. rabbitmq可以有广播,点对点等模式,而集成组件只有广播模式

基于以上的介绍,主要帮助大家理解和认识,在什么时候用什么类型的工具。

实例

  • 集成组件的发布订阅

订阅

@Getter@Builder(toBuilder = true)@NoArgsConstructor@AllArgsConstructorpublic class CreateBookEvent {  private String address;  private String title;}@Componentpublic class EmailEventListener {  @EventListener  @Async  public void handleEvent(CreateBookEvent event) throws Exception {    System.out.println("email消息:建立图书:" + event.getTitle());  }}

发布

@Autowired  private ApplicationEventPublisher applicationEventPublisher;  public void publish(){      applicationEventPublisher.publishEvent(CreateBookEvent.builder().address("system").title("新建图书").build());}
  • rabbitmq的发布订阅

订阅

@Slf4j@Componentpublic class DistributorSubscriber {  public static final String WORK_QUEUE = "fx.activity.total";  public static final String EXCHANGE = "fx.exchange";  @Autowired  DistributorActivityTotalRepository distributorActivityTotalRepository;  @Autowired  ObjectMapper objectMapper;  @Bean  public TopicExchange phoneTotalExchange() {    return new TopicExchange(EXCHANGE);  }  @Bean  public Queue phoneTotalQueue() {    return new Queue(WORK_QUEUE);  }    @Bean  public Binding bindSignQueue() {    return BindingBuilder.bind(phoneTotalQueue()).to(phoneTotalExchange()).with(WORK_QUEUE);  }   @RabbitListener(queues = WORK_QUEUE)  public void phoneTotalQueueListener(String data) {    try {      logger.debug("fx.activity.total:{}", data);      DistributorActivityTotal entity =          objectMapper.readValue(data, DistributorActivityTotal.class);      distributorActivityTotalRepository.incUpdate(entity);    } catch (Exception ex) {      logger.error("fx.activity.total.error", ex);    }  }

发布

@Autowired  private RabbitTemplate rabbitTemplate;     public void modifySalesperson(SalesPersonDTO salesPersonDTO) {    try {      rabbitTemplate.convertAndSend(          "EXCHANGE",          "MQName",          objectMapper.writeValueAsString(salesPersonDTO)      );      logger.debug("Enter {},message:{}", "modifySalesperson", salesPersonDTO.toString());    } catch (Exception ex) {      logger.error("MQ.modifySalesperson.error", ex);    }  }

转载于:https://www.cnblogs.com/lori/p/10824990.html

你可能感兴趣的文章
New UWP Community Toolkit
查看>>
JDBC连接数据库(二)
查看>>
leetcode 674. Longest Continuous Increasing Subsequence
查看>>
Extensions in UWP Community Toolkit - SurfaceDialTextbox
查看>>
Golang 语言的单元测试和性能测试(也叫 压力测试)
查看>>
springboot数据库连接池使用策略
查看>>
Java中CAS详解
查看>>
Java线程的学习_线程池
查看>>
Android 虚拟导航挡住应用底部解决方案(屏幕底部的三个按键)
查看>>
工厂函数
查看>>
Java Spring MVC 错误 及 常见问题 总结
查看>>
Linux系统实战项目——sudo日志审计
查看>>
native.js是什么且如何使用
查看>>
Android Application Task Activities的关系
查看>>
浅谈CSS盒子模型
查看>>
实现iFrame自适应高度,原来很简单!
查看>>
get app id
查看>>
poj 3624 0/1背包暨0/1背包的学习
查看>>
Android 批量上传sd卡图片
查看>>
Robot Framework作者建议如何选择自动化测试框架
查看>>