(六)storm-kafka源码走读之PartitionManager,kafkapartition


PartitionManager算是storm-kafka的核心类了,现在开始简单分析一下。还是先声明一下,metric部分这里不做分析。

PartitionManager主要负责的是消息的发送、容错处理,所以PartitionManager会有三个集合

  •  _pending尚未发送的message的offset集合, 是个TreeSet<Long>()
  • failed : 发送失败的offset 集合,是个TreeSet<Long>()
  • _waitingToEmit: 存放待发射的message,是个LinkedList,从kafka读到的message全部先放在这里
还有几个变量需要说一下:
  • Long _emittedToOffset;   从kafka读到的offset,从kafka读到的messages会放入_waitingToEmit,放入这个list,我们就认为一定会被emit,所以emittedToOffset可以认为是从kafka读到的offset 
  • Long _committedTo;  已经写入zk的offset
  • lastCompletedOffset() 已经被成功处理的offset,由于message是要在storm里面处理的,其中是可能fail的,所以正在处理的offset是缓存在_pending中的 
    如果_pending为空,那么lastCompletedOffset=_emittedToOffset 
    如果_pending不为空,那么lastCompletedOffset为pending list里面第一个offset,因为后面都还在等待ack
public long lastCompletedOffset() {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.first();
        }
    }

来看初始化的过程:
     public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
        _partition = id;
        _connections = connections;
        _spoutConfig = spoutConfig;
        _topologyInstanceId = topologyInstanceId;
        _consumer = connections.register(id.host, id.partition);
        _state = state;
        _stormConf = stormConf;
        numberAcked = numberFailed = 0;

        String jsonTopologyId = null;
        Long jsonOffset = null;
        String path = committedPath();
        try {
            Map<Object, Object> json = _state.readJSON(path); // 从zk读取offset
            LOG.info("Read partition information from: " + path +  "  --> " + json );
            if (json != null) {
                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
                jsonOffset = (Long) json.get("offset");
            }
        } catch (Throwable e) {
            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
        }

		/**
		 * 根据用户设置的startOffsetTime,值来读取offset(-2 从kafka头开始  -1 是从最新的开始 0 =无 从ZK开始)
		 **/
        Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);

        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
            _committedTo = currentOffset;
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
        } else {
            _committedTo = jsonOffset;
            LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
        }

		/**
		 * 下面这个if判断是如果当前读取的offset值与提交到zk的值不一致,且相差Long.MAX_VALUE,就认为中间很大部分msg发射了没有提交,就把这部分全部放弃,避免重发
		 * 令_committedTo = currentOffset, 这个是新修复的bug,之前maxOffsetBehind=100000(好像是这个值,这个太小),bug isuue 是 STORM-399

		 **/
        if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
            LOG.info("Last commit offset from zookeeper: " + _committedTo);
            _committedTo = currentOffset;
            LOG.info("Commit offset " + _committedTo + " is more than " +
                    spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
        }

        LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
        _emittedToOffset = _committedTo;

        _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
        _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
        _fetchAPICallCount = new CountMetric();
        _fetchAPIMessageCount = new CountMetric();
    }
刚开始的时候需要读取message,放到_waitingToEmit中,这是fill的过程,看代码
private void fill() {
        long start = System.nanoTime();
        long offset;
		
		// 首先要判断是否有fail的offset, 如果有的话,在需要从这个offset开始往下去读取message,所以这里有重发的可能
        final boolean had_failed = !failed.isEmpty(); 

        // Are there failed tuples? If so, fetch those first.
        if (had_failed) {
            offset = failed.first(); // 取失败的最小的offset值,
        } else {
            offset = _emittedToOffset;
        }

        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
        long end = System.nanoTime();
        long millis = (end - start) / 1000000;
        _fetchAPILatencyMax.update(millis);
        _fetchAPILatencyMean.update(millis);
        _fetchAPICallCount.incr();
        if (msgs != null) {
            int numMessages = 0;

            for (MessageAndOffset msg : msgs) {
                final Long cur_offset = msg.offset();
                if (cur_offset < offset) {
                    // Skip any old offsets.
                    continue;
                }
				
				/**
				 * 只要是没有失败的或者失败的set中含有该offset(因为失败msg有很多,我们只是从最小的offset开始读取msg的)
				 * ,就把这个message放到待发射的list中
				 **/
                if (!had_failed || failed.contains(cur_offset)) {
                    numMessages += 1;
                    _pending.add(cur_offset);
                    _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
                    _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
                    if (had_failed) { // 如果失败列表中含有该offset,就移除,因为要重新发射了。
                        failed.remove(cur_offset);
                    }
                }
            }
            _fetchAPIMessageCount.incrBy(numMessages);
        }
    }

=================== github 最新代码====================== 把如下代码改成
ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);

修改成:
ByteBufferMessageSet msgs = null;
        try {
            msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
        } catch (UpdateOffsetException e) {
            _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
            LOG.warn("Using new offset: {}", _emittedToOffset);
            // fetch failed, so don't update the metrics
            return;
        }


====================2014-11-10========================
fetchMessage过程
public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
        ByteBufferMessageSet msgs = null;
        String topic = config.topic;
        int partitionId = partition.partition;
        for (int errors = 0; errors < 2 && msgs == null; errors++) { //容忍两次错误
            FetchRequestBuilder builder = new FetchRequestBuilder();
            FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
                    clientId(config.clientId).maxWait(config.fetchMaxWait).build();
            FetchResponse fetchResponse;
            try {
                fetchResponse = consumer.fetch(fetchRequest);
            } catch (Exception e) {
                if (e instanceof ConnectException ||
                        e instanceof SocketTimeoutException ||
                        e instanceof IOException ||
                        e instanceof UnresolvedAddressException
                        ) {
                    LOG.warn("Network error when fetching messages:", e);
                    throw new FailedFetchException(e);
                } else {
                    throw new RuntimeException(e);
                }
            }
            if (fetchResponse.hasError()) { // 主要处理offset outofrange的case,通过getOffset从earliest或latest读
                KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
                if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
                    long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
                    LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                            "retrying with default start offset time from configuration. " +
                            "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
                    offset = startOffset;
                } else {
                    String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                    LOG.error(message);
                    throw new FailedFetchException(message);
                }
            } else {
                msgs = fetchResponse.messageSet(topic, partitionId);
            }
        }
        return msgs;
    }
====================== github 上关于fetchMessage已有相关改动,改动代码如下====================================
public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws UpdateOffsetException {
        ByteBufferMessageSet msgs = null;
        String topic = config.topic;
        int partitionId = partition.partition;
        FetchRequestBuilder builder = new FetchRequestBuilder();
        FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
                clientId(config.clientId).maxWait(config.fetchMaxWait).build();
        FetchResponse fetchResponse;
        try {
            fetchResponse = consumer.fetch(fetchRequest);
        } catch (Exception e) {
            if (e instanceof ConnectException ||
                    e instanceof SocketTimeoutException ||
                    e instanceof IOException ||
                    e instanceof UnresolvedAddressException
                    ) {
                LOG.warn("Network error when fetching messages:", e);
                throw new FailedFetchException(e);
            } else {
                throw new RuntimeException(e);
            }
        }
        if (fetchResponse.hasError()) {
            KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
            if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
                LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                        "retrying with default start offset time from configuration. " +
                        "configured start offset time: [" + config.startOffsetTime + "]");
                throw new UpdateOffsetException();
            } else {
                String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                LOG.error(message);
                throw new FailedFetchException(message);
            }
        } else {
            msgs = fetchResponse.messageSet(topic, partitionId);
        }
        return msgs;
    }

去掉了三次容错处理,之所以这么修改的原因大家去看STORM-511 isuue,描述的原因如下:
With default behaviour (KafkaConfig.useStartOffsetTimeIfOffsetOutOfRange == true) when Kafka returns the error about offset being out of range, storm.kafka.KafkaUtils.fetchMessages tries to fix offset in local scope and retry fetch request. But if there are no more messages appeared under that specified partition it will never update the PartitionManager, but keep sending tons of requests with invalid offset to Kafka broker. On both sides Storm and Kafka logs grow extremely quick during that time.

==========================================2014-11-10添加=====================================================


再来看next方法,这个方法就是KafkaSpout 的nextTuple真正调用的方法
//returns false if it's reached the end of current batch
    public EmitState next(SpoutOutputCollector collector) {
        if (_waitingToEmit.isEmpty()) {
            fill(); // 开始时获取message
        }
        while (true) {
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); //每次读取一条
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
			// 如果忘记了,可以再返回看下自定义scheme这篇 : http://blog.csdn.net/wzhg0508/article/details/40874155
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); 
            if (tups != null) {
                for (List<Object> tup : tups) { //这个地方在讲述自定义Scheme时,提到了
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break; // 这里就是每成功发射一天msg,就break掉,返回emitstate给kafkaSpout的nextTuple中做判断和定时commit成功处理的offset到zk
            } else {
                ack(toEmit.offset); // ack 做清除工作
            }
        }
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }

来看ack的操作:
public void ack(Long offset) {
        if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
            // Too many things pending!
            _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();
        }
        _pending.remove(offset);
        numberAcked++;
    }

接下来看下,commit的操作,这个操作也是在kafkaSpout中调用的
public void commit() {
        long lastCompletedOffset = lastCompletedOffset();
        if (_committedTo != lastCompletedOffset) {
            LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
            Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                    .put("offset", lastCompletedOffset)
                    .put("partition", _partition.partition)
                    .put("broker", ImmutableMap.of("host", _partition.host.host,
                            "port", _partition.host.port))
                    .put("topic", _spoutConfig.topic).build();
            _state.writeJSON(committedPath(), data);

            _committedTo = lastCompletedOffset;
            LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
        } else {
            LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
        }
    }

这个commit很简单,不用我多做解释了 重点来看下fail处理吧
public void fail(Long offset) {
        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
            LOG.info(
                    "Skipping failed tuple at offset=" + offset +
                            " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +
                            " behind _emittedToOffset=" + _emittedToOffset
            );
        } else {
            LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
            failed.add(offset);
            numberFailed++;
            if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
                throw new RuntimeException("Too many tuple failures");
            }
        }
    }
之前storm-kafka-0.8plus的版本是这样的(摘自storm-kafka-0.8-plus 源码解析

首先作者没有cache message,而只是cache offset 
所以fail的时候,他是无法直接replay的,在他的注释里面写了,不这样做的原因是怕内存爆掉

所以他的做法是,当一个offset fail的时候, 直接将_emittedToOffset回滚到当前fail的这个offset 
下次从Kafka fetch的时候会从_emittedToOffset开始读,这样做的好处就是依赖kafka做replay,问题就是会有重复问题 
所以使用时,一定要考虑,是否可以接受重复问题

public void fail(Long offset) {
        //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
        // things might get crazy with lots of timeouts
        if (_emittedToOffset > offset) {
            _emittedToOffset = offset;
            _pending.tailSet(offset).clear();
        }
    }

大家对比一下吧
后续继续更新上述未说清楚的地方,望大家包涵

reference

http://www.cnblogs.com/fxjwind/p/3808346.html

相关内容