博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm ack和fail机制再论
阅读量:7055 次
发布时间:2019-06-28

本文共 1054 字,大约阅读时间需要 3 分钟。

之前对这个的理解有些问题,今天用到有仔细梳理了一遍,记录一下

 

首先开启storm tracker机制的前提是,

1. 在spout emit tuple的时候,要加上第3个参数messageid

2. 在配置中acker数目至少为1
3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路

 

流程,

1. 当tuple具有messageid时,spout会把该tuple加到pending list里面

   并发消息给acker,通知acker开始tracker这条tuple

2. 然后再后续的bolt的处理逻辑中,你必须显式的ack或fail所有处理的tuple

   如果这条tuple在整个DAG图上都成功执行了,那么acker会发现该tuple的track异或值为0
   于是acker会发ack_message给spout
   当然如果在DAG图上任意一个节点bolt上fail,那么acker会认为该tuple fail
   于是acker会发fail_message给spout

3. 当spout收到ack或fail message如何处理,

    首先是从pending list里面删掉这条tuple,因为无论ack或fail,只要得到结果,这条tuple就没有继续被cache的必要了
    然后做的事是调用spout.ack或spout.fail
    所以系统默认是不会做任何事的,甚至是fail后的重发,你也需要在fail里面自己实现
    如何实现后面看

4. 如果一条tuple没有被ack或fail,最终是会超时的

    Spout会根据system tick去rotate pending list,对于每个过时的tuple,都调用spout.fail

 

下面的问题就是如何做fail重发,

这个必须用户通过自己处理fail来做,系统是不会自己做的,

public void fail(Object msgId)

看看系统提供的接口,只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,用户如何取得原来的msg

貌似需要自己cache,然后用这个msgId去查询,太坑爹了

阿里自己的Jstorm会提供

public interface IFailValueSpout { void fail(Object msgId, Listvalues); }

这样更合理一些, 可以直接取得系统cache的msg values

转载地址:http://qoool.baihongyu.com/

你可能感兴趣的文章
使用HeadlessChrome做单页应用SEO
查看>>
[iOS]Core Data浅析二 -- 转换实体(Entity)为模型对象
查看>>
thinkpad 系列恢复F1-F12原始功能,切换ctrl和fn的位置
查看>>
JavaScript算法 ,Python算法,Go算法,java算法,系列之归并排序
查看>>
基于 React 的前端项目开发总结
查看>>
VR进化论|教你搭建通用的WebVR工程
查看>>
如何把要想保存的文章转为 Markdown 格式
查看>>
ThinkPHP3.2.3 关联模型
查看>>
高效的 itertools 模块
查看>>
简单意义上的桶排序
查看>>
解决向github提交代码不用输入帐号密码
查看>>
夏日葵电商:微信分销系统开发运营误区及技巧
查看>>
UXCore 组件单测的一些事儿
查看>>
2018 re:Invent回顾篇:前线开发者眼中AWS的创新版图
查看>>
Git Submodule新漏洞已修复
查看>>
广深IT之行:传统模式与技术创新的融合
查看>>
QCon演讲速递:异步处理在分布式系统中的优化作用
查看>>
GitLab可完全管理Google Kubernetes Engine
查看>>
独家解密:阿里大规模数据中心性能分析
查看>>
使用GitHub的十个最佳实践
查看>>