通过TelnetClient获取Zookeeper监控数据,zookeeperclient


如果想编写一个监控Zookeeper的Java程序,可以通过两种方式 : (1)通过TelnetClient发送命令 ,命令的详解参考:http://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html#sc_zkCommands (2)通过JMX,说明请参考:http://zookeeper.apache.org/doc/trunk/zookeeperJMX.html
本文通过一个简单的例子来演示如何通过TelnetClient发送mntr命令获取Zookeeper的监控数据

写一个Telnet的工具类


package com.eric.agent.utils;
import org.apache.commons.net.telnet.TelnetClient;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
/**
 * Telnet操作器,基于commons-net-2.2.jar
 *
 * @author aihua.sun
 * @date 2015/4/9
 * @since V1.0
 */
public class TelnetTools {
    private String prompt = ">"; //结束标识字符串,Windows中是>,Linux中是#
    private char promptChar = '>';   //结束标识字符
    private TelnetClient telnet;
    private InputStream in;     // 输入流,接收返回信息
    private PrintStream out;    // 向服务器写入 命令
    /**
     * @param termtype 协议类型:VT100、VT52、VT220、VTNT、ANSI
     * @param prompt   结果结束标识
     */
    public TelnetTools(String termtype, String prompt) {
        telnet = new TelnetClient(termtype);
        setPrompt(prompt);
    }
    public TelnetTools(String termtype) {
        telnet = new TelnetClient(termtype);
    }
    public TelnetTools() {
        telnet = new TelnetClient();
    }
    /**
     * 登录到目标主机
     *
     * @param ip
     * @param port
     */
    public void login(String ip, int port) {
        try {
            telnet.connect(ip, port);
            in = telnet.getInputStream();
            out = new PrintStream(telnet.getOutputStream());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    /**
     * 读取分析结果
     *
     * @param pattern 匹配到该字符串时返回结果
     * @return
     */
    public String readUntil(String pattern) {
        StringBuffer sb = new StringBuffer();
        try {
            char lastChar = (char) -1;
            boolean flag = pattern != null && pattern.length() > 0;
            if (flag)
                lastChar = pattern.charAt(pattern.length() - 1);
            char ch;
            int code = -1;
            while ((code = in.read()) != -1) {
                ch = (char) code;
                sb.append(ch);
                //匹配到结束标识时返回结果
                if (flag) {
                    if (ch == lastChar && sb.toString().endsWith(pattern)) {
                        return sb.toString();
                    }
                } else {
                    //如果没指定结束标识,匹配到默认结束标识字符时返回结果
                    if (ch == promptChar)
                        return sb.toString();
                }
                //登录失败时返回结果
                if (sb.toString().contains("Login Failed")) {
                    return sb.toString();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return sb.toString();
    }
    /**
     * 发送命令
     *
     * @param value
     */
    public void write(String value) {
        try {
            out.println(value);
            out.flush();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 发送命令,返回执行结果
     *
     * @param command
     * @return
     */
    public String sendCommand(String command) {
        try {
            write(command);
            return readUntil(prompt);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 关闭连接
     */
    public void distinct() {
        try {
            if (telnet != null && !telnet.isConnected())
                telnet.disconnect();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void setPrompt(String prompt) {
        if (prompt != null) {
            this.prompt = prompt;
            this.promptChar = prompt.charAt(prompt.length() - 1);
        }
    }
}


调用类

package com.tscloud.agent.flume.source.dataprovider;

/**
 * 通过HTTP作为HDFS Master监控信息的source
 * 明细可参考http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html#sc_zkCommands
 * @author aihua.sun
 * @date 2015/4/6
 * @since V1.0
 */

import com.eric.agent.flume.model.ZookeeperRoleInfo;
import com.eric.agent.flume.source.base.IClusterServiceRoleDataProvider;
import com.eric.agent.utils.AgentConstants;
import com.eric.agent.utils.TelnetTools;
import com.eric.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.*;

public class ZookeeperDataProvider{
    protected final Logger LOGGER = LoggerFactory.getLogger(getClass());
    private static final String zk_avg_latency = "zk_avg_latency";
    private static final String zk_max_latency = "zk_max_latency";
    private static final String zk_min_latency = "zk_min_latency";
    private static final String zk_packets_received = "zk_packets_received";
    private static final String zk_packets_sent = "zk_packets_sent";
    private static final String zk_server_state = "zk_server_state";
    private static final String zk_znode_count = "zk_znode_count";
    private static final String zk_followers = "zk_followers";
    private static final String zk_open_file_descriptor_count = "zk_open_file_descriptor_count";

    public String extractMonitorData() {
        //TODO 通过调用API获得IP以及参数
        ZookeeperRoleInfo monitorDataPoint = new ZookeeperRoleInfo();
        String IP = "192.168.40.242";
        int port = 2181;
        TelnetTools telnet=null;
        try {
            telnet = new TelnetTools();
            telnet.login(IP, port);
            String rs = telnet.sendCommand("mntr");
            Map<String, String> telnetResultMap = parseTelnetResult(rs);
            monitorDataPoint.setZkAvgLatency(translateStrToLong(telnetResultMap.get(zk_avg_latency)));
            monitorDataPoint.setZkMaxLatency(translateStrToLong(telnetResultMap.get(zk_max_latency)));
            monitorDataPoint.setZkMinLatency(translateStrToLong(telnetResultMap.get(zk_min_latency)));
            monitorDataPoint.setZkPacketsReceived(translateStrToLong(telnetResultMap.get(zk_packets_received)));
            monitorDataPoint.setZkPacketsSent(translateStrToLong(telnetResultMap.get(zk_packets_sent)));
            monitorDataPoint.setZkServerState(telnetResultMap.get(zk_server_state));
            monitorDataPoint.setZkZnodeCount(translateStrToLong(telnetResultMap.get(zk_znode_count)));
            monitorDataPoint.setZkFollowers(translateStrToLong(telnetResultMap.get(zk_followers)));
            monitorDataPoint.setZkOpenFileDescriptorCount(translateStrToLong(telnetResultMap.get(zk_open_file_descriptor_count)));
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            telnet.distinct();
        }

        return monitorDataPoint.toString();
    }

    private Long translateStrToLong(String value) {
        if (org.apache.commons.lang.StringUtils.isAlphanumeric(value)) {
            return Long.valueOf(value);
        }
        return 0L;
    }

    private Map<String, String> parseTelnetResult(String rs) {
        //The output contains multiple lines with the following format:
        //key \t value
        String[] resultArray = rs.split("\n");
        Map<String, String> resultMap = new HashMap<String, String>();
        for (String recordLine : resultArray) {
            String[] recordKeyValue = recordLine.split("\t");
            LOGGER.debug("############recordKeyValue.size:" + recordKeyValue.length + " recordKeyValue:" + Arrays.toString(recordKeyValue));
            if (recordKeyValue != null && recordKeyValue.length == 2) {
                resultMap.put(recordKeyValue[0], recordKeyValue[1]);
            }
        }
        return resultMap;
    }


    public static void main(String[] args) {
        System.out.println(new ZookeeperDataProvider().extractMonitorData());
    }

}


相关内容