上篇文章,我们讲到使用Redis的订阅发布实现延时消息队列。
现在主要讲解一下真正的消息队列,市面上主流的消息队列有:
- RabbitMQ
- RocketMQ
- kafka
其中RocketMQ自身就带有延时消息队列功能,kafka、RabbitMQ也可以实现延迟消息队列,但需要安装插件。
消息队列未必一定会100%执行,如果出现未成功,可以做脚本扫描来弥补。
消息队列的好处
1、解耦,作为中间件。例如A作为生成订单,B代表扣减金额等;A生成后,生成消息队列,由B去消费消息队列。
2、异步(削峰即处理高并发):例如下单成功后 发短信、邮件、仓储调度等。
正常的消息队列
例如 订单生成消息,然后加入到queue(队列),由然后消息消费者消费
延迟消息队列
订单生成消息,然后一定时间后再加入消息队列中
使用场景:延迟支付的订单自动取消;定期清除日志;
延迟消息队列的时间精度
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SpringBoot引入RecketMQ
1、在pom.xml中引入依赖
mvnrepository.com/search?q=recoketmq
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
</dependency>
创建消息生产者
2、在manager/rocketmq下创建ProducerSchedule
package com.fcors.fcors.manager.rocketmq;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class ProducerSchedule {
private DefaultMQProducer producer;
@Value("${rocketmq.producer.producer-group}")
private String producerGroup;
@Value("${rocketmq.namesrv-addr}")
private String namesrvAddr;
public ProducerSchedule() {
}
@PostConstruct
public void defaultMQProducer() {
if (this.producer == null) {
this.producer = new DefaultMQProducer(this.producerGroup);
this.producer.setNamesrvAddr(this.namesrvAddr);
}
try {
this.producer.start();
System.out.println("-------producer start");
} catch (MQClientException e) {
e.printStackTrace();
}
}
public String send(String topic, String messageText) {
Message message = new Message(topic, messageText.getBytes());
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message.setDelayTimeLevel(4);
SendResult result = null;
try {
result = this.producer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(result.getMsgId());
System.out.println(result.getSendStatus());
return result.getMsgId();
}
}
3、配置文件中添加配置
rocketmq:
consumer:
consumer-group: SleeveConsumerGroup
producer:
producer-group: SleeveProducerGroup
namesrv-addr: 192.168.8.21:9876
4、在TestController添加接口
@Autowired
private ProducerSchedule producerSchedule;
@GetMapping("/push")
public void pushMessageToMQ() throws Exception {
producerSchedule.send("TopicTest", "test");
}
运行正常:
打开RocketMQ查看
5、创建消息消费者
在manager/rocketmq下创建类ConsumerSchedule
package com.fcors.fcors.manager.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class ConsumerSchedule implements CommandLineRunner {
@Value("${rocketmq.consumer.consumer-group}")
private String consumerGroup;
@Value("${rocketmq.namesrv-addr}")
private String namesrvAddr;
public void messageListener() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
/* 匹配消息主题Topic,匹配的正则表达式 */
consumer.subscribe("TopicTest", "*");
/*设置消费条数*/
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (Message message : messages) {
System.out.println("消息:" + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
@Override
public void run(String... args) throws Exception {
this.messageListener();
}
}