java实时监听日志写入kafka


目的

实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)

源码:

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.NoRouteToHostException;
import java.util.ArrayList;  
import java.util.Collection;  
import java.util.List;  
import java.util.Properties;  
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  


/*
 * 自己在源服务器写生产者往kafka插入数据,注意文件"producer.properties放在linux下该jar文件同一目录
 * 监听某个目录下的文件数据然后写入kafka
 * nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &
 * 
 * 
 */
public class PortalLogTail_Line {  
  
    private Producer<String,String> inner;  
    java.util.Random ran = new Random();
    public PortalLogTail_Line() throws FileNotFoundException, IOException {  
        Properties properties = new Properties();  
     //   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));  
      
        properties.load(new FileInputStream("producer.properties"));  
       
        ProducerConfig config = new ProducerConfig(properties); 
      
        inner = new Producer<String, String>(config);  
     
    }  
  
      
    public void send(String topicName,String message) {  
        if(topicName == null || message == null){  
            return;  
        }  
     //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);  
        //随机作为key,hash分散到各个分区
      KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);  
     //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);
        inner.send(km);  
    	
    }  
      
    public void send(String topicName,Collection<String> messages) {  
        if(topicName == null || messages == null){  
            return;  
        }  
        if(messages.isEmpty()){  
            return;  
        }  
        List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();  
        for(String entry : messages){  
            KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);  
            kms.add(km);  
        }  
        inner.send(kms);  
    }  
      
    public void close(){  
        inner.close();  
    }  

    
    
	public String getNewFile(File file)
	{
		File[] fs=file.listFiles();
		long maxtime=0;
		String newfilename="";
		for (int i=0;i<fs.length;i++)
		{
			if (fs[i].lastModified()>maxtime && fs[i].getName().contains("access"))
			{
				maxtime=fs[i].lastModified();
				newfilename=fs[i].getAbsolutePath();
				
			}
		}
		return newfilename;
	}
  	//写入文件名及行号
	public void writePosition(String path,int rn,String positionpath)
	{
		try {
		       BufferedWriter out = new BufferedWriter(new FileWriter(positionpath));
		       out.write(path+","+rn);
		       out.close();
		} catch (IOException e) {
		}
	}
	LineNumberReader randomFile=null;
	 String newfile=null;
	 String thisfile=null;
	 String prefile=null;
	 int ln=0;
	 int beginln=0;
    public void realtimeShowLog(final File file,final String topicname, final String positionpath) throws IOException{     
      
        //启动一个线程每1秒钟读取新增的日志信息     
       new Thread(new Runnable(){     
            public void run() {     
           		thisfile=getNewFile(file);
      		   prefile=thisfile;
         		//访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件
         		try {
     				BufferedReader br=new BufferedReader(new FileReader(positionpath));
     				String line=br.readLine();
     				if (line!=null &&line.contains(","))
     				{
     					thisfile=line.split(",")[0];
     					 prefile=thisfile;
     					 beginln=Integer.parseInt(line.split(",")[1]);
     				}
     				
     				
     			} catch (FileNotFoundException e2) {
     				// TODO Auto-generated catch block
     				e2.printStackTrace();
     			}
     			 catch (IOException e2) {
     					// TODO Auto-generated catch block
     					e2.printStackTrace();
     				}
         		
                 //指定文件可读可写     
                     try {
     					randomFile = new LineNumberReader(new FileReader(thisfile));
     				} catch (FileNotFoundException e) {
     					// TODO Auto-generated catch block
     					e.printStackTrace();
     				}     
              while (true)
              {
             	 try {
     				Thread.sleep(100);
     				
     			} catch (InterruptedException e1) {
     				// TODO Auto-generated catch block
     				e1.printStackTrace();
     			}
             	 try {     
                      //获得变化部分的     
                    //  randomFile.seek(lastTimeFileSize);     
                      String tmp = "";     
                      while( (tmp = randomFile.readLine())!= null) {  
                     	 int currln=randomFile.getLineNumber();
                     	 //beginln默认为0
                     	 if (currln>beginln)
                     		 send(topicname,new String(tmp.getBytes("utf8")));
                          
                          ln++;
                          
                          //每发生一条写一次影响效率,连续发100次后再记录位置
                          if (ln>100)
                         	 {
                         	 writePosition(thisfile,currln,positionpath);
                         	 ln=0;
                         	 }
                     
                      }   
                     thisfile=getNewFile(file);
                     if(!thisfile.equals(prefile))
                     
                     {
                     	randomFile.close();
                  	   randomFile = new LineNumberReader(new FileReader(thisfile));
                  	  prefile=thisfile;
                  	 beginln=0;
                     }
                      
                     
                  } catch (IOException e) {     
                      throw new RuntimeException(e);     
                  }     
              }
        }}).start();     
    }     
      
    /** 
     * @param args 
     * @throws Exception 
     */  
    public static void main(String[] args) throws Exception {  
    	PortalLogTail_Line producer = new PortalLogTail_Line();   
    	if (args.length!=3)
    	{
    		System.out.println("usage:topicname pathname positionpath");
    		System.exit(1);
    	}
    	String topicname=args[0];
    	String pathname=args[1];
    	String positionpath=args[2]; 
    	final File tmpLogFile = new File(pathname);
    	producer.realtimeShowLog(tmpLogFile,topicname,positionpath); 
        
   
  
    }  
  
} 

producer.properties文件放在同级目录下
metadata.broker.list=xxx:10909,xxx:10909

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
#producer.type=async

# specify the compression codec for all data generated: none , gzip, snappy.
# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally
compression.codec=none
#compression.codec=gzip

# message encoder
serializer.class=kafka.serializer.StringEncoder



测试

最后执行:
 nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &


相关内容