kafka集群搭建,kafka集群


第一步

先去官网下载 kafka_2.9.2-0.8.1.1.tgz 并解压再进入到安装目录(也可以自己配置路径,方法跟配置java、hadoop等路径是一样的).
> tar -xzf kafka_2.9.2-0.8.1.1.tgz 
> cd kafka_2.9.2-0.8.1.1


第二步
zeekeeper集群搭建(用的是kafka自带的zeekeeper,一共准备了三台机器)
1、关闭各台机器的防火墙(一定要切记,我搭建的时候以为能ping通就ok了,就没关心防火墙的问题了,最后白白浪费了一天的时间)
命令 /ect/init.d/iptables stop

2、进入到打开/ect下的hosts文件
修改为
127.0.0.1 localhost
10.61.5.66 host1
10.61.5.67 host2
10.61.5.68 host3
(ip和机器名根据个人实际情况修改)

3、修改zeekeeper 配置文件
进入到kafka安装目录下的config文件,打开zookeeper.properties
修改dataDir={kafka安装目录}/zookeeper/logs/
注释掉maxClientCnxns=0
在文件末尾添加如下语句
tickTime=2000
initLimit=5
syncLimit=2
#host1、2、3为主机名,可以根据实际情况更改,端口号也可以更改
server.1=host1:2888:3888
server.2=host2:2888:3888
server.3=host3:2888:3888

4、在dataDir目录下的建立一个myid文件
命令   echo 1 >myid
另外两台机子分别设置为2、3,依次类推。


第三步
启动zookeeper服务(每台机子的zeekeeper都要启)
> bin/zookeeper-server-start.sh config/zookeeper.properties
在三台机子的zeekeeper都启动好之前,先启动的机子会有错误日志,这是正常的

 

第四步
配置kafka
1、在kafka安装目录下的config目录下打开server.properties文件
修改
zookeeper.connect=host1:2181,host2:2181,host3:2181    (2181为端口号,可以根据自己的实际情况更改)
其他两台机子的server.properties文件中的broker.id也要改,反正三台机子的broker.id不能有重复
2、修改producer.properties文件
修改
metadata.broker.list=host1:9092,host2:9092,host3:9092
prodeucer.type=async

3、修改consumer.properties文件
修改
zeekeeper.connect=host1:2181,host2:2181,host3:2181

4、在每台机子启动kafka服务
> bin/kafka-server-start.sh config/server.properties

 

第四步:建立一个主题
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic my-replicated-test
factor大小不能超过broker数

通过以下命令查看主题
> bin/kafka-topics.sh --list --zookeeper host1:2181 (也可以是host2:2181等)
my-replicated-test

通过下述命令可以看到该主题详情
> bin/kafka-topics.sh --describe --zookeeper host1:2181 --topic my-replicated-test


第五步:发送消息
在host2上建立生产者角色,并发送消息(其实可以是三台机子中的任何一台)
> bin/kafka-console-producer.sh --broker-list host1:9092 --topic my-replicated-test 
This is a message
This is another message

在host3上建立消费者角色(在该终端窗口内可以看到生产者发布这消息)
> bin/kafka-console-consumer.sh --zookeeper host1:2181 --topic my-replicated-test --from-beginning
This is a message
This is another message

至此,一个kafka集群就搭好了,可以作为kafka服务器了

 

 测试程序(在win系统上)

切记要去C:\Windows\system32\drivers\etc\hosts作如下配置,否则测试程序无法访问kafka服务器!

10.61.5.66 host1
10.61.5.67 host2
10.61.5.68 host3

记得将kafka安装目录下libs里的所有包导入项目里去

//生产者测试程序

public class ProducerTest {
 public static void main(String[] args) throws FileNotFoundException {  
        Properties props = new Properties();  
        props.put("zookeeper.connect", "slaves7:2182,slaves8:2182,slaves9:2182");  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        props.put("metadata.broker.list","slaves7:9092,slaves8:9092,slaves9:9092");
      
        ProducerConfig config = new ProducerConfig(props);  
        Producer<String, String> producer = new Producer<String, String>(config);  
         File file=new File("E:/test","test.txt");
         BufferedReader readtxt=new BufferedReader(new FileReader(file));
          String line=null;
          byte[] item=null;
   try {
      while((line=readtxt.readLine())!=null){
      item=line.getBytes();
      String str = new String(item);
      System.out.println(str);
      producer.send(new KeyedMessage<String, String>("my-replicated-topic",str));
      }
   } catch (IOException e) {
    e.printStackTrace();
   }
       }  
}
//消费者测试程序

public class ConsumerTest extends Thread {
 private final ConsumerConnector consumer;  
    private final String topic;  
      
    public static void main(String[] args) {  
        ConsumerTest consumerThread = new ConsumerTest("my-replicated-topic");  
        consumerThread.start();  
    }  
      
    public ConsumerTest(String topic) {  
     System.out.println(topic);
        consumer = kafka.consumer.Consumer  
                .createJavaConsumerConnector(createConsumerConfig());  
        this.topic = topic;  
    }  
      
    private static ConsumerConfig createConsumerConfig() {  
        Properties props = new Properties();  
        props.put("zookeeper.connect", "slaves7:2182,slaves8,slaves9:2182");  
        props.put("group.id", "0");  
        props.put("zookeeper.session.timeout.ms", "400000");  
        props.put("zookeeper.sync.time.ms", "200");  
        props.put("auto.commit.interval.ms", "1000");  
      
        return new ConsumerConfig(props);  
      
    }  
      
    public void run() {  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, new Integer(1));  
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  
                .createMessageStreams(topicCountMap);  
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);  
        ConsumerIterator<byte[], byte[]> it = stream.iterator();  
        while (it.hasNext())  
            System.out.println(new String(it.next().message()));  
    }  
}

当两个测试程序都运行后,生产者程序会从本机读取txt文件的内容,消费者程序会显示出这些内容

版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容