scala - Akka TCP Comand Failed -
i'm writing client , server applications using akka tcp , i'm having issue high throughput. when write many messages on client side, i'm having many commandfailed messages , can't figure out why... here's server:
class server(listener: actorref) extends actor { import tcp._ import context.system io(tcp) ! bind(self, new inetsocketaddress("localhost", 9090)) def receive = { case commandfailed(_: bind) => { println("command failed error") context stop self } case c@tcp.connected(remote, local) => listener ! gatlingconnected(c.remoteaddress.tostring) println("connected: " + c.remoteaddress) val handler = context.actorof(props(classof[serverhandler], listener, c.remoteaddress.tostring)) val connection = sender connection ! register(handler) } } class serverhandler(listener: actorref, remote: string) extends actor { import tcp._ override def receive: receive = { case received(data) => listener ! data.utf8string case peerclosed => { listener ! finished(remote) context stop self } } } message , finished case classes i've createad. here's client (where think source of problem):
private class tcpmessagesender(listener: actorref) extends baseactor { final val message_delimiter = "\n" val buffer = new listbuffer[any] val failedmessages = new listbuffer[write] io(tcp) ! connect(new inetsocketaddress(configuration.data.tcp.host, configuration.data.tcp.port)) override def receive = { case msg @ (_: usermessage | _: groupmessage | _: requestmessage) => logger.warn(s"received message ($msg) before connected. buffering...") buffer += msg case commandfailed(_: connect) => logger.warn("can't connect. messages ignored") listener ! terminate context stop self case c @ connected(remote, local) => logger.info("connected " + c.remoteaddress) val connection = sender connection ! register(self) logger.info("sending previous received messages: " + buffer.size) buffer.foreach(msg => { val msgstring: string = jsonhelper.tojson(map[string, any]("message_type" -> msg.getclass.getsimplename, "message" -> msg)) connection ! write(bytestring(msgstring + message_delimiter)) }) buffer.clear logger.info("sent") context become { case msg @ (_: usermessage | _: groupmessage | _: requestmessage) => val msgstring: string = jsonhelper.tojson(map[string, any]("message_type" -> msg.getclass.getsimplename, "message" -> msg)) logger.trace(s"sending message: $msgstring") connection ! write(bytestring(msgstring + message_delimiter)) case commandfailed(w: write) => logger.error("command failed. buffering message...") failedmessages += w connection ! resumewriting case commandfailed(c) => logger.error(s"command $c failed. don't know do...") case received(data) => logger.warn(s"i not supposed receive data: $data") case "close" => connection ! close case _: connectionclosed => logger.info("connection closed") context stop self case writingresumed => { logger.info("sending failed messages") failedmessages.foreach(write => connection ! write) failedmessages.clear } } } } sometimes receive lot of commandfailed messages , call resumewrite , never receive writingresumed (and connection never closes in cases). doing wrong?
i think problem when register actor sending register message, have set useresumewriting parameter true:
connection ! register(handler, false, true) the doc of resumewriting command states:
when `useresumewriting` in effect indicated in [[tcp.register]] message command needs sent connection actor in order re-enable writing after [[tcp.commandfailed]] event. [[tcp.writecommand]] processed connection actor between first [[tcp.commandfailed]] , subsequent reception of message rejected [[tcp.commandfailed]].
Comments
Post a Comment