alook哪里可以下载,alook可以缓存视频的版本

  

  大家好,我是君哥。今天来聊一聊消息队列的延时消息是怎么实现的。   

  

  延时消息是指发送到消息队列后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。   

  

  延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息。   

  

  

1 生产者

  

  

   首先看一个生产者发送延时消息的官方示例代码:   

  

  公共静态void main(字符串参数)引发异常{ //实例化一个生成器以发送计划消息default MQ producer producer=new default MQ producer(' ExampleProducerGroup ');//启动制片人制片人。start();int totalMessagesToSend=100 for(int I=0;我totalMessagesToSendi){ Message Message=新消息(' TestTopic ',(' Hello scheduled message ' i).getBytes());//这条消息将在10秒钟后发送给消费者消息。setdelaytimelevel(3);//发送消息制片人。发送(消息);} //使用后关闭生成器生产。关闭();}   

  

  从上面的代码可以看到,跟普通消息不一样的是,消息设置setDelayTimeLevel属性值,这里设置为3,这里最终将3这个延时级别复制给了耽搁属性。   

  

  关于延时级别,可以看下面这个定义:   

  

  //MessageStoreConfig类私有字符串消息延迟级别=' 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ';   

  

  这里延时级别有18个,上面的示例代码中延迟级别是3,消息会延迟10s后消费者才能拉取。   

  

  

2 Broker 处理

  

  

  经纪人收到消息后,会将消息写入承诺日志。在写入时,会判断消息耽搁属性是否大于0。代码如下:   

  

  //CommitLog类如果(消息。getdelaytimelevel()0){ if(msg。getdelaytimelevel()this。defaultmessagestore。getschedulemessageservice().getMaxDelayLevel()){ msg。setdelaytimelevel(这个。defaultmessagestore。getschedulemessageservice().getMaxDelayLevel());} topic=TopicValidator .RMQ _系统_时间表_主题;int queue id=schedulemessageservice。延迟级别2队列id(消息。getdelaytimelevel());//Backup real topic,queueId消息访问器。put属性(msg,MessageConst .属性_真实_主题,消息。gettopic());消息访问器。put属性(msg,MessageConst .属性_真实_队列_ID,字符串。(消息的值。getqueueid());味精。setpropertiestring(消息解码器。messageproperties2字符串(消息。get properties())));msg.setTopic(主题);味精。setqueueid(队列id);}   

  

  从上面的代码可以看到,提交日志写入时并没有直接写入,而是把主题改为日程_话题_XXXX,把queueId改为延时级别减1。因为延时级别有18个,所以这里有18个队列。如下图:   

  

     

  

  

2.1 写入消息

  

  

   延时消息写入后,会有一个调度任务不停地拉取这些延时消息,这个逻辑在类调度消息服务。这个类的初始化   

代码如下:

  

public void start() { if (started.compareAndSet(false, true)) { this.load(); this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); //省略部分逻辑 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //省略部分逻辑 this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } //省略持久化的逻辑 }}

  

上面的 load() 方法会加载一个 delayLevelTable(ConcurrentHashMap类型),key 保存延时级别(从 1 开始),value 保存延时时间(单位是 ms)。

  

load() 方法结束后,创建了一个有 18 个核心线程的定时线程池,然后遍历 delayLevelTable,创建 18 个任务(DeliverDelayedMessageTimerTask)进行每个延时级别的任务调度。任务调度的代码逻辑如下:

  

public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; } SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { //省略部分逻辑 this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); return; } long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); //省略部分逻辑 long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown > 0) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); //事务消息判断省略 boolean deliverSuc; //只保留同步 deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy); if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e); } finally { bufferCQ.release(); } this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);}

  

这段代码可以参考下面的流程图来进行理解:

  

  

上面有一个修正投递时间的函数,这个函数的意义是如果已经过了投递时间,那么立即投递。代码如下:

  

private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { long result = deliverTimestamp; long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); if (deliverTimestamp > maxTimestamp) { result = now; } return result;}

  

注意:消息从 CommitLog 转发到 ConsumeQueue 时,会判断是否是延时消息(Topic = SCHEDULE_TOPIC_XXXX 并且延时级别大于 0),如果是延时消息,就会修改 tagsCode 值为消息投递的时间戳,而 tagsCode 原值是 tag 的 HashCode。代码如下:

  

//CommitLog类checkMessageAndReturnSize方法if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp);}

  

如下图:

  

  

而 ScheduleMessageService 调度线程将消息从 ConsumeQueue 重新投递到原始队列中时,会把 tagsCode 再次修改为 tag 的 HashCode,代码如下:

  

//类MessageExtBrokerInner,这个方法被 messageTimeup 方法调用。public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) { return 0; } return tags.hashCode();}

  

如下图:

  

  

2.3 一个问题

如果有一个业务场景,要求延时消息 3 小时才能消费,而 RocketMQ 的延时消息最大延时级别只支持延时 2 小时,怎么处理?

  

这里提供两个思路供大家参考:

  

在 Broker 上修改 messageDelayLevel 的默认配置;在客户端缓存 msgId,先设置延时级别是 18(2h),当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,这次延时级别是 17(1h),如果没有缓存则进行消费。

3 总结

经过上面的讲解,延时消息的处理流程如下:

  

最后,延时消息的延时时间并不精确,这个时间是 Broker 调度线程把消息重新投递到原始的 MessageQueue 的时间,如果发生消息积压或者 RocketMQ 客户端发生流量管控,客户端拉取到消息后进行处理的时间可能会超出预设的延时时间。

相关文章

管理员 管理员
在区块链的世界,每个人都是一个节点。
最近文章
  • alook哪里可以下载,alook可以缓存视频的版本
  • abs区块链最新资讯,混合区块链是什么
  • 2021msi全部赛程图,msi季中赛2021rng赛程回放
  • 2021免费追剧公众号,2021免费影视网站推荐
  • 2021年非法采矿罪判决书,2021年非婚生子上户口罚款吗
  • amd6700有矿卡吗,amd5800可以玩什么
  • aieiui拼音教学第二课时,aoouiu拼音教学设计
  • 2021手游人气平台,2021手游人气排行
  • 2021最值钱古币,2021最值钱的十三个证书
  • apexsteam和origin怎么加好友,apexsteam怎么更新
  • am3和am4扣具孔距尺寸,am3和am4原装扣具有什么区别
  • airpods声音比平时小了,airpods声音太小解决方法
  • 2021 金价走势图,2021 金价走势
  • 2022款奔驰gle300报价及图片,奔驰gle350时尚型西安最低价
  • 2022年黄金价格走势预测表,2022年黄金价格走势预测图
  • abey最新报道,abey最新活动
  • 2g显存,2g显卡坦克世界总是出现显卡错误
  • 300兆的算力一天能挖多少币,算力230一天能有多少收益
  • apart的用法及短语例句,与apart有关的英语短语
  • 2020款阿特兹和奔驰cla,奔驰cla200l哪里出厂的
  • 2022年大雪预报,2022年大牛股排行榜
  • a family dinner翻译,a family reunion dinner翻译
  • 2分米40毫米等于多少毫米,一分米减9毫米等于多少毫米
  • 2020手机排行榜5g手机,2020手机排行榜2000左右
  • 100美元等于多少印尼盾,100美元等于多少缅币
  • adc挖矿最新消息,澳大利亚挖矿最新消息
  • abc慈善币如何,abc慈善币多少钱一个
  • 2021年贺岁金银纪念币市场价格,2021年贺岁纪念币兑换
  • 2020年etc打折吗,2020年etc收费比人工贵吗
  • 9.0roll币在哪换,roll币各版本通用吗
  • 2021年贺岁银质纪念币5枚套装,2021年贺岁银质纪念币5枚套装价值
  • 2021年a股涨停次数排行榜,2021年a股正式收官
  • 2021年小篮球潍坊赛区,2021年小篮球联赛山东省赛区
  • 2021金银币有收藏价值吗,2021金银币分级抢购是什么意思
  • 2021免费师范录取分数临沂大学,2021免费师范录取分数聊城大学
  • 2022年还有哪些潜力币推荐,怎么才能快速的收集新币
  • 2021买xr还是xsmax,2021买xr二手有必要吗
  • 1斤银子有几克,1斤银要多少钱
  • 2020年是牛市上涨最快的一年,2020年是牛市还是大级别反弹
  • 4s店不按时交车投诉,4s店不按时交车投诉12315