java - WordCount with Guaranteeing-message-processing -
i trying run wordcount example guaranteeing message processing.
there 1 spout
- wspout - emitting random sentences msgid.
and 2 bolts
splitsentence - spliting sentence in words , emit anchoring
wordcount - printing words count.
what wanted achieve below code when words counting sentence done. spout corresponding sentence must acknowledged.
i acknowledging _collector.ack(tuple) @ last bolt wordcount only. see strange inspite of ack() getting called @ wordcount.execute() , corresponding wspout.ack() not getting called. failed after default timeout.
i don't understand whats wrong code. please me understand problem. appreciated.
below complete code.
public class testtopology { public static class wspout implements irichspout { spoutoutputcollector _collector; integer msgid = 0; @override public void nexttuple() { random _rand = new random(); string[] sentences = new string[] { "there 2 things benefit", " storms reliability capabilities", "specifying link in the", " tuple tree " + "called anchoring", " anchoring done @ ", "the same time emit " + "new tuple" }; string message = sentences[_rand.nextint(sentences.length)]; _collector.emit(new values(message), msgid); system.out.println(msgid + " " + message); msgid++; } @override public void open(map conf, topologycontext context, spoutoutputcollector collector) { system.out.println("open"); _collector = collector; } @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("line")); } @override public void ack(object msgid) { system.out.println("ack ------------------- " + msgid); } @override public void fail(object msgid) { system.out.println("fail ----------------- " + msgid); } @override public void activate() { // todo auto-generated method stub } @override public void close() { } @override public void deactivate() { // todo auto-generated method stub } @override public map<string, object> getcomponentconfiguration() { // todo auto-generated method stub return null; } } public static class splitsentence extends baserichbolt { outputcollector _collector; public void prepare(map conf, topologycontext context, outputcollector collector) { _collector = collector; } public void execute(tuple tuple) { string sentence = tuple.getstring(0); (string word : sentence.split(" ")) { system.out.println(word); _collector.emit(tuple, new values(word)); } //_collector.ack(tuple); } public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("word")); } } public static class wordcount extends basebasicbolt { map<string, integer> counts = new hashmap<string, integer>(); @override public void execute(tuple tuple, basicoutputcollector collector) { system.out.println("wordcount msgid : " + tuple.getmessageid()); string word = tuple.getstring(0); integer count = counts.get(word); if (count == null) count = 0; count++; system.out.println(word + " ===> " + count); counts.put(word, count); collector.emit(new values(word, count)); } @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields("word", "count")); } } public static void main(string[] args) throws exception { topologybuilder builder = new topologybuilder(); builder.setspout("spout", new wspout(), 2); builder.setbolt("split", new splitsentence(), 2).shufflegrouping( "spout"); builder.setbolt("count", new wordcount(), 2).fieldsgrouping("split", new fields("word")); config conf = new config(); conf.setdebug(true); if (args != null && args.length > 0) { conf.setnumworkers(1); stormsubmitter.submittopology(args[0], conf, builder.createtopology()); } else { conf.setmaxtaskparallelism(3); localcluster cluster = new localcluster(); cluster.submittopology("word-count", conf, builder.createtopology()); thread.sleep(10000); cluster.shutdown(); } } }
wordcount extends basebasicbolt ensures tuples acked automatically in bolt, stated in comment. however, splitsentence extends baserichbolt requires ack tuples manually. you're not acking, tuples time out.
Comments
Post a Comment