K天熟悉Apache Storm (三),apachestorm


软件版本:Storm:0.9.3 ,Redis:2.8.19;jedis:2.6.2;

代码及Jedis下载:Storm实时单词计数

Storm应用场景--实时单词计数,有点类似《Getting Started with Storm》中的chapter6的real-life app。

场景描述:

    1. 使用一个java程序每间隔一定时间向Redis数据库A中存入数据;

    2. Storm的Spout读取Redis数据库A中的数据,读取后删除Redis中的数据;

    3. Storm的SplitBolt读取Spout的输出,对其进行解析,并输出;

    4. Storm的CountBolt对SplitBolt的数据进行计数,并每隔一定间隔把数据存储在Redis数据库B中;

    5. 另外的java程序定时读取Redis数据库B中的数据,并打印;

具体实现:

1. Java定时向Redis发送数据

while(true){// 每次发送3个数据
			try {
				Thread.sleep(200);// 每200毫秒产生一次数据
			} catch (InterruptedException e) {
				e.printStackTrace();
			} 
			interval ++; 
			int index = random.nextInt(normal.length);
			if(!jedis.exists("0")){// 如果不存在key说明已经被取走了,就再次产生,否则不产生
				jedis.set("0",normal[index]);
			}
				index = random.nextInt(normal.length);
			if(!jedis.exists("1")){	
				jedis.set("1", normal[index]);
			}
			index = random.nextInt(normal.length);
			if(!jedis.exists("2")){
				jedis.set("2", normal[index]);
			}
			
			if(interval*200/1000==2*60) {// 每间隔200毫秒产生数据后,产生了2分钟,共2*60*1000/200*3 个数据记录
				// 暂停 5分钟
				System.out.println(new java.util.Date()+":数据暂定5分钟产生...");
				try {
					interval=0;
					Thread.sleep(5*60*1000);
					
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(new java.util.Date()+":5分钟暂停完成,继续产生数据...");
			
			}
		}

这里使用一个固定的字符串数组,每次从里面随机抽取三个字符串,使用Jedis存储到Redis的数据库中;

2. Spout读取Redis数据

@Override
	public void nextTuple() {
		long interval =0;
		while(true){// 获取数据
			interval++;
			String zero = getItem("0");
			String one = getItem("1");
			String two =  getItem("2");
			
			try {
				Thread.sleep(200);// 每200毫秒发送一次数据
			} catch (InterruptedException e) {
				e.printStackTrace();
			} 
			if(zero==null||one==null||two==null){
				// do nothing
				// 没有数据
//				if(interval%15==0){
//				}
			}else{
				String tmpStr =zero+","+one+","+two;
				if(thisTaskId==tmpStr.hashCode()%numTasks){ // spout负载均衡
					this.collector.emit(new Values(tmpStr));
				
					if(interval%15==0&&"fast".equals(slow_fast)){
						System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),
								taskId, "Spout:["+zero+","+one+","+two+"]"));
					}else if("slow".equals(slow_fast)){
						System.out.println(RedisUtils.getCurrDateWithInfo(String.valueOf(thisTaskId),
								taskId, "Spout:["+zero+","+one+","+two+"]"));
					}else{
						new RuntimeException("Wrong argument!");
					}
				}
			}
			
		}	
	}
这里使用了负载均衡,Spout处理的数据按task进行分隔。

getItem用于从Redis中获取数据,并删除对应的数据,代码如下:

/**
	 * Redis中获取键值并删除对应的键
	 * @param index
	 */
	private String getItem(String index){
		if(!jedis.exists(index)){
			return null;
		}
		String val = jedis.get(index);
//		if(val==null||"null".equals("null")){
//			return ;
//		}
		
		jedis.del(index);
		return val;
	}
3. SplitBolt就是一般的单词分隔代码:

public void execute(Tuple input, BasicOutputCollector collector) {
        interval++;
		String sentence = input.getString(0);
		if(interval%15==0&&"fast".equals(slow_fast)){
//        System.out.println(new java.util.Date()+":ConponentId:"+conponentId+",taskID:"+taskId+
//        		"splitBolt:"+sentence);
			System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "splitBolt:"+sentence));
		}else if("slow".equals(slow_fast)){
			System.out.println(RedisUtils.getCurrDateWithInfo(conponentId, taskId, "splitBolt:"+sentence));
		}
        String[] words = sentence.split(",");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
	}

4. CountBolt进行单词计数,并向Redis数据库中存储单词的计数

public void execute(Tuple input, BasicOutputCollector collector) {
		interval++;
		String str = input.getString(0);
		/**
		 * If the word dosn't exist in the map we will create
		 * this, if not We will add 1 
		 */
		if(!counters.containsKey(str)){
			counters.put(str, 1);
		}else{
			Integer c = counters.get(str) + 1;
			counters.put(str, c);
		}
		
		// 每records条数据则向向数据库中更新
		if(interval%records==0){
			for(Map.Entry<String , Integer> m :counters.entrySet()){	
				jedis.set(m.getKey(), String.valueOf(m.getValue()));// 
				
			}
		}
}
5. Java程序定时读取Redis中单词计数,并打印

private void read() {
		System.out.println("数据获取开始。。。,10s后打印。。。");
		long interval =0;
		while(true){// 获取数据
			interval++;
			Set<String> keys = jedis.keys("*");
			for(String key:keys){
				push2Map(key);
			}
//			push2Map("one");
			try {
				Thread.sleep(200);// 每200毫秒获取一次数据
			} catch (InterruptedException e) {
				e.printStackTrace();
			} 
			if(interval*200/1000==10) {// 每10秒打印一次
				interval=0;
				printMap();
			}
		}		
	}

Storm作为实时大数据处理框架,从这个小例子中就可以感受一二。


ps:相关调用接口:

System.out.println("\nwc.redis.WCTopology <storeFrequent> <num_works>" +
					" <parallel_spout> <parallel_split_bolt> <parallel_count_bolt> <slow|fast>"+
					" <printWC>");
打包使用storm jar命令运行的时候,其中的参数解释如下:

storeFrequent : CountBolt每多少条记录往Redis数据库中存储一次数据;

num_works :  worker的数量;

parallel_spout :Spout并行的数量;

parallel_split_bolt :SplitBolt并行的数量;

parallel_count_bolt :CountBolt并行的数量;

slow|fast :在Spout/SplitBolt/CountBolt中日志打印的频率;

printWC:在CountBolt中的日志是否打印(true|false);


分享,成长,快乐

脚踏实地,专注

转载请注明blog地址:http://blog.csdn.net/fansy1990


相关内容