win系统下启动linux上的kafka集群及使用,linuxkafka


一、首先在win系统下C:\Windows\System32\drivers\etc目录中hosts文件添加如下内容:

10.61.6.167 slaves1
10.61.6.168 slaves2
10.61.6.169 slaves3


二、启动kafka集群类

package com.conn.server.start;


import java.io.IOException;


import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;




public class StartKafka {  
 
    public static void main(String[] args) {
    StartKafka startzk=new StartKafka();
    //启动kafka自带zookeeper集群
    startzk.startZk1();
    startzk.startZk2();
    startzk.startZk3();
    //启动kafka
    startzk.startKafka1();
    startzk.startKafka2();
    startzk.startKafka3();
    }
          
    public static void startZk1(){
        String hostname = "10.61.6.166";  
        String username = "root";  
        String password = "pass@word2";  
        //指明连接主机的IP地址  
        Connection conn = new Connection (hostname);  
        Session ssh = null;  
        try {  
            //连接到主机  
            conn.connect();  
            //使用用户名和密码校验  
            boolean isconn = conn.authenticateWithPassword(username, password);  
            if(!isconn){  
                System.out.println("用户名称或者是密码不正确");  
            }else{  
                System.out.println("已经连接OK");  
                ssh = conn.openSession();  
                //使用多个命令用分号隔开  

                ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");  
              
                //只允许使用一行命令,即ssh对象只能使用一次execCommand这个方法,多次使用则会出现异常  
            }  
            //连接的Session和Connection对象都需要关闭  
            ssh.close();  
            conn.close();  
              
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
          
    }  
  
    public static void startZk2(){
        String hostname = "10.61.6.168";  
        String username = "root";  
        String password = "pass@word2";  
        //指明连接主机的IP地址  
        Connection conn = new Connection (hostname);  
        Session ssh = null;  
        try {  
            //连接到主机  
            conn.connect();  
            //使用用户名和密码校验  
            boolean isconn = conn.authenticateWithPassword(username, password);  
            if(!isconn){  
                System.out.println("用户名称或者是密码不正确");  
            }else{  
                System.out.println("已经连接OK");  
                ssh = conn.openSession();  
               ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");  
              
            }  
            ssh.close();  
            conn.close();  
              
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
    
    public static void startZk3(){
        String hostname = "10.61.6.169";  
        String username = "root";  
        String password = "pass@word2";  
        //指明连接主机的IP地址  
        Connection conn = new Connection (hostname);  
        Session ssh = null;  
        try {  
            //连接到主机  
            conn.connect();  
            //使用用户名和密码校验  
            boolean isconn = conn.authenticateWithPassword(username, password);  
            if(!isconn){  
                System.out.println("用户名称或者是密码不正确");  
            }else{  
                System.out.println("已经连接OK");  
                ssh = conn.openSession();  
               ssh.execCommand("cd /home/lee/kafka/kafka;bin/zookeeper-server-start.sh config/zookeeper.properties&");  
               }  
            ssh.close();  
            conn.close();  
              
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
    
    public static void startKafka1(){
        String hostname = "10.61.6.167";  
        String username = "root";  
        String password = "pass@word2";  
        //指明连接主机的IP地址  
        Connection conn = new Connection (hostname);  
        Session ssh = null;  
        try {  
            //连接到主机  
            conn.connect();  
            //使用用户名和密码校验  
            boolean isconn = conn.authenticateWithPassword(username, password);  
            if(!isconn){  
                System.out.println("用户名称或者是密码不正确");  
            }else{  
                System.out.println("已经连接OK");  
                ssh = conn.openSession();  
                ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");  
            }  
            ssh.close();  
            conn.close();  
              
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
    
    public static void startKafka2(){
        String hostname = "10.61.6.168";  
        String username = "root";  
        String password = "pass@word2";  
        //指明连接主机的IP地址  
        Connection conn = new Connection (hostname);  
        Session ssh = null;  
        try {  
            //连接到主机  
            conn.connect();  
            //使用用户名和密码校验  
            boolean isconn = conn.authenticateWithPassword(username, password);  
            if(!isconn){  
                System.out.println("用户名称或者是密码不正确");  
            }else{  
                System.out.println("已经连接OK");  
                ssh = conn.openSession();  
                ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");  
            }  
            ssh.close();  
            conn.close();  
              
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
    
    public static void startKafka3(){
        String hostname = "10.61.6.169";  
        String username = "root";  
        String password = "pass@word2";  
        //指明连接主机的IP地址  
        Connection conn = new Connection (hostname);  
        Session ssh = null;  
        try {  
            //连接到主机  
            conn.connect();  
            //使用用户名和密码校验  
            boolean isconn = conn.authenticateWithPassword(username, password);  
            if(!isconn){  
                System.out.println("用户名称或者是密码不正确");  
            }else{  
                System.out.println("已经连接OK");  
                ssh = conn.openSession();  
                ssh.execCommand("cd /home/lee/kafka/kafka;bin/kafka-server-start.sh config/server.properties&");  
            }  
            ssh.close();  
            conn.close();  
              
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
    
}  

三、生产者类

package com.performanceTest;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public class ProducerSample {
public static void main(String[] args) throws FileNotFoundException {  
        //ProducerSample ps = new ProducerSample();  
  
        Properties props = new Properties();  
        props.put("zookeeper.connect", "slaves1:2182,slaves2:2182,slaves3:2182");    //这里也可以改为集群各节点的ip地址
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        props.put("metadata.broker.list","slaves1:9092,slaves2:9092,slaves3:9092");
        props.put("request.required.acks", "1");
        props.put("batch.num.messages","200");
        ProducerConfig config = new ProducerConfig(props);  
        Producer<String, String> producer = new Producer<String, String>(config);  


                 File file=new File("E:/test","110-140_1.txt");
        BufferedReader readtxt=new BufferedReader(new FileReader(file));
        String line=null;
        byte[] item=null;
try {
while((line=readtxt.readLine())!=null){
line = line.replaceAll("\\t", ",");
item=line.getBytes();
String str = new String(item);
producer.send(new KeyedMessage<String, String>("mykafka",str));
}
} catch (IOException e) {
e.printStackTrace();
}
     
    }  
}

四、消费者类

package com.performanceTest;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


public class ConsumerTest extends Thread {
private final ConsumerConnector consumer;  
    private final String topic;  
    private  final List<String> messages = new ArrayList<String>();
      
    public static void main(String[] args) {  
        ConsumerTest consumerThread = new ConsumerTest("mykafka");  
        consumerThread.start();  
    }  
      
    public ConsumerTest(String topic) {  
    System.out.println(topic);
        consumer = kafka.consumer.Consumer  
                .createJavaConsumerConnector(createConsumerConfig());  
        this.topic = topic;  
    }  
      
    private static ConsumerConfig createConsumerConfig() {  
        Properties props = new Properties();  
       props.put("zookeeper.connect", "slaves1:2182,slaves2:2182,slaves3:2182"); 
        props.put("group.id", "0");  
        props.put("zookeeper.session.timeout.ms", "400000");  
        props.put("zookeeper.sync.time.ms", "200");  
        props.put("auto.commit.interval.ms", "1000");  
      
        return new ConsumerConfig(props);  
      
    }  
      

版权声明:本文为博主原创文章,未经博主允许不得转载。

相关内容