RocketMQ SpringBoot 实践

About 4 mintooljava

官网:https://rocketmq.apache.org/

消息队列简介

  • 可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ
  • 消息队列的优点:
    • 通过异步处理提高系统性能(削峰、减少响应所需时间);
    • 降低系统耦合性。
  • 消息队列缺点:
    • 系统可用性降低: 要担心MQ队列挂掉和消息丢失
    • 系统复杂性提高:需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
    • 一致性问题:消息的真正消费者没有正确消费消息,导致数据不一致。
  • 名词解释:
    • Topic: 主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
    • 生产者(消息发布者):负责生产消息并发送消息到Topic
    • 消费者(消息订阅者): 负责从Topic接收消息并消费
    • 消息:生产者向消息到Topic,消费者接受到的数据
    • 消息属性: 生产者给消息定义的属性,包含Message Key 和 Tag
    • Group: 给一类生产者和消费者分一个组,消息发布或订阅的逻辑一致
  • 应用场景:削峰填谷、异步解耦、顺序收发、分布式事务一致性、大数据分析、分布式缓存同步
  • RocketMQ特色功能:消息查询、消息轨迹、集群消费、广播消费、重置消费位点、死信队列、全球消息路由、资源报表、监控报警
  • 消息类型:普通消息、事务消息、延时消息、顺序消息

MAC安装

下载链接

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.1.0-incubating/rocketmq-all-4.1.0-incubating-source-release.zip

命令行安装

cd  rocketmq-all-4.1.0-incubating
mvn -Prelease-all -DskipTests clean install -U    // maven依赖
cd distribution/target/apache-rocketmq
sh bin/mqnamesrv &  // 启动 Name Server服务
sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & // 启动 broker

docker-compose安装

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
    networks:
        rmq:
          aliases:
            - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
      - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8080:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

docker-compose安装成功后,springboot链接的时候一直报错无法正常链接,网上说系统物理内存需要空闲20G,因为安装了rmqconsole可能导致内存不够,故单独安装了RrocketMQ。

SpringBoot简单集成

配置文件

server.port=8081
apache.rocketmq.consumer.PushConsumer=PushConsumer
apache.rocketmq.producer.producerGroup=Producer
apache.rocketmq.namesrvAddr=localhost:9876

pom.xml依赖

  <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-common -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.13</version>
        </dependency>

生产者服务

package com.example.roketmq.server;


import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * @ClassName ProducerService
 * @Author gaoqisen
 * @Date 2019-10-31
 * @Version 1.0
 */
@Service
public class ProducerService {
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private DefaultMQProducer producer;

    @PostConstruct
    public void initProducer() {
        producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setRetryTimesWhenSendFailed(3);
        try {
            producer.start();
            System.out.println("[Producer 已启动]");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String send(String topic, String tags, String msg) {
        SendResult result = null;
        try {
            Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            result = producer.send(message);
            System.out.println("[生产者:] msgID(" + result.getMsgId() + ") " + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
    }

    @PreDestroy
    public void shutDownProducer() {
        if (producer != null) {
            producer.shutdown();
        }
    }
}

消费者

package com.example.roketmq.server;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @ClassName ConsumerService
 * @Author gaoqisen
 * @Date 2019-10-31
 * @Version 1.0
 */
@Component
public class ConsumerService {
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            consumer.subscribe("test1", "push");

            // 如果是第一次启动,从队列头部开始消费
            // 如果不是第一次启动,从上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {
                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("[消费者] msgID(" + messageExt.getMsgId() + ") msgBody : " + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
            System.out.println("[Consumer 已启动]");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

测试Controller

package com.example.roketmq.server;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName TestController
 * @Author gaoqisen
 * @Date 2019-10-31
 * @Version 1.0
 */
@RestController
public class TestController {
    @Autowired
    private ProducerService producer;

    @RequestMapping("/push")
    public String pushMsg(String msg) {
        return producer.send("test1", "push", msg);
    }
}

如果报错No route info of this topic

  • 更改启动方式为:sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
  • jar包的版本要对应上,如rocketmq的版本为4.1.0-incubating,那么jar包的版本也应该为4.1.0-incubating(按照这种方法以解决)。
Last update:
Contributors: gaoqisen