fil币今日快讯最新,filter

  

  废话少说,上才艺跳羚版本:2 .3 .3 .releaserockemq-客户端版本:4 .7 .一时钟搭建rockerMq教程:https://博客。csdn。net/itjavaee/article/details/108280613码云链接:后面补上文章参考:https://博客。csdn。net/u 010975270/文章/详情/104911353/非常感谢1.首先导入专家依赖(请自行选择对应的版本)!-注意:这里的版本,要和部署在服务器上的版本号一致-依赖关系groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-客户端/artifactId版本4 .7 .1/版本/依赖关系   

  

  2.消息队列基本使用流程消息发送   

  

  创建生产者制片人,并指定生产者组名指定名称服务器(rockerMq链接地址)启动生产者创建消息对象,指定主题主题,标签和消息体发送消息关闭生产者消息消费   

  

  创建消费者消费者,指定消费组名指定名称服务器订阅主题主题和标签注册消息监听器,设置回调函数,处理消息启动消费者消费者3.1基础消息发送包com。zqh。com。火箭MQ。basemsg导入org。阿帕奇。火箭MQ。客户。例外。mqbrokerexception导入org。阿帕奇。火箭MQ。客户。例外。MQ客户端异常;导入组织。阿帕奇。火箭MQ。客户。制片人。默认MQ生产者;导入组织。阿帕奇。火箭MQ。客户。制片人。发送回拨;导入org。阿帕奇。火箭MQ。客户。制片人。发送结果;导入org。阿帕奇。火箭MQ。客户。制片人。发送状态;导入org。阿帕奇。火箭MQ。常见。消息。消息;导入组织。阿帕奇。火箭MQ。远程处理。例外。远程处理异常;导入Java。util。并发。countdownlatch导入Java。util。并发。时间单位;/** *发送_确认:消息发送成功*刷新磁盘超时:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失*刷新_从设备_超时:消息发送成功,但是服务器同步到奴隶时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失*奴隶_不可用:消息发送成功,但是此时奴隶不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失*/public class BaseMsgProducer { final static String name SRV _ ADDR=' 192。168 .0 .3609876 ';最终静态字符串productor _ GROUP=' base _ GROUP final静态字符串TOPIC=' base _ topicfinal静态字符串TAGS=' base _ tags公共静态void main(字符串参数)抛出InterruptedException、RemotingException、MQClientException、MQBrokerException { sendSyncMsg();//sendASyncMsg();//sendOnewayMsg();} /** * 发送同步消息需要接到消息结果之后再发送下一个消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知*公共静态void sendSyncMsg()抛出InterruptedException、RemotingException、MQClientException、MQBrokerException {   

// 创建生产者producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_syncMsg"); //指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅) producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 1000; i++) { //创建消息对象,指定主题topic,tag和消息体 Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes()); //发送消息 SendResult result = producer.send(message); //发送状态 SendStatus sendStatus = result.getSendStatus(); //消息ID String msgId = result.getMsgId(); //消息队列ID int queueId = result.getMessageQueue().getQueueId(); System.out.println("发送结果:" + result); } //6.- 关闭producer producer.shutdown(); } /** * 异步消息不必等待返回结果,立即发送下一个消息,可以通过send(Message msg, SendCallback sendCallback)中的回调函数,对返回结果进行处理。异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。 */ public static void sendASyncMsg() throws InterruptedException, RemotingException, MQClientException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_async"); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); final CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { Message message = new Message(TOPIC, TAGS + "_async", System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes()); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.println("发送结果:" + sendResult); } @Override public void onException(Throwable throwable) { countDownLatch.countDown(); System.out.println("异常结果:" + throwable.getMessage()); } }); } countDownLatch.await(5, TimeUnit.SECONDS); // 避免提前关闭了,所以等发送完再关闭 producer.shutdown(); } /** * 单向消息通俗来说,就是发送消息不必等待返回结果,也无需执行回调函数。这种方式主要用在不特别关心发送结果的场景,例如日志发送。 */ public static void sendOnewayMsg() throws InterruptedException, RemotingException, MQClientException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_oneway"); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 10; i++) { Message message = new Message(TOPIC, TAGS + "_oneway", System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes()); producer.sendOneway(message); System.out.println("发送完成"); } producer.shutdown(); }}

  

3.2 消费者package com.zqh.com.rocketmq.baseMsg;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.springframework.stereotype.Component;@Componentpublic class BaseMsgConsumer { private DefaultMQPushConsumer consumer; private static final String CONSUMER_GROUP = "base_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "base_topic"; private final static String TAGS = "base_tags_async || base_tags_oneway || base_tags"; public BaseMsgConsumer() throws MQClientException { System.out.println("初始化成功"); consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.subscribe(TOPIC, TAGS); // 分散消费 MessageModel.CLUSTERING 同一个 Consumer ID 所标识的所有 Consumer 分散消费消息。 // 广播消费 MessageModel.BROADCASTING 同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 并发消费模式 (MessageListenerConcurrently) consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size()); list.forEach(msg -> { // 业务实现 System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}

  

4.1 批量信息package com.zqh.com.rocketmq.batchMsg;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;/** * 批量信息 * 前面发送的消息都是一条一条发送,批量消息则是一次发送多条消息,这一批消息总大小(无论是单条消息还是消息总大小)不应该超过4MB(默认大小) * 批量消息一般有两种使用情况: * 达到一定消息数量之后发送 * 一定时间发送(比如1分钟发送一次) */public class BatchMsgProducer { private final static Logger logger = LoggerFactory.getLogger(BatchMsgProducer.class); private static final String PRODUCER_GROUP = "batch_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "batch_topic"; private final static String TAGS = "batch_tags"; public static void sendBatchMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 10; i++) { // 创建消息集合 List<Message> messageList = new ArrayList<>(); // CODE: 13 DESC: the message body size over max value, MAX: 4194304 byte<> bytes = new byte<1024 * 1024 * 3>; messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes())); messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes())); messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes())); SendResult sendResult = producer.send(messageList); System.out.println("发送结果:" + sendResult); } producer.shutdown(); } public static void main(String<> args) throws Exception { sendBatchMsg(); }}

  

4.2 消费者package com.zqh.com.rocketmq.batchMsg;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Componentpublic class BatchMsgConsumer { private final static Logger logger = LoggerFactory.getLogger(BatchMsgConsumer.class); private DefaultMQPushConsumer consumer; private static final String CONSUMER_GROUP = "batch_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "batch_topic"; private final static String TAGS = "batch_tags"; public BatchMsgConsumer() throws MQClientException { System.out.println("初始化成功"); consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.subscribe(TOPIC, TAGS); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 并发消费模式 (MessageListenerConcurrently) consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size()); list.forEach(msg -> { // 业务实现 System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}

  

5.1 延时消费package com.zqh.com.rocketmq.delayMsg;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 延时信息 * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */public class DelayMsgProducer { private final static Logger logger = LoggerFactory.getLogger(DelayMsgProducer.class); private static final String PRODUCER_GROUP = "delay_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "delay_topic"; private final static String TAGS = "delay_tags"; public static void sendDelayMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 10; i++) { Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, "rocketMq".getBytes()); // 3对应的10s message.setDelayTimeLevel(3); // 发送消息 SendResult sendResult = producer.send(message); System.out.println("发送结果:" + sendResult); } producer.shutdown(); } public static void main(String<> args) throws Exception { sendDelayMsg(); }}

  

5.2 消费端

  

package com.zqh.com.rocketmq.delayMsg;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Componentpublic class DelayMsgConsumer { private final static Logger logger = LoggerFactory.getLogger(DelayMsgConsumer.class); private DefaultMQPushConsumer consumer; private static final String CONSUMER_GROUP = "delay_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "delay_topic"; private final static String TAGS = "delay_tags"; public DelayMsgConsumer() throws MQClientException { System.out.println("初始化成功"); consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.subscribe(TOPIC, TAGS); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 并发消费模式 (MessageListenerConcurrently) consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size()); list.forEach(msg -> { // 业务实现 System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}

  

6.1 过滤消息

  

package com.zqh.com.rocketmq.filterMsg;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 过滤信息 */public class FilterMsgProducer { private final static Logger logger = LoggerFactory.getLogger(FilterMsgProducer.class); private static final String PRODUCER_GROUP = "filter_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "filter_topic"; private final static String TAGS = "filter_tags"; public static void sendFilterMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 10; i++) { Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, "bytes".getBytes()); // 绑定自定义属性 message.putUserProperty("i", i + ""); SendResult sendResult = producer.send(message); System.out.println("发送结果:" + sendResult); } producer.shutdown(); } public static void main(String<> args) throws Exception { sendFilterMsg(); }}

  

6.2 消费端

  

package com.zqh.com.rocketmq.filterMsg;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.MessageSelector;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Componentpublic class FileterMsgConsumer { private final static Logger logger = LoggerFactory.getLogger(FileterMsgConsumer.class); private DefaultMQPushConsumer consumer; private static final String CONSUMER_GROUP = "filter_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "filter_topic"; // * 的话代表全部接收 private final static String TAGS = "filter_tags"; public FileterMsgConsumer() throws MQClientException { System.out.println("初始化成功"); consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR);// 通过tag来过滤需要的// consumer.subscribe(TOPIC, TAGS); //通过sql过滤消息,只要i>=3的消息 consumer.subscribe(TOPIC, MessageSelector.bySql("i>=3")); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 并发消费模式 (MessageListenerConcurrently) consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size()); list.forEach(msg -> { // 业务实现 System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}

  

7.1 顺序消息

  

package com.zqh.com.rocketmq.orderMsg;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.ArrayList;import java.util.List;/** * SEND_OK:消息发送成功 * FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失 * FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失 * SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 Slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失 * <p> * 保证消息的顺序性 全局顺序消息 * 一般通过建立单队列的方式,消息全在一个队列里,保证消息的顺序性(FIFO) * 局部顺序消息 * 不需要限定单个队列,将需要按序消费的消息,放入到同一个队列中即可。 * 比如:顺序消息A1,B2,C3,D4,都放入到队列1中,就能保证消息的顺序性。 */public class OrderMsgProducer { final static String NAMESRV_ADDR = "192.168.0.30:9876"; final static String PRODUCER_GROUP = "order_group"; final static String TOPIC = "order_topic"; final static String TAGS = "order_tags"; public static void main(String<> args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { sendOrderMsg(); } /** * 发送同步消息需要接到消息结果之后再发送下一个消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 */ public static void sendOrderMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { // 创建生产者producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); // 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅) producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); List<OrderMsg> orderMsgList = OrderMsg.orderMsgList(); int i = 0; for (OrderMsg orderMsg : orderMsgList) { Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("hello rocketMq" + i++).getBytes()); // 发送消息 // 参数一:消息对象 // 参数二:消息队列的选择器 // 参数三:选择队列的业务标识(此处为订单ID) // 其实就是:使用业务标识,进行一定的规则,选出该标识对应存储的队列 SendResult sendResult = producer.send(message, new MessageQueueSelector() { /** * @param list 消息队列 * @param message 消息对象(即上面传递过来的message) * @param o 业务标识的参数(即传递过来的orderId) * @return: org.apache.rocketmq.common.message.MessageQueue */ @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { int index = Math.abs(o.hashCode()) % (list.size()); return list.get(index); } }, orderMsg.getId()); // 上面的orderMsg.getId()这个orderid就是用来确认使用的队列,一般是业务订单id,如果是0的话,就是全局顺序 System.out.println("发送结果:" + sendResult); } producer.shutdown(); }}class OrderMsg { private String id; private String msg; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public OrderMsg(String id, String msg) { this.id = id; this.msg = msg; } public static List<OrderMsg> orderMsgList() { List<OrderMsg> objects = new ArrayList<>(); for (int i = 0; i < 20; i++) { objects.add(new OrderMsg("12", i + "")); } return objects; }}

  

7.2 消费端

  

package com.zqh.com.rocketmq.orderMsg;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Componentpublic class OrderMsgConsumer { private final static Logger logger = LoggerFactory.getLogger(OrderMsgConsumer.class); private DefaultMQPushConsumer consumer; private static final String CONSUMER_GROUP = "order_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "order_topic"; private final static String TAGS = "order_tags"; public OrderMsgConsumer() throws MQClientException { System.out.println("初始化成功"); consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.subscribe(TOPIC, TAGS); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 注册消息监听器 // 顺序消息监听 - MessageListenerOrderly 使用该监听器时,只启用一个线程对一个队列进行消费 // 即:一个队列只会被一个线程取到,第二个线程无法访问这个队列(对消费队列上锁,在消费消息之前,先去获取对应队列对应的锁,保证同一个队列不会被并发消费) consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size()); list.forEach(msg -> { System.out.println(Thread.currentThread().getName() + "\t" + LocalDateTime.now() + "\t\t消费消息:" + new String(msg.getBody())); } ); return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); }}

  

8.1 事务消息

  

package com.zqh.com.rocketmq.transactionMsg;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 事务信息 */public class TransactionMsgProducer { private final static Logger logger = LoggerFactory.getLogger(TransactionMsgProducer.class); private static final String PRODUCER_GROUP = "transaction_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "transaction_topic"; private final static String TAGS = "transaction_tags"; public static void sendTransactionMsg() throws MQClientException { TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); producer.setTransactionListener(new TransactionListener() { /** * @Description 在该方法中执行本地事务 * @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id * @param o 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到 * @return: org.apache.rocketmq.client.producer.LocalTransactionState * 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */ @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { //这边可以创建一个全局共享变量concurrentHashMap,用来存储transactionId以及对应的值(比如正常执行1,异常执行-1等),便于回查时进行判断,这里就不赘述了 System.out.print("执行本地事务" + message.getTransactionId()); int i = (int) o; if (i % 3 == 0) { //提交消息 System.out.println("提交"); return LocalTransactionState.COMMIT_MESSAGE; } else if (i % 3 == 1) { //回滚消息 System.out.println("回滚"); return LocalTransactionState.ROLLBACK_MESSAGE; } //消息回查 System.out.println("回调"); return LocalTransactionState.UNKNOW; } /** * @Description 事务消息状态回查 * @param messageExt 通过获取transactionId来判断这条消息的本地事务执行状态 * @return: org.apache.rocketmq.client.producer.LocalTransactionState * 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("消息回查:" + messageExt.getTransactionId() + "\t" + new String(messageExt.getBody())); //可以根据消息体中的信息去数据库中查询该消息是否已经被执行 //或者根据上方执行本地事务时concurrentHashMap中存储的transactionId对应的值进行判断,返回对应的操作值 //这里演示就直接提交了 return LocalTransactionState.COMMIT_MESSAGE; } }); // 启动生产者 producer.start(); for (int i = 0; i < 10; i++) { Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("rocketMq" + i).getBytes()); // 发送事务消息 // 参数一:消息对象 // 参数二:附加参数,可用于事务监听器中执行本地事务时的获取参数 SendResult sendResult = producer.sendMessageInTransaction(message, i); System.out.println("发送消息:" + sendResult); } // 由于MQ要回查生产者,所以不需要关闭生产者// producer.shutdown(); } public static void main(String<> args) throws Exception { sendTransactionMsg(); }}

  

8.2 消费端

  

package com.zqh.com.rocketmq.transactionMsg;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Componentpublic class TransactionMsgConsumer { private final static Logger logger = LoggerFactory.getLogger(TransactionMsgConsumer.class); private DefaultMQPushConsumer consumer; private static final String CONSUMER_GROUP = "transaction_group"; private final static String NAMESRV_ADDR = "192.168.0.30:9876"; private final static String TOPIC = "transaction_topic"; private final static String TAGS = "transaction_tags"; public TransactionMsgConsumer() throws MQClientException { System.out.println("初始化成功"); consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.subscribe(TOPIC, TAGS); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 并发消费模式 (MessageListenerConcurrently) consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size()); list.forEach(msg -> { // 业务实现 System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); }}

  

题外话1. 事务流程图   

  

  

相关文章