漫游Kafka实战篇之客户端API,kafka实战篇api
漫游Kafka实战篇之客户端API,kafka实战篇api
原文地址:http://blog.csdn.net/honglei915/article/details/37697655
Kafka Producer APIs
Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:
class Producer { /* 将消息发送到指定分区 */ public void send(kafka.javaapi.producer.ProducerData<K,V> producerData); /* 批量发送一批消息 */ public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData); /* 关闭producer */ public void close(); }Producer API提供了以下功能:
KafKa Consumer APIs
Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。
低级别的API
class SimpleConsumer { /*向一个broker发送读取请求并得到消息集 */ public ByteBufferMessageSet fetch(FetchRequest request); /*向一个broker发送读取请求并得到一个相应集 */ public MultiFetchResponse multifetch(List<FetchRequest> fetches); /** * 得到指定时间之前的offsets * 返回值是offsets列表,以倒序排序 * @param time: 时间,毫秒, * 如果指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset. * 如果指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset. */ public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets); }低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。
高级别的API
/* 创建连接 */ ConsumerConnector connector = Consumer.create(consumerConfig); interface ConsumerConnector { /** * 这个方法可以得到一个流的列表,每个流都是MessageAndMetadata的迭代,通过MessageAndMetadata可以拿到消息和其他的元数据(目前之后topic) * Input: a map of <topic, #streams> * Output: a map of <topic, list of message streams> */ public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); /** * 你也可以得到一个流的列表,它包含了符合TopicFiler的消息的迭代, * 一个TopicFilter是一个封装了白名单或黑名单的正则表达式。 */ public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams); /* 提交目前消费到的offset */ public commitOffsets() /* 关闭连接 */ public shutdown() }这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。
每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。
src/core/target/scala_2.8.0/下
开发相应的功能呀 你 想怎么用
评论暂时关闭