说说hadoop,hadoop


Hadoop目前我是只看过猪跑,没有吃过猪肉的状态。我在微信和微博上零星地看过一些文章,在udacity上的”Intro to Hadoop and MapReduce”课程上看过几段视频。本想下载安装好Hadoop环境的虚拟机试验一下的,结果虚拟机太大,总是出现网络错误不能下载。目前又没有一定要使用Hadoop的项目,也就没有太多动力继续钻研了。不掌握细节可以宏观把握嘛。本着一门技术要“想透彻、写清楚、讲明白”的步骤,还是写点什么,看自己理解了多少,也为以后提个醒。

Hadoop的核心(Hadoop core)包含MapReduce和HDFS(Hadoop Distributed File System)。围绕Hadoop的核心,又有很多其他工具,如PIG, HIVE, IMPALA, scoop, Flume, HBase, HUE, dozie, mahout等,这些工具是为了方便使用Hadoop的核心的,他们一起构成了Hadoop的生态系统(Ecosystem)。例如HIVE提供了类似SQL的HiveQL,用来生成MapReduce算法。(CDH, Cloudera Distribution Including Apache Hadoop包含整个Hadoop生态系统涉及的工具。)本文只想涉及Hadoop的核心。

首先明白MapReduce的思想从哪里来。我们解决一个大的问题,通常都希望把他分解成小的部分,分而治之,然后汇总。MapReduce就是这样一种分而治之的模型。比如归并排序(merge sort)算法,就利用了分治法。先把大的序列分成两个小的序列,然后对小的序列进行排序,最后把排好序的子序列合并,构成有序的大的序列。map和reduce是通常函数式编程语言里面的两种操作。以下面的Clojure程序为例。(Clojure语言是Lisp语言的在Java虚拟机上的一种方言,也包含CLR和JavaScript版本,是函数式编程语言。此程序在tryclj试验运行通过)。

> (def bigdata [1,2,3,4,5])

#'sandbox6657/bigdata

> bigdata

[1 2 3 4 5]

> (def mapped-data (map (partial * 2) bigdata))

#'sandbox6657/mapped-data

> mapped-data

(2 4 6 8 10)

> (reduce + mapped-data)

30
>             

在这段程序中,首先有一堆数据bigdata,他们是1 2 3 4 5。对他进行map操作,每个元素乘以2,中间结果是mapped-data,他们是2 4 6 8 10。最后对中间结果进行reduce操作,把他们加起来,得到最终结果30。这只是个简单的例子,但我们可以脑补一下现实中处理数据可能遇到的情况。首先数据量可能很大,这个bigdata可能有tera或peta量级。第二我们这里只是对数据进行加倍操作,现实中可能这个操作很耗时,比如对一组数据进行线性拟合;现实中也可能对数据处理的响应速度要求很高,比如收到从多个传感器传来的数据很快做出响应。第三我们这里只是处理简单的整数,现实中处理的数据可能是数据库、日志、图片或音视频文件等结构化、半结构化(semi-structured)、非结构化数据。这里就引入了大数据的三个特点,大、快、多 —— 数据量大,处理速度要求快,数据类型多样化。(英文的说法是大数据的3个V,Volume, Velocity, Variety)。Hadoop就是来解决大数据的大、快、多的。

HDFS解决了大数据的存储问题。他提供了一种分布式的文件系统,用来把大文件存在不同的节点,不同的节点构成集群。节点的类型有数据节点(Data Node)和名称节点(Name Node)。(名称节点是否叫索引节点更准确?)一份大的文件,放到HDFS中,会被分成小份(Chunk),缺省是64M一份。这些小份会分配到不同的数据结点中,而且有冗余。缺省每一小份会保存在3个数据结点中,这样即使有些数据结点不工作了,也能保证数据是安全的,计算可以继续。名称节点用来记录文件在不同数据结点的分配情况的,可以看出他比较重要,是单点失败(single point of failure)的位置,所以通常名称节点要用高可靠性的机器,采用NFS备份,也可以考虑让名称节点工作在主备(Active Standby)模式。HDFS不光是为了存大数据,更重要的是这些数据需要经常用来计算。如果只是存大数据的话,磁盘阵列(SAN, Storage Area Network)也可以做到。

MapReduce解决了大数据的运算问题。MapReduce算法分Map部分和Reduce部分,Map部分来分解问题,Reduce部分来整合问题。Hadoop缺省有一个Reducer服务,有多个Mapper服务。如果配置有多个Reducer服务,则需要额外的步骤来汇总结果。Hadoop的运算节点类型有JobTracker和TaskTracker。JobTracker来给TaskTracker分配任务,有多个TaskTracker,有一个JobTracker。TaskTracker和数据节点可以是一个节点。JobTracker在给TaskTracker分配任务的时候,会优先分配同一节点上的TaskTracker处理本节点上的数据或离得近的节点上的数据。

Map算法通常处理每一条数据然后把中间结果输出为键(Key)和值(Value)的形式。在所有这些中间结果到达Reduce算法之前,进行了汇总和排序(Shuffle and Sort)。Reducer对这些键值对进行处理,也把结果输出为键值对的形式,得到最终结果,所以最终结果可以认为是个哈希表。(mapping -> intermediate records->shuffle and sort -> reducers -> result)

Hadoop是Java语言写的,所以通常Map和Reduce算法需要用Java语言来写。但是Hadoop也提供了Streaming技术。采用Streaming,Map和Reduce算法程序的输入来自stdin,输出到stdout,这样就可以使用任何语言来写Map和Reduce算法。缺省Key和Value之间用tab键分隔,每条数据用换行符分隔。

下面举一个例子,增加一些感性认识。下面是采用Python语言写的Map算法,此算法的输入是包含销售数据的文本文件,文本文件每一行包含tab键分隔的6个字段,分别是date, time, store, item, cost, payment。我们这里关心每个store销售的总额,所以此算法的key是store,value是cost。

def mapper():
    for line in sys.stdin:
        data = line.strip().split("\t")
        if len(data) != 6: continue #defensive coding 
        date, time, store, item, cost, payment = data
        print "{0}\t{1}".format(store,cost)

下面是采用Python语言写的Reduce算法。此算法的输入是上面map算法输出的中间结果,即tab键分隔的store和cost。reducer得到这些中间结果时,是排过序的(因为有shuffle and sort步骤),所以他可以通过key值的变化知道某个store的所有数据处理完了。

def reducer():
    salesTotal = 0
    oldKey = None
    for line in sys.stdin:
        data = line.strip().split("\t")
        if len(data) != 2:
            continue
        thisKey, thisSale = data
        if oldKey and oldKey != thisKey:
            print "{0}\t{1}".format(oldKey,salesTotal)
            salesTotal = 0
        oldKey = thisKey
        salesTotal += float(thisSale)

    if oldKey != None:  
        print(print "{0}\t{1}".format(oldKey,salesTotal))  

执行这个MapReduce任务的命令大概如下:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
 -mapper mapper.py \
 -reducer reducer.py \
 -file mapper.py \
 -file reducer.py \
 -input myinput \
 -output joboutput

通常我们也不是写完了MapReduce程序就交给Hadoop跑,而是先测验一下确保功能正确。前面我们知道了MapReduce算法运行的顺序(mapping -> intermediate records->shuffle and sort -> reducers -> result),根据这个顺序测试MapReduce算法程序就容易了。比如测试上面我们的例子程序,可以采用下面的命令:

cat input.txt | mapper.py | sort | reducer.py 

由此我们也可以得知写MapReduce算法不一定需要有Hadoop的环境哦。

下面附一些Hadoop HDFS的命令,大多和linux系统的命令有对应。

hadoop fs -ls
hadoop fs -put
hadoop fs -tail 
hadoop fs -mv  
hadoop fs -rm 
hadoop fs -mkdir 
hadoop fs -get 

好了,关于Hadoop我听说了,我了解了,然后我记下来了上面的内容。因为没有机会用,也许很快就忘了。最近在看俞敏洪的《在绝望中寻找希望》(本书可以了解俞敏洪的一些励志故事,当然是心灵鸡汤,但偶尔也需要补补的嘛),里面说到,一个谈过恋爱的人总比没谈过的自信点。由此推之,学习任何技术,即使没机会用到,也还是有那么一点点价值的。

相关内容