在storm中使用非Java语言



storm由Java实现,但通过multilang protocl(多语言协议),能够使用php,python,ruby或者javascript来写spout和bolt。 多语言协议是storm中实现的一种特殊协议,它使用标准输入和标准输出作为与执行spout和bolt任务的进程之间通信的信道。消息以json格式或者普通的文本行通过信道传输。

多语言协议的实现细节


该协议依赖于作为进程之间通信信道的标准输入和标准输出。一个脚本想要生效需要采取下列步骤

  • 初始化握手
  • 开始循环
  • 读、写tuple

初始化握手


要控制进程(启动或者停止),storm需要知道的正在执行脚本的进程PID。根据多语言协议,当处理过程开始的第一件事情就是storm会发射一个带有配置,拓扑上下文和PID目录的Json对象到标准输入。它看上去跟下面的代码块差不多:

{
    "conf": {
        "topology.message.timeout.secs": 3,
},
"context": {
    "task->component": {
        "1": "example-spout",
        "2": "__acker",
        "3": "example-bolt"
    },
    "taskid": 3
},
"pidDir": "..."
}

进程必须在pidDir指示的目录创建一个空文件,文件名为进程ID,然后将PID作为JSON对象写到标准输出。

{"pid" : 1234}

例如,如果接收到/tmp/example\n,脚本的执行PID为123,那么创建空文件/tmp/example/123,并打印行{"pid":123}n和end\n到标准输出。storm采用这种方式来跟踪PID,以及在关闭的时候杀死进程。

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

这里已经实现了函数read_msg,用于从标准输入读取消息。多语言协议中消息是json格式的单行或多行文本。当storm发送单行内容为end\n消息时说明消息结束。

function read_msg() {
    $msg = "";
    while(true) {
        $l = fgets(STDIN);
        $line = substr($l,0,-1);
        if($line=="end") {
            break;
        }
        $msg = "$msg$line\n";
    }
    return substr($msg, 0, -1);
}
function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}
function write_line($line) {
    echo("$line\n");
}

使用flush()是非常关键的,有可能由于指定字符数量未满足导致缓冲区的内容不会flush。这意味脚本会挂起等待storm输入,由于storm同样在等待脚本输出,所以脚本是不会接收到输入的。所以一定要保证当脚本输出内容后立即flush。

开启循环与消息读写


这是最关键的一步,因为所有的工作都在这里完成。这一步的实现取决于要实现spout还是bolt。在spout的情形中,应该发送消息。在bolt的情形中,循环,读取消息,处理它们,然后发射,确认或者失败。
发送数字的spout的实现如下。

$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
    $msg = read_msg();
    $cmd = json_decode($msg, true);
    if ($cmd['command']=='next') {
        if ($from<$to) {
            storm_emit(array("$from"));
            $task_ids = read_msg();
            $from++;
        } else {
            sleep(1);
        }
    }
    storm_sync();
}

从命令行参数中获得from和to,开始循环。每当从storm取得消息next,就说明可以发射新tuple。一旦所有的tuple发送完成,没有更多的tuple发送,挂起。

要保证脚本准备好下一个tuple,storm在发射下一个之前会等待文本行sync\n。要读取命令,直接调用read_msg()来解码。在bolts中有些小区别。

while(true) {
    $msg = read_msg();
    $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
    if (!empty($tuple["id"])) {
        if (isPrime($tuple["tuple"][0])) {
            storm_emit(array($tuple["tuple"][0]));
        }
        storm_ack($tuple["id"]);
    }
}

循环,从标准输入读取消息。一旦接收到消息,json解析。如果是一个tuple,对它做处理,即判断是否是素数。在任何情形中,都要消息确认。
在函数json_decode中使用JSON_BIGINT_AS_STRING来规避Java和PHP之间的格式问题。Java中发送的大数,在PHP中获取时会丢失精度,而这可能会导致问题。要解决这个问题,告诉PHP将大数当作字符串处理,在json消息中打印时不使用双引号。PHP 5.4.0 或更高版本必须使用这个参数。

相关资料

https://github.com/nathanmarz/storm/wiki/Using-non-JVM-languages-with-Storm

https://github.com/lazyshot/storm-php

https://github.com/nathanmarz/storm-starter

《Getting Started with Storm》


相关内容