数学之路-分布式计算-disco(4),数学之路计算-disco


第一个参数iter是一个迭代器,涉及被map函数产生的键和值,它们是reduce实例。

在本例中,单词随机被委托给不同的reduce实例,然后,要单词相同,处理它的reduce也相同,可确保最终合计是正确的。

第二个参数params与map函数中一致,在此,仅简单使用disco.util.kvgroup()来提取每个单词统计计数,累计计数,yield(产生)结果。

运行作业

下面开始运行作业,可使用大量参数定制作业,但通常来说,对于简单的任务来说 ,仅使用其中3个即可。除了启动作业之外,我们还需要输出结果,首先,我们在作业完成前要等待,通过调用wait等待调用完毕,完成后会返回结果,为方便起见,通过job对象调用wait及其它相关方法。

result_iterator()函数取结果文件地址列表,它被wait()函数返回,iterates(迭代)遍历所有结果中的键值对。

 

defmap(line, params):

    for word in line.split():

        yield word, 1

 

defreduce(iter, params):

    from disco.util import kvgroup

    for word, counts in kvgroup(sorted(iter)):

        yield word, sum(counts)

 

if__name__ == '__main__':

    job =Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],

                    map=map,

                    reduce=reduce)

    for word, count inresult_iterator(job.wait(show=True)):

        print(word, count)

 

本博客所有内容是原创,如果转载请注明来源

http://blog.csdn.net/myhaspl/


如果一切妥当,可看到作业执行,输入从tagdata:bigtxt中读入,这个是开始时刻创建的最终打印输出,在job运行时,可打开(或运行disco master的端口),查看作业的实时进程。

python count_words.py

也可在控制台上查看作业进程,如下: 

DISCO_EVENTS=1 python count_words.py

正如您所看到的,创建一个新的迪斯科的工作是相当简单的。你可以在任意数量的方面扩展这个简单的例子。例如,通过使用params对象包括停用词的列表。

如果你把迪斯科分布式文件系统的数据,你可以试试改变输到tag://data:bigtxt,以及加上map_reader =disco.worker.task_io.chain_reader。

你可以试着用sum_combiner(),使job更有效率。

你也可以尝试自定义功能分区和读取函数,用与map和reduce函数一样的方式编写,然后,你可以试着链接job在一起,以便之前的job输出成为下一个的输入。

disco是设计得尽可能的简单,这样你就可以专注于你自己的问题,而不是框架。

相关内容