版本说明
JDK 1.8
RabbitMQ 3.7.15 Erlang 22.0SpringBoot 2.3.3.RELEASE
// TODO 2021年1⽉8⽇ 整理CentOS安装RabbitMQ流程1. 在RabbitMQ的Web管理界⾯,创建test队列
参数的含义
durability:是否持久化(重启或宕机后消息依然保存)
durable 持久transient 暂时新建maven项⽬。2. pom.xml
xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\">
3. application.yaml
server:
port: 20002spring: rabbitmq:
# 这⾥我改了本地的hosts,实际地址是192.168.0.121 host: vm.com port: 5672 virtual-host: / username: admin password: admin # 开启消息确认模式
# 消息发送到交换机确认机制,是否确认回调 # publisher-confirms: true # 是否返回回调
publisher-returns: true template:
#开启mandatory: true, basic.return⽅法将消息返还给⽣产者 mandatory: true listener: simple: # ⼿动应答
acknowledge-mode: manual # 最少消费者数量 concurrency: 1 # 最多消费者数量 max-concurrency: 10 # ⽀持重试 retry:
enabled: true
端⼝
5672:RabbitMQ的通信端⼝15672:Web管理界⾯端⼝4. RabbitmqDemo.java
@SpringBootApplication@EnableRabbit
public class RabbitmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDemoApplication.class, args); }}
5. RabbitConfig.java
@Configuration@Slf4j
public class RabbitConfig {
private RabbitTemplate rabbitTemplate;
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; }}
配置RabbitMQ的消息模板。6. 消息⽣产者 produce.java
@Component
public class Producer {
// @Qualifier(\"rabbitTemplate\") @Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
for (int i = 0; i < 5; i++) {
System.out.println(\"⽣产者发送消息,序号为: \" + i);
rabbitTemplate.convertAndSend(\"test\ } }}
初始化消息发送模板RabbitTemplate,@Qualifier注解⽤于限定具体的实现类,这⾥可以不指定。7. 消息消费者 consumer.java消费者1和消费者2均监听test队列。
不同的是,消费者1收到消息后返回确认应答basicAck。
⽽消费者2收到消息后返回拒绝应答basicRegect,消息被消费者拒绝后重新回到test队列中,等待下次发送给消费者。
@Component@Slf4j
public class Consumer {
/**
* 消费者1 模拟正常处理消息的情况,消息处理完毕发送确认应答 * @param message * @param channel
* @throws IOException */
@RabbitListener(queues = \"test\")
public void process1(Message message, Channel channel) throws IOException { log.info(\"消费者1 接收消息: \" + new String(message.getBody()));
log.info(\"消费者1 确认应答消息:\" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
/**
* 消费者2 模拟处理消息出错的情况,消费者2向rabbitmq发送拒绝应答。
* 处理失败的消息会被重新放⼊ready中,再次发送给消费者,直⾄收到确认应答 * @param message * @param channel
* @throws IOException */
@RabbitListener(queues = \"test\")
public void process2(Message message, Channel channel) throws IOException { log.info(\"消费者2 接收消息:\" + new String(message.getBody()));
log.info(\"消费者2 拒绝应答消息:\" + new String(message.getBody()));
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); }}
8. 测试RabbitMqController.java
@RestController
@RequestMapping(\"\")
public class RabbitMqController { @Autowired
private Producer producer; @GetMapping(\"/send\") public String send() { producer.send(); return \"发送完成\"; }}
9. 测试
⽣产者发送消息,序号为: 0⽣产者发送消息,序号为: 1⽣产者发送消息,序号为: 2⽣产者发送消息,序号为: 3⽣产者发送消息,序号为: 4
可以看出序号为2的消息3次被消费者2接收,消费者2也3次发送拒绝应答,直到第4次才被消费者1接收,并返回确认应答。到此这篇关于SpringBoot整合RabbitMQ ⼿动应答 简单demo的⽂章就介绍到这了,更多相关SpringBoot整合RabbitMQ 内容请搜索以前的⽂章或继续浏览下⾯的相关⽂章希望⼤家以后多多⽀持!
因篇幅问题不能全部显示,请点此查看更多更全内容