hadoop集群算法调用--web平台2.0


1.配置

1.1首页

打开浏览器,输入项目发布的首页,即可看到下面的界面:

1.2 配置

配置功能其实就是让用户配置hadoop集群的,配置界面如下:
输入jobtracker 和namenode的机器名和端口号,点击提交,如果成功,就会给出红色提示信息。那么这样如何去验证的呢? 其实就是根据输入的参数得到一个jobclient对象,如果可以得到那么就说明集群是可用的,否则则说明不可用;具体代码如下:
public static boolean initialJobClient(){
		if(HOST==null||JOBTRACKER_PORT==0||NAMENODE_PORT==0){
			return false;
		}
		log.info("Initial  job client begins...");
		boolean flag=true;
		try {
			InetSocketAddress jobTracker=new InetSocketAddress(HOST,JOBTRACKER_PORT);
			jobClient=new JobClient(jobTracker, getConf());
		} catch (IOException e) {
			flag=false;
			log.info("Job client can't be got\n"+e.getMessage());
		}
		if(flag){
			log.info("Initial  job client done!!!");
		}
		return flag;
	}
但是,如果当集群不可用的时候,就会一直连接,后台一直打印重试连接,前台一直等待,暂时还没有进行相应的处理。

2. hadoop读取删除

hadoop读取删除其实就是hadoop文件的一般操作,上传下载主要是使用FileSystem做的。

2.1 写入

可以按照下面界面输入相应的参数:
点击提交后,会有红色字体提示写入成功,或者可以在云平台进行数据查看(50070界面)。这里写入使用的是FSDataOutputStream的write功能,这个类有好几个write(不同数据类型,这里使用的是string的),不同的write方式,读取需要使用相应的方式。写入的代码如下:
/**
	 * write string to hdfs file
	 * @param path
	 * @param data
	 * @return
	 */
	public static boolean writeToHdfs(String path,String data){
		boolean flag = true;
	    Path filePath=new Path(path);  
	    FileSystem fs;  
	    FSDataOutputStream out=null;  
	    try {  
	        fs = FileSystem.get(filePath.toUri(),getConf());
	        out = fs.create(filePath);  
	        out.writeUTF(data);  
	    } catch(Exception e){  
	        log.info("write to hdfs file"+filePath.toString()+" :"+e.getMessage());
	        flag=false;
	    }finally {  
	        try {
	        	if(out!=null){
	        		out.close();
	        	}
			} catch (IOException e) {
				log.info("close hdfs file "+filePath.toString()+" wrong\n"+e.getMessage());
				flag=false;
			}  
	    } 
		return flag;
	}

2.2 读取

这里读取的还是刚才写入的内容:
输入对应的目录,提交后即可看到内容。需要说明的是这里读取的数据一定要是用writeUTF的方式写入的,不然会是乱码。后面应该会修改这个,改的更加通用点。读取的代码如下:
/**
	 * read hdfs file to String
	 * @param file
	 * @return
	 */
	public static String readHdfs(String path){
		String data="";
		Path filePath= new Path(path);
		FileSystem fs =null;  
        FSDataInputStream in = null;  
        try {  
          fs=FileSystem.get(filePath.toUri(), getConf()); 
          in=fs.open(filePath);
          data= in.readUTF();  
        }catch(Exception e){
        	log.info("read hdfs file "+path+" error:\n"+e.getMessage());
        }finally {  
        	try {
	        	if(in!=null){
					in.close();
	        	}
	        	if(fs!=null){
	        		fs.close();
	        	}
        	} catch (IOException e) {
        		log.info("close FileSystem error:\n"+e.getMessage());
			}
        }  
		return data;
	}

2.3 上传

上传其实就是使用FileSystem的copy函数而已,主要代码如下:
Path in=new Path(localFile);
		Path out=new Path(hdfsFile);
		FileSystem fs=null;
		try {
			fs = FileSystem.get(URI.create(hdfsFile),getConf());
			fs.copyFromLocalFile(in,out);

2.4 下载

下载也是一样,使用的同样是FileSystem的函数,具体如下:
Path in=new Path(hdfsFileName);
Path out=new Path(localFileName);
try {
fs=FileSystem.get(URI.create(hdfsFileName),getConf());
fs.copyToLocalFile(in, out);

3. hadoop 算法

3.1 序列文件读取

hadoop算法本来是想写成MR的形式的,但是写MR又要上传代码到hadoop集群,感觉麻烦,所以没有写成MR的模式,也就是说没有了监控(其实,后面还是要把代码.class文件打包上传到云平台的)。这里的序列文件读取,其实就是使用了Sequence.Reader而已。需要说明的是,这个读取只是针对一般的<key,value>格式,比如LongWritable、IntWritable等等。读取后的文件一个如下所示:

并且,如果没有序列文件的话,可以按照页面的提示,生成一个序列文件。

3.2 序列文件转换为txt

其实,就是多了一个步骤,把3.1的文件写入一个本地文件而已。写入本地文件使用的是PrintStream,具体如下:
/**
	 * 写入字符串到本地文件
	 * @param localPath
	 * @param data
	 */
	public static boolean writeToLocal(String localPath,String data){
		boolean flag=true;
		File file=new File(localPath);
		if(!file.exists()){
			try {
				new File(file.getParent()).mkdirs();
				file.createNewFile();
			} catch (Exception e) {
				log.info("创建文件失败\n"+e.getMessage());
			}
			
		}
		PrintStream ps=null;
		try {
			ps = new PrintStream(new FileOutputStream(file));
			ps.print(data);
		} catch (FileNotFoundException e) {
			log.info("写入本地文件"+localPath+"失败\n"+e.getMessage());
			flag=false;
		}finally{
			ps.close();
		}
		return flag;
	}

4. mahout算法

mahout算法设计的思路其实和前面监控算法的程序是一样的,只是这里在读取云平台已有job列表的时候加了判断,如下:
JobStatus[] jobStatusAll=HadoopUtil.jobClient.getAllJobs();
		JobStatus jobStatus=null;
		int id =0;
		String jobIden="";
		/**
		 * 防止当前云平台是第一次启动,这个时候没有任务列表,获取的jobStatus是空;
		 */
		if(jobStatusAll==null||jobStatusAll.length<=0){
			//修改TaskTracker代码,把集群启动时间写入hdfs,然后在这里读取出来
			id=0;
			jobIden=readJTStartTime();
			
		}else{
			jobStatus=jobStatusAll[jobStatusAll.length-1];
			id=jobStatus.getJobID().getId();
			jobIden=jobStatus.getJobID().getJtIdentifier();
		}
这里,如果第一次启动的时候,文件列表是空的,这时就需要去读取启动时间,这里采取的方式可以参考:《hadoop 启动时间写入文件》。而这里的代码只是读取文件中的启动时间,然后去拼凑jobId而已。 监控的一般形式如下: 但是,也有可能会出现下面的界面:
看上面两个红色方框,第一个因为前一个任务刚启动然后就失败了,导致jobid没有写入到joblist中。第二个是因为页面刷新的时间和任务启动之间的间隔关系,如果job任务刚刚结束之前读取了一个job状态是running,然后隔了一段时间后,另外一个job任务启动了,那么之前的job任务的状态就会是running的状态,而没有更新。这里的逻辑还有点问题,有待更新。
目前mahout算法可以使用的是Canopy、Kmeans、协同过滤算法,模式挖掘还没有写。 另外需要注意的一点是,工程下载后需要把项目的.class文件打包上传到云平台才不会出错。因为在canopy、kmeans算法的时候,lz自己写了一个数据转换的程序,运行这两个算法的时候会首先调用数据转换,所以没有的话就会报错了。 最后说下数据格式吧: canopy和kmeans算法的都是一样的,一般如下:
1,133,8
5,122775,10
9,18297,6
9,50422,8
9,80503,10
9,110624,8
9,147283,10
9,218923,5
18,102606,2
25,58155,2
里面的逗号可以是其他字符,需要在算法参数页面设置。 协同过滤算法也是上面的数据格式,不过逗号不能改,且一定是userID,ItemID,prefValue的格式。

如果您觉得lz的blog或者资源还ok的话,可以选择给lz投一票,多谢。(投票地址:http://vote.blog.csdn.net/blogstaritem/blogstar2013/fansy1990 )



http://blog.csdn.net/fansy1990



相关内容

    暂无相关文章