SpringCloud
1.基础篇
1.1消息队列简介
为什么要使用消息队列
- 消息队列相当于菜鸟驿站,可以暂时存放消息,等消费者自己去取
同步与异步的区别
- 线性操作,要所有操作完成后在返回响应
- 异步操作,把消息传入消息队列后直接返回响应
异步的好处
- 耦合度低 —-不会因为后续操作失败,导致整个功能的失败
- 快速响应
- 并发压力传递 —-可以在高峰时减少并发,低分时增加并发
- 便于扩展
什么是消息队列
- 消息队列是实现应用程序和应用程序之间通信的中间件产品
消息队列底层实现的两大主流方式
- 由于消息队列执行的是跨应用的信息传递,所以制定底层通信标准非常必要
- 目前主流的消息队列通信协议标准包括:
- AMQP(Advanced Message Queuing Protocol):通用协议,IBM公司研发
- JMS(Java Message Service):专门为Java语言服务,SUN公司研发,一组由Java接口组成的Java标准
1.2RabbitMQ简介
官网地址:https://www.rabbitmq.com/
RabbitMQ是一款基于AMQP、由Erlang语言开发的消息队列产品,2007年Rabbit技术公司发布了它的1.0版本
- 交换机不存放消息,如果Connection没有指定正确的交换机则会导致消息丢失
- 消息存放在队列里
1.3RabbitMQ使用
安装RabbitMQ
1 | 拉取镜像 |
访问后台管理界面:http://192.168.200.100:15672
HelloWorld
生产者发送消息,消费者接收消息,用最简单的方式实现
官网说明:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
- 添加依赖
1
2
3
4
5
6
7<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies> - 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60package com.atguigu.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("192.168.200.100");
// 设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为 /
connectionFactory.setVirtualHost("/");
// 设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
// 设置连接密码;默认为guest
connectionFactory.setPassword("123456");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
// queue 参数1:队列名称
// durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
// exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
// autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
// arguments 参数5:队列其它参数
channel.queueDeclare("simple_queue", true, false, false, null);
// 要发送的信息
String message = "你好;小兔子!";
// 参数1:交换机名称,如果没有指定则使用默认Default Exchange
// 参数2:路由key,简单模式可以传递队列名称
// 参数3:配置信息
// 参数4:消息内容
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
} - 消费者控制台打印
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65package com.atguigu.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("192.168.200.100");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建Channel
Channel channel = connection.createChannel();
// 5. 创建队列
// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
// 参数1. queue:队列名称
// 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
// 参数3. exclusive:是否独占。
// 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
// 参数5. arguments:其它参数。
channel.queueDeclare("simple_queue",true,false,false,null);
// 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
// 参数1. consumerTag:标识
// 参数2. envelope:获取一些信息,交换机,路由key...
// 参数3. properties:配置信息
// 参数4. body:数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
// 参数1. queue:队列名称
// 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
// 参数3. callback:回调对象
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume("simple_queue",true,consumer);
}
}consumerTag:amq.ctag-8EB87GaZFP52LKSMcj98UA
Exchange:
RoutingKey:simple_queue
properties:#contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:你好;小兔子!
1.4RabbitMQ经典用法
RabbitMQ官网,通过教程的形式,给我们列举了7种RabbitMQ用法
网址:https://www.rabbitmq.com/getstarted.html
Work Queues
HelloWorld本质上就是这种模式,只是多几个队列
多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。
Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。
封装工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45package com.atguigu.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static final String HOST_ADDRESS = "192.168.200.100";
public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(HOST_ADDRESS);
// 端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection con = ConnectionUtil.getConnection();
// amqp://guest@192.168.200.100:5672/
System.out.println(con);
con.close();
}
}生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.atguigu.rabbitmq.work;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++) {
String body = i+"hello rabbitmq~~~";
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
}
channel.close();
connection.close();
}
}消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35package com.atguigu.rabbitmq.work;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
Publish/Subscribe
生产者不是把消息直接发送到队列,而是发送到交换机
交换机接收消息,而如何处理消息取决于交换机的类型交换机有如下3种常见类型
Fanout:广播,将消息发送给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失
工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列
生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60package com.atguigu.rabbitmq.fanout;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 1、获取连接
Connection connection = ConnectionUtil.getConnection();
// 2、创建频道
Channel channel = connection.createChannel();
// 参数1. exchange:交换机名称
// 参数2. type:交换机类型
// DIRECT("direct"):定向
// FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"):通配符的方式
// HEADERS("headers"):参数匹配
// 参数3. durable:是否持久化
// 参数4. autoDelete:自动删除
// 参数5. internal:内部使用。一般false
// 参数6. arguments:其它参数
String exchangeName = "test_fanout";
// 3、创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 4、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 5、绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout,routingKey设置为""
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 6、发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 7、释放资源
channel.close();
connection.close();
}
}消费者代码2个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35package com.atguigu.rabbitmq.fanout;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35package com.atguigu.rabbitmq.fanout;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
Routing
通过『路由绑定』的方式,把交换机和队列关联起来
交换机和队列通过路由键进行绑定
生产者发送消息时不仅要指定交换机,还要指定路由键
交换机接收到消息会发送到路由键绑定的队列
在编码上与 Publish/Subscribe发布与订阅模式的区别:
交换机的类型为:Direct • 队列绑定交换机的时候需要指定routing key。
如果一个交换机通过相同的routing key绑定了多个队列,就会有广播效果
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51package com.atguigu.rabbitmq.routing;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");
// 队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
// 发送消息
channel.basicPublish(exchangeName,"warning",null,message.getBytes());
System.out.println(message);
// 释放资源
channel.close();
connection.close();
}
}消费者2个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35package com.atguigu.rabbitmq.routing;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35package com.atguigu.rabbitmq.routing;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer2 将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
Topics
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符
Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,
例如:item.insert通配符规则:
井号:匹配零个或多个词
*:匹配一个词
Routingkey可以一次匹配多条规则,并加入
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53package com.atguigu.rabbitmq.topic;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
// routing key 常用格式:系统的名称.日志的级别。
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
// 分别发送消息到队列:order.info、goods.info、goods.error
String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());
body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
channel.close();
connection.close();
}
}消费者2个
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package com.atguigu.rabbitmq.topic;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue1";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package com.atguigu.rabbitmq.topic;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue2";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
RPC
- 远程过程调用,本质上是同步调用,和我们使用OpenFeign调用远程接口一样
- 所以这不是典型的消息队列工作方式,我们就不展开说明了
Publisher Confirms
- 发送端消息确认,是我们在进阶篇要探讨的『消息可靠性投递』的一部分
工作模式小结
- 直接发送到队列:底层使用了默认交换机
- 经过交换机发送到队列
- Fanout:没有Routing key直接绑定队列
- Direct:通过Routing key绑定队列,消息发送到绑定的队列上
- 一个交换机绑定一个队列:定点发送
- 一个交换机绑定多个队列:广播发送
- Topic:针对Routing key使用通配符
2.整合SpringBoot
2.1基本使用
消费者
- 配置pom文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies> - 配置yaml文件
1
2
3
4
5
6
7
8
9
10spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
logging:
level:
com.atguigu.mq.listener.MyMessageListener: info - 监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package com.atguigu.mq.listener;
import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
public void processMessage(String dateString,
Message message,
Channel channel) {
log.info(dateString);
}
} - @RabbitListener注解属性对比
bindings属性
- 表面作用:
- 指定交换机和队列之间的绑定关系
- 指定当前方法要监听的队列
- 隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们
queues属性 - @RabbitListener(queues = {QUEUE_ATGUIGU})
- 作用:指定当前方法要监听的队列
- 注意:此时框架不会创建相关交换机和队列,必须提前创建好
- 配置pom文件
生产者
- 配置pom文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies> - 配置yaml文件
1
2
3
4
5
6
7spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: / - 测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package com.atguigu.mq.test;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
public class RabbitMQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
private RabbitTemplate rabbitTemplate;
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu");
}
}
- 配置pom文件
2.2可靠性
- 故障情况1:消息没有发送到消息队列
- 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
- 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机
- 故障情况2:消息队列服务器宕机导致内存中消息丢失
- 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失
- 故障情况3:消费端宕机或抛异常导致消息没有成功被消费
- 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
- 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)
生产者端消息确认机制
添加yaml配置
- 注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置
1
2
3
4
5
6
7
8
9
10
11
12spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
com.atguigu.mq.config.MQProducerAckConfig: info
- 注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置
创建配置类
@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。
使用@PostConstruct注解的方法必须满足以下条件:- 方法不能有任何参数。
- 方法必须是非静态的。
- 方法不能返回任何值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42package com.atguigu.mq.config;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
private RabbitTemplate rabbitTemplate;
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
//确认消息是否发送到交换机
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送到交换机成功!数据:" + correlationData);
} else {
log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
}
}
//确认消息是否发送到队列
public void returnedMessage(ReturnedMessage returned) {
log.info("消息主体: " + new String(returned.getMessage().getBody()));
log.info("应答码: " + returned.getReplyCode());
log.info("描述:" + returned.getReplyText());
log.info("消息使用的交换器 exchange : " + returned.getExchange());
log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
}
}
发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package com.atguigu.mq.test;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
public class RabbitMQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
private RabbitTemplate rabbitTemplate;
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu");
}
}
备份交换机
- 针对备份队列创建消费端监听器
1
2
3
4
5
6
7
8
9
10
11
12
13public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
public static final String QUEUE_NAME_BACKUP = "queue.order.backup";
public void processMessageBackup(String dateString,
Message message,
Channel channel) {
log.info("BackUp: " + dateString);
}
持久化
默认持久化
消费端消息确认
ACK是acknowledge的缩写,表示已确认
默认情况下,消费端取回消息后,默认会自动返回ACK确认消息
- 要点1:把消息确认模式改为手动确认
- 要点2:调用Channel对象的方法返回信息
- ACK:Acknowledgement,表示消息处理成功
- NACK:Negative Acknowledgement,表示消息处理失败
- Reject:拒绝,同样表示消息处理失败
- 要点3:后续操作
- requeue为true:重新放回队列,重新投递,再次尝试
- requeue为false:不放回队列,不重新投递
- 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据
- yaml配置
1
2
3
4
5
6
7
8
9
10spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认 - 创建监听器类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77package com.atguigu.mq.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
// 修饰监听方法
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
// 1、获取当前消息的 deliveryTag 值备用
// 唯一标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 2、正常业务操作
log.info("消费端接收到消息内容:" + dataString);
// System.out.println(10 / 0);
// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 4、获取信息,看当前消息是否曾经被投递过
// redelivered 为 true:说明当前消息已经重复投递过一次了
// redelivered 为 false:说明当前消息是第一次投递
Boolean redelivered = message.getMessageProperties().getRedelivered();
// 核心操作失败:返回 NACK 信息
// requeue 参数:控制消息是否重新放回队列
// 取值为 true:重新放回队列,broker 会重新投递这个消息
// 取值为 false:不重新放回队列,broker 会丢弃这个消息
if (!redelivered) {
// 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次
//第二个参数:multiple:是否批量处理
//第三个参数:requeue:是否重新放回队列
channel.basicNack(deliveryTag, false, true);
} else {
// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列
// 第二个参数:requeue
channel.basicReject(deliveryTag, false);
//这两个方法相差 是否批量处理
//channel.basicNack(deliveryTag, false,false)
}
}
}
}
2.3消费端限流
- 在yaml设置prefetch参数
1
2
3
4
5
6
7
8
9
10
11spring:
rabbitmq:
host: 192.168.200.100
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual
prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息2.4消息超时
给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除
我们可以从两个层面来给消息设定过期时间: - 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这个队列中的消息全部使用同一个过期时间。
消息本身:给具体的某个消息设定过期时间
如果两个层面都做了设置,那么哪个时间短,哪个生效消息层面
- 队列层面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
public void testSendMessageTTL() {
// 1、创建消息后置处理器对象
MessagePostProcessor messagePostProcessor = (Message message) -> {
// 设定 TTL 时间,以毫秒为单位
message.getMessageProperties().setExpiration("5000");
return message;
};
// 2、发送消息
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu", messagePostProcessor);
}
2.5死信
概念:当一个消息无法被消费,它就变成了死信
- 死信产生的原因大致有下面三种:
- 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false
- 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信
- 超时:消息到达超时时间未被消费
- 死信的处理方式大致有下面三种:
- 丢弃:对不重要的消息直接丢弃,不做处理
- 入库:把死信写入数据库,日后处理
- 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)