Spout的实现步骤,Spout实现步骤


Spout的实现步骤: ·        对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。 ·        在数据得到了字段的说明后,将其转换成tuple。 ·        声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。 Spout的具体编码在Listing Three中显示。 Listing ThreeSpoutopennextTupledelcareOutputFields方法的逻辑。 1.  public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )   2.  {    3.             _collector = collector;   4.           try   5.           {   6.           fileReader  =  new BufferedReader(new FileReader(new File(file))); 7.           } 8.           catch (FileNotFoundException e) 9.           { 10.          System.exit(1);    11.          } 12. }                                                          13.   14. public void nextTuple() 15. {   16.          protected void ListenFile(File file) 17.          { 18.          Utils.sleep(2000);   19.          RandomAccessFile access = null; 20.          String line = null;   21.             try   22.             { 23.                 while ((line = access.readLine()) != null) 24.                 { 25.                     if (line !=null) 26.                     {   27.                          String[] fields=null; 28.                           if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());   29.                           else   30.                           fields = line.split  (tupleInfo.getDelimiter());   31.                           if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields)); 32.                     } 33.                } 34.             } 35.             catch (IOException ex){ } 36.             } 37. }   38.   39. public void declareOutputFields(OutputFieldsDeclarer declarer) 40. {   41.       String[] fieldsArr = new String [tupleInfo.getFieldList().size()]; 42.       for(int i=0; i<tupleInfo.getFieldList().size(); i++) 43.       { 44.               fieldsArr = tupleInfo.getFieldList().get(i).getColumnName(); 45.       } 46. declarer.declare(new Fields(fieldsArr)); 47. }     
declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。
更多精彩内容请关注:http://bbs.superwu.cn 关注超人学院微信二维码:

相关内容