基于Hive元数据存储和Avro序列化的MapReduce日志解析


一,相关名词解释:

Avro:apache提供的序列化框架

序列化:在HDFS中,网络带宽成为一种稀缺资源,为了实现数据的高效传输,就需要将对象压缩成二进制流

Avro VS others:

Avro提供着与诸如Thrift和Protocol Buffers等系统相似的功能,但是在一些基础方面还是有区别的,主要是:

1 动态类型:Avro并不需要生成代码,模式和数据存放在一起,而模式使得整个数据的处理过程并不生成代码、静态数据类型等等。这方便了数据处 理系统和语言的构造。 2 未标记的数据:由于读取数据的时候模式是已知的,那么需要和数据一起编码的类型信息就很少了,这样序列化的规模也就小了。 3 不需要用户指定字段号:即使模式改变,处理数据时新旧模式都是已知的,所以通过使用字段名称可以解决差异问题。 二,实现步骤 1,定义一个avro文件,例如记录log中异常信息的ex.avsc
{
		"type": "record",
		"name": "Ex",
		"namespace": "com.tianrandai.test",
		"fields": [
			{"name":"AppKey","type":"string"},
			{"name":"SessionId","type":"string"},									
			{"name":"IsValidStart","type":"string"},						
			{"name":"SerialNumber","type":"int"},									
			{"name":"Strategy","type":"string"},			
			{"name":"IsSessionStop","type":"boolean"},
			{"name":"SessionDuration","type":"int"},
			{"name":"ClientTime","type":"string"},
			{"name":"isCaught","type":"boolean"},
			{"name":"ExceptionName","type":"string"},
			{"name":"CustomExceptionName","type":"string"},
			{"name":"Stack","type":"string"}
		]
   }
2,建立exception对应的元数据库,因为压缩过程采用了avro序列化框架,我们使用hive时,采用avro 下面演示如何在hive中创建一张动态加载schema的外部元数据表 a,将预先编写好的avsc文件上传到hdfs
hadoop fs -put ex.avsc /test
b,使用hive语句创建表
CREATE EXTERNAL TABLE Ex
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS
    INPUTFORMAT  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    LOCATION 'hdfs:///test/ex'
    TBLPROPERTIES (
	'avro.schema.url'='hdfs:///test/ex.avsc'
    );

3,日志分析 a,自定义输入文件过滤器
public class InputFileFilter extends Configured implements PathFilter {
    private Configuration conf = null;
    private FileSystem fs;

    public Configuration getConf() {
        return conf;
    }

    public void setConf(Configuration conf) {
        this.conf = conf;
        if(conf != null){
            try {
                fs= FileSystem.get(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    @Override
    public boolean accept(Path path) {
        String regex = conf.get(ReaderUtils.INPUT_FILTER_REGEX);
        if(regex == null){
            return  true;
        }
        try {
            if(fs.isDirectory(path)){
                return true;
            }else{
                return  path.toString().matches(regex);
            }
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
    }
}
b,构建一个Pair
Schema.Parser parser = new Schema.Parser();
    Schema exSchema = parser.parse(RecordBuilder.class.getResourceAsStream("/ex.avsc"))
    GenericRecord record = new GenericData.Record(exSchema);
    record.put("AppKey",common.getAppKey());
    record.put("IsValidStart",common.getIsValidStart());
    record.put("SerialNumber",common.getSerialNumber());
    record.put("SessionDuration",common.getSessionDuration());
    record.put("SessionId",common.getSessionId());
    record.put("Strategy",common.getStrategy());
    record.put("ClientTime",common.getClientTime());
    record.put("IsSessionStop",common.isSessionStop());
    record.put("DurTime",pageView.getDurTime());
    record.put("Tile",pageView.getTitle());

    Pair exPair = new ImmutablePair<String, GenericRecord>(ex,record);
c,map过程
 AvroKey<String> outputKey = new AvroKey<String>();
     AvroValue<GenericRecord> outputValue = new AvroValue<GenericRecord>();
     outputKey.datum(pair.getKey());
     outputValue.datum(pair.getValue());
     context.write(outputKey,outputValue);
d,reduce过程
	private AvroMultipleOutputs avroMultipleOutputs;
    private AvroKey<GenericRecord> outputKey;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        outputKey =new AvroKey<GenericRecord>();
        avroMultipleOutputs = new AvroMultipleOutputs(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        avroMultipleOutputs.close();
    }

    @Override
    protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException {
       
        for(AvroValue<GenericRecord> value : values){
            outputKey.datum(value.datum());
            avroMultipleOutputs.write("ex",outputKey,NullWritable.get(),"ex"+"/"+"19000101");                        
        }
    }

e,runner
conf.setIfUnset(ReaderUtils.INPUT_FILTER_REGEX, ".*\\.log");
        conf.setBoolean("mapreduce.output.fileoutputformat.compress",true);
        conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC);
        conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);

        Job job = Job.getInstance(conf, this.getClass().getName());
       ........

        job.setInputFormatClass(TextInputFormat.class);

        Schema union = Schema.createUnion(new ArrayList<Schema>(RecordBuilder.schemas.values()));
        AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputValueSchema(job,union);

        for(Map.Entry<String,Schema> entry : RecordBuilder.schemas.entrySet()){
            AvroMultipleOutputs.addNamedOutput(job, entry.getKey(), AvroKeyOutputFormat.class, entry.getValue(), null);
        }

        Path[] pathArr ;
        if(args[0].contains(",")){
            List<Path> listPath = new ArrayList<Path>();
            String [] paths = args[0].split(",");
            for(String path : paths){
                listPath.add(new Path(path));
            }
            pathArr = listPath.toArray(new Path[listPath.size()]);
        }else {
            pathArr = new Path[]{new Path(args[0])};
        }
        Path outputPath = new Path(args[1]);
        outputPath.getFileSystem(conf).delete(outputPath,true);
        FileInputFormat.setInputPaths(job,pathArr);
        FileInputFormat.setInputDirRecursive(job,true);
        FileInputFormat.setInputPathFilter(job,InputFileFilter.class);
        FileOutputFormat.setOutputPath(job,outputPath);
        return job.waitForCompletion(true) ? 0 : 1;
    }

3,将mr生成的文件导入hive
LOAD DATA INPATH 'hdfs:///test/output/ex' OVERWRITE INTO TABLE Ex;









 


相关内容

    暂无相关文章