scala - How to convert RX Observable to Play Enumerator -
i set websocket in play using native enumerator construct, calling code returns string:
def operationstatusfeed = websocket.using[string] { implicit request => val in = iteratee.ignore[string] val out = enumerator.repeatm { promise.timeout(operation, 3 seconds) } (in, out) }
now want operation
function return rx.lang.scala.observable[string]
instead of string, , want output string enters. how can map observable play.api.libs.iteratee.enumerator
?
you can use implicit conversion bryan gilbert. work fine, careful use updated version of bryan gilbert's conversions ! unsubscribe never called in answer jeroen kransen (and that's bad!).
/* * observable enumerator */ implicit def observable2enumerator[t](obs: observable[t]): enumerator[t] = { // unicast create channel can push data , returns enumerator concurrent.unicast { channel => val subscription = obs.subscribe(new channelobserver(channel)) val oncomplete = { () => subscription.unsubscribe } val onerror = { (_: string, _: input[t]) => subscription.unsubscribe } (oncomplete, onerror) } } class channelobserver[t](channel: channel[t]) extends rx.lang.scala.observer[t] { override def onnext(elem: t): unit = channel.push(elem) override def oncompleted(): unit = channel.end() override def onerror(e: throwable): unit = channel.end(e) }
to complete, here conversion enumerator observable :
/* * enumerator observable */ implicit def enumerator2observable[t](enum: enumerator[t]): observable[t] = { // creating observable return observable({ observer: observer[t] => // keeping way unsubscribe observable var cancelled = false // enumerator input tested predicate // once cancelled set true, enumerator stop producing data val cancellableenum = enum through enumeratee.breake[t](_ => cancelled) // applying iteratee on producer, passing data observable cancellableenum ( iteratee.foreach(observer.onnext(_)) ).oncomplete { // passing completion or error observable case success(_) => observer.oncompleted() case failure(e) => observer.onerror(e) } // unsubscription change var stop enumerator above via breake function new subscription { override def unsubscribe() = { cancelled = true } } }) }
rx websockets in play
on other hand, may remark of time deal iteratees , enumerators in play when work websockets (as here). agree iteratees less intuitive observables , why using rx in play project.
from observation, i've built library called widgetmanager : integration of rx in play getting rid of iteratees manipulation.
using library, code :
def operationstatusfeed = websocket.using[string] { implicit request => // can optionally give function process data client (processclientdata) // , function execute when connection closed (onclientclose) val w = new widgetmanager() w.addobservable("op", operation) // subscribe , push data in socket client (automatic js callback called) w.subscribepush("op") // deals iteratees , enumerators , returns what's needed w.websocket }
the library on github here : rxplay (contributions welcome)
Comments
Post a Comment