基于Hive元数据存储和Avro序列化的MapReduce日志解析
基于Hive元数据存储和Avro序列化的MapReduce日志解析
一,相关名词解释:
Avro:apache提供的序列化框架
序列化:在HDFS中,网络带宽成为一种稀缺资源,为了实现数据的高效传输,就需要将对象压缩成二进制流
Avro VS others:
Avro提供着与诸如Thrift和Protocol Buffers等系统相似的功能,但是在一些基础方面还是有区别的,主要是:
1 动态类型:Avro并不需要生成代码,模式和数据存放在一起,而模式使得整个数据的处理过程并不生成代码、静态数据类型等等。这方便了数据处 理系统和语言的构造。 2 未标记的数据:由于读取数据的时候模式是已知的,那么需要和数据一起编码的类型信息就很少了,这样序列化的规模也就小了。 3 不需要用户指定字段号:即使模式改变,处理数据时新旧模式都是已知的,所以通过使用字段名称可以解决差异问题。 二,实现步骤 1,定义一个avro文件,例如记录log中异常信息的ex.avsc{ "type": "record", "name": "Ex", "namespace": "com.tianrandai.test", "fields": [ {"name":"AppKey","type":"string"}, {"name":"SessionId","type":"string"}, {"name":"IsValidStart","type":"string"}, {"name":"SerialNumber","type":"int"}, {"name":"Strategy","type":"string"}, {"name":"IsSessionStop","type":"boolean"}, {"name":"SessionDuration","type":"int"}, {"name":"ClientTime","type":"string"}, {"name":"isCaught","type":"boolean"}, {"name":"ExceptionName","type":"string"}, {"name":"CustomExceptionName","type":"string"}, {"name":"Stack","type":"string"} ] }2,建立exception对应的元数据库,因为压缩过程采用了avro序列化框架,我们使用hive时,采用avro 下面演示如何在hive中创建一张动态加载schema的外部元数据表 a,将预先编写好的avsc文件上传到hdfs
hadoop fs -put ex.avsc /testb,使用hive语句创建表
CREATE EXTERNAL TABLE Ex ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION 'hdfs:///test/ex' TBLPROPERTIES ( 'avro.schema.url'='hdfs:///test/ex.avsc' );
3,日志分析 a,自定义输入文件过滤器
public class InputFileFilter extends Configured implements PathFilter { private Configuration conf = null; private FileSystem fs; public Configuration getConf() { return conf; } public void setConf(Configuration conf) { this.conf = conf; if(conf != null){ try { fs= FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } } } @Override public boolean accept(Path path) { String regex = conf.get(ReaderUtils.INPUT_FILTER_REGEX); if(regex == null){ return true; } try { if(fs.isDirectory(path)){ return true; }else{ return path.toString().matches(regex); } } catch (IOException e) { e.printStackTrace(); return false; } } }b,构建一个Pair
Schema.Parser parser = new Schema.Parser(); Schema exSchema = parser.parse(RecordBuilder.class.getResourceAsStream("/ex.avsc")) GenericRecord record = new GenericData.Record(exSchema); record.put("AppKey",common.getAppKey()); record.put("IsValidStart",common.getIsValidStart()); record.put("SerialNumber",common.getSerialNumber()); record.put("SessionDuration",common.getSessionDuration()); record.put("SessionId",common.getSessionId()); record.put("Strategy",common.getStrategy()); record.put("ClientTime",common.getClientTime()); record.put("IsSessionStop",common.isSessionStop()); record.put("DurTime",pageView.getDurTime()); record.put("Tile",pageView.getTitle()); Pair exPair = new ImmutablePair<String, GenericRecord>(ex,record);c,map过程
AvroKey<String> outputKey = new AvroKey<String>(); AvroValue<GenericRecord> outputValue = new AvroValue<GenericRecord>(); outputKey.datum(pair.getKey()); outputValue.datum(pair.getValue()); context.write(outputKey,outputValue);d,reduce过程
private AvroMultipleOutputs avroMultipleOutputs; private AvroKey<GenericRecord> outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException { outputKey =new AvroKey<GenericRecord>(); avroMultipleOutputs = new AvroMultipleOutputs(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { avroMultipleOutputs.close(); } @Override protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException { for(AvroValue<GenericRecord> value : values){ outputKey.datum(value.datum()); avroMultipleOutputs.write("ex",outputKey,NullWritable.get(),"ex"+"/"+"19000101"); } }
e,runner
conf.setIfUnset(ReaderUtils.INPUT_FILTER_REGEX, ".*\\.log"); conf.setBoolean("mapreduce.output.fileoutputformat.compress",true); conf.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); Job job = Job.getInstance(conf, this.getClass().getName()); ........ job.setInputFormatClass(TextInputFormat.class); Schema union = Schema.createUnion(new ArrayList<Schema>(RecordBuilder.schemas.values())); AvroJob.setMapOutputKeySchema(job,Schema.create(Schema.Type.STRING)); AvroJob.setMapOutputValueSchema(job,union); for(Map.Entry<String,Schema> entry : RecordBuilder.schemas.entrySet()){ AvroMultipleOutputs.addNamedOutput(job, entry.getKey(), AvroKeyOutputFormat.class, entry.getValue(), null); } Path[] pathArr ; if(args[0].contains(",")){ List<Path> listPath = new ArrayList<Path>(); String [] paths = args[0].split(","); for(String path : paths){ listPath.add(new Path(path)); } pathArr = listPath.toArray(new Path[listPath.size()]); }else { pathArr = new Path[]{new Path(args[0])}; } Path outputPath = new Path(args[1]); outputPath.getFileSystem(conf).delete(outputPath,true); FileInputFormat.setInputPaths(job,pathArr); FileInputFormat.setInputDirRecursive(job,true); FileInputFormat.setInputPathFilter(job,InputFileFilter.class); FileOutputFormat.setOutputPath(job,outputPath); return job.waitForCompletion(true) ? 0 : 1; }
3,将mr生成的文件导入hive
LOAD DATA INPATH 'hdfs:///test/output/ex' OVERWRITE INTO TABLE Ex;
评论暂时关闭