JStorm,jstormgithub


本文描述将一个简单的HelloWorld,提交到JStorm中运行。

1. 创建Maven工程

在Eclipse中创建Maven工程,默认方式创建即可。修改pom.xml文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
...
	<repositories>
		<repository>
			<id>github-releases</id>
			<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
		</repository>
		<repository>
			<id>clojars.org</id>
			<url>http://clojars.org/repo</url>
		</repository>
		<repository>
			<id>twitter4j</id>
			<url>http://twitter4j.org/maven2</url>
		</repository>
	</repositories>

	<dependencies>
		...
		<dependency>
			<groupId>storm</groupId>
			<artifactId>storm</artifactId>
			<version>0.9.0</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.0.2</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>
由于JStorm的artifactId目前无法连接,所以,用storm的,一样可以编译和运行。

2. 创建HelloWorldBold

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

/**
 * Hello world!
 * 
 */
public class HelloWorldBolt extends BaseRichBolt {
	private static final long serialVersionUID = 1L;
	private int myCount = 0;

	/*
	 * prepare() => on create
	 */
	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
	}

	/*
	 * execute() => most important method in the bolt is execute(Tuple input),
	 * which is called once per tuple received the bolt may emit several tuples
	 * for each tuple received
	 */
	@Override
	public void execute(Tuple tuple) {
		String test = tuple.getStringByField("sentence");
		if (test == "Hello World") {
			myCount++;
			System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount));
		}
	}

	/*
	 * declareOutputFields => This bolt emits nothing hence no body for
	 * declareOutputFields()
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
	}
}

3. 创建HelloWorldSpout

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * Hello world!
 * 
 */
public class HelloWorldSpout extends BaseRichSpout {
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	private int referenceRandom;
	private static final int MAX_RANDOM = 10;

	public HelloWorldSpout() {
		final Random rand = new Random();
		referenceRandom = rand.nextInt(MAX_RANDOM);
	}

	/*
	 * declareOutputFields() => you need to tell the Storm cluster which fields
	 * this Spout emits within the declareOutputFields method.
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("sentence"));
	}

	/*
	 * open() => The first method called in any spout is 'open' TopologyContext
	 * => contains all our topology data SpoutOutputCollector => enables us to
	 * emit the data that will be processed by the bolts conf => created in the
	 * topology definition
	 */
	@Override
	public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext topologyContext,
			SpoutOutputCollector collector) {
		this.collector = collector;
	}

	/*
	 * nextTuple() => Storm cluster will repeatedly call the nextTuple method
	 * which will do all the work of the spout. nextTuple() must release the
	 * control of the thread when there is no work to do so that the other
	 * methods have a chance to be called.
	 */
	@Override
	public void nextTuple() {
		final Random rand = new Random();
		int instanceRandom = rand.nextInt(MAX_RANDOM);
		if (instanceRandom == referenceRandom) {
			collector.emit(new Values("Hello World"));
		} else {
			collector.emit(new Values("Other Random Word"));
		}
	}
}

4. 创建HelloWorldTopology

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;

/**
 * Hello world!
 * 
 */
public class HelloWorldTopology {
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10);
		builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 1).shuffleGrouping("randomHelloWorld");
		Config conf = new Config();
		conf.setDebug(true);
		if (args != null && args.length > 0) {// 如果在JStrom集群中运行
			conf.setNumWorkers(3);
			// JStorm 安装完后,默认的NIMBUS端口配置为7627
			conf.put(Config.NIMBUS_THRIFT_PORT, 7627);
			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
		}
	}
}

5. 编译打包

使用Eclipse的Maven,编译打包,更名为HelloWorld.jar

6. 提交至JStorm

# $JSTORM_HOME/bin/jstorm jar HelloWorld.jar com.test.jstorm.HelloWorldTopology HelloWorld
提交后,再刷新JStorm的管理页面,可看到是否提交成功。


相关内容