Hadoop处理HDF文件,


1、前言

HDF文件是遥感应用中一种常见的数据格式,由于其高度结构化的特点,笔者曾被如何使用Hadoop处理HDF文件这个问题困扰过相当长的一段时间。于是Google各种解决方案,但都没有找到一种理想的处理办法。也曾参考过HDFGroup官方发的一篇帖子(网址在这里),里面提供了使用Hadoop针对大、中、小HDF文件的处理思路。虽然根据他提供的解决办法,按图索骥,肯定能解决如何使用Hadoop处理HDF文件这个问题,但个人感觉方法偏复杂且需要对HDF的数据格式有较深的理解,实现起来不太容易。于是乎,笔者又继续寻找解决方案,终于发现了一种办法,下面将对该方法进行具体说明。


2、MapReduce主程序

这里主要使用到了netcdf的库进行hdf数据流的反序列化工作(netcdf库的下载地址)。与HDF官方提供的Java库不同,netcdf仅利用Java进行HDF文件的读写操作,且这个库支持多种科学数据,包括HDF4、HDF5等多种格式。而HDF的官方Java库中,底层实际仍是用C进行HDF文件的操作。下面贴出MapReduce的Mapper函数代码:

package example;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import ucar.ma2.ArrayShort;
import ucar.nc2.Dimension;
import ucar.nc2.Group;
import ucar.nc2.NetcdfFile;
import ucar.nc2.Variable;

public class ReadMapper extends
		Mapper<Text, BytesWritable, Text, BytesWritable> {

	public void map(Text key, BytesWritable value, Context context)
			throws IOException, InterruptedException {		
		String fileName = key.toString();
		NetcdfFile file = NetcdfFile.openInMemory("hdf4", value.get());
		Group dataGroup = (file.findGroup("MOD_Grid_monthly_1km_VI")).findGroup("Data_Fields");
		//读取到1_km_monthly_red_reflectance的变量
		Variable redVar = dataGroup.findVariable("1_km_monthly_red_reflectance");
		short[][] data = new short[1200][1200];
		if(dataGroup != null){			
			ArrayShort.D2 dataArray;
			//读取redVar中的影像数据
			dataArray = (ArrayShort.D2) redVar.read();
			List<Dimension> dimList = file.getDimensions();
			//获取影像的y方向像元个数
			Dimension ydim = dimList.get(0);
			//获取影像的x方向像元个数
			Dimension xdim = dimList.get(1);
			//遍历整个影像,读取出像元的值
			for(int i=0;i<xdim.getLength();i++){
				for(int j=0;j<ydim.getLength();j++){
					data[i][j] = dataArray.get(i, j);					
				}				
			}									
		}		
		System.out.print(file.getDetailInfo());
	}
}

注意程序中的NetcdfFile.openInMemory方法,该静态方法支持从byte[]中构造HDF文件,从而实现了HDF文件的反序列化操作。下面贴出主程序的示例代码:

package example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import example.WholeFileInputFormat;


public class ReadMain {
	public boolean runJob(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		// conf.set("mapred.job.tracker", Utils.JOBTRACKER);
		String rootPath= "/opt/hadoop-2.3.0/etc/hadoop";
		//String rootPath="/opt/hadoop-2.3.0/etc/hadoop/";
		conf.addResource(new Path(rootPath+"yarn-site.xml"));
		conf.addResource(new Path(rootPath+"core-site.xml"));
		conf.addResource(new Path(rootPath+"hdfs-site.xml"));
		conf.addResource(new Path(rootPath+"mapred-site.xml"));
		Job job = new Job(conf);

		job.setJobName("Job name:" + args[0]);
		job.setJarByClass(ReadMain.class);

		job.setMapperClass(ReadMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(BytesWritable.class);
		
		job.setInputFormatClass(WholeFileInputFormat.class);
		job.setOutputFormatClass(NullOutputFormat.class);
		FileInputFormat.addInputPath(job, new Path(args[1]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));
		boolean flag = job.waitForCompletion(true);
		return flag;
	}

	public static void main(String[] args) throws ClassNotFoundException,
			IOException, InterruptedException {
		String[] inputPaths = new String[] { "normalizeJob",
				"hdfs://192.168.168.101:9000/user/hduser/hdf/MOD13A3.A2005274.h00v10.005.2008079143041.hdf",
				"hdfs://192.168.168.101:9000/user/hduser/test/" };
		ReadMain test = new ReadMain();
		test.runJob(inputPaths);
	}

}
关于MapReduce主程序有几点值得说明一下:

1、MapReduce数据的输入格式为WholeFileInputFormat.class,即不对数据进行切分。关于该格式,可以参考另外一篇博客:如何通过Java程序提交Yarn的计算任务,这里不再赘述。

2、本人用的是Yarn2.3.0来执行计算任务,如果用老版本的hadoop,如1.2.0,则把以上主程序中的conf.addResource部分的代码删掉即可。

3、以上MapReduce程序中,只用到了Map函数,未设置Reduce函数。

4、以上程序用到的为HDF4格式的数据,按理说,HDF5格式的数据应该也是支持的。


3、HDF数据的格式

由于HDF数据高度结构化,因此在netcdf库的使用中,需要使用类似于"标签"的方式来访问HDF中的具体数据。下面贴出netcdf中读出来的HDF数据的具体格式信息(即使用file.getDetailInfo()函数,打印出来的信息):

注意,ReadMapper函数中出现的类似于“MOD_Grid_monthly_1km_VI”、"Data_Fields"等信息,即根据以下HDF数据的格式信息得到的。

netcdf D:/2005-274/MOD13A3.A2005274.h00v08.005.2008079142757.hdf {
  variables:
    char StructMetadata.0(32000);

    char CoreMetadata.0(40874);

    char ArchiveMetadata.0(6530);


  group: MOD_Grid_monthly_1km_VI {
    variables:
      short _HDFEOS_CRS;
        :Projection = "GCTP_SNSOID";
        :UpperLeftPointMtrs = -2.0015109354E7, 1111950.519667; // double
        :LowerRightMtrs = -1.8903158834333E7, -0.0; // double
        :ProjParams = 6371007.181, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0; // double
        :SphereCode = "-1";


    group: Data_Fields {
      dimensions:
        YDim = 1200;
        XDim = 1200;
      variables:
        short 1_km_monthly_NDVI(YDim=1200, XDim=1200);
          :long_name = "1 km monthly NDVI";
          :units = "NDVI";
          :valid_range = -2000S, 10000S; // short
          :_FillValue = -3000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_EVI(YDim=1200, XDim=1200);
          :long_name = "1 km monthly EVI";
          :units = "EVI";
          :valid_range = -2000S, 10000S; // short
          :_FillValue = -3000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_VI_Quality(YDim=1200, XDim=1200);
          :_Unsigned = "true";
          :long_name = "1 km monthly VI Quality";
          :units = "bit field";
          :valid_range = 0S, -2S; // short
          :_FillValue = -1S; // short
          :Legend = "\n\t Bit Fields Description (Right to Left): \n\t[0-1] : MODLAND_QA [2 bit range]\n\t\t 00: VI produced, good quality \n\t\t 01: VI produced, but check other QA \n\t\t 10: Pixel produced, but most probably cloudy \n\t\t 11: Pixel not produced due to other reasons than clouds \n\t[2-5] : VI usefulness [4 bit range]  \n\t\t 0000: Highest quality  \n\t\t 0001: Lower quality  \n\t\t 0010..1010: Decreasing quality  \n\t\t 1100: Lowest quality  \n\t\t 1101: Quality so low that it is not useful \n\t\t 1110: L1B data faulty \n\t\t 1111: Not useful for any other reason/not processed \n\t[6-7] : Aerosol quantity [2 bit range] \n\t\t 00: Climatology \n\t\t 01: Low \n\t\t 10: Average \n\t\t 11: High (11) \n\t[8] : Adjacent cloud detected; [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[9] : Atmosphere BRDF correction performed [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[10] : Mixed clouds  [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[11-13] : Land/Water Flag [3 bit range]   \n\t\t 000: Shallow ocean \n\t\t 001: Land (Nothing else but land) \n\t\t 010: Ocean coastlines and lake shorelines \n\t\t 011: Shallow inland water \n\t\t 100: Ephemeral water \n\t\t 101: Deep inland water \n\t\t 110: Moderate or continental ocean \n\t\t 111: Deep ocean \n\t[14] : Possible snow/ice [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[15] : Possible shadow [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n";

        short 1_km_monthly_red_reflectance(YDim=1200, XDim=1200);
          :long_name = "1 km monthly red reflectance";
          :units = "reflectance";
          :valid_range = 0S, 10000S; // short
          :_FillValue = -1000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_NIR_reflectance(YDim=1200, XDim=1200);
          :long_name = "1 km monthly NIR reflectance";
          :units = "reflectance";
          :valid_range = 0S, 10000S; // short
          :_FillValue = -1000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_blue_reflectance(YDim=1200, XDim=1200);
          :long_name = "1 km monthly blue reflectance";
          :units = "reflectance";
          :valid_range = 0S, 10000S; // short
          :_FillValue = -1000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_MIR_reflectance(YDim=1200, XDim=1200);
          :long_name = "1 km monthly MIR reflectance";
          :units = "reflectance";
          :valid_range = 0S, 10000S; // short
          :_FillValue = -1000S; // short
          :Legend = "\n\t The MIR band saved in the VI product is MODIS band 7 \n\t\t Bandwidth : 2105-2155 nm \n\t\t Band center: 2130 nm \n";
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_view_zenith_angle(YDim=1200, XDim=1200);
          :long_name = "1 km monthly view zenith angle";
          :units = "degrees";
          :valid_range = -9000S, 9000S; // short
          :_FillValue = -10000S; // short
          :scale_factor = 100.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_sun_zenith_angle(YDim=1200, XDim=1200);
          :long_name = "1 km monthly sun zenith angle";
          :units = "degrees";
          :valid_range = -9000S, 9000S; // short
          :_FillValue = -10000S; // short
          :scale_factor = 100.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_relative_azimuth_angle(YDim=1200, XDim=1200);
          :long_name = "1 km monthly relative azimuth angle";
          :units = "degrees";
          :valid_range = -3600S, 3600S; // short
          :_FillValue = -4000S; // short
          :scale_factor = 10.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        byte 1_km_monthly_pixel_raliability(YDim=1200, XDim=1200);
          :long_name = "1 km monthly pixel raliability";
          :units = "rank";
          :valid_range = 0B, 3B; // byte
          :_FillValue = -1B; // byte
          :Legend = "\n\t Rank Keys: \n\t\t[-1]:  Fill/No Data-Not Processed. \n\t\t [0]:  Good data     - Use with confidence \n\t\t [1]:  Marginal data - Useful, but look at other QA information \n\t\t [2]:  Snow/Ice      - Target covered with snow/ice\n\t\t [3]:  Cloudy        - Target not visible, covered with cloud \n";

    }
  }
  // global attributes:
  :HDFEOSVersion = "HDFEOS_V2.9";
  :_History = "Direct read of HDF4 file through CDM library; HDF-EOS StructMetadata information was read";
  :HDF4_Version = "4.2.1 (NCSA HDF Version 4.2 Release 1-post3, January 27, 2006)";
  :featureType = "GRID";
}

相关内容