中易网

storm0.8 kafka0.8 怎样整合

答案:1  悬赏:10  
解决时间 2021-01-16 18:07
  • 提问者网友:半生酒醒
  • 2021-01-16 01:01
storm0.8 kafka0.8 怎样整合
最佳答案
  • 二级知识专家网友:夜风逐马
  • 2021-01-16 02:17
public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
_partition = id;
_connections = connections;
_spoutConfig = spoutConfig;
_topologyInstanceId = topologyInstanceId;
_consumer = connections.register(id.host, id.partition); //注册partition到connections,并生成simpleconsumer
_state = state;
_stormConf = stormConf;

String jsonTopologyId = null;
Long jsonOffset = null;
String path = committedPath();
try {
Map json = _state.readJSON(path);
LOG.info("Read partition information from: " + path + " --> " + json );
if (json != null) {
jsonTopologyId = (String) ((Map) json.get("topology")).get("id");
jsonOffset = (Long) json.get("offset"); // 从zk中读出commited offset
}
} catch (Throwable e) {
LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
}

if (jsonTopologyId == null || jsonOffset == null) { // zk中没有记录,那么根据spoutConfig.startOffsetTime设置offset,Earliest或Latest
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
LOG.info("No partition information found, using configuration to determine offset");
} else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
}

_emittedToOffset = _committedTo; // 初始化时,中间状态都是一致的
}
我要举报
如以上回答内容为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
点此我要举报以上问答信息!
大家都在看
推荐信息