tp钱包兑换失败怎么处理,tp钱包兑换超时不到账

  

  今天我们就开始学习下默认消息发送流程,学习他的实现思路,也帮助我们工作中,遇到了问题不会手足无措。   

  

     

  

  思考问题消息发送者是如何做负载均衡的?消息发送者是如何保证高可用的?消息发送批量消息如何保证一致性的?默认发送流程-工作原理源码入口:org。阿帕奇。火箭MQ。客户。制片人。defaultmq生产者#发送(组织。阿帕奇。火箭MQ。常见。消息。消息)   

  

  启动演示:   

  

  DefaultMQProducer producer=new DefaultMQProducer(' group 1 ');制片人。setnamesrvaddr(' xxx:9876 ');producer.start()。消息消息=新消息(' TopicTest' /* Topic */,' TagA' /* Tag */,(' Hello RocketMQ ' i).getBytes(RemotingHelper .DEFAULT_CHARSET) /*消息体*/);发送结果发送结果=productor。发送(msg);流程:   

  

  1.校验主题,设置主题   

  

  味精。设置主题(命名空间为(消息。gettopic()));命名空间为(字符串资源)的公共字符串{返回命名空间util。包装名称空间(this。获取命名空间(),资源);}2.默认发送方式为同步发送,默认超时时间为3s   

  

  private int sendMsgTimeout=3000 public send result send(消息msg,长超时)抛出MQClientException,RemotingException,MQBrokerException,中断的异常{返回此。senddefaultimpl(消息,通信模式.同步、空、超时);}3.确认 producer service 运行状态是否为运行中   

  

  入口:org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl # makeSureStateOK   

  

  //检查状态,如果不是运转状态则抛出异常私有void makeSureStateOK()抛出MQClientException { if(this。服务状态!=服务状态正在运行){ throw new MQClientException('生产者服务状态不正常,‘这个。服务状态错误。建议ttodo(FAQUrl .CLIENT_SERVICE_NOT_OK),null);} }4.校验信息   

  

  主题长达是否大于主题最大长度,主题是否为空是否通过正则校验,正文是否为空,正文大小是否超过4m公共静态void checkTopic(字符串主题)抛出MQClientException { if(utilall。为空(topic)){ throw new MQ client exception('指定主题为空,null);}如果(题目。LENGTH()TOPIC _ MAX _ LENGTH){ throw new MQ client exception(string。格式('指定主题长于主题最大长度%d . ',TOPIC _ MAX _ LENGTH),null);} if(isTopicOrGroupIllegal(topic)){ throw new MQClientException(string。格式(   

"The specified topic<%s> contains illegal characters, allowing only %s", topic, "^<%|a-zA-Z0-9_->+$"), null); }}// body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());5.找到主题发布的信息,未找到则抛出异常

  

入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo

  

消息生产者更新和维护路由信息缓存

  

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 消息生产者更新和维护路由信息缓存 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; }}6.通过TopicPublishInfo 找到对应的MessageQueue下的,BrokerName信息

  

入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue

  

获取到BrokerName对应的MessageQueue信息

  

public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); }}如果lastBrokerName为null,通过对 sendWhichQueue 方法获取一个队列

  

取余,然后从messageQueueList中获取一个MessageQueue

  

public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos);}7.最后消息发送

  

入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl

  

1.根绝BrokerName获取到broker地址

  

在启动阶段,对BrokerAddrTable信息进行了维护

  

public String findBrokerAddressInPublish(final String brokerName) { HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { return map.get(MixAll.MASTER_ID); } return null;}如果未找到,则通过主题查找主题信息,通过更新路由信息后,在尝试获取,如果还未找到则抛出异常

  

if (null == brokerAddr) { // 1.1 如果未找到,则通过主题查找主题信息,通过更新路由信息后,在尝试获取,如果还未找到则抛出异常 tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}2.为消息分配全局唯一ID

  

// 为消息分配全局唯一IDif (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg);}在RocketMQ消息发送-请求与响应文章中,我们已经学习了请求参数中,创建了全局唯一的MsgId,可以回头看一看

  

3.注册钩子消息发送钩子函数

  

这里主要做了三件事情,确认MsgType类型、是否为延迟消息、调用钩子函数内的方法

  

if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); // 3.1 通过isTrans来确定MsgType类型 if ("true".equals(isTrans)) { context.setMsgType(MessageType.Trans_Msg_Half); } // 3.2 如果msg里面 __STARTDELIVERTIME 或者 DELAY 不为空,则设置为延迟消息 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } // 3.3 调用钩子函数里的方法 this.executeSendMessageHookBefore(context);}4.设置发送信息请求头SendMessageRequestHeader

  

  

最后根据默认发送方式,进行消息的发送

  

主要利用NettyRemotingClient进行发送,这里就先不展开来说了 入口: MQClientAPIImpl.sendMessage()

  

问题答复消息发送者是如何做负载均衡的?默认采用轮询,每一个消息发送者全局会维护一个 Topic 上一次选择的队列,然后基于这个序号进行递增轮询AllocateMessageQueueAveragely平均分配,按照总数除以消费者个数进行,对每个消费者进行分配AllocateMessageQueueAveragelyByCircle 轮流平均分配,按照消费者个数,进行轮询分配消息发送者是如何保证高可用的?在上面的步骤中通过TopicPublishInfo 找到对应的MessageQueue下的,BrokerName信息,利用参数sendLatencyFaultEnable来开启关闭故障规避机制sendLatencyFaultEnable 设置为 true:开启延迟规避机制,一旦消息发送失败会将 broker-a “悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该 Broker 发送消息。使用本次消息发送延迟时间来计算Broker故障规避时长,不参与消息发送队列负载final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq;}但是这样子做可能带来的后果是Broker没有可用的情况,或者是某个Broker数据激增,增加消费者的压力,所以默认不开启规避机制,遇到消息发送失败,规避 broker-a,但是在下一次消息发送时,即再次调用broker-a。

  

消息发送批量消息如何保证一致性的?将一个Topic下的消息,通过batch方法包一起发送客户端ID与使用陷阱摘自丁威老师的文章

  

  

总结这段时间主要学习了RocketMQ的消息发送,主要是以源码为主,深入了解了消息发送的启动和消息发送的流程,以及认识到客户端ID与使用陷阱 一图总结

  

  

  

作者:叫我小郭_
链接:https://juejin.cn/post/7105315713157431332
来源:稀土掘金

相关文章