scala - How to convert RX Observable to Play Enumerator -


i set websocket in play using native enumerator construct, calling code returns string:

def operationstatusfeed = websocket.using[string] { implicit request =>   val in = iteratee.ignore[string]   val out = enumerator.repeatm {    promise.timeout(operation, 3 seconds)   }   (in, out) } 

now want operation function return rx.lang.scala.observable[string] instead of string, , want output string enters. how can map observable play.api.libs.iteratee.enumerator?

you can use implicit conversion bryan gilbert. work fine, careful use updated version of bryan gilbert's conversions ! unsubscribe never called in answer jeroen kransen (and that's bad!).

  /*    * observable enumerator    */   implicit def observable2enumerator[t](obs: observable[t]): enumerator[t] = {     // unicast create channel can push data , returns enumerator     concurrent.unicast { channel =>       val subscription = obs.subscribe(new channelobserver(channel))       val oncomplete = { () => subscription.unsubscribe }       val onerror = { (_: string, _: input[t]) => subscription.unsubscribe }       (oncomplete, onerror)     }   }    class channelobserver[t](channel: channel[t]) extends rx.lang.scala.observer[t] {     override def onnext(elem: t): unit = channel.push(elem)     override def oncompleted(): unit = channel.end()     override def onerror(e: throwable): unit = channel.end(e)   } 

to complete, here conversion enumerator observable :

  /*    * enumerator observable    */   implicit def enumerator2observable[t](enum: enumerator[t]): observable[t] = {     // creating observable return     observable({ observer: observer[t] =>       // keeping way unsubscribe observable       var cancelled = false        // enumerator input tested predicate       // once cancelled set true, enumerator stop producing data       val cancellableenum = enum through enumeratee.breake[t](_ => cancelled)        // applying iteratee on producer, passing data observable       cancellableenum (         iteratee.foreach(observer.onnext(_))       ).oncomplete { // passing completion or error observable         case success(_) => observer.oncompleted()         case failure(e) => observer.onerror(e)       }        // unsubscription change var stop enumerator above via breake function       new subscription { override def unsubscribe() = { cancelled = true } }     })   } 

rx websockets in play

on other hand, may remark of time deal iteratees , enumerators in play when work websockets (as here). agree iteratees less intuitive observables , why using rx in play project.

from observation, i've built library called widgetmanager : integration of rx in play getting rid of iteratees manipulation.

using library, code :

def operationstatusfeed = websocket.using[string] { implicit request =>    // can optionally give function process data client (processclientdata)   // , function execute when connection closed (onclientclose)   val w = new widgetmanager()    w.addobservable("op", operation)    // subscribe , push data in socket client (automatic js callback called)   w.subscribepush("op")    // deals iteratees , enumerators , returns what's needed   w.websocket } 

the library on github here : rxplay (contributions welcome)


Comments

Popular posts from this blog

apache - Remove .php and add trailing slash in url using htaccess not loading css -

javascript - jQuery show full size image on click -