_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)


博文作者:妳那伊抹微笑
个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在
技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
qq交流群:214293307  云计算之嫣然伊笑(期待与你一起学习,共同进步)


# Kfaka的体系结构

# 学习前言
Kafka的整个学习过程就是自己看官网的文档,出错了找各种错,各种百度Google什么的,还好最后都解决了,不然就坑爹了,伤不起啊!英语真心也是硬伤,还是好好学英语,多记记单词吧!看官方文档还是有点压力的,不过压力就是动力,人生就该如此,这几天有点小忙,今天终于把这个整理完了,以此来帮助那些想学这些新技术的朋友们,虽然整理的时候有点累,但是还是值得的。对云计算有兴趣的朋友可以加上面说的214293307这个群哦,一起学习,共同进步 ...

# Kafka介绍

# 介绍(Introduce)

Kafka是一个分布式的,分区,复制的提交日志服务。它提供了一个信息系统的功能,但有一个独特的设计。

这一切意味着什么?

首先让我们回顾一些基本的通信术语:

Kafka保持的类别称为主题的邮件订阅。

我们称之为发布消息到Kafka主题的生产流程。

我们会打电话的过程,订阅的主题和过程的发布消息消费者。饲料

Kafka是作为一个由一个或多个服务器的集群称为经纪人。

因此,在一个高水平的生产者发送消息,通过网络向Kafka集群,反过来又服务于他们的消费者喜欢这个:

 

客户和服务器之间的通信是做了一个简单的,高性能的,与语言无关的TCP协议。我们为KafkaJava客户端,但客户端在很多语言中都可用。

# 主题和日志(Topics and Logs)

让我们先进入高层次的抽象Kafka提供的话题。

一个主题是一个类或饲料的名字,消息被发布。对于每一个主题,Kafka集群保持分区日志看起来像这样:

 

每个分区是一个有序的信息,是不断追加对提交的日志不可变的序列。在分区的消息都分配一个序列号唯一标识每个消息称为分区中的偏移量。

Kafka集群保留所有发布的消息是否已为一个可配置的时间消耗。例如,如果日志保留设置为两天,然后对两天前一消息发布可供消费的,之后它将被丢弃,自由的空间。Kafka的表现实际上是恒定的相对于数据的大小,所以保持大量的数据是没有问题的。

事实上,只有元数据保留在每一个消费者的基础是日志中的消费者的立场,称为抵消。这个偏移量是由消费者控制:一般消费者将推进其线性偏移因为它读取的消息,但事实上的位置是由消费者的控制,它可以在任何命令它喜欢使用消息。例如,消费者可以重置一个旧的偏移处理。

结合这个特点意味着Kafka的消费者都很便宜,他们可以来来去去,没有太大的影响,群集或其他消费者。例如,您可以使用我们的命令行工具的尾巴的主题没有改变什么是由任何现有的消费者消费的内容。

日志中的分区有多种用途。首先,他们允许日志规模超出大小适合在一个单一的服务器。每个分区必须适合于服务器主机,但一个话题可能有许多分区,它可以处理任意数量的数据。二是并行性更高一点的单位。

#

# 分布式(Distribution)

日志的分区分布在Kafka集群中的服务器,每个服务器处理数据和请求共享的分区。每个分区复制在一个可配置的服务器数量的容错。每个分区都有一个服务器充当“领导者”和零个或更多的服务器充当“追随者”。领导者处理所有分区的读写请求而被动复制领导人追随者。如果领导者失败,其中一个追随者将自动成为新的领袖。每个服务器充当领袖的分区和跟风者的为他人所以负载均衡的集群中的。

# 生产者(Producers)

生产商将数据发布到主题的选择。生产者负责选择哪个消息分配到哪个分区内的话题。循环的方式可以简单地平衡负载或它可以根据一些语义配分函数(比如基于一些关键的消息)。更多的使用在第二个分区。

# 消费者(Consumers)

消息通常有两个模型:队列和发布-订阅。在一个队列,消费者可能会从服务器读取和每个消息去其中的一个,在发布-订阅消息被广播给所有的消费者。Kafka提供单个消费者的抽象,概括了these-the消费者团体。

消费者与消费者团体名称,标签和每个消息发布到主题是交付给一个消费者实例在每个订阅的消费者群体。可以在单独的进程或消费者实例在不同的机器上。

如果所有的消费者实例有相同的消费群体,那么这个作品就像一个传统的队列在消费者均衡负载。

如果所有的消费者实例有不同的消费群体,那么这个作品发布-订阅和所有消息被广播给所有的消费者。

更常见的,然而,我们发现,主题有一个小数量的消费者团体,每个逻辑订户一个。每组由许多消费者对可扩展性和容错性的实例。这只不过是发布-订阅语义订阅者是集群的消费者,而不是单个的过程。


两个服务器集群Kafka举办四个分区(P0-P3)两个消费群体。消费者团体有两个使用者实例和B组有四个。

Kafka保证比传统的消息传递系统,具有较强的排序。

传统的队列在服务器上保留消息顺序,从队列中,如果多个消费者消费,那么服务器分发消息的顺序存储。然而,尽管服务器分发消息,消息异步传递给消费者,所以他们可能准时到达的顺序不同的消费者。这实际上意味着失去消息的排序的并行消费。消息传递系统经常解决这个概念的独家消费”,只允许一个进程使用的队列,但当然,这意味着没有并行处理。

Kafka它更好。通过的概念parallelism-the partition-within主题,Kafka能够提供订购担保和负载均衡池的消费过程。这是分配的分区主题来达到消费者消费者团体,每个分区都被一个消费者。这样我们确保消费者是唯一的读者,分区和消耗的数据。因为这仍然有很多分区平衡负载在许多消费者实例。但是要注意,不能有更多的消费者比分区实例。

Kafka仅提供了一个全序的消息在一个分区中,而不是在不同的分区之间的话题。Per-partition分区数据排序结合能力的关键是足够的对于大多数应用程序。然而,如果你需要一个全序与主题信息就可以实现这一点,只有一个分区,尽管这将意味着只有一个消费者的过程。

# Kafka保证 (Guarantees)

# 在一个高级Kafka中赋予以下保证:

# 消息由一个生产商将附加到一个特定的主题分区顺序发送。也就是说,如果一个消息发送M1由同一生产商作为消息平方米,M1和M1发送第一个,然后将有一个偏移量低于平方米,出现在日志中。
# 使用者实例看到消息的顺序把它们存储在日志中。
# 对于复制因子N的一个主题,我们将容忍N - 1服务器失败而不会丢失任何消息提交到日志

给出更多的细节在这些担保在设计部分的文档。

# Kafka的设计目标

# Kafka的安装

# 下载kafka_2.9.2-0.8.1.1.tgz(官方推荐scala为2.9.2版本)

[root@rs229 ~]# wget -c -P /root http://mirrors.cnnic.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

# 解压kafka_2.9.2-0.8.1.1.tgz

[root@rs229 kafka]# pwd

/usr/local/adsit/yting/apache/kafka

[root@rs229 kafka]# ll

total 14716

-rw-r--r-- 1 root root 15067175 Jun 26 21:50kafka_2.9.2-0.8.1.1.tgz

[root@rs229 kafka]# tar -zxvf kafka_2.9.2-0.8.1.1.tgz

[root@rs229 kafka]# ll

total 14720

drwxr-xr-x 5 root root     4096 Apr 23 03:37 kafka_2.9.2-0.8.1.1

-rw-r--r-- 1 root root 15067175 Jun 26 21:50kafka_2.9.2-0.8.1.1.tgz

[root@rs229 kafka]# cd kafka_2.9.2-0.8.1.1

[root@rs229 kafka_2.9.2-0.8.1.1]# ll

total 28

drwxr-xr-x 3 root root  4096 Apr 23 03:37 bin

drwxr-xr-x 2 root root  4096 Apr 23 03:37 config

drwxr-xr-x 2 root root  4096 Apr 23 03:37 libs

-rw-rw-r-- 1 root root 11358 Apr 23 02:37 LICENSE

-rw-rw-r-- 1 root root   162 Apr 23 02:37 NOTICE(这么一看真简洁啊!)

# 配置环境变量

[root@rs229 bin]# pwd

/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin

[root@rs229 bin]# vi /etc/profile.d/yting.sh

# yousmile

# env configure start

JAVA_HOME=/usr/local/adsit/yting/jdk/jdk1.7.0_60

HADOOP_HOME=/usr/local/adsit/yting/apache/hadoop/hadoop-2.2.0

HBASE_HOME=/usr/local/adsit/yting/apache/hbase/hbase-0.96.2-hadoop2

ZOOKEEPER_HOME=/usr/local/adsit/yting/apache/zookeeper/zookeeper-3.4.6

HIVE_HOME=/usr/local/adsit/yting/apache/hive/apache-hive-0.13.1-bin

MAVEN_HOME=/usr/local/adsit/yting/apache/maven/apache-maven-3.2.1

MAHOUT_HOME=/usr/local/adsit/yting/apache/mahout/mahout-distribution-0.9

MAHOUT_LOCAL=MAHOUT_HOME

SCALA_HOME=/usr/local/adsit/yting/apache/scala/scala-2.10.4

STORM_HOME=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.1-incubating

REDIS_HOME=/usr/local/adsit/yting/apache/redis/redis-2.8.9

FLUME_HOME=/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/

KAFKA_HOME=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1

 

PATH=.:$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin:$ZOOKEEPER_HOME/bin:$HIVE_HOME/bin:$MAVEN_HOME/bin:$MAHOUT_HOME/bin:$SCALA_HOME/bin:$STORM_HOME/bin:$REDIS_HOME/bin:$FLUME_HOME/bin:$KAFKA_HOME/bin

 

export JAVA_HOME HADOOP_HOME ZOOKEEPER_HOME$HIVE_HOME  MAVEN_HOME MAHOUT_HOMEMAHOUT_LOCAL SCALA_HOME STORM_HOME REDIS_HOME FLUME_HOME KAFKA_HOME PATH

export HADOOP_HOME_WARN_SUPPRESS=1

# env configure end

 

 

 

# alias configure start

alias cls='clear'

alias wai='who am i'

alias cdd='cd ..'

alias cdp='cd -'

alias sep='source /etc/profile'

alias yousister='reboot'

# alias configure end

# Kafka之入门案例一 (Quick Start官网的例子)

# 前言

这个简单例子的最后后面一句英文,简直就是坑爹,英文原文为:

If you have each of the above commandsrunning in a different terminal then you should now be able to type messagesinto the producer terminal and see them appear in the consumer terminal.

All of the command line tools haveadditional options; running the command with no arguments will display usageinformation documenting them in more detail.(如果你有上面的命令运行在一个不同的终端,那么你现在应该可以输入消息生产者终端和看到他们出现在消费终端。所有的命令行工具附加选项,不使用任何参数运行命令将显示使用信息记录他们的更多细节。

这要是不懂linux的一直在一个终端运行以上命令,还不怒砸电脑了,各种错!

# 下载kafka_2.9.2-0.8.1.1.tgz

# Start the server (启动server)

# 启动kafka自带的zookeeper(当然也可以使用外部的)

这里以后台方式启动哦,不然退出就没了,饿有点懒、、、

[root@rs229 kafka_2.9.2-0.8.1.1]#bin/zookeeper-server-start.sh config/zookeeper.properties &

[2014-06-26 22:07:43,287] INFO Reading configurationfrom: config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)

[2014-06-26 22:07:43,288] WARN Either no config or noquorum defined in config, running  instandalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)

[2014-06-26 22:07:43,316] INFO Reading configurationfrom: config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)

[2014-06-26 22:07:43,317] INFO Starting server(org.apache.zookeeper.server.ZooKeeperServerMain)

[2014-06-26 22:07:43,326] INFO Serverenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,326] INFO Serverenvironment:host.name=rs229 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Serverenvironment:java.version=1.7.0_60 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Serverenvironment:java.vendor=Oracle Corporation(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Serverenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Server environment:java.class.path=:/…/kafka_2.8.0*.jar(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Serverenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Serverenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Serverenvironment:java.compiler=<NA>(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Serverenvironment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Serverenvironment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,327] INFO Server environment:os.version=2.6.32-279.el6.x86_64(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,328] INFO Serverenvironment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,328] INFO Serverenvironment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,328] INFO Serverenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,338] INFO tickTime set to 3000(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,338] INFO minSessionTimeout setto -1 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,338] INFO maxSessionTimeout setto -1 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-26 22:07:43,358] INFObinding to port 0.0.0.0/0.0.0.0:2181(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:07:43,379] INFO Snapshotting: 0(org.apache.zookeeper.server.persistence.FileTxnSnapLog)

---------------------------------------------------------------------------------------------------------------------------------

[root@rs229 kafka_2.9.2-0.8.1.1]#ps -ef | grep zookeeper

root     1260211904  3 22:13 pts/0    00:00:00/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -Xmx512M -Xms512M -server-XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark-XX:+DisableExplicitGC -Djava.awt.headless=true-Xloggc:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs/zookeeper-gc.log-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps-Dcom.sun.management.jmxremote-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs-Dlog4j.configuration=file:bin/../config/log4j.properties -cp :/…/kafka_2.8.0*.jarorg.apache.zookeeper.server.quorum.QuorumPeerMain config/zookeeper.properties

root     1264911904  0 22:13 pts/0    00:00:00 grep zookeeper

# 启动kafka的server

[root@rs229 kafka_2.9.2-0.8.1.1]#bin/kafka-server-start.sh config/server.properties &

[2] 12678

[root@rs229 kafka_2.9.2-0.8.1.1]# [2014-06-2622:17:10,854] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,907] INFO Property broker.id isoverridden to 0 (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,907] INFO Propertylog.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,907] INFO Property log.dirs isoverridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,907] INFO Propertylog.retention.check.interval.ms is overridden to 60000(kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,908] INFO Property log.retention.hoursis overridden to 168 (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,908] INFO Propertylog.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,908] INFO Propertynum.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,908] INFO Propertynum.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,908] INFO Propertynum.partitions is overridden to 2 (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,909] INFO Property port isoverridden to 9092 (kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,909] INFO Propertysocket.receive.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,909] INFO Propertysocket.request.max.bytes is overridden to 104857600(kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,909] INFO Propertysocket.send.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,909] INFO Propertyzookeeper.connect is overridden to localhost:2181(kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,909] INFO Propertyzookeeper.connection.timeout.ms is overridden to 1000000(kafka.utils.VerifiableProperties)

[2014-06-26 22:17:10,925] INFO [Kafka Server 0],starting (kafka.server.KafkaServer)

[2014-06-26 22:17:10,927] INFO [Kafka Server 0],Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)

[2014-06-26 22:17:10,938] INFO Starting ZkClientevent thread. (org.I0Itec.zkclient.ZkEventThread)

[2014-06-26 22:17:10,945] INFO Clientenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,945] INFO Clientenvironment:host.name=rs229 (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,945] INFO Clientenvironment:java.version=1.7.0_60 (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,945] INFO Clientenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,945] INFO Clientenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,945] INFO Clientenvironment:java.class.path=:/…/kafka_2.8.0*.jar(org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,945] INFO Clientenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,946] INFO Clientenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,946] INFO Clientenvironment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,946] INFO Clientenvironment:os.name=Linux (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,946] INFO Clientenvironment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,946] INFO Clientenvironment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,946] INFO Clientenvironment:user.name=root (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,946] INFO Clientenvironment:user.home=/root (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,946] INFO Clientenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,947] INFO Initiating clientconnection, connectString=localhost:2181 sessionTimeout=6000watcher=org.I0Itec.zkclient.ZkClient@7c8b7ac9 (org.apache.zookeeper.ZooKeeper)

[2014-06-26 22:17:10,960] INFO Opening socket connection to serverlocalhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)

[2014-06-26 22:17:10,967] INFO Socket connection established tolocalhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)

[2014-06-26 22:17:10,969] INFO Accepted socket connection from /127.0.0.1:43490(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:17:10,976] INFO Client attempting toestablish new session at /127.0.0.1:43490(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:17:10,980] INFO Creating new log file:log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)

[2014-06-26 22:17:10,994] INFO Established session0x146d885fce70000 with negotiated timeout 6000 for client /127.0.0.1:43490(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:17:10,997] INFO Session establishment complete on serverlocalhost/127.0.0.1:2181, sessionid = 0x146d885fce70000, negotiated timeout =6000 (org.apache.zookeeper.ClientCnxn)

[2014-06-26 22:17:11,001] INFO zookeeper state changed (SyncConnected)(org.I0Itec.zkclient.ZkClient)

[2014-06-26 22:17:11,042] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0x4 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/brokers Error:KeeperErrorCode = NoNode for /brokers (org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:17:11,057] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0xa zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/configError:KeeperErrorCode = NoNode for /config(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:17:11,064] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0x10 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error Path:/adminError:KeeperErrorCode = NoNode for /admin(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:17:11,119] INFO Log directory'/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)

[2014-06-26 22:17:11,133] INFO Starting log cleanupwith a period of 60000 ms. (kafka.log.LogManager)

[2014-06-26 22:17:11,137] INFO Starting log flusherwith a default period of 9223372036854775807 ms. (kafka.log.LogManager)

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

[2014-06-26 22:17:11,169] INFO Awaiting socketconnections on 0.0.0.0:9092. (kafka.network.Acceptor)

[2014-06-26 22:17:11,171] INFO [Socket Server onBroker 0], Started (kafka.network.SocketServer)

[2014-06-26 22:17:11,240] INFO Will not load MX4J,mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)

[2014-06-26 22:17:11,275] INFO 0 successfully electedas leader (kafka.server.ZookeeperLeaderElector)

[2014-06-26 22:17:11,281] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:setDatacxid:0x19 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:17:11,347] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:deletecxid:0x27 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for/admin/preferred_replica_election(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:17:11,369] INFO Registered broker 0 atpath /brokers/ids/0 with address rs229:9092. (kafka.utils.ZkUtils$)

[2014-06-26 22:17:11,380] INFO [Kafka Server 0],started (kafka.server.KafkaServer)

[2014-06-26 22:17:11,470] INFO New leader is 0(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

---------------------------------------------------------------------------------------------------------------------------------

[root@rs229 kafka_2.9.2-0.8.1.1]#ps -ef | grep kafka

root     12602 11904  0 22:13 pts/0    00:00:01/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -Xmx512M -Xms512M -server-XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark-XX:+DisableExplicitGC -Djava.awt.headless=true-Xloggc:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs/zookeeper-gc.log-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps-Dcom.sun.management.jmxremote-Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs-Dlog4j.configuration=file:bin/../config/log4j.properties -cp:/…/kafka_2.8.0*.jar org.apache.zookeeper.server.quorum.QuorumPeerMainconfig/zookeeper.properties

root     12678 11904  0 22:17 pts/0    00:00:03/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -Xmx1G -Xms1G -server-XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark-XX:+DisableExplicitGC -Djava.awt.headless=true-Xloggc:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs/kafkaServer-gc.log-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false-Dcom.sun.management.jmxremote.ssl=false-Dkafka.logs.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../logs-Dlog4j.configuration=file:bin/../config/log4j.properties -cp:/…/kafka_2.8.0*.jar kafka.Kafka config/server.properties

root     12811 11904  0 22:27 pts/0    00:00:00 grep kafka

[root@rs229 kafka_2.9.2-0.8.1.1]#

Create a topic (创建一个主题)

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh--create --zookeeper localhost:2181 --replication-factor 1 --partitions 1--topic test &

[3] 12867

[root@rs229 kafka_2.9.2-0.8.1.1]# [2014-06-2622:35:26,593] INFO Accepted socket connection from /127.0.0.1:43553(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:35:26,598] INFO Client attempting toestablish new session at /127.0.0.1:43553(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:35:26,600] INFO Established session0x146d885fce70001 with negotiated timeout 30000 for client /127.0.0.1:43553(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:35:26,741] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70001 type:setDatacxid:0x3 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/config/topics/test Error:KeeperErrorCode = NoNode for /config/topics/test(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:35:26,753] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70001 type:createcxid:0x4 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/config/topics Error:KeeperErrorCode = NodeExists for /config/topics(org.apache.zookeeper.server.PrepRequestProcessor)

Created topic "test".

[2014-06-26 22:35:26,768] INFO Processed sessiontermination for sessionid: 0x146d885fce70001(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:35:26,770] INFO Closed socketconnection for client /127.0.0.1:43553 which had sessionid 0x146d885fce70001(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:35:26,816] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0x3b zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/brokers/topics/test/partitions/0 Error:KeeperErrorCode = NoNode for/brokers/topics/test/partitions/0(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:35:26,818] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70000 type:createcxid:0x3c zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/brokers/topics/test/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/test/partitions(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:35:26,904] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions [test,0](kafka.server.ReplicaFetcherManager)

[2014-06-26 22:35:26,956] INFO Completed load of logtest-0 with log end offset 0 (kafka.log.Log)

[2014-06-26 22:35:26,968] INFO Created log forpartition [test,0] in /tmp/kafka-logs with properties {segment.index.bytes-> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912,flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000,index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy ->delete, segment.ms -> 604800000, max.message.bytes -> 1000012,flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5,retention.ms -> 604800000}. (kafka.log.LogManager)

[2014-06-26 22:35:26,969] WARN Partition [test,0] onbroker 0: No checkpointed highwatermark is found for partition [test,0](kafka.cluster.Partition

# 查看topic,可以看到刚刚的test

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh--list --zookeeper localhost:2181

test

# Send some messages (发送一些消息)

输入一条信息(Thisis a message: The you smile until forever),并且Ctrl+z退出shell

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-console-producer.sh --broker-list localhost:9092--topic test

This is a message: The you smile until forever

[2014-06-26 22:43:29,572] INFO Closing socketconnection to /127.0.0.1. (kafka.network.Processor)

^C[2014-06-2622:43:30,528] INFO Closing socket connection to /116.255.224.229.(kafka.network.Processor)

# Start a consumer(开启一个消费者)

输入命令之后打印出一些信息,最后面显示了刚刚输入的信息:Thisis a message: The you smile until forever

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-console-producer.sh --broker-list localhost:9092--topic test

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

This is a message: The you smile until forever

[2014-06-26 22:43:29,572] INFO Closing socketconnection to /127.0.0.1. (kafka.network.Processor)

^C[2014-06-26 22:43:30,528] INFO Closing socketconnection to /116.255.224.229. (kafka.network.Processor)

[root@rs229 kafka_2.9.2-0.8.1.1]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning

[2014-06-26 22:45:14,066] INFO Accepted socketconnection from /127.0.0.1:43617 (org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:45:14,070] INFO Client attempting toestablish new session at /127.0.0.1:43617 (org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:45:14,072] INFO Established session0x146d885fce70010 with negotiated timeout 6000 for client /127.0.0.1:43617(org.apache.zookeeper.server.NIOServerCnxn)

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

[2014-06-26 22:45:14,110] INFO Accepted socketconnection from /127.0.0.1:43618 (org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:45:14,110] INFO Client attempting toestablish new session at /127.0.0.1:43618(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:45:14,111] INFO Established session0x146d885fce70011 with negotiated timeout 30000 for client /127.0.0.1:43618(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:45:14,125] INFO Processed sessiontermination for sessionid: 0x146d885fce70011(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:45:14,126] INFO Closed socketconnection for client /127.0.0.1:43618 which had sessionid 0x146d885fce70011(org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-26 22:45:14,225] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70010 type:createcxid:0x2 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-55455/ids Error:KeeperErrorCode = NoNode for/consumers/console-consumer-55455/ids(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:45:14,227] INFO Got user-level KeeperExceptionwhen processing sessionid:0x146d885fce70010 type:create cxid:0x3zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-55455 Error:KeeperErrorCode = NoNode for/consumers/console-consumer-55455 (org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:45:14,485] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70010 type:createcxid:0x16 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-55455/owners/test Error:KeeperErrorCode =NoNode for /consumers/console-consumer-55455/owners/test(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:45:14,487] INFO Got user-levelKeeperException when processing sessionid:0x146d885fce70010 type:createcxid:0x17 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-55455/owners Error:KeeperErrorCode = NoNodefor /consumers/console-consumer-55455/owners(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-26 22:45:14,623] INFO Closing socketconnection to /116.255.224.229. (kafka.network.Processor)

This is a message: The you smileuntil forever

# Kafka之入门案例二 (Java API 的使用)

# 前言

单机的案例,不是Cluster

# Zookeeper的启动(自带的)

# zookeeper.properties配置文件(Zookeeper启动时需要指定该配置文件)

[root@rs229 config]# pwd

/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/config

[root@rs229 config]# cat zookeeper.properties

# Licensed to the Apache Software Foundation (ASF)under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regardingcopyright ownership.

# The ASF licenses this file to You under the ApacheLicense, Version 2.0

# (the "License"); you may not use thisfile except in compliance with

# the License. You may obtain a copy of the License at

#

#   http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to inwriting, software

# distributed under the License is distributed on an"AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eitherexpress or implied.

# See the License for the specific language governingpermissions and

# limitations under the License.

# the directory where the snapshot is stored.

dataDir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/zookeeper

# the port at which the clients will connect

clientPort=2181

# disable the per-ip limit on the number ofconnections since this is a non-production config

maxClientCnxns=0

# Zookeeper的启动

[root@rs229 kafka_2.9.2-0.8.1.1]# pwd

/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/zookeeper-server-start.sh config/zookeeper.properties

[2014-06-29 21:43:05,531] INFO Reading configurationfrom: config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)

[2014-06-29 21:43:05,532] WARN Either no config or noquorum defined in config, running  instandalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)

[2014-06-29 21:43:05,561] INFO Reading configurationfrom: config/zookeeper.properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)

[2014-06-29 21:43:05,561] INFO Starting server(org.apache.zookeeper.server.ZooKeeperServerMain)

[2014-06-29 21:43:05,570] INFO Serverenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:host.name=rs229 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:java.version=1.7.0_60 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:java.class.path=:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/scala-library-2.9.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zkclient-0.3.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:java.compiler=<NA>(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Serverenvironment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,571] INFO Server environment:os.version=2.6.32-279.el6.x86_64(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,572] INFO Serverenvironment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,572] INFO Serverenvironment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,572] INFO Serverenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,582] INFO tickTime set to 3000(org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,582] INFO minSessionTimeout setto -1 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,582] INFO maxSessionTimeout setto -1 (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:05,602] INFO binding to port0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxn)

[2014-06-29 21:43:05,621] INFO Reading snapshot/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/zookeeper/version-2/snapshot.0(org.apache.zookeeper.server.persistence.FileSnap)

[2014-06-29 21:43:05,650] INFO Snapshotting: 40(org.apache.zookeeper.server.persistence.FileTxnSnapLog)

[2014-06-29 21:43:12,000] INFO Expiring session0x146e71987810001, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)

[2014-06-29 21:43:12,002] INFO Processed sessiontermination for sessionid: 0x146e71987810001(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-06-29 21:43:12,003] INFO Creating new log file:log.41 (org.apache.zookeeper.server.persistence.FileTxnLog)

# Kafka Server的启动

# Kafka Server配置文件(kafka server启动时需要指定该配置文件)

[root@rs229 config]# cat server.properties

# Licensed to the Apache Software Foundation (ASF)under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regardingcopyright ownership.

# The ASF licenses this file to You under the ApacheLicense, Version 2.0

# (the "License"); you may not use thisfile except in compliance with

# the License. You may obtain a copy of the License at

#

#   http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to inwriting, software

# distributed under the License is distributed on an"AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.

# See the License for the specific language governingpermissions and

# limitations under the License.

# see kafka.server.KafkaConfig for additional detailsand defaults

 

############################# Server Basics#############################

 

# The id of the broker. This must be set to a unique integerfor each broker.

broker.id=0

 

############################# Socket Server Settings#############################

 

# The port the socket server listens on

port=9092

 

# Hostname the broker will bind to. If not set, theserver will bind to all interfaces

#host.name=localhost

 

# Hostname the broker will advertise to producers andconsumers. If not set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value returnedfrom

# java.net.InetAddress.getCanonicalHostName().

#advertised.host.name=<hostname routable byclients>

 

# The port to publish to ZooKeeper for clients touse. If this is not set,

# it will publish the same port that the broker bindsto.

#advertised.port=<port accessible by clients>

 

# The number of threads handling network requests

num.network.threads=2

 

# The number of threads doing disk I/O

num.io.threads=8

 

# The send buffer (SO_SNDBUF) used by the socketserver

socket.send.buffer.bytes=1048576

 

# The receive buffer (SO_RCVBUF) used by the socketserver

socket.receive.buffer.bytes=1048576

 

# The maximum size of a request that the socketserver will accept (protection against OOM)

socket.request.max.bytes=104857600

 

 

############################# Log Basics#############################

 

# A comma seperated list of directories under whichto store log files

#log.dirs=/tmp/kafka-logs

log.dirs=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs

 

# The default number of log partitions per topic.More partitions allow greater

# parallelism for consumption, but this will alsoresult in more files across

# the brokers.

num.partitions=2

 

############################# Log Flush Policy#############################

 

# Messages are immediately written to the filesystembut by default we only fsync() to sync

# the OS cache lazily. The following configurationscontrol the flush of data to disk.

# There are a few important trade-offs here:

#    1.Durability: Unflushed data may be lost if you are not using replication.

#    2.Latency: Very large flush intervals may lead to latency spikes when the flushdoes occur as there will be a lot of data to flush.

#    3.Throughput: The flush is generally the most expensive operation, and a smallflush interval may lead to exceessive seeks.

# The settings below allow one to configure the flushpolicy to flush data after a period of time or

# every N messages (or both). This can be doneglobally and overridden on a per-topic basis.

 

# The number of messages to accept before forcing aflush of data to disk

#log.flush.interval.messages=10000

 

# The maximum amount of time a message can sit in alog before we force a flush

#log.flush.interval.ms=1000

 

############################# Log Retention Policy#############################

 

# The following configurations control the disposalof log segments. The policy can

# be set to delete segments after a period of time,or after a given size has accumulated.

# A segment will be deleted whenever *either* ofthese criteria are met. Deletion always happens

# from the end of the log.

 

# The minimum age of a log file to be eligible fordeletion

log.retention.hours=168

 

# A size-based retention policy for logs. Segmentsare pruned from the log as long as the remaining

# segments don't drop below log.retention.bytes.

#log.retention.bytes=1073741824

 

# The maximum size of a log segment file. When thissize is reached a new log segment will be created.

log.segment.bytes=536870912

 

# The interval at which log segments are checked tosee if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=60000

 

# By default the log cleaner is disabled and the logretention policy will default to just delete segments after their retentionexpires.

# If log.cleaner.enable=true is set the cleaner willbe enabled and individual logs can then be marked for log compaction.

log.cleaner.enable=false

 

############################# Zookeeper#############################

 

# Zookeeper connection string (see zookeeper docs fordetails).

# This is a comma separated host:port pairs, eachcorresponding to a zk

# server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string tothe urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=localhost:2181

 

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=1000000

# Kafka Server的启动

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh config/server.properties

[2014-06-29 21:44:30,062] INFO Verifying properties(kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,125] INFO Property broker.id isoverridden to 0 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,126] INFO Propertylog.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,126] INFO Property log.dirs isoverridden to /usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/logs(kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,126] INFO Propertylog.retention.check.interval.ms is overridden to 60000(kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,127] INFO Propertylog.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,127] INFO Propertylog.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,127] INFO Propertynum.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,127] INFO Propertynum.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,128] INFO Propertynum.partitions is overridden to 2 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,128] INFO Property port isoverridden to 9092 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,128] INFO Propertysocket.receive.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,129] INFO Property socket.request.max.bytesis overridden to 104857600 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,129] INFO Propertysocket.send.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,129] INFO Property zookeeper.connectis overridden to localhost:2181 (kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,130] INFO Propertyzookeeper.connection.timeout.ms is overridden to 1000000(kafka.utils.VerifiableProperties)

[2014-06-29 21:44:30,156] INFO [Kafka Server 0],starting (kafka.server.KafkaServer)

[2014-06-29 21:44:30,158] INFO [Kafka Server 0],Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)

[2014-06-29 21:44:30,170] INFO Starting ZkClientevent thread. (org.I0Itec.zkclient.ZkEventThread)

[2014-06-29 21:44:30,177] INFO Clientenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Clientenvironment:host.name=rs229 (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Clientenvironment:java.version=1.7.0_60 (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Clientenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Client environment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Clientenvironment:java.class.path=:/…/kafka_2.8.0*.jar(org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Clientenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Client environment:java.compiler=<NA>(org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,177] INFO Clientenvironment:os.name=Linux (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,178] INFO Clientenvironment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,178] INFO Clientenvironment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,178] INFO Clientenvironment:user.name=root (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,178] INFO Client environment:user.home=/root(org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,178] INFO Clientenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,178] INFO Initiating client connection,connectString=localhost:2181 sessionTimeout=6000watcher=org.I0Itec.zkclient.ZkClient@7c8b7ac9 (org.apache.zookeeper.ZooKeeper)

[2014-06-29 21:44:30,191] INFO Opening socketconnection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)

[2014-06-29 21:44:30,198] INFO Socket connectionestablished to localhost/127.0.0.1:2181, initiating session(org.apache.zookeeper.ClientCnxn)

[2014-06-29 21:44:30,215] INFO Session establishmentcomplete on server localhost/127.0.0.1:2181, sessionid = 0x146e7dd68ba0000,negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)

[2014-06-29 21:44:30,218] INFO zookeeper statechanged (SyncConnected) (org.I0Itec.zkclient.ZkClient)

[2014-06-29 21:44:30,441] INFO Loading log'page_visits-1' (kafka.log.LogManager)

[2014-06-29 21:44:30,464] INFO Recovering unflushedsegment 0 in log page_visits-1. (kafka.log.Log)

[2014-06-29 21:44:30,472] INFO Completed load of logpage_visits-1 with log end offset 0 (kafka.log.Log)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

[2014-06-29 21:44:30,485] INFO Loading log'page_visits-0' (kafka.log.LogManager)

[2014-06-29 21:44:30,486] INFO Recovering unflushedsegment 0 in log page_visits-0. (kafka.log.Log)

[2014-06-29 21:44:30,486] INFO Completed load of logpage_visits-0 with log end offset 0 (kafka.log.Log)

[2014-06-29 21:44:30,487] INFO Loading log 'test-0' (kafka.log.LogManager)

[2014-06-29 21:44:30,488] INFO Recovering unflushedsegment 0 in log test-0. (kafka.log.Log)

[2014-06-29 21:44:30,488] INFO Completed load of logtest-0 with log end offset 0 (kafka.log.Log)

[2014-06-29 21:44:30,489] INFO Loading log 'page_visits-4'(kafka.log.LogManager)

[2014-06-29 21:44:30,490] INFO Recovering unflushedsegment 0 in log page_visits-4. (kafka.log.Log)

[2014-06-29 21:44:30,491] INFO Completed load of logpage_visits-4 with log end offset 0 (kafka.log.Log)

[2014-06-29 21:44:30,492] INFO Loading log'page_visits-3' (kafka.log.LogManager)

[2014-06-29 21:44:30,493] INFO Recovering unflushedsegment 0 in log page_visits-3. (kafka.log.Log)

[2014-06-29 21:44:30,493] INFO Completed load of logpage_visits-3 with log end offset 0 (kafka.log.Log)

[2014-06-29 21:44:30,494] INFO Loading log'page_visits-2' (kafka.log.LogManager)

[2014-06-29 21:44:30,495] INFO Recovering unflushedsegment 0 in log page_visits-2. (kafka.log.Log)

[2014-06-29 21:44:30,495] INFO Completed load of logpage_visits-2 with log end offset 0 (kafka.log.Log)

[2014-06-29 21:44:30,496] INFO Starting log cleanupwith a period of 60000 ms. (kafka.log.LogManager)

[2014-06-29 21:44:30,500] INFO Starting log flusherwith a default period of 9223372036854775807 ms. (kafka.log.LogManager)

[2014-06-29 21:44:30,517] INFO Awaiting socketconnections on 0.0.0.0:9092. (kafka.network.Acceptor)

[2014-06-29 21:44:30,518] INFO [Socket Server onBroker 0], Started (kafka.network.SocketServer)

[2014-06-29 21:44:30,588] INFO Will not load MX4J,mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)

[2014-06-29 21:44:30,614] INFO 0 successfully electedas leader (kafka.server.ZookeeperLeaderElector)

[2014-06-29 21:44:31,038] INFO New leader is 0(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

[2014-06-29 21:44:31,042] INFO Registered broker 0 atpath /brokers/ids/0 with address rs229:9092. (kafka.utils.ZkUtils$)

[2014-06-29 21:44:31,059] INFO [Kafka Server 0],started (kafka.server.KafkaServer)

[2014-06-29 21:44:31,319] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions[page_visits,4],[page_visits,2],[page_visits,0],[page_visits,3],[page_visits,1],[test,0](kafka.server.ReplicaFetcherManager)

[2014-06-29 21:44:31,392] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions[page_visits,4],[page_visits,2],[page_visits,0],[page_visits,3],[page_visits,1],[test,0](kafka.server.ReplicaFetcherManager)

 

# 删除以前的Topic

[root@rs229 kafka_2.9.2-0.8.1.1]# kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yting_page_visits--zookeeper localhost:2181

deletion succeeded!

上面红色框框的是官网给的命令,貌似是0.8.0版本的,在0.8.1版本中行不通,郁闷了!所以就使用kafka-run-class.sh这样的命令来删除topic

进入zookeeper确认下是否borker被删除了

[root@rs229 kafka_2.9.2-0.8.1.1]# zookeeper-shell.sh

USAGE:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/zookeeper-shell.shzookeeper_host:port[/path] [args...]

[root@rs229 kafka_2.9.2-0.8.1.1]# zookeeper-shell.shlocalhost:2181

Connecting to localhost:2181

Welcome to ZooKeeper!

JLine support is disabled

 

WATCHER::

 

WatchedEvent state:SyncConnected type:None path:null

ls /

[consumers, config, controller,zookeeper, brokers, admin, controller_epoch]

ls /brokers

[topics, ids]

ls /brokers/topics

[test]

可以确认zookeeper中的broker也已经被删除了,ok

# 在Linux shell下创建一个Topic

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1 --partitions 5 --topic yting_page_visits

Created topic "yting_page_visits".

[root@rs229 kafka_2.9.2-0.8.1.1]#

# 这里是一个错误Topic的的创建,原因跟解决方案都有,可以参考参考(单机版原因)

[root@rs229kafka_2.9.2-0.8.1.1]# bin/kafka-topics.sh --create--zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic yting_page_visits

Errorwhile executing topic command replication factor: 3 larger than availablebrokers: 1

kafka.admin.AdminOperationException:replication factor: 3 larger than available brokers: 1

    atkafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)

    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)

    atkafka.admin.TopicCommand$.createTopic(TopicCommand.scala:86)

    atkafka.admin.TopicCommand$.main(TopicCommand.scala:50)

    atkafka.admin.TopicCommand.main(TopicCommand.scala)

原因:因为这是单机测试,创建topic的参数--replication-factor3,饿在电脑上只开了一个broker,报错提示replicationfactor: 3 larger than available brokers: 1

中可以看出来的、、、

解决:--replication-factor 3改成--replication-factor1就行了

 

# TestProducer.java运行生产者的代码,在Eclipse下运行就行了


控制台的输出信息:

log4j:WARN No appenders could befound for logger (kafka.utils.VerifiableProperties).

log4j:WARN Please initialize thelog4j system properly.

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation(NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinderfor further details.

# 在服务器上查看数据

可以在Linux shell下运行命令,然后就看到生产者Producer生产的数据了

说明一下这里看的是5个分区的数据哦

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic yting_page_visits --from-beginning

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

id---> 3 --->1404281896546,www.ytingxmei1129.com,13.14.20.179

id---> 6 --->1404281897616,www.ytingxmei1129.com,13.14.20.114

id---> 8 --->1404281897786,www.ytingxmei1129.com,13.14.20.64

id---> 19 --->1404281898657,www.ytingxmei1129.com,13.14.20.54

id---> 24 --->1404281898874,www.ytingxmei1129.com,13.14.20.19

id---> 30 --->1404281899132,www.ytingxmei1129.com,13.14.20.189

id---> 42 --->1404281899657,www.ytingxmei1129.com,13.14.20.99

id---> 48 --->1404281900467,www.ytingxmei1129.com,13.14.20.154

id---> 62 --->1404281901328,www.ytingxmei1129.com,13.14.20.129

id---> 63 --->1404281901374,www.ytingxmei1129.com,13.14.20.29

id---> 65 --->1404281901461,www.ytingxmei1129.com,13.14.20.139

id---> 74 --->1404281901856,www.ytingxmei1129.com,13.14.20.69

id---> 80 --->1404281902158,www.ytingxmei1129.com,13.14.20.94

id---> 81 --->1404281902204,www.ytingxmei1129.com,13.14.20.74

id---> 83 --->1404281902289,www.ytingxmei1129.com,13.14.20.204

id---> 84 --->1404281902330,www.ytingxmei1129.com,13.14.20.59

id---> 87 --->1404281902462,www.ytingxmei1129.com,13.14.20.154

id---> 89 --->1404281902548,www.ytingxmei1129.com,13.14.20.229

id---> 92 --->1404281902676,www.ytingxmei1129.com,13.14.20.104

id---> 98 --->1404281903106,www.ytingxmei1129.com,13.14.20.49

id---> 10 --->1404281897959,www.ytingxmei1129.com,13.14.20.118

id---> 12 --->1404281898130,www.ytingxmei1129.com,13.14.20.173

id---> 17 --->1404281898517,www.ytingxmei1129.com,13.14.20.98

id---> 18 --->1404281898610,www.ytingxmei1129.com,13.14.20.173

id---> 21 --->1404281898742,www.ytingxmei1129.com,13.14.20.158

id---> 34 --->1404281899306,www.ytingxmei1129.com,13.14.20.83

id---> 35 --->1404281899353,www.ytingxmei1129.com,13.14.20.58

id---> 40 --->1404281899571,www.ytingxmei1129.com,13.14.20.53

id---> 46 --->1404281900298,www.ytingxmei1129.com,13.14.20.118

id---> 53 --->1404281900848,www.ytingxmei1129.com,13.14.20.198

id---> 54 --->1404281900933,www.ytingxmei1129.com,13.14.20.118

id---> 58 --->1404281901154,www.ytingxmei1129.com,13.14.20.228

id---> 60 --->1404281901238,www.ytingxmei1129.com,13.14.20.223

id---> 61 --->1404281901287,www.ytingxmei1129.com,13.14.20.8

id---> 67 --->1404281901550,www.ytingxmei1129.com,13.14.20.18

id---> 71 --->1404281901728,www.ytingxmei1129.com,13.14.20.133

id---> 75 --->1404281901901,www.ytingxmei1129.com,13.14.20.133

id---> 78 --->1404281902070,www.ytingxmei1129.com,13.14.20.58

id---> 82 --->1404281902247,www.ytingxmei1129.com,13.14.20.68

id---> 85 --->1404281902374,www.ytingxmei1129.com,13.14.20.48

id---> 86 --->1404281902416,www.ytingxmei1129.com,13.14.20.183

id---> 90 --->1404281902592,www.ytingxmei1129.com,13.14.20.143

id---> 91 --->1404281902634,www.ytingxmei1129.com,13.14.20.183

id---> 94 --->1404281902764,www.ytingxmei1129.com,13.14.20.228

id---> 95 --->1404281902850,www.ytingxmei1129.com,13.14.20.233

id---> 96 --->1404281902934,www.ytingxmei1129.com,13.14.20.143

id---> 1 --->1404281895240,www.ytingxmei1129.com,13.14.20.16

id---> 2 --->1404281896467,www.ytingxmei1129.com,13.14.20.96

id---> 7 --->1404281897700,www.ytingxmei1129.com,13.14.20.246

id---> 14 --->1404281898302,www.ytingxmei1129.com,13.14.20.1

id---> 16 --->1404281898472,www.ytingxmei1129.com,13.14.20.116

id---> 25 --->1404281898916,www.ytingxmei1129.com,13.14.20.106

id---> 28 --->1404281899047,www.ytingxmei1129.com,13.14.20.121

id---> 29 --->1404281899090,www.ytingxmei1129.com,13.14.20.141

id---> 36 --->1404281899395,www.ytingxmei1129.com,13.14.20.86

id---> 44 --->1404281899789,www.ytingxmei1129.com,13.14.20.56

id---> 49 --->1404281900511,www.ytingxmei1129.com,13.14.20.111

id---> 51 --->1404281900679,www.ytingxmei1129.com,13.14.20.111

id---> 72 --->1404281901770,www.ytingxmei1129.com,13.14.20.21

id---> 77 --->1404281902028,www.ytingxmei1129.com,13.14.20.6

id---> 79 --->1404281902116,www.ytingxmei1129.com,13.14.20.16

id---> 97 --->1404281903019,www.ytingxmei1129.com,13.14.20.21

id---> 4 --->1404281896631,www.ytingxmei1129.com,13.14.20.62

id---> 9 --->1404281897873,www.ytingxmei1129.com,13.14.20.27

id---> 20 --->1404281898701,www.ytingxmei1129.com,13.14.20.167

id---> 22 --->1404281898785,www.ytingxmei1129.com,13.14.20.182

id---> 32 --->1404281899220,www.ytingxmei1129.com,13.14.20.32

id---> 33 --->1404281899263,www.ytingxmei1129.com,13.14.20.32

id---> 37 --->1404281899438,www.ytingxmei1129.com,13.14.20.177

id---> 38 --->1404281899485,www.ytingxmei1129.com,13.14.20.92

id---> 39 --->1404281899529,www.ytingxmei1129.com,13.14.20.72

id---> 41 --->1404281899616,www.ytingxmei1129.com,13.14.20.47

id---> 45 --->1404281899874,www.ytingxmei1129.com,13.14.20.252

id---> 52 --->1404281900765,www.ytingxmei1129.com,13.14.20.217

id---> 55 --->1404281901017,www.ytingxmei1129.com,13.14.20.47

id---> 57 --->1404281901110,www.ytingxmei1129.com,13.14.20.177

id---> 64 --->1404281901419,www.ytingxmei1129.com,13.14.20.242

id---> 68 --->1404281901596,www.ytingxmei1129.com,13.14.20.62

id---> 70 --->1404281901684,www.ytingxmei1129.com,13.14.20.37

id---> 88 --->1404281902506,www.ytingxmei1129.com,13.14.20.72

id---> 5 --->1404281897529,www.ytingxmei1129.com,13.14.20.40

id---> 11 --->1404281898044,www.ytingxmei1129.com,13.14.20.110

id---> 13 --->1404281898215,www.ytingxmei1129.com,13.14.20.20

id---> 15 --->1404281898387,www.ytingxmei1129.com,13.14.20.165

id---> 23 --->1404281898832,www.ytingxmei1129.com,13.14.20.75

id---> 26 --->1404281898959,www.ytingxmei1129.com,13.14.20.30

id---> 27 --->1404281899004,www.ytingxmei1129.com,13.14.20.55

id---> 31 ---> 1404281899176,www.ytingxmei1129.com,13.14.20.180

id---> 43 --->1404281899703,www.ytingxmei1129.com,13.14.20.200

id---> 47 --->1404281900381,www.ytingxmei1129.com,13.14.20.40

id---> 50 --->1404281900596,www.ytingxmei1129.com,13.14.20.95

id---> 56 --->1404281901064,www.ytingxmei1129.com,13.14.20.20

id---> 59 --->1404281901196,www.ytingxmei1129.com,13.14.20.30

id---> 66 --->1404281901502,www.ytingxmei1129.com,13.14.20.155

id---> 69 --->1404281901640,www.ytingxmei1129.com,13.14.20.55

id---> 73 --->1404281901811,www.ytingxmei1129.com,13.14.20.120

id---> 76 --->1404281901944,www.ytingxmei1129.com,13.14.20.125

id---> 93 --->1404281902719,www.ytingxmei1129.com,13.14.20.0

id---> 99 --->1404281903191,www.ytingxmei1129.com,13.14.20.65

id---> 100 --->1404281903277,www.ytingxmei1129.com,13.14.20.75

^C[2014-07-02 14:18:59,463] WARN Reconnect due tosocket error: null (kafka.consumer.SimpleConsumer)

Consumed 100 messages

[root@rs229 kafka_2.9.2-0.8.1.1]#

第一:数据无序的

第二:该你自己想了(提示:请看TestSimplePartitioner.java这个类)

TestSimplePartitioner.java运行消费者代码,在Eclipse下运行就行了

# partition 0的数据:0~19(一共20条)

# partition 1的数据:0~15(一共16条)

# partition 2的数据:0~17(一共18条)

# partition 3的数据:0~25(一共26条)

# partition 4的数据:0~19(一共20条)

# 总结

20+16+18+26+20刚好有100条数据,没有丢失!分区数据的数据大小也很均衡的样子,如果使用一致性哈希的话就跟均衡了

# 被认为是错误信息的信息

# Kafka-server会出现这样的信息:

[2014-07-01 22:54:07,583] ERROR Closing socket for/116.255.224.229because of error (kafka.network.Processor)

java.io.IOException: Connection reset by peer

       atsun.nio.ch.FileDispatcherImpl.write0(Native Method)

       atsun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)

       atsun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)

       atsun.nio.ch.IOUtil.write(IOUtil.java:65)

       atsun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)

       atkafka.api.PartitionDataSend.writeTo(FetchResponse.scala:67)

       atkafka.network.MultiSend.writeTo(Transmission.scala:102)

       atkafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)

       atkafka.network.MultiSend.writeTo(Transmission.scala:102)

       atkafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)

       atkafka.network.Processor.write(SocketServer.scala:375)

       atkafka.network.Processor.run(SocketServer.scala:247)

       atjava.lang.Thread.run(Thread.java:745)

# Kafka-zookeeper会出现这样的信息:

[2014-07-01 22:53:22,561] INFO Got user-levelKeeperException when processing sessionid:0x146f265ce780006 type:setDatacxid:0x37 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-84796/offsets/yting_page_visits/4Error:KeeperErrorCode = NoNode for /consumers/console-consumer-84796/offsets/yting_page_visits/4(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-07-01 22:53:22,563] INFO Got user-levelKeeperException when processing sessionid:0x146f265ce780006 type:createcxid:0x38 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a ErrorPath:/consumers/console-consumer-84796/offsets/yting_page_visitsError:KeeperErrorCode = NodeExists for/consumers/console-consumer-84796/offsets/yting_page_visits(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-07-01 22:54:07,589] INFO Processed sessiontermination for sessionid: 0x146f265ce780006(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-07-01 22:54:07,591] INFO Closed socketconnection for client /127.0.0.1:38575 which had sessionid 0x146f265ce780006(org.apache.zookeeper.server.NIOServerCnxn)

[2014-07-01 22:54:07,594] INFO Accepted socketconnection from /127.0.0.1:38586 (org.apache.zookeeper.server.NIOServerCnxn)

[2014-07-01 22:54:07,594] INFO Client attempting toestablish new session at /127.0.0.1:38586(org.apache.zookeeper.server.NIOServerCnxn)

[2014-07-01 22:54:07,596] INFO Established session0x146f265ce780008 with negotiated timeout 30000 for client /127.0.0.1:38586(org.apache.zookeeper.server.NIOServerCnxn)

[2014-07-01 22:54:07,628] INFO Processed sessiontermination for sessionid: 0x146f265ce780008(org.apache.zookeeper.server.PrepRequestProcessor)

[2014-07-01 22:54:07,629] INFO Closed socketconnection for client /127.0.0.1:38586 which had sessionid 0x146f265ce780008(org.apache.zookeeper.server.NIOServerCnxn)

# 错误信息分析

分析:看下面两条信息,一个是kafka-server打印出来的日志信息,一个是kafka-zookeeper打印出来的日志信息,这是由于运行命令bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic yting_page_visits --from-beginning后按了Ctrl+C才会出现Kafka-server上面这样的信息,至于kafka-zookeeper出现的信息,也不懂,后面再来弄了,留个悬念!、、、

# 本案例源代码

# 生产者 Producer

package com.yting.cloud.kafka.producer;

 

import java.util.*;

 

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

 

/**

 * Kafka官网给的案例 Producer,饿在Eclipse下本地连接服务器测试,所以修改了一些代码

 *

 * @Author王扬庭

 * @Time 2014-07-01

 *

 */

publicclass TestProducer {

    publicstaticvoid main(String[] args) {

//     longevents = Long.parseLong(args[0]);

       long events = 100;

       Randomrnd = new Random();

 

       Propertiesprops = new Properties();

//     props.put("metadata.broker.list","broker1:9092,broker2:9092");

       props.put("metadata.broker.list","rs229:9092");// Eclipsers229在本地hosts也要配置,或者写成ip形式也可以

       props.put("serializer.class","kafka.serializer.StringEncoder");

       props.put("partitioner.class","com.yting.cloud.kafka.partition.TestSimplePartitioner");

       props.put("request.required.acks","1");

 

       ProducerConfigconfig = newProducerConfig(props);

 

       Producer<String,String> producer = newProducer<String, String>(config);

 

       for (long nEvents = 0;nEvents < events; nEvents++) {

           long runtime =new Date().getTime();

           Stringip = "13.14.20." +rnd.nextInt(255);

           Stringmsg = runtime + ",www.ytingxmei1129.com," + ip;

           KeyedMessage<String,String> data = newKeyedMessage<String, String>("yting_page_visits", ip, msg);

           producer.send(data);

       }

       producer.close();

    }

}

# 分区 Partition

package com.yting.cloud.kafka.partition;

 

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

 

/**

 * Kafka官网给的案例 SimplePartitioner,官网给的是0.8.0的版本,跟0.8.1的版本不一样,所以改了下,你懂的!

 *

 * @Author 王扬庭

 * @Time 2014-07-01

 *

 */

public class TestSimplePartitioner implementsPartitioner {

    public TestSimplePartitioner(VerifiableProperties props) {

 

    }

 

    // public int partition(String key, int a_numPartitions){

    // int partition = 0;

    // int offset = key.lastIndexOf('.');

    // if (offset > 0) {

    // partition = Integer.parseInt(key.substring(offset + 1)) %

    // a_numPartitions;

    // }

    // return partition;

    // }

 

    @Override

    public int partition(Object obj, inta_numPartitions) {

       String key = obj.toString();

       int partition = 0;

       int offset = key.lastIndexOf('.');

       if (offset > 0) {

           partition = Integer.parseInt(key.substring(offset +1))

                  % a_numPartitions;

       }

       return partition;

    }

 

}

# 消费者 Consumer

请参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

package com.yting.cloud.kafka.consumer;

 

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.*;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

 

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

/**

 * Kafka官网给的案例SimpleConsumer,饿在Eclipse本地连接服务器测试,所以修改了一些代码

 *

 * @Author 王扬庭

 * @Time2014-06-29 15:09:21

 *

 */

public class TestSimpleConsumer {

       publicstatic void main(String args[]) {

              TestSimpleConsumerexample = new TestSimpleConsumer();

              //long maxReads = Long.parseLong(args[0]);

              //String topic = args[1];

              //int partition = Integer.parseInt(args[2]);

              //seeds.add(args[3]);

              //int port = Integer.parseInt(args[4]);

              longmaxReads = 100;

              Stringtopic = "yting_page_visits";

              intpartition = 1;

              List<String>seeds = new ArrayList<String>();

              seeds.add("rs229");

              intport = Integer.parseInt("9092");

              try{

                     example.run(maxReads,topic, partition, seeds, port);

              }catch (Exception e) {

                     System.out.println("Oops:"+ e);

                     e.printStackTrace();

              }

       }

 

       privateList<String> m_replicaBrokers = new ArrayList<String>();

 

       publicTestSimpleConsumer() {

              m_replicaBrokers= new ArrayList<String>();

       }

 

       publicvoid run(long a_maxReads, String a_topic, int a_partition, List<String>a_seedBrokers, int a_port) throws Exception {

              //find the meta data about the topic and partition we are interested in

              //

              PartitionMetadatametadata = findLeader(a_seedBrokers, a_port, a_topic,

                            a_partition);

              if(metadata == null) {

                     System.out

                                   .println("Can'tfind metadata for Topic and Partition. Exiting");

                     return;

              }

              if(metadata.leader() == null) {

                     System.out

                                   .println("Can'tfind Leader for Topic and Partition. Exiting");

                     return;

              }

              StringleadBroker = metadata.leader().host();

              StringclientName = "Client_" + a_topic + "_" + a_partition;

 

              SimpleConsumerconsumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024,clientName);

              longreadOffset = getLastOffset(consumer, a_topic, a_partition,

                            kafka.api.OffsetRequest.EarliestTime(),clientName);

 

              intnumErrors = 0;

              while(a_maxReads > 0) {

                     if(consumer == null) {

                            consumer= new SimpleConsumer(leadBroker, a_port, 100000,

                                          64* 1024, clientName);

                     }

                     FetchRequestreq = new FetchRequestBuilder().clientId(clientName)

                                   .addFetch(a_topic,a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might needto be increased if large batches are written to Kafka

                                   .build();

                     FetchResponsefetchResponse = consumer.fetch(req);

 

                     if(fetchResponse.hasError()) {

                            numErrors++;

                            //Something went wrong!

                            shortcode = fetchResponse.errorCode(a_topic, a_partition);

                            System.out.println("Errorfetching data from the Broker:"

                                          +leadBroker + " Reason: " + code);

                            if(numErrors > 5)

                                   break;

                            if(code == ErrorMapping.OffsetOutOfRangeCode()) {

                                   //We asked for an invalid offset. For simple case ask for

                                   //the last element to reset

                                   readOffset= getLastOffset(consumer, a_topic, a_partition,

                                                 kafka.api.OffsetRequest.LatestTime(),clientName);

                                   continue;

                            }

                            consumer.close();

                            consumer= null;

                            leadBroker= findNewLeader(leadBroker, a_topic, a_partition,

                                          a_port);

                            continue;

                     }

                     numErrors= 0;

 

                     longnumRead = 0;

                     for(MessageAndOffset messageAndOffset : fetchResponse.messageSet(

                                   a_topic,a_partition)) {

                            longcurrentOffset = messageAndOffset.offset();

                            if(currentOffset < readOffset) {

                                   System.out.println("Foundan old offset: " + currentOffset

                                                 +" Expecting: " + readOffset);

                                   continue;

                            }

                            readOffset= messageAndOffset.nextOffset();

                            ByteBufferpayload = messageAndOffset.message().payload();

 

                            byte[]bytes = new byte[payload.limit()];

                            payload.get(bytes);

                            System.out.println(String.valueOf(messageAndOffset.offset())

                                          +": " + new String(bytes, "UTF-8"));

                            numRead++;

                            a_maxReads--;

                     }

 

                     if(numRead == 0) {

                            try{

                                   Thread.sleep(1000);

                            }catch (InterruptedException ie) {

                            }

                     }

              }

              if(consumer != null)

                     consumer.close();

       }

 

       publicstatic long getLastOffset(SimpleConsumer consumer, String topic,

                     intpartition, long whichTime, String clientName) {

              TopicAndPartitiontopicAndPartition = new TopicAndPartition(topic,

                            partition);

              Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();

              requestInfo.put(topicAndPartition,new PartitionOffsetRequestInfo(

                            whichTime,1));

              kafka.javaapi.OffsetRequestrequest = new kafka.javaapi.OffsetRequest(

                            requestInfo,kafka.api.OffsetRequest.CurrentVersion(),

                            clientName);

              OffsetResponseresponse = consumer.getOffsetsBefore(request);

 

              if(response.hasError()) {

                     System.out

                                   .println("Errorfetching data Offset Data the Broker. Reason: "

                                                 +response.errorCode(topic, partition));

                     return0;

              }

              long[]offsets = response.offsets(topic, partition);

              returnoffsets[0];

       }

 

       privateString findNewLeader(String a_oldLeader, String a_topic,

                     inta_partition, int a_port) throws Exception {

              for(int i = 0; i < 3; i++) {

                     booleangoToSleep = false;

                     PartitionMetadatametadata = findLeader(m_replicaBrokers, a_port,

                                   a_topic,a_partition);

                     if(metadata == null) {

                            goToSleep= true;

                     }else if (metadata.leader() == null) {

                            goToSleep= true;

                     }else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())

                                   &&i == 0) {

                            //first time through if the leader hasn't changed give

                            //ZooKeeper a second to recover

                            //second time, assume the broker did recover before failover,

                            //or it was a non-Broker issue

                            //

                            goToSleep= true;

                     }else {

                            returnmetadata.leader().host();

                     }

                     if(goToSleep) {

                            try{

                                   Thread.sleep(1000);

                            }catch (InterruptedException ie) {

                            }

                     }

              }

              System.out

                            .println("Unableto find new leader after Broker failure. Exiting");

              thrownew Exception(

                            "Unableto find new leader after Broker failure. Exiting");

       }

 

       privatePartitionMetadata findLeader(List<String> a_seedBrokers,

                     inta_port, String a_topic, int a_partition) {

              PartitionMetadatareturnMetaData = null;

              loop:for (String seed : a_seedBrokers) {

                     SimpleConsumerconsumer = null;

                     try{

                            consumer= new SimpleConsumer(seed, a_port, 100000, 64 * 1024,

                                          "leaderLookup");

                            List<String>topics = Collections.singletonList(a_topic);

                            TopicMetadataRequestreq = new TopicMetadataRequest(topics);

                            kafka.javaapi.TopicMetadataResponseresp = consumer.send(req);

 

                            List<TopicMetadata>metaData = resp.topicsMetadata();

                            for(TopicMetadata item : metaData) {

                                   for(PartitionMetadata part : item.partitionsMetadata()) {

                                          if(part.partitionId() == a_partition) {

                                                 returnMetaData= part;

                                                 breakloop;

                                          }

                                   }

                            }

                     }catch (Exception e) {

                            System.out.println("Errorcommunicating with Broker [" + seed

                                          +"] to find Leader for [" + a_topic + ", "

                                          +a_partition + "] Reason: " + e);

                     }finally {

                            if(consumer != null)

                                   consumer.close();

                     }

              }

              if(returnMetaData != null) {

                     m_replicaBrokers.clear();

                     for(kafka.cluster.Broker replica : returnMetaData.replicas()) {

                            m_replicaBrokers.add(replica.host());

                     }

              }

              returnreturnMetaData;

       }

}

# zookeeper.properties配置文件(zookeeper启动时需要指定该配置文件)

[root@rs229 config]# pwd

/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/config

[root@rs229 config]# cat zookeeper.properties

# Licensed to the Apache Software Foundation (ASF)under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regardingcopyright ownership.

# The ASF licenses this file to You under the ApacheLicense, Version 2.0

# (the "License"); you may not use thisfile except in compliance with

# the License. You may obtain a copy of the License at

#

#   http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to inwriting, software

# distributed under the License is distributed on an"AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eitherexpress or implied.

# See the License for the specific language governingpermissions and

# limitations under the License.

# the directory where the snapshot is stored.

dataDir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/zookeeper

# the port at which the clients will connect

clientPort=2181

# disable the per-ip limit on the number ofconnections since this is a non-production config

maxClientCnxns=0

# server.properties配置文件(kafka server启动时需要指定该配置文件)

[root@rs229 config]# cat server.properties

# Licensed to the Apache Software Foundation (ASF)under one or more

# contributor license agreements.  See the NOTICE file distributed with

# this work for additional information regardingcopyright ownership.

# The ASF licenses this file to You under the ApacheLicense, Version 2.0

# (the "License"); you may not use thisfile except in compliance with

# the License. You may obtain a copy of the License at

#

#   http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to inwriting, software

# distributed under the License is distributed on an"AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.

# See the License for the specific language governingpermissions and

# limitations under the License.

# see kafka.server.KafkaConfig for additional detailsand defaults

 

############################# Server Basics#############################

 

# The id of the broker. This must be set to a unique integerfor each broker.

broker.id=0

 

############################# Socket Server Settings#############################

 

# The port the socket server listens on

port=9092

 

# Hostname the broker will bind to. If not set, theserver will bind to all interfaces

#host.name=localhost

 

# Hostname the broker will advertise to producers andconsumers. If not set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value returnedfrom

# java.net.InetAddress.getCanonicalHostName().

#advertised.host.name=<hostname routable byclients>

 

# The port to publish to ZooKeeper for clients touse. If this is not set,

# it will publish the same port that the broker bindsto.

#advertised.port=<port accessible by clients>

 

# The number of threads handling network requests

num.network.threads=2

 

# The number of threads doing disk I/O

num.io.threads=8

 

# The send buffer (SO_SNDBUF) used by the socketserver

socket.send.buffer.bytes=1048576

 

# The receive buffer (SO_RCVBUF) used by the socketserver

socket.receive.buffer.bytes=1048576

 

# The maximum size of a request that the socketserver will accept (protection against OOM)

socket.request.max.bytes=104857600

 

 

############################# Log Basics#############################

 

# A comma seperated list of directories under whichto store log files

#log.dirs=/tmp/kafka-logs

log.dirs=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs

 

# The default number of log partitions per topic.More partitions allow greater

# parallelism for consumption, but this will alsoresult in more files across

# the brokers.

num.partitions=2

 

############################# Log Flush Policy#############################

 

# Messages are immediately written to the filesystembut by default we only fsync() to sync

# the OS cache lazily. The following configurationscontrol the flush of data to disk.

# There are a few important trade-offs here:

#    1.Durability: Unflushed data may be lost if you are not using replication.

#    2.Latency: Very large flush intervals may lead to latency spikes when the flushdoes occur as there will be a lot of data to flush.

#    3.Throughput: The flush is generally the most expensive operation, and a smallflush interval may lead to exceessive seeks.

# The settings below allow one to configure the flushpolicy to flush data after a period of time or

# every N messages (or both). This can be doneglobally and overridden on a per-topic basis.

 

# The number of messages to accept before forcing aflush of data to disk

#log.flush.interval.messages=10000

 

# The maximum amount of time a message can sit in alog before we force a flush

#log.flush.interval.ms=1000

 

############################# Log Retention Policy#############################

 

# The following configurations control the disposalof log segments. The policy can

# be set to delete segments after a period of time,or after a given size has accumulated.

# A segment will be deleted whenever *either* ofthese criteria are met. Deletion always happens

# from the end of the log.

 

# The minimum age of a log file to be eligible fordeletion

log.retention.hours=168

 

# A size-based retention policy for logs. Segmentsare pruned from the log as long as the remaining

# segments don't drop below log.retention.bytes.

#log.retention.bytes=1073741824

 

# The maximum size of a log segment file. When thissize is reached a new log segment will be created.

log.segment.bytes=536870912

 

# The interval at which log segments are checked tosee if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=60000

 

# By default the log cleaner is disabled and the logretention policy will default to just delete segments after their retentionexpires.

# If log.cleaner.enable=true is set the cleaner willbe enabled and individual logs can then be marked for log compaction.

log.cleaner.enable=false

 

############################# Zookeeper#############################

 

# Zookeeper connection string (see zookeeper docs fordetails).

# This is a comma separated host:port pairs, eachcorresponding to a zk

# server. e.g."127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string tothe urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=localhost:2181

 

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=1000000

# 这当然只是入门了,高级点的部分暂时不写了,没那么多时间了,有点小忙啊!

                                                                                                                                                                                                                                                                  时间:2014-07-02 14:39:21

相关内容

    暂无相关文章