Flume 负载平衡配置(Flume load balancing configuration)和测试


2.1 Flume配置

集群DNS配置如下:

hadoop-maser 192.168.177.162
machine-0192.168.177.158
machine-1191.168.177.167

配置主Flume,在hadoop-maser机上。配置文件为loadbalance.properties。

agent.sources=s1
agent.channels=c1
agent.sinks=k1 k2
 
agent.sinkgroups = g1
agent.sinkgroups.g1.sinks = k1 k2 
agent.sinkgroups.g1.processor.type =load_balance 
agent.sinkgroups.g1.processor.selector =round_robin 
agent.sinkgroups.g1.processor.backoff =true 
 
agent.sources.s1.type=avro
agent.sources.s1.channels=c1
agent.sources.s1.bind=0.0.0.0
agent.sources.s1.port=51515
agent.sources.s1.interceptors=i1
agent.sources.s1.interceptors.i1.type=timestamp
 
agent.channels.c1.type=memory
 
agent.sinks.k1.channel = c1 
agent.sinks.k1.type = avro 
agent.sinks.k1.hostname = machine-0
agent.sinks.k1.port = 51515
agent.sinks.k2.channel = c1 
agent.sinks.k2.type = avro
agent.sinks.k2.hostname = machine-1
agent.sinks.k2.port = 51515


配置machine-0,定义文件conf/loadbalance.properties。

 
#loadbalance.properties。
agent.sources=s1
agent.channels=c1
agent.sinks=k1
 
agent.sources.s1.type=avro
agent.sources.s1.channels=c1
agent.sources.s1.bind=0.0.0.0
agent.sources.s1.port=51515
 
agent.channels.c1.type=memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity =10000
agent.channels.c1.byteCapacityBufferPercentage= 20
agent.channels.c1.byteCapacity = 800000
 
agent.sinks.k1.type=hdfs
agent.sinks.k1.channel=c1
#这台机上没有安装hadoop节点,请配位
#hdfs://ip-address:8020/flume/%Y/%m
agent.sinks.k1.hdfs.path=/flume/%Y/%m
agent.sinks.k1.hdfs.filePrefix=flume
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.rollSize=0
agent.sinks.k1.hdfs.fileType=DataStream
agent.sinks.k1.hdfs.writeFormat=Text
agent.sinks.k1.hdfs.useLocalTimeStamp=false


与machine-0一样,配置machine-1。


 

启动flume-ng。进入flume目录下(将flume配置到linux环境变量中,不需要这一步骤,直接启动就可以了),输入下列命令。

bin/flume-ng agent -n agent -c conf –f  \

conf/loadbalance.properties-Dflume.root.logger=DEBUG,console

在三台机上,启动flume。

2.2 客户端应用

这里采用flume-ng-log4appender作为客户端,演示这个数据传递的实例。准备如下jar包。


pom.xml配置:

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>Flume-master</groupId>
    <artifactId>Flume-master</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>
    <name>Flume-master</name>
    <properties>
        <jackson.version>1.9.3</jackson.version>
        <flume.version>1.4.0</flume.version>
        <tomcat.version>7.0.42</tomcat.version>
        <jetty.version>8.1.13.v20130916</jetty.version>
        <!-- Hadoop properties -->
        <distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
        <distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
        <distMgmtSnapshotsUrl>https://repository.apache.org/content/repositories/snapshots</distMgmtSnapshotsUrl>
        <distMgmtStagingId>apache.staging.https</distMgmtStagingId>
        <distMgmtStagingName>Apache Release Distribution Repository</distMgmtStagingName>
        <distMgmtStagingUrl>https://repository.apasche.org/service/local/staging/deploy/maven2</distMgmtStagingUrl>
    </properties>
    <repositories>
        <repository>
          <id>${distMgmtSnapshotsId}</id>
          <name>${distMgmtSnapshotsName}</name>
          <url>${distMgmtSnapshotsUrl}</url>
        </repository>
        <repository>
          <id>repository.jboss.org</id>
          <url>http://repository.jboss.org/nexus/content/groups/public/</url>
          <snapshots>
           <enabled>false</enabled>
          </snapshots>
        </repository>
        <repository>
            <id>oracleReleases</id>
            <name>Oracle Released Java Packages</name>
            <url>http://download.oracle.com/maven</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>java.net2</id>
            <name>Repository hosting the jee6 artifacts</name>
            <url>http://download.java.net/maven/2</url>
        </repository>
    </repositories>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-core-asl</artifactId>
                <version>${jackson.version}</version>
            </dependency>
            <dependency>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-mapper-asl</artifactId>
                <version>${jackson.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-web-api</artifactId>
            <version>6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume.flume-ng-clients</groupId>
            <artifactId>flume-ng-log4jappender</artifactId>
            <version>${flume.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.mortbay.jetty</groupId>
                    <artifactId>jetty</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.mortbay.jetty</groupId>
                    <artifactId>jetty-util</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <finalName>FlumeWebApp</finalName>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
        </resources>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.tomcat.maven</groupId>
                    <artifactId>tomcat7-maven-plugin</artifactId>
                    <version>2.1</version>
                </plugin>
                 <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-dependency-plugin</artifactId>
                 <version>2.4</version>
               </plugin>
               <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-enforcer-plugin</artifactId>
                 <version>1.3.1</version>
                 <configuration>
                    <rules>
                     <requireMavenVersion>
                        <version>[3.0.2,)</version>
                      </requireMavenVersion>
                      <requireJavaVersion>
                        <version>1.6</version>
                      </requireJavaVersion>
                    </rules>
                 </configuration>
               </plugin>
               <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-assembly-plugin</artifactId>
                 <version>2.3</version>
               </plugin>
               <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-deploy-plugin</artifactId>
                 <version>2.5</version>
               </plugin>
               <plugin>
                 <groupId>org.apache.rat</groupId>
                 <artifactId>apache-rat-plugin</artifactId>
                 <version>0.7</version>
               </plugin>
               <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-antrun-plugin</artifactId>
                 <version>1.6</version>
               </plugin>
               <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-site-plugin</artifactId>
                 <version>3.0</version>
                 <dependencies>
                    <dependency><!-- add support for ssh/scp -->
                      <groupId>org.apache.maven.wagon</groupId>
                      <artifactId>wagon-ssh</artifactId>
                      <version>1.0</version>
                    </dependency>
                 </dependencies>
               </plugin>
               <plugin>
                 <groupId>com.atlassian.maven.plugins</groupId>
                 <artifactId>maven-clover2-plugin</artifactId>
                 <version>3.0.5</version>
               </plugin>
            </plugins>
        </pluginManagement>
 
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <inherited>true</inherited>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-war-plugin</artifactId>
                <version>2.1.1</version>
                <configuration>
                    <failOnMissingWebXml>false</failOnMissingWebXml>
                </configuration>
            </plugin>
            <!--
            <plugin>
               <groupId>org.apache.tomcat.maven</groupId>
               <artifactId>tomcat7-maven-plugin</artifactId>
            </plugin>
            -->
            <plugin>
                <groupId>org.mortbay.jetty</groupId>
                <artifactId>jetty-maven-plugin</artifactId>
                <version>${jetty.version}</version>
                <configuration>
                    <stopPort>9999</stopPort>
                    <stopKey>stop</stopKey>
                    <webApp>
                        <contextPath>/${project.build.finalName}</contextPath>
                    </webApp>
                </configuration>
            </plugin>
            <plugin>
               <groupId>org.apache.maven.plugins</groupId>
               <artifactId>maven-enforcer-plugin</artifactId>
               <inherited>false</inherited>
               <executions>
                 <execution>
                    <id>clean</id>
                    <goals>
                      <goal>enforce</goal>
                    </goals>
                    <phase>pre-clean</phase>
                 </execution>
                 <execution>
                    <id>default</id>
                    <goals>
                      <goal>enforce</goal>
                    </goals>
                    <phase>validate</phase>
                 </execution>
                 <execution>
                    <id>site</id>
                    <goals>
                      <goal>enforce</goal>
                    </goals>
                    <phase>pre-site</phase>
                 </execution>
               </executions>
             </plugin>
           <plugin>
               <groupId>org.apache.rat</groupId>
               <artifactId>apache-rat-plugin</artifactId>
              <configuration>
                 <excludes>
                    <exclude>.gitattributes</exclude>
                    <exclude>.gitignore</exclude>
                    <exclude>.git/**</exclude>
                    <exclude>.idea/**</exclude>
                </excludes>
              </configuration>
             </plugin>
           <plugin>
               <artifactId>maven-site-plugin</artifactId>
               <version>3.0</version>
               <executions>
                 <execution>
                    <id>attach-descriptor</id>
                    <goals>
                      <goal>attach-descriptor</goal>
                    </goals>
                    <configuration>
                      <generateReports>true</generateReports>
                    </configuration>
                 </execution>
               </executions>
           </plugin>
        </plugins>
    </build>
    <profiles>
        <profile>
            <id>hdfs</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <properties>
                <flume.Port>51515</flume.Port>
            </properties>
        </profile>
        <profile>
            <id>hbase</id>
            <activation>
                <activeByDefault>false</activeByDefault>
            </activation>
            <properties>
                <flume.Port>61616</flume.Port>
            </properties>
        </profile>
    </profiles>
</project>


Java代码:

 

import java.util.Date;
import java.util.concurrent.Executors;
importjava.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 
import org.apache.log4j.Logger;
 
 
 
public class Worker implements Runnable{
 
   
    private staticfinalLogger LOG= Logger.getLogger(Worker.class); 
    private String command;
   
    /**
     * @param args
     */
    public staticvoidmain(String[] args) {
        new Worker("0").init();
    }
 
    public voidinit(){
        int numWorkers = 1;
        int threadPoolSize = 3 ;
 
        ScheduledExecutorServicescheduledThreadPool = Executors.newScheduledThreadPool(threadPoolSize);
       
        //schedule to run after sometime
        System.out.println("Current Time = "+new Date());
        Worker worker = null;
        for(int i=0; i< numWorkers; i++){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            worker = new Worker("do heavy processing");
//             scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);
            //scheduleAtFixedRate
//             scheduledThreadPool.scheduleAtFixedRate(worker, 0, 1, TimeUnit.SECONDS);
           scheduledThreadPool.scheduleWithFixedDelay(worker, 5, 10,
                    TimeUnit.SECONDS);
       
        }
        
        //add some delay to let some threads spawn by scheduler
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        scheduledThreadPool.shutdown();
        while(!scheduledThreadPool.isTerminated()){
            //wait for all tasks to finish
        }
        LOG.info("Finished all threads");
    }
    public Worker(String command){
        this.command = command;
    }
    @Override
    public void run() {
        LOG.info(Thread.currentThread().getName()+" Start. Command = "+command);
        processCommand();
        LOG.info(Thread.currentThread().getName()+" End.");
    }
 
    private void processCommand() {
        try {
            for(int i = 1000; i < 2000; i++){
              LOG.info("sequence:"+ i);
            }
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
   
    @Override
    public String toString(){
        return this.command;
    }
}

 

Log4j配置文件log4j.properties

# File Appender rootLog
log4j.rootLogger=DEBUG,stdout,rootLog
 
#console configure for DEV environment
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS}%-5p(%c:%L)- %m%n
 
log4j.appender.rootLog=org.apache.log4j.RollingFileAppender
log4j.appender.rootLog.File=rootLog.log
log4j.appender.rootLog.MaxFileSize=5000KB
log4j.appender.rootLog.MaxBackupIndex=20
log4j.appender.rootLog.layout=org.apache.log4j.PatternLayout
log4j.appender.rootLog.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS}%-5p(%c:%L)- %m%n
 
# File Appender boentel
#log4j.logger.com.boentel=DEBUG,boentel
#log4j.additivity.com.boentel=true
#log4j.appender.boentel=org.apache.log4j.RollingFileAppender
#log4j.appender.boentel.File= boentel.log
#log4j.appender.boentel.MaxFileSize=2000KB
#log4j.appender.boentel.MaxBackupIndex=20
#log4j.appender.boentel.layout=org.apache.log4j.PatternLayout
#log4j.appender.boentel.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS} %-5p (%c:%L) - %m%n
 
#log4j.logger.com.loadbalance= DEBUG,loadbalance
#log4j.additivity.com.loadbalance= true
 
#log4j.appender.loadbalance =org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
#log4j.appender.loadbalance.Hosts = machine-0:51515 machine-1:51515 hadoop-master:51515
#log4j.appender.loadbalance.UnsafeMode = true
#log4j.appender.out2.MaxBackoff = 60000
 #FQDN RANDOM ,defaultis ROUND_ROBIN
#log4j.appender.loadbalance.Selector = ROUND_ROBIN
#log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
#log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS} %-5p (%c:%L) - %m%n
 
log4j.logger.com.loadbalance=DEBUG,flume
log4j.additivity.com.loadbalance=true
 
#single node machine
log4j.appender.flume= org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname= hadoop-master
log4j.appender.flume.Port= 51515
log4j.appender.flume.UnsafeMode= true
log4j.appender.flume.layout= org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern= %d{yyyy-MM-dd HH:mm:ss,SSS}%-5p(%c:%L)- %m%n
 


 

启动app,一段时间后,观察数据是否有丢失。我使用几个case测试不过,没有发现任务问题。

待续:

相关内容