1.基础篇

1.1消息队列简介

为什么要使用消息队列

  • 消息队列相当于菜鸟驿站,可以暂时存放消息,等消费者自己去取
    code
    code

同步与异步的区别

  • 线性操作,要所有操作完成后在返回响应
    code
  • 异步操作,把消息传入消息队列后直接返回响应
    code

异步的好处

  1. 耦合度低 —-不会因为后续操作失败,导致整个功能的失败
  2. 快速响应
  3. 并发压力传递 —-可以在高峰时减少并发,低分时增加并发
  4. 便于扩展

什么是消息队列

  • 消息队列是实现应用程序和应用程序之间通信的中间件产品

消息队列底层实现的两大主流方式

  • 由于消息队列执行的是跨应用的信息传递,所以制定底层通信标准非常必要
  • 目前主流的消息队列通信协议标准包括:
    • AMQP(Advanced Message Queuing Protocol):通用协议,IBM公司研发
    • JMS(Java Message Service):专门为Java语言服务,SUN公司研发,一组由Java接口组成的Java标准
      code
      code

1.2RabbitMQ简介

官网地址:https://www.rabbitmq.com/
RabbitMQ是一款基于AMQP、由Erlang语言开发的消息队列产品,2007年Rabbit技术公司发布了它的1.0版本

  • 交换机不存放消息,如果Connection没有指定正确的交换机则会导致消息丢失
  • 消息存放在队列里
    code

1.3RabbitMQ使用

安装RabbitMQ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 拉取镜像
docker pull rabbitmq:3.13-management

# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management

访问后台管理界面:http://192.168.200.100:15672

HelloWorld

生产者发送消息,消费者接收消息,用最简单的方式实现
官网说明:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

  1. 添加依赖
    1
    2
    3
    4
    5
    6
    7
    <dependencies>
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
    </dependency>
    </dependencies>
  2. 生产者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.atguigu.rabbitmq.simple;  

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class Producer {

    public static void main(String[] args) throws Exception {

    // 创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();

    // 设置主机地址
    connectionFactory.setHost("192.168.200.100");

    // 设置连接端口号:默认为 5672
    connectionFactory.setPort(5672);

    // 虚拟主机名称:默认为 /
    connectionFactory.setVirtualHost("/");

    // 设置连接用户名;默认为guest
    connectionFactory.setUsername("guest");

    // 设置连接密码;默认为guest
    connectionFactory.setPassword("123456");

    // 创建连接
    Connection connection = connectionFactory.newConnection();

    // 创建频道
    Channel channel = connection.createChannel();

    // 声明(创建)队列
    // queue 参数1:队列名称
    // durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
    // exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
    // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
    // arguments 参数5:队列其它参数
    channel.queueDeclare("simple_queue", true, false, false, null);

    // 要发送的信息
    String message = "你好;小兔子!";

    // 参数1:交换机名称,如果没有指定则使用默认Default Exchange
    // 参数2:路由key,简单模式可以传递队列名称
    // 参数3:配置信息
    // 参数4:消息内容
    channel.basicPublish("", "simple_queue", null, message.getBytes());

    System.out.println("已发送消息:" + message);

    // 关闭资源
    channel.close();
    connection.close();

    }

    }
    code
    code
  3. 消费者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    package com.atguigu.rabbitmq.simple;  

    import com.rabbitmq.client.*;

    import java.io.IOException;

    public class Consumer {

    public static void main(String[] args) throws Exception {

    // 1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    // 2. 设置参数
    factory.setHost("192.168.200.100");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("123456");

    // 3. 创建连接 Connection
    Connection connection = factory.newConnection();

    // 4. 创建Channel
    Channel channel = connection.createChannel();

    // 5. 创建队列
    // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
    // 参数1. queue:队列名称
    // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
    // 参数3. exclusive:是否独占。
    // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
    // 参数5. arguments:其它参数。
    channel.queueDeclare("simple_queue",true,false,false,null);

    // 接收消息
    DefaultConsumer consumer = new DefaultConsumer(channel){

    // 回调方法,当收到消息后,会自动执行该方法
    // 参数1. consumerTag:标识
    // 参数2. envelope:获取一些信息,交换机,路由key...
    // 参数3. properties:配置信息
    // 参数4. body:数据
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("consumerTag:"+consumerTag);
    System.out.println("Exchange:"+envelope.getExchange());
    System.out.println("RoutingKey:"+envelope.getRoutingKey());
    System.out.println("properties:"+properties);
    System.out.println("body:"+new String(body));

    }

    };

    // 参数1. queue:队列名称
    // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
    // 参数3. callback:回调对象
    // 消费者类似一个监听程序,主要是用来监听消息
    channel.basicConsume("simple_queue",true,consumer);

    }

    }
    控制台打印

    consumerTag:amq.ctag-8EB87GaZFP52LKSMcj98UA
    Exchange:
    RoutingKey:simple_queue
    properties:#contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
    body:你好;小兔子!
    code

1.4RabbitMQ经典用法

RabbitMQ官网,通过教程的形式,给我们列举了7种RabbitMQ用法
网址:https://www.rabbitmq.com/getstarted.html
code

Work Queues

HelloWorld本质上就是这种模式,只是多几个队列

多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。
Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。

  1. 封装工具类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    package com.atguigu.rabbitmq.util;  

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class ConnectionUtil {

    public static final String HOST_ADDRESS = "192.168.200.100";

    public static Connection getConnection() throws Exception {

    // 定义连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    // 设置服务地址
    factory.setHost(HOST_ADDRESS);

    // 端口
    factory.setPort(5672);

    //设置账号信息,用户名、密码、vhost
    factory.setVirtualHost("/");
    factory.setUsername("guest");
    factory.setPassword("123456");

    // 通过工程获取连接
    Connection connection = factory.newConnection();

    return connection;
    }



    public static void main(String[] args) throws Exception {

    Connection con = ConnectionUtil.getConnection();

    // amqp://guest@192.168.200.100:5672/
    System.out.println(con);

    con.close();

    }

    }
  2. 生产者代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package com.atguigu.rabbitmq.work;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    public class Producer {

    public static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME,true,false,false,null);

    for (int i = 1; i <= 10; i++) {

    String body = i+"hello rabbitmq~~~";

    channel.basicPublish("",QUEUE_NAME,null,body.getBytes());

    }

    channel.close();

    connection.close();

    }

    }
  3. 消费者代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    package com.atguigu.rabbitmq.work;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;

    import java.io.IOException;

    public class Consumer1 {

    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME,true,false,false,null);

    Consumer consumer = new DefaultConsumer(channel){

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("Consumer1 body:"+new String(body));

    }

    };

    channel.basicConsume(QUEUE_NAME,true,consumer);

    }

    }

    code

Publish/Subscribe

生产者不是把消息直接发送到队列,而是发送到交换机
交换机接收消息,而如何处理消息取决于交换机的类型

交换机有如下3种常见类型
Fanout:广播,将消息发送给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失
工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列

  1. 生产者代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.atguigu.rabbitmq.fanout;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class Producer {

    public static void main(String[] args) throws Exception {

    // 1、获取连接
    Connection connection = ConnectionUtil.getConnection();

    // 2、创建频道
    Channel channel = connection.createChannel();

    // 参数1. exchange:交换机名称
    // 参数2. type:交换机类型
    // DIRECT("direct"):定向
    // FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
    // TOPIC("topic"):通配符的方式
    // HEADERS("headers"):参数匹配
    // 参数3. durable:是否持久化
    // 参数4. autoDelete:自动删除
    // 参数5. internal:内部使用。一般false
    // 参数6. arguments:其它参数
    String exchangeName = "test_fanout";

    // 3、创建交换机
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);

    // 4、创建队列
    String queue1Name = "test_fanout_queue1";
    String queue2Name = "test_fanout_queue2";

    channel.queueDeclare(queue1Name,true,false,false,null);
    channel.queueDeclare(queue2Name,true,false,false,null);

    // 5、绑定队列和交换机
    // 参数1. queue:队列名称
    // 参数2. exchange:交换机名称
    // 参数3. routingKey:路由键,绑定规则
    // 如果交换机的类型为fanout,routingKey设置为""
    channel.queueBind(queue1Name,exchangeName,"");
    channel.queueBind(queue2Name,exchangeName,"");

    String body = "日志信息:张三调用了findAll方法...日志级别:info...";

    // 6、发送消息
    channel.basicPublish(exchangeName,"",null,body.getBytes());

    // 7、释放资源
    channel.close();
    connection.close();

    }

    }
  2. 消费者代码2个

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    package com.atguigu.rabbitmq.fanout;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;

    public class Consumer1 {

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    String queue1Name = "test_fanout_queue1";

    channel.queueDeclare(queue1Name,true,false,false,null);

    Consumer consumer = new DefaultConsumer(channel){

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("body:"+new String(body));
    System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");

    }

    };

    channel.basicConsume(queue1Name,true,consumer);

    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    package com.atguigu.rabbitmq.fanout;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;

    public class Consumer2 {

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    String queue2Name = "test_fanout_queue2";

    channel.queueDeclare(queue2Name,true,false,false,null);

    Consumer consumer = new DefaultConsumer(channel){

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("body:"+new String(body));
    System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");

    }

    };

    channel.basicConsume(queue2Name,true,consumer);

    }

    }

    code

Routing

通过『路由绑定』的方式,把交换机和队列关联起来
交换机和队列通过路由键进行绑定
生产者发送消息时不仅要指定交换机,还要指定路由键
交换机接收到消息会发送到路由键绑定的队列
在编码上与 Publish/Subscribe发布与订阅模式的区别:
交换机的类型为:Direct • 队列绑定交换机的时候需要指定routing key。
如果一个交换机通过相同的routing key绑定了多个队列,就会有广播效果

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    package com.atguigu.rabbitmq.routing;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class Producer {

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    String exchangeName = "test_direct";

    // 创建交换机
    channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);

    // 创建队列
    String queue1Name = "test_direct_queue1";
    String queue2Name = "test_direct_queue2";

    // 声明(创建)队列
    channel.queueDeclare(queue1Name,true,false,false,null);
    channel.queueDeclare(queue2Name,true,false,false,null);

    // 队列绑定交换机
    // 队列1绑定error
    channel.queueBind(queue1Name,exchangeName,"error");

    // 队列2绑定info error warning
    channel.queueBind(queue2Name,exchangeName,"info");
    channel.queueBind(queue2Name,exchangeName,"error");
    channel.queueBind(queue2Name,exchangeName,"warning");

    String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";

    // 发送消息
    channel.basicPublish(exchangeName,"warning",null,message.getBytes());
    System.out.println(message);

    // 释放资源
    channel.close();
    connection.close();

    }

    }
  2. 消费者2个

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    package com.atguigu.rabbitmq.routing;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;

    public class Consumer1 {

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    String queue1Name = "test_direct_queue1";

    channel.queueDeclare(queue1Name,true,false,false,null);

    Consumer consumer = new DefaultConsumer(channel){

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("body:"+new String(body));
    System.out.println("Consumer1 将日志信息打印到控制台.....");

    }

    };

    channel.basicConsume(queue1Name,true,consumer);

    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    package com.atguigu.rabbitmq.routing;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;

    public class Consumer2 {

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    String queue2Name = "test_direct_queue2";

    channel.queueDeclare(queue2Name,true,false,false,null);

    Consumer consumer = new DefaultConsumer(channel){

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("body:"+new String(body));
    System.out.println("Consumer2 将日志信息存储到数据库.....");

    }

    };

    channel.basicConsume(queue2Name,true,consumer);

    }

    }

code

Topics

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符
Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,
例如:item.insert

通配符规则:
井号:匹配零个或多个词
*:匹配一个词
Routingkey可以一次匹配多条规则,并加入

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    package com.atguigu.rabbitmq.topic;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class Producer {

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    String exchangeName = "test_topic";

    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

    String queue1Name = "test_topic_queue1";
    String queue2Name = "test_topic_queue2";

    channel.queueDeclare(queue1Name,true,false,false,null);
    channel.queueDeclare(queue2Name,true,false,false,null);

    // 绑定队列和交换机
    // 参数1. queue:队列名称
    // 参数2. exchange:交换机名称
    // 参数3. routingKey:路由键,绑定规则
    // 如果交换机的类型为fanout ,routingKey设置为""
    // routing key 常用格式:系统的名称.日志的级别。
    // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
    channel.queueBind(queue1Name,exchangeName,"#.error");
    channel.queueBind(queue1Name,exchangeName,"order.*");
    channel.queueBind(queue2Name,exchangeName,"*.*");

    // 分别发送消息到队列:order.info、goods.info、goods.error
    String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
    channel.basicPublish(exchangeName,"order.info",null,body.getBytes());

    body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
    channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());

    body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
    channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

    channel.close();
    connection.close();

    }

    }
  2. 消费者2个

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.atguigu.rabbitmq.topic;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;

    public class Consumer1 {

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    String QUEUE_NAME = "test_topic_queue1";

    channel.queueDeclare(QUEUE_NAME,true,false,false,null);

    Consumer consumer = new DefaultConsumer(channel){

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("body:"+new String(body));

    }

    };

    channel.basicConsume(QUEUE_NAME,true,consumer);

    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.atguigu.rabbitmq.topic;  

    import com.atguigu.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    import java.io.IOException;

    public class Consumer2 {

    public static void main(String[] args) throws Exception {

    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();

    String QUEUE_NAME = "test_topic_queue2";

    channel.queueDeclare(QUEUE_NAME,true,false,false,null);

    Consumer consumer = new DefaultConsumer(channel){

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("body:"+new String(body));

    }

    };

    channel.basicConsume(QUEUE_NAME,true,consumer);

    }

    }

    code

RPC

  • 远程过程调用,本质上是同步调用,和我们使用OpenFeign调用远程接口一样
  • 所以这不是典型的消息队列工作方式,我们就不展开说明了

Publisher Confirms

  • 发送端消息确认,是我们在进阶篇要探讨的『消息可靠性投递』的一部分

工作模式小结

  • 直接发送到队列:底层使用了默认交换机
  • 经过交换机发送到队列
  • Fanout:没有Routing key直接绑定队列
  • Direct:通过Routing key绑定队列,消息发送到绑定的队列上
  • 一个交换机绑定一个队列:定点发送
  • 一个交换机绑定多个队列:广播发送
  • Topic:针对Routing key使用通配符

2.整合SpringBoot

2.1基本使用

  1. 消费者

    1. 配置pom文件
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>3.1.5</version>
      </parent>

      <dependencies>
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      </dependency>
      </dependencies>
    2. 配置yaml文件
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      spring:
      rabbitmq:
      host: 192.168.200.100
      port: 5672
      username: guest
      password: 123456
      virtual-host: /
      logging:
      level:
      com.atguigu.mq.listener.MyMessageListener: info
    3. 监听器
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      package com.atguigu.mq.listener;

      import lombok.extern.slf4j.Slf4j;
      import com.rabbitmq.client.Channel;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.Exchange;
      import org.springframework.amqp.rabbit.annotation.Queue;
      import org.springframework.amqp.rabbit.annotation.QueueBinding;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;

      @Component
      @Slf4j
      public class MyMessageListener {

      public static final String EXCHANGE_DIRECT = "exchange.direct.order";
      public static final String ROUTING_KEY = "order";
      public static final String QUEUE_NAME = "queue.order";

      @RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = QUEUE_NAME, durable = "true"),
      exchange = @Exchange(value = EXCHANGE_DIRECT),
      key = {ROUTING_KEY}
      ))
      public void processMessage(String dateString,
      Message message,
      Channel channel) {
      log.info(dateString);
      }

      }
    4. @RabbitListener注解属性对比

      bindings属性

      • 表面作用:
      • 指定交换机和队列之间的绑定关系
      • 指定当前方法要监听的队列
      • 隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们
        queues属性
      • @RabbitListener(queues = {QUEUE_ATGUIGU})
      • 作用:指定当前方法要监听的队列
      • 注意:此时框架不会创建相关交换机和队列,必须提前创建好
  2. 生产者

    1. 配置pom文件
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>3.1.5</version>
      </parent>

      <dependencies>
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      </dependency>
      </dependencies>
    2. 配置yaml文件
      1
      2
      3
      4
      5
      6
      7
      spring: 
      rabbitmq:
      host: 192.168.200.100
      port: 5672
      username: guest
      password: 123456
      virtual-host: /
    3. 测试程序
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      package com.atguigu.mq.test;

      import org.junit.jupiter.api.Test;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.test.context.SpringBootTest;

      @SpringBootTest
      public class RabbitMQTest {

      public static final String EXCHANGE_DIRECT = "exchange.direct.order";
      public static final String ROUTING_KEY = "order";

      @Autowired
      private RabbitTemplate rabbitTemplate;

      @Test
      public void testSendMessage() {
      rabbitTemplate.convertAndSend(
      EXCHANGE_DIRECT,
      ROUTING_KEY,
      "Hello atguigu");
      }

      }

2.2可靠性

  • 故障情况1:消息没有发送到消息队列
    • 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
    • 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机
  • 故障情况2:消息队列服务器宕机导致内存中消息丢失
    • 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
  • 故障情况3:消费端宕机或抛异常导致消息没有成功被消费
    • 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
    • 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)
生产者端消息确认机制
  • 添加yaml配置

    • 注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      spring:
      rabbitmq:
      host: 192.168.200.100
      port: 5672
      username: guest
      password: 123456
      virtual-host: /
      publisher-confirm-type: CORRELATED # 交换机的确认
      publisher-returns: true # 队列的确认
      logging:
      level:
      com.atguigu.mq.config.MQProducerAckConfig: info
  • 创建配置类

    @PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。
    使用@PostConstruct注解的方法必须满足以下条件:

    • 方法不能有任何参数
    • 方法必须是非静态的
    • 方法不能返回任何值
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      package com.atguigu.mq.config;
      import jakarta.annotation.PostConstruct;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.ReturnedMessage;
      import org.springframework.amqp.rabbit.connection.CorrelationData;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;

      @Component
      @Slf4j
      public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{

      @Autowired
      private RabbitTemplate rabbitTemplate;

      @PostConstruct
      public void init() {
      rabbitTemplate.setConfirmCallback(this);
      rabbitTemplate.setReturnsCallback(this);
      }

      //确认消息是否发送到交换机
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      if (ack) {
      log.info("消息发送到交换机成功!数据:" + correlationData);
      } else {
      log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
      }
      }

      //确认消息是否发送到队列
      @Override
      public void returnedMessage(ReturnedMessage returned) {
      log.info("消息主体: " + new String(returned.getMessage().getBody()));
      log.info("应答码: " + returned.getReplyCode());
      log.info("描述:" + returned.getReplyText());
      log.info("消息使用的交换器 exchange : " + returned.getExchange());
      log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
      }
      }
  • 发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    package com.atguigu.mq.test;

    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;

    @SpringBootTest
    public class RabbitMQTest {

    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() {
    rabbitTemplate.convertAndSend(
    EXCHANGE_DIRECT,
    ROUTING_KEY,
    "Hello atguigu");
    }

    }
备份交换机

code
code
code

  • 针对备份队列创建消费端监听器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
    public static final String QUEUE_NAME_BACKUP = "queue.order.backup";

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),
    exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),
    key = {""}
    ))
    public void processMessageBackup(String dateString,
    Message message,
    Channel channel) {
    log.info("BackUp: " + dateString);
    }
    code
持久化

默认持久化

消费端消息确认

ACK是acknowledge的缩写,表示已确认
默认情况下,消费端取回消息后,默认会自动返回ACK确认消息

  • 要点1:把消息确认模式改为手动确认
    • 要点2:调用Channel对象的方法返回信息
      • ACK:Acknowledgement,表示消息处理成功
      • NACK:Negative Acknowledgement,表示消息处理失败
      • Reject:拒绝,同样表示消息处理失败
    • 要点3:后续操作
      • requeue为true:重新放回队列,重新投递,再次尝试
      • requeue为false:不放回队列,不重新投递
    • 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据
      code
  1. yaml配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    spring:
    rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
    simple:
    acknowledge-mode: manual # 把消息确认模式改为手动确认
  2. 创建监听器类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    package com.atguigu.mq.listener;

    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;

    import java.io.IOException;

    @Component
    @Slf4j
    public class MyMessageListener {

    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
    public static final String QUEUE_NAME = "queue.order";

    // 修饰监听方法
    @RabbitListener(
    // 设置绑定关系
    bindings = @QueueBinding(

    // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
    value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),

    // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
    exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),

    // 配置路由键信息
    key = {ROUTING_KEY}
    ))
    public void processMessage(String dataString, Message message, Channel channel) throws IOException {

    // 1、获取当前消息的 deliveryTag 值备用
    // 唯一标识
    long deliveryTag = message.getMessageProperties().getDeliveryTag();

    try {
    // 2、正常业务操作
    log.info("消费端接收到消息内容:" + dataString);

    // System.out.println(10 / 0);

    // 3、给 RabbitMQ 服务器返回 ACK 确认信息
    channel.basicAck(deliveryTag, false);
    } catch (Exception e) {

    // 4、获取信息,看当前消息是否曾经被投递过
    // redelivered 为 true:说明当前消息已经重复投递过一次了
    // redelivered 为 false:说明当前消息是第一次投递
    Boolean redelivered = message.getMessageProperties().getRedelivered();

    // 核心操作失败:返回 NACK 信息
    // requeue 参数:控制消息是否重新放回队列
    // 取值为 true:重新放回队列,broker 会重新投递这个消息
    // 取值为 false:不重新放回队列,broker 会丢弃这个消息
    if (!redelivered) {
    // 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次
    //第二个参数:multiple:是否批量处理
    //第三个参数:requeue:是否重新放回队列
    channel.basicNack(deliveryTag, false, true);
    } else {
    // 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列
    // 第二个参数:requeue
    channel.basicReject(deliveryTag, false);
    //这两个方法相差 是否批量处理
    //channel.basicNack(deliveryTag, false,false)
    }

    }
    }

    }

2.3消费端限流

  • 在yaml设置prefetch参数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    spring:
    rabbitmq:
    host: 192.168.200.100
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
    simple:
    acknowledge-mode: manual
    prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息

    2.4消息超时

    给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除
    我们可以从两个层面来给消息设定过期时间:

  • 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。
  • 消息本身:给具体的某个消息设定过期时间
    如果两个层面都做了设置,那么哪个时间短,哪个生效

  • 消息层面
    code

  • 队列层面
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;

    @Test
    public void testSendMessageTTL() {

    // 1、创建消息后置处理器对象
    MessagePostProcessor messagePostProcessor = (Message message) -> {

    // 设定 TTL 时间,以毫秒为单位
    message.getMessageProperties().setExpiration("5000");

    return message;
    };

    // 2、发送消息
    rabbitTemplate.convertAndSend(
    EXCHANGE_DIRECT,
    ROUTING_KEY,
    "Hello atguigu", messagePostProcessor);
    }

2.5死信

概念:当一个消息无法被消费,它就变成了死信

  • 死信产生的原因大致有下面三种:
    • 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false
    • 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信
    • 超时:消息到达超时时间未被消费
  • 死信的处理方式大致有下面三种:
    • 丢弃:对不重要的消息直接丢弃,不做处理
    • 入库:把死信写入数据库,日后处理
    • 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)