Python 写的Hadoop小程序


该程序是在python2.3上完成的,python版本间有差异。

Mapper:

import sys

line_number = 0
tab_number = 0
pv_number = 0
clk_number = 0
if_compressed_tested = 0
if_compressed = 0

#functions:
def compressed_stat(line):
    global line_number
    global tab_number
    global pv_number
    global clk_number
    try:
        line_number += 1
        line_split_list = line.split("\t")
        line_split_list_size = len(line_split_list)
        tab_number += (line_split_list_size - 1)
        index = 1
        while index < line_split_list_size:
            pv_clk_list = line_split_list[index].strip().split(" ")
            pv_number += int(pv_clk_list[0])
            clk_number += int(pv_clk_list[1])
            index += 1
    except ValueError:
        print line,"\tERROR"


def before_compress_stat(line):
    global line_number
    global pv_number
    global clk_number
    try:
        line_number += 1
        line = line.strip()
        line_split_list = line.split(" ")
        pv_number += int(line_split_list[0])
        clk_number += int(line_split_list[1])
    except ValueError:
        print line,"\tERROR"
#end functions  
  

for line in sys.stdin:
    try:
        line = line.strip()
        if if_compressed_tested == 0:
            if_compressed_tested = 1
            if line.find("\t") > 0:
                if_compressed = 1
        if if_compressed == 0:
            before_compress_stat(line)
        else:
            compressed_stat(line)
    except ValueError:
        pass
if if_compressed == 1:
    print ("%ld %ld %ld %ld"%(line_number, tab_number, pv_number,clk_number))
else:

    print ("%ld %ld %ld"%(line_number,pv_number,clk_number))

Reducer:
import sys


line_number = 0
tab_number = 0
pv_number = 0
clk_number = 0
if_compressed_tested = 0
if_compressed = 0

def compressed_stat(line):
    global line_number
    global tab_number
    global pv_number
    global clk_number
    pv_clk_list = line.split(" ")
    if len(pv_clk_list) != 4:
        print line,"\tERROR"
    else:
        line_number += int(pv_clk_list[0])
        tab_number += int(pv_clk_list[1])
        pv_number += int(pv_clk_list[2])
        clk_number += int(pv_clk_list[3])

def before_compress_stat(line):
    global line_number
    global pv_number
    global clk_number
    pv_clk_list = line.split(" ")
    if len(pv_clk_list) != 3:
        print line,"\tERROR"
    else:
        line_number += int(pv_clk_list[0])
        pv_number += int(pv_clk_list[1])
        clk_number += int(pv_clk_list[2])
#

for line in sys.stdin:
    try:
        line = line.strip()
        if line.count("ERROR") > 0:
            print line
            continue
       
        if if_compressed_tested == 0:
            if_compressed_tested = 1
            if len(line.split(" ")) == 4:
                if_compressed = 1
            elif len(line.split(" ")) == 3:
                if_compressed = 0
            else:
                print line,"\tERROR"
                continue
           
        if if_compressed == 0:
            before_compress_stat(line)
        else:
            compressed_stat(line)
    except ValueError:
        print line, "\tERROR"
        pass
       
if if_compressed == 0:
    print "LINE_NUMBER:",line_number,"TOTAL_PV_NUMBER:",pv_number, "TOTAL_CLK_NUMBER:",clk_number
else:
    print "LINE_NUMBER:",line_number,"TAB_NUMBER",tab_number,"TOTAL_PV_NUMBER:",pv_number, "TOTAL_CLK_NUMBER:",clk_number

相关内容