java调用API操作HDFS,javaapihdfs


java调用API操作HDFS
本文介绍Java调用API从hdfs读取数据
package mongodb;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Arrays;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.util.ReflectionUtils;


class Item implements Comparable
{
	String value;
	double weight;
	public Item(String v)
	{
		value=v;
		weight = Double.parseDouble(value.split(":")[1]);
	}
	public int compareTo(Object o)
	{   
       return this.weight == ((Item) o).weight ? 0 :  
           (this.weight  > ((Item) o).weight ? -1 : 1); 
   } 
}

public class BatchUpdateSim {
	public static String parse(String str)
	{
		//String str="90003	20718001:1.0,2077635:1.0,2053809:1.0";
		String[] fields=str.split("\t");
		String[] valueArray= fields[1].split(",");
		Item[] items = new Item[valueArray.length];
		for(int i=0;i<items.length;i++)
		{
			items[i]=new Item(valueArray[i]);
		}
		Arrays.sort(items);
		for(int i=0;i<valueArray.length;i++)
		{
			valueArray[i] = "{"+items[i].value+"}";
		}
		String valueStr = StringUtils.join(valueArray,",");
		return  "{\"key\":"+fields[0]+",\"values\":["+valueStr+"]}";
				
	}
	public static void main(String[] args) throws IOException, ClassNotFoundException {
		// TODO Auto-generated method stub

		Configuration  conf = new Configuration ();  
	  conf.addResource("/usr/local/hadoop/conf/core-site.xml");
	  conf.addResource("/usr/local/hadoop/conf/hdfs-site.xml");
	   conf.addResource("/usr/local/hadoop/conf/mapred-site.xml");
		//String HDFS="hdfs://webdm-cluster";
		//String HDFS="hdfs://localhost:9000";
	    String HDFS=args[0];
		FileSystem hdfs = FileSystem.get(URI.create(HDFS),conf); 
		String filePath=args[1];
		FSDataInputStream fin = hdfs.open(new Path(filePath));

		//CompressionCodecFactory factory = new CompressionCodecFactory(conf);  
		//CompressionCodec codec = factory.getCodec(new Path(filePath)); //根据hdfs文件的后缀类型自动识别
		//Class<?> codecClass = Class.forName("com.hadoop.compression.lzo.LzoCodec");
		//CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
        BufferedReader reader = null;
		String line;
		int count=0;
		RecsysDb db = RecsysDb.getInstance();
		String itemSimName=args[2];
		try 
		{
			// if (codec == null)
			// {  
			        reader = new BufferedReader(new InputStreamReader(fin)); // in = new BufferedReader(new InputStreamReader(fin, "UTF-8"));				    
			// }
			        /*
			 else 
			 {  
				    System.out.println("识别出压缩类型");
			        CompressionInputStream comInputStream = codec.createInputStream(fin);  
			        reader = new BufferedReader(new InputStreamReader(comInputStream));  
			  } */
			 		
			while ((line = reader.readLine()) != null) 
			{
				//if(count==0) System.out.println(line);
				String strJson=parse(line);
				
				db.updateItemSim(itemSimName, strJson);
				count++;
				if(count%1000==0)System.out.println("count:"+count);
			}	
		} 
		finally 
		{
			if (reader != null) 
			{
				reader.close();
			}
			System.out.println(count);
			
		}
		

	}

}



下面是一个shell脚本,用Java命令执行上面的程序的话,需要加载各种Hadoop相关的jar包。 (PS:后来发现有种更好的方法,就是用Hadoop命令执行,因为用Hadoop命令执行,Hadoop自己会加载一些jar包,无需自己再手动加载)
hd_core="/usr/local/hadoop/hadoop-core-1.1.2.jar";
s4j="/usr/local/hadoop/lib/slf4j-log4j12-1.6.1.jar";
s4japi="/usr/local/hadoop/lib/slf4j-api-1.6.1.jar";
log4j="/usr/local/hadoop/lib/log4j-1.2.17.jar";
guva="/usr/local/hadoop/lib/guava-11.0.2.jar";
clog="/usr/local/hadoop/lib/commons-logging-1.1.1.jar";
cconf="/usr/local/hadoop/lib/commons-configuration-1.6.jar";
cl="/usr/local/hadoop/lib/commons-lang-2.5.jar";
ccli="/usr/local/hadoop/lib/commons-cli-1.2.jar";
protobuf="/usr/local/hadoop/lib/protobuf-java-2.4.0a.jar";
hdfs="/usr/local/hadoop/lib/hadoop-hdfs-1.1.2.jar";
mongodb="/data/home/liulian/linger/jars/mongo-java-driver-2.12.4.jar";
libs=(
$hd_core
$s4j
$s4japi
$log4j
$guva
$clog
$cconf
$cl
$ccli
$protobuf
$hdfs
$mongodb
)
libstr=""
for jarlib in ${libs[@]};
do
    libstr=${jarlib}":"${libstr}
done
echo $libstr;
java -Xbootclasspath/a:${libstr}  -jar ../jars/updateSim.jar hdfs://10.200.91.164:9000 tv_sim/result/000000_0 tvItemsimColl



本文链接:http://blog.csdn.net/lingerlanlan/article/details/42178675 本文作者:linger


相关内容