scala在spark上操作hdfs,scalasparkhdfs


scala在spark上操作hdfs


package tester


import java.io.BufferedInputStream
import java.io.File
import java.io.FileInputStream
import java.io.InputStream

import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Path._


/*
 * 需求:用scala操作hdfs,但是又不想用RDD。
 * http://bleibinha.us/blog/2013/09/accessing-the-hadoop-distributed-filesystem-hdfs-with-scala
 * http://www.linuxboy.net/Linux/2014-04/100545.htm HDFS——如何列出目录下的所有文件
 */
object Hdfs {
    def ls(fileSystem:FileSystem,path:String)=
    {
      println("list path:"+path)
      val fs = fileSystem.listStatus(new Path(path))
      val listPath = FileUtil.stat2Paths(fs)
      for( p <- listPath)
      {
        println(p)
      }
      println("----------------------------------------")
    }
    def main(args: Array[String]) {
      val conf = new Configuration()
      //val hdfsCoreSitePath = new Path("core-site.xml")
     // val hdfsHDFSSitePath = new Path("hdfs-site.xml")     
      //conf.addResource(hdfsCoreSitePath)
      //conf.addResource(hdfsHDFSSitePath)
      println(conf)//Configuration: core-default.xml, core-site.xml
      //根据这个输出,在这个程序进来之前,conf已经被设置过了
      
      //目前我知道,定位具体的hdfs的位置,有两种方式
      //一种是在conf配置,一个域名可以绑定多个ip.我们通过这个域名来定位hdfs.
      //另一种是在调用FileSystem.get时指定一个域名或者一个ip,当然仅限一个.
      
      val fileSystem = FileSystem.get(conf)
      //如果conf设置了hdfs的host和port,此处可以不写
      //hadoop的配置都是一层一层的,后面的会覆盖前面的.
      
      //String HDFS="hdfs://localhost:9000";
      //FileSystem hdfs = FileSystem.get(URI.create(HDFS),conf);
      //这种写法 只能用一个ip或者域名了.不推荐.
      
      ls(fileSystem,"/")
      ls(fileSystem,".")     
      ls(fileSystem,"svd")
      
    }
}

 

利用spark提交任务

spark-submit --class"tester.Hdfs" Hdfs.jar

 

 

Spark assembly has been built with Hive,including Datanucleus jars on classpath

Configuration: core-default.xml,core-site.xml

15/01/30 09:52:36 WARNutil.NativeCodeLoader: Unable to load native-hadoop library for yourplatform... using builtin-java classes where applicable

list path:/

hdfs://myhadoop-cluster/sparkTest

hdfs://myhadoop-cluster/user

----------------------------------------

list path:.

hdfs://myhadoop-cluster/user/liulian/.Trash

hdfs://myhadoop-cluster/user/liulian/svd

----------------------------------------

list path:svd

hdfs://myhadoop-cluster/user/liulian/svd/movie_data_set.dat

hdfs://myhadoop-cluster/user/liulian/svd/movie_data_set.random

hdfs://myhadoop-cluster/user/liulian/svd/movie_data_set.svd_input

hdfs://myhadoop-cluster/user/liulian/svd/movie_data_set.svd_input2

hdfs://myhadoop-cluster/user/liulian/svd/movie_data_set.svd_input2.5

hdfs://myhadoop-cluster/user/liulian/svd/movie_data_set.svd_input3

hdfs://myhadoop-cluster/user/liulian/svd/svd_out2.5

 

 

 

根据输出可以看到,

spark自动加载了hadoop的配置文件core-default.xml,core-site.xml

 

可以到hadoop的根目录下搜索这两个文件

find -name "core*.xml"

 

在core-default.xml可以看到文件系统是本地文件系统

<property>

 <name>fs.defaultFS</name>

 <value>file:///</value>

 <description>The name of the default file system.  A URI whose

 scheme and authority determine the FileSystem implementation.  The

 uri's scheme determines the config property (fs.SCHEME.impl) naming

  theFileSystem implementation class.  Theuri's authority is used to

 determine the host, port, etc. for a filesystem.</description>

</property>

 

<property>

 <name>fs.default.name</name>

 <value>file:///</value>

 <description>Deprecated. Use (fs.defaultFS) property

 instead</description>

</property>

 

在core-site.xml可以看到文件系统是hdfs

 <property>

   <name>fs.defaultFS</name>

   <value>hdfs://myhadoop-cluster</value>

 </property>

 

 

显然core-site.xml的配置会覆盖掉core-default.xml的配置。

 

令我不解的是,

这里只配置了hdfs的域名,而没有配置hdfs的ip。(也没有改动系统的hosts文件)

她是如何找到具体的ip的呢?

 

我看了hdfs-site.xml文件,里面可以看到hdfs的域名绑定了两个ip(通过dfs.nameservices,dfs.ha.namenodes,dfs.namenode等属性来配置)。

但是问题又来了,程序又没有加载hdfs-site.xml文件。

 

真是不懂hadoop的配置文件加载机制啊。

 

 

此前我用java来操作hdfs,如果用域名总是出现unknownhost的异常。

明显是根据域名找不到具体ip。

网上有人说改动系统的hosts文件。当然这是一种方法,但是不太好,因为hdfs的域名其实可以跟多个ip绑定的,改hosts文件的方法一个域名只能对应一个ip。

其实当时我压根连hdfs的ip是啥都不知道,问了很多人他们也说错了,因为他们压根没有用过真正的集群。后来我用linux命令netstat –apn查看端口连接,发现有一个ip是用9000端口跟当前服务器(其实是hadoop的client,用于提交job)连接的,然后将这个ip和9000当作hdfs的ip和port,一试,果然可以。

 

 

 

 

无论如何,现在总算可以成功调用hdfs了。

而且得出了一些有用的结论:

1 在core-default.xml和core-site.xml可以配置文件系统

2 在hdfs-site.xml可以绑定hdfs的域名和ip(可以是多个ip)

3 操作hdfs时,方法1在core-site.xml指定域名,在hdfs-site.xml绑定域名和ip(可以是多个),然后加载到conf中。方法2是在调用FileSystem.get时指定一个域名或者一个ip。

4 用spark-submit或者hadoop执行job时,会自动载入一些hadoop等jar包,而直接用java执行job时,则不会自动,如果用到必须手动指定。

 

 

本文链接:http://blog.csdn.net/lingerlanlan/article/details/43314651

本文作者:linger

 

 

 


相关内容