SpringBoot系列(二十三)RocketMQ

上篇文章,我们讲到使用Redis的订阅发布实现延时消息队列。

现在主要讲解一下真正的消息队列,市面上主流的消息队列有:

  • RabbitMQ
  • RocketMQ
  • kafka

其中RocketMQ自身就带有延时消息队列功能,kafka、RabbitMQ也可以实现延迟消息队列,但需要安装插件。

消息队列未必一定会100%执行,如果出现未成功,可以做脚本扫描来弥补。

消息队列的好处
1、解耦,作为中间件。例如A作为生成订单,B代表扣减金额等;A生成后,生成消息队列,由B去消费消息队列。
2、异步(削峰即处理高并发):例如下单成功后 发短信、邮件、仓储调度等。

http://www.fcors.com/%e6%8a%80%e6%9c%af%e4%b8%8e%e6%a1%86%e6%9e%b6/linux%e5%ae%89%e8%a3%85rocketmq/

正常的消息队列
例如 订单生成消息,然后加入到queue(队列),由然后消息消费者消费
延迟消息队列
订单生成消息,然后一定时间后再加入消息队列中
使用场景:延迟支付的订单自动取消;定期清除日志;
延迟消息队列的时间精度
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

SpringBoot引入RecketMQ

1、在pom.xml中引入依赖

mvnrepository.com/search?q=recoketmq

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.9.3</version>
		</dependency>

创建消息生产者

2、在manager/rocketmq下创建ProducerSchedule

package com.fcors.fcors.manager.rocketmq;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class ProducerSchedule {
    private DefaultMQProducer producer;

    @Value("${rocketmq.producer.producer-group}")
    private String producerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    public ProducerSchedule() {

    }

    @PostConstruct
    public void defaultMQProducer() {
        if (this.producer == null) {
            this.producer = new DefaultMQProducer(this.producerGroup);
            this.producer.setNamesrvAddr(this.namesrvAddr);
        }
        try {
            this.producer.start();
            System.out.println("-------producer start");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public String send(String topic, String messageText)  {
        Message message = new Message(topic, messageText.getBytes());
//      messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        message.setDelayTimeLevel(4);

        SendResult result = null;
        try {
            result = this.producer.send(message);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(result.getMsgId());
        System.out.println(result.getSendStatus());
        return result.getMsgId();
    }
}

3、配置文件中添加配置

rocketmq:
  consumer:
    consumer-group: SleeveConsumerGroup
  producer:
    producer-group: SleeveProducerGroup
  namesrv-addr: 192.168.8.21:9876

4、在TestController添加接口

    @Autowired
    private ProducerSchedule producerSchedule;

    @GetMapping("/push")
    public void pushMessageToMQ() throws Exception {
        producerSchedule.send("TopicTest", "test");
    }

运行正常:

JAVA、基础技术、技术与框架SpringBoot系列(二十三)RocketMQ插图

打开RocketMQ查看

JAVA、基础技术、技术与框架SpringBoot系列(二十三)RocketMQ插图1

5、创建消息消费者

在manager/rocketmq下创建类ConsumerSchedule

package com.fcors.fcors.manager.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class ConsumerSchedule implements CommandLineRunner {

    @Value("${rocketmq.consumer.consumer-group}")
    private String consumerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    public void messageListener() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        consumer.setNamesrvAddr(namesrvAddr);
        /* 匹配消息主题Topic,匹配的正则表达式 */
        consumer.subscribe("TopicTest", "*");
        /*设置消费条数*/
        consumer.setConsumeMessageBatchMaxSize(1);

        consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
            for (Message message : messages) {
                System.out.println("消息:" + new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
    }


    @Override
    public void run(String... args) throws Exception {
        this.messageListener();
    }
}
JAVA、基础技术、技术与框架SpringBoot系列(二十三)RocketMQ插图2