Storm初体验


package mapstorm;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class StormMain {

	public static void main(String[] args) throws Exception {

		TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader", new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(), 1).fieldsGrouping("word-normalizer", new Fields("word"));

        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(true);
       //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        
        StormSubmitter.submitTopology("wordCounterTopology", conf, builder.createTopology());
      //  Thread.sleep(1000);
        //StormSubmitter.("wordCounterTopology");
     //   StormSubmitter.shutdown();
        
       
         //Topology run
        //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        //LocalCluster cluster = new LocalCluster();
        //cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        //Thread.sleep(2000);
        //cluster.shutdown();
        //
	}

}

package mapstorm;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounter extends BaseBasicBolt {
	private static final long serialVersionUID = 5678586644899822142L;
	Integer id;
    String name;
    Map<String, Integer> counters;

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		  String str = input.getString(0);
	        System.out.println("WordCounter word "+ str);
	        if(!counters.containsKey(str)){
	            counters.put(str, 1);
	        }else{
	            Integer c = counters.get(str) + 1;
	            counters.put(str, c);
	        }
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {}

	@Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
        System.out.println("finish-----------");
    }
 

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }
}

package mapstorm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer extends BaseBasicBolt {
	
	public void cleanup() {
		System.out.println("finish");
	}

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		 String sentence = input.getString(0);
	        String[] words = sentence.split(" ");
	        System.out.println("WordNormalizer recevie  "+ sentence);
	        for(String word : words){
	            word = word.trim();
	            if(!word.isEmpty()){
	                word = word.toLowerCase();
	                System.out.println("WordNormalizer recevie "+ sentence+"words  "+ word);
	                collector.emit(new Values(word));
	            }
	        }
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
		
	}

}



package mapstorm;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader extends BaseRichSpout {
	
	private SpoutOutputCollector collector;
    private FileReader fileReader;
    private String filePath;
    private boolean completed = false;
   
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}

    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }

	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		 try {
	            this.fileReader = new FileReader(conf.get("wordsFile").toString());
	        } catch (FileNotFoundException e) {
	            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
	        }
	    	this.filePath	= conf.get("wordsFile").toString();
	        this.collector = collector;

	}

	@Override
	public void nextTuple() {
		if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            return;
        }
        String str;
        BufferedReader reader =new BufferedReader(fileReader);
        try{
            while((str = reader.readLine()) != null){
            	System.out.println("WordReader read"+ str);
                this.collector.emit(new Values(str),str);
                System.out.println("WordReader out"+ str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("line"));

	}

}



   

相关内容

    暂无相关文章