java - WordCount with Guaranteeing-message-processing -


i trying run wordcount example guaranteeing message processing.

there 1 spout

  1. wspout - emitting random sentences msgid.

and 2 bolts

  1. splitsentence - spliting sentence in words , emit anchoring

  2. wordcount - printing words count.

what wanted achieve below code when words counting sentence done. spout corresponding sentence must acknowledged.

i acknowledging _collector.ack(tuple) @ last bolt wordcount only. see strange inspite of ack() getting called @ wordcount.execute() , corresponding wspout.ack() not getting called. failed after default timeout.

i don't understand whats wrong code. please me understand problem. appreciated.

below complete code.

public class testtopology {      public static class wspout implements irichspout {         spoutoutputcollector _collector;     integer msgid = 0;     @override     public void nexttuple() {         random _rand = new random();         string[] sentences = new string[] { "there 2 things benefit",                 " storms reliability capabilities",                 "specifying link in the",                 " tuple tree " + "called anchoring",                 " anchoring done @ ",                 "the same time emit " + "new tuple" };          string message = sentences[_rand.nextint(sentences.length)];         _collector.emit(new values(message), msgid);         system.out.println(msgid + " " + message);          msgid++;     }     @override     public void open(map conf, topologycontext context,             spoutoutputcollector collector) {         system.out.println("open");         _collector = collector;     }     @override     public void declareoutputfields(outputfieldsdeclarer declarer) {         declarer.declare(new fields("line"));     }     @override     public void ack(object msgid) {         system.out.println("ack ------------------- " + msgid);      }     @override     public void fail(object msgid) {         system.out.println("fail ----------------- " + msgid);      }     @override     public void activate() {         // todo auto-generated method stub     }     @override     public void close() {      }     @override     public void deactivate() {         // todo auto-generated method stub     }     @override     public map<string, object> getcomponentconfiguration() {         // todo auto-generated method stub         return null;     } }  public static class splitsentence extends baserichbolt {     outputcollector _collector;     public void prepare(map conf, topologycontext context,             outputcollector collector) {         _collector = collector;     }      public void execute(tuple tuple) {         string sentence = tuple.getstring(0);         (string word : sentence.split(" ")) {             system.out.println(word);             _collector.emit(tuple, new values(word));         }         //_collector.ack(tuple);     }      public void declareoutputfields(outputfieldsdeclarer declarer) {         declarer.declare(new fields("word"));     } }  public static class wordcount extends basebasicbolt {     map<string, integer> counts = new hashmap<string, integer>();      @override     public void execute(tuple tuple, basicoutputcollector collector) {         system.out.println("wordcount msgid : " + tuple.getmessageid());         string word = tuple.getstring(0);         integer count = counts.get(word);         if (count == null)             count = 0;         count++;         system.out.println(word + " ===> " + count);         counts.put(word, count);         collector.emit(new values(word, count));     }     @override     public void declareoutputfields(outputfieldsdeclarer declarer) {         declarer.declare(new fields("word", "count"));     }  }  public static void main(string[] args) throws exception {     topologybuilder builder = new topologybuilder();     builder.setspout("spout", new wspout(), 2);     builder.setbolt("split", new splitsentence(), 2).shufflegrouping(             "spout");     builder.setbolt("count", new wordcount(), 2).fieldsgrouping("split",             new fields("word"));     config conf = new config();     conf.setdebug(true);      if (args != null && args.length > 0) {         conf.setnumworkers(1);         stormsubmitter.submittopology(args[0], conf,                 builder.createtopology());     } else {         conf.setmaxtaskparallelism(3);          localcluster cluster = new localcluster();         cluster.submittopology("word-count", conf, builder.createtopology());         thread.sleep(10000);         cluster.shutdown();     } } } 

wordcount extends basebasicbolt ensures tuples acked automatically in bolt, stated in comment. however, splitsentence extends baserichbolt requires ack tuples manually. you're not acking, tuples time out.


Comments

Popular posts from this blog

javascript - jquery or ashx not working -

opencv - DataType<cv::detail::deriv_type>::depth what is it used for -

python 3.x - Mapping specific letters onto a list of words -