RabbitMQ官网

RabbitMQ的结构和概念

image-20230110095913001

  • channel:操作MQ的工具
  • exchange:交换机,负责路由消息至消息队列
  • queue:消息缓存
  • VirtualHost:虚拟主机,是queue、exchange等资源的逻辑分组。通常情况下, 每个用户应该有自己的虚拟主机。虚拟主机之间是相互隔离的,其内的消息并不互通

Spring AMQP

  • AMQP:Advanced Message Queuing Protocol,一种应用程序之间传递业务消息的协议
  • Spring AMQP:基于AMQP协议定义的一套API规范,提供了模版来发送和接受消息。其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

功能

  • 监听器容器,用于异步处理入站消息
  • 用于发送和接受消息的RabbitTemplate
  • RabbitAdmin用于声明队列,交换和绑定

引入依赖

 <!-- AMQP -->
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
   <version>3.0.1</version>
 </dependency>
 

applicaiton.yml

 spring:
   rabbitmq:
      host: IP地址 
      port: 5672
      virtual-host: / #虚拟主机名称
      username: root
      password: 123456

发送消息

@Resource
private RabbitTemplate rabbitTemplate;

/*
* 发送消息
* */

public void testSendMessageToSimpleQueue() {
  String queueName = "simple.queue";		//队列名称
  String message = "Hello Spring AMQP";	//消息
  rabbitTemplate.convertAndSend(queueName, message);
}

消息模型

  • publisher:消息发布者
  • queue:消息队列
  • consumer:消费者,处理和消费消息

基本消息队列(BasicQueue)

image-20230110100429592

发送流程

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

接收流程

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定,队列中有消息后会自动消费

使用SpringAMQP实现

监听/接收消息

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Projectname: Cloud
 * @Filename: SpringRabbitListener
 * @Author: SpringForest
 * @Data:2023/1/10 12:57
 * @Description:
 * 自动监听队列中的消息
 */

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("接收到的消息:" + msg);
    }
}

消息一旦消费就会从队列中删除,RabbitMQ没有消息回溯功能

工作消息队列(WorkQueue)

image-20230110100530530

多个消费者同时消费一个消息队列,可以提高速度。

消息预取:消息队列会根据消费者的数量预先分配给消费者数据,该行为忽略了不同消费者的处理能力,不利于提高资源利用率。

取消消息预取:

spring:
   rabbitmq:
      host: IP地址 
      port: 5672
      virtual-host: / #虚拟主机名称
      username: root
      password: 123456
      listener:
      	simple:
      	 prefetch: 1 #取消预取

发布、订阅模式(Publish、Subscribe)

发布订阅模式与上述模式的区别就是允许将同一个消息发布给多个消费者。实现方式是加入了交换机(exchange)。

image-20230110100727388

image-20230110190111639

广播模式(Fanout Exchange)

模式图如上

实现过程:

  1. 在consumer服务声明Exchange、Queue、Binding,此处使用@Configuration配置类
@Configuration
public class FanoutConfig {
    //声明Exchange交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("simple.exchange");
    }

    //声明第一个队列
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    //声明第二个队列
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    //绑定队列1和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    //如需绑定其他队列,同理
    //绑定队列2和交换机
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
  1. 在consumer服务中编写消费者方法
@RabbitListener(queues = "fanout.queue1")
public void listenWorkQueue1(String msg) {
  System.out.println("消费者1接收到的消息:" + msg);

}
  1. 在publisher服务中编写发布者方法
public void sendFanoutExchange() {
  //交换机名称
  String exchangeName = "simple.exchange";
  //消息
  String message = "Hello MQ Exchange";
  //发送消息
  rabbitTemplate.convertAndSend(exchangeName, "", message);
}

路由模式(Direct Exchange)

image-20230110100740280

image-20230111094125169

Direct Exchange会将接收到的消息根据路由规则路由到指定的Queue,因此称为路由模式(routes)

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到与消息有一致的Key的队列

实现过程

  1. 利用@RabbitListener声明Exchange、Queue、RoutingKey。不用广播模式中的方法是因为会使用过多的Bean导致臃肿

使用该声明方式,若交换机和队列不存在会自动创建

@RabbitListener(bindings = @QueueBinding(
  value = @Queue(name = "direct.queue1"),
  exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
  key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
  System.err.println("消费者接收到direct.queue1的消息:" + msg);
}
  1. 发布者
public void sendDirectExchange() {
    //交换机名称
    String exchangeName = "direct.exchange";
    //消息
    String message = "Hello blue";
    //RoutingKey
    String routingKey = "blue";
    //发送消息
    rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}

主题模式(Topic Exchange)

image-20230110100755315

image-20230111102110325

  • TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,使用.分割

例如:

China.news

Japan.wether

  • Queue和Exchange指定BindingKey时可以使用通配符:

#代指0个或多个单词, *代指一个单词

例如

#.wether

China.#

实现

  1. 声明队列和交换机,主要区别为交换机的type变为了topic
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
        key = "china.#"
))
public void listenTopicQueue1(String msg) {
    System.err.println("消费者接收到topic.queue1的消息:" + msg);
}
  1. 发布者
public void testTopicExchange() {
    //交换机名称
    String exchangeName = "topic.exchange";
    //消息
    String message = "Hello China news";
    //RoutingKey
    String routingKey = "china.news";
    //发送消息
    rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}

消息转换器

消息发送

在SpringAMQP的发送方法中,接收消息的类型是Object,即可以发送任意对象类型的消息,SpringAMQP会帮助我们序列化为字节后发送

//发送对象类型的消息
public void sendObject() {
    Map<String, Object> msg = new HashMap<>();
    msg.put("name", "test name");
    msg.put("age", 21);

    rabbitTemplate.convertAndSend("object.queue", msg);
}

默认发送的消息类型为x-java-serialized-object为jdk自带的序列化

image-20230111111032923

该方式存在注入攻击等安全隐患,且序列化后的信息过长,会降低收发效率和占用内存空间

Spring的对消息对象的处理是由org.springframwork.amqp.support.converter.MessageConverter来处理的。

默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果需要自定义类型的话,仅需定一个MessageConverter类型的Bean即可。

使用自定义的声明会将Spring的自动覆盖掉(自动装配)

例如使用JSON方式完成序列化

  1. 引入依赖
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.14.1</version>
</dependency>
  1. 在发布者的启动类中声明MessageConvert(注意导入的包)
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

结果如下

image-20230111112345685

消息接收

  1. 引入依赖
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.14.1</version>
</dependency>
  1. 在消费者定义MessageConverter
@Bean
public MessageConverter jsonMessageConverter() {
  return new Jackson2JsonMessageConverter();
}
  1. 定义一个消费者监听队列并获取消息,注意接收方法参数需要与发送参数类型一致
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> map){
    System.out.println(map);
}

SpringAMQP中消息的序列化和反序列化是怎么实现的

  • 利用MessageConverter实现,默认是JDK的序列化
  • 发送方与接收方必须使用相同的MessageConverter