flume+kafka+storm


配置flume:

       http://blog.csdn.net/desilting/article/details/22811593

conf/flume-conf.properties文件:

producer.sources = s
producer.channels = c
producer.sinks = r

producer.sources.s.channels = c
producer.sources.s.type= netcat
producer.sources.s.bind= 192.168.40.134
producer.sources.s.port= 44444

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=192.168.40.134:9092
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.custom.topic.name=mykafka
producer.sinks.r.channel = c

producer.channels.c.type = memory
producer.channels.c.capacity = 1000


配置kafka:

       http://blog.csdn.net/desilting/article/details/22872839

启动zookeeper、kafka及storm

创建topic:

        bin/kafka-topics.sh --create --zookeeper 192.168.40.132:2181 --replication-factor 3 --partitions 1 --topic  mykafka

查看topic:

       bin/kafka-topics.sh --describe --zookeeper 192.168.40.132:2181

       Topic:mykafka PartitionCount:1ReplicationFactor:3Configs:

       Topic: mykafka Partition: 0Leader: 134Replicas: 133,134,132Isr: 134,133,132

partition 同一个topic下可以设置多个partition,将topic下的message存储到不同的partition下,目的是为了提高并行性
leader 负责此partition的读写操作,每个broker都有可能成为某partition的leader
replicas 副本,即此partition在哪几个broker上有备份,不管broker是否存活
isr 存活的replicas    
启动flume:

       bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console


KafkaSink类:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.flume.Context;
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;

public class KafkaSink extends AbstractSink implements Configurable {

    private Context context;
    private Properties parameters;
    private Producer<String, String> producer;

    private static final String PARTITION_KEY_NAME = "custom.partition.key";
    private static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
    private static final String DEFAULT_ENCODING = "UTF-8";
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);

    public void configure(Context context) {
        this.context = context;
        ImmutableMap<String, String> props = context.getParameters();
        this.parameters = new Properties();
        for (Map.Entry<String,String> entry : props.entrySet()) {
            this.parameters.put(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public synchronized void start() {
        super.start();
        ProducerConfig config = new ProducerConfig(this.parameters);
        this.producer = new Producer<String, String>(config);
    }

    public Status process() {
        Status status = null;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();

        try {
            transaction.begin();
            Event event = channel.take();
            if (event != null) {
                String partitionKey = (String) parameters.get(PARTITION_KEY_NAME);
                String topic = Preconditions.checkNotNull((String) this.parameters.get(CUSTOME_TOPIC_KEY_NAME),
                        "topic name is required");
                String eventData = new String(event.getBody(), DEFAULT_ENCODING);
                KeyedMessage<String, String> data = (partitionKey.isEmpty()) ? new KeyedMessage<String, String>(topic,
                        eventData) : new KeyedMessage<String, String>(topic, partitionKey, eventData);
                LOGGER.info("Sending Message to Kafka : [" + topic + ":" + eventData + "]");
                producer.send(data);
                transaction.commit();
                LOGGER.info("Send message success");
                status = Status.READY;
            } else {
                transaction.rollback();
                status = Status.BACKOFF;
            }
        } catch (Exception e) {
            e.printStackTrace();
            LOGGER.info("Send message failed!");
            transaction.rollback();
            status = Status.BACKOFF;
        } finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void stop() {
        producer.close();
    }
}


KafkaSpout参考其他人的代码:

https://github.com/HolmesNL/kafka-spout

需要做一些修改。


storm测试程序:

public class ExclamationTopology {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;
    transient CountMetric _countMetric;
    transient MultiCountMetric _wordCountMetric;
    transient ReducedMetric _wordLengthMeanMetric;
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
      initMetrics(context);
    }

    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
        updateMetrics(tuple.getString(0));
    }

      void updateMetrics(String word)
      {
          _countMetric.incr();
          _wordCountMetric.scope(word).incr();
          _wordLengthMeanMetric.update(word.length());
      }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }

      void initMetrics(TopologyContext context)
      {
          _countMetric = new CountMetric();
          _wordCountMetric = new MultiCountMetric();
          _wordLengthMeanMetric = new ReducedMetric(new MeanReducer());

          context.registerMetric("execute_count", _countMetric, 5);
          context.registerMetric("word_count", _wordCountMetric, 60);
          context.registerMetric("word_length", _wordLengthMeanMetric, 60);
      }
  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    String topic = args.length==2 ? args[1] : args[0];
    KafkaSpout kafkaSpout = new KafkaSpout(topic,"testKafkaGroup","192.168.40.132:2181");

    builder.setSpout("word", kafkaSpout, 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();
    conf.setDebug(true);
    conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);

    if (args != null && args.length == 2) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}

测试结果:

telnet 192.168.40.134 44444

在telnet端随便输入ASDF字符:

ASDF
OK
ASD
OK
F
OK
ASDF
OK

flume端显示:

2014-04-08 01:30:44,379 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASDF
2014-04-08 01:30:44,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success
2014-04-08 01:30:44,604 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASD
2014-04-08 01:30:44,611 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success
2014-04-08 01:30:44,794 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:F
2014-04-08 01:30:44,799 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success
2014-04-08 01:30:45,038 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASDF

最终在storm的logs/metrics.log文件中,会找到这样的记录:

2014-04-08 01:30:28,446 495106 1396945828 ubuntu:6703 10:exclaim2 word_count {ADF^M!!!=1, SDF^M!!!=0, DF^M!!!=2, FDAS^M!!!=0, ^M!!!=0, AF^M!!!=1,D^M!!!=1, DS^M!!!=1, AS^M!!!=5, SAFSDF^M!!!=0, FD^M!!!=1, 224^M!!!=0, ASDF^M!!!=2, FG^M!!!=0, AD^M!!!=1, FS^M!!!=1, ASD^M!!!=5, F^M!!!=3, ASGAS^M!!!=0, SD^M!!!=1, ASFDAS^M!!!=0, FAS^M!!!=2, SDG^M!!!=0}




相关内容