scala - Getting NullPointerException when running Spark job with naive Bayes implementation -
i trying implement naivebayes on spark, yet does't work well. because of line @ temp function : each_prob.map(_.take(1))
if have idea problem, please me...
this main function ;
object donaive extends app{ val file_pathes = vector( ("plus","resource/doc1.txt"), ("plus","resource/doc2.txt"), ("plus","resource/doc3.txt"), ("minus","resource/doc4.txt"), ("minus","resource/doc5.txt"), ("minus","resource/doc6.txt") ) val pn = parallelnaive(file_pathes) val cached_rdd = read.rdds("resource/examine.txt") val each_prob : vector[rdd[string]] = pn.allclassnames.map{ class_name => cached_rdd .map { elt => ( pn.eachprobword(elt._1 , class_name ) * elt._2 ).tostring } } val head_prob = each_prob.head println(pn.docs.map(elt=>elt._2.take(1).head)) pn.temp("resource/examine.txt")}
and parallelnaive class. temp function find problem :
case class parallelnaive( file_pathes : vector[(string,string)] ) extends serializable { val docs:vector[(string ,rdd[(string,int)])] = file_pathes.map( class_path => ( class_path._1 , read.rdds(class_path._2,false) ) ) val wholeclass :map[string,vector[(string,rdd[(string,int)])]] = docs.groupby(elt=>elt._1) val allclassnames:vector[string] = wholeclass.map(elt=>elt._1).tovector val eachnumdocs:map[string,int] = wholeclass.map(elt=>(elt._1,elt._2.length)) val sumnumdocs:int = docs.size def eachnumword(word:string , class_name:string ):int = { var doc_count = 0 wholeclass(class_name).foreach{ class_rdd => // == (class,rdd) val filtered = class_rdd._2.filter{word_occur=> word_occur._1==word}.take(1) if(filtered.size!=0) doc_count += 1 } doc_count } def eachprobword(word:string , class_name:string , alpha:int = 2):double={ val nwc = eachnumword(word , class_name).todouble val nc = eachnumdocs(class_name).todouble log( ( nwc+(alpha-1) ) / ( nc + 2*(alpha-1) ) ) } def eachprobclass(class_name:string):double={ val nc = eachnumdocs(class_name).todouble log( ( nc+1 ) / ( sumnumdocs + numclass ) ) } val numclass = wholeclass.size def temp(doc_path:string) ={ val cached_rdd = read.rdds(doc_path) val each_prob : vector[rdd[double]] = allclassnames.map{ class_name => cached_rdd .map { elt => eachprobword(elt._1 , class_name ) * elt._2 } } each_prob.map(_.take(1)) } def classify(doc_path:string , alpha:int = 2 ) = { val cached_rdd = read.rdds(doc_path) //何度も使うのでcache化 val probperclass = allclassnames.map{ class_name => val each_prob = cached_rdd.map { elt => eachprobword(elt._1 , class_name , alpha) * elt._2 } val sum_prob : double = each_prob.reduce{ (a,b) => a+b } sum_prob + eachprobclass(class_name) } //list of probability document belong println(" max_class---------------------------------------------") println("probperclass : "+probperclass) val max_class : (double,int) = probperclass.zipwithindex.max // ( probability , index of class ) println(" return estimation class---------------------------------------------") allclassnames(max_class._2) }
}
got error ;
14/04/06 13:55:50 info scheduler.taskschedulerimpl: adding task set 16.0 1 tasks 14/04/06 13:55:50 info scheduler.tasksetmanager: starting task 16.0:0 tid 15 on executor localhost: localhost (process_local) 14/04/06 13:55:50 info scheduler.tasksetmanager: serialized task 16.0:0 2941 bytes in 0 ms 14/04/06 13:55:50 info executor.executor: running task id 15 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_7 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_0 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_1 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_2 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_3 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_4 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_5 locally 14/04/06 13:55:50 info storage.blockmanager: found block rdd_57_0 locally eachprobword--------------------------------------------- eachnumword--------------------------------------------- 14/04/06 13:55:50 error executor.executor: exception in task id 15 java.lang.nullpointerexception @ org.apache.spark.rdd.rdd.filter(rdd.scala:261) @ supervised.parallelnaive$$anonfun$eachnumword$1.apply(parallelnaive.scala:38) @ supervised.parallelnaive$$anonfun$eachnumword$1.apply(parallelnaive.scala:37) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.iterablelike$class.foreach(iterablelike.scala:72) @ scala.collection.abstractiterable.foreach(iterable.scala:54) @ supervised.parallelnaive.eachnumword(parallelnaive.scala:36) @ supervised.parallelnaive.eachprobword(parallelnaive.scala:50) @ supervised.parallelnaive$$anonfun$7$$anonfun$apply$1.apply(parallelnaive.scala:84) @ supervised.parallelnaive$$anonfun$7$$anonfun$apply$1.apply(parallelnaive.scala:84) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$10.next(iterator.scala:312) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.generic.growable$class.$plus$plus$eq(growable.scala:48) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:103) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:47) @ scala.collection.traversableonce$class.to(traversableonce.scala:273) @ scala.collection.abstractiterator.to(iterator.scala:1157) @ scala.collection.traversableonce$class.tobuffer(traversableonce.scala:265) @ scala.collection.abstractiterator.tobuffer(iterator.scala:1157) @ scala.collection.traversableonce$class.toarray(traversableonce.scala:252) @ scala.collection.abstractiterator.toarray(iterator.scala:1157) @ org.apache.spark.rdd.rdd$$anonfun$15.apply(rdd.scala:844) @ org.apache.spark.rdd.rdd$$anonfun$15.apply(rdd.scala:844) @ org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext.scala:884) @ org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext.scala:884) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:109) @ org.apache.spark.scheduler.task.run(task.scala:53) @ org.apache.spark.executor.executor$taskrunner$$anonfun$run$1.apply$mcv$sp(executor.scala:213) @ org.apache.spark.deploy.sparkhadooputil.runasuser(sparkhadooputil.scala:49) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:178) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:722) 14/04/06 13:55:50 warn scheduler.tasksetmanager: lost tid 15 (task 16.0:0) 14/04/06 13:55:50 warn scheduler.tasksetmanager: loss due java.lang.nullpointerexception java.lang.nullpointerexception @ org.apache.spark.rdd.rdd.filter(rdd.scala:261) @ supervised.parallelnaive$$anonfun$eachnumword$1.apply(parallelnaive.scala:38) @ supervised.parallelnaive$$anonfun$eachnumword$1.apply(parallelnaive.scala:37) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.iterablelike$class.foreach(iterablelike.scala:72) @ scala.collection.abstractiterable.foreach(iterable.scala:54) @ supervised.parallelnaive.eachnumword(parallelnaive.scala:36) @ supervised.parallelnaive.eachprobword(parallelnaive.scala:50) @ supervised.parallelnaive$$anonfun$7$$anonfun$apply$1.apply(parallelnaive.scala:84) @ supervised.parallelnaive$$anonfun$7$$anonfun$apply$1.apply(parallelnaive.scala:84) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$10.next(iterator.scala:312) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.generic.growable$class.$plus$plus$eq(growable.scala:48) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:103) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:47) @ scala.collection.traversableonce$class.to(traversableonce.scala:273) @ scala.collection.abstractiterator.to(iterator.scala:1157) @ scala.collection.traversableonce$class.tobuffer(traversableonce.scala:265) @ scala.collection.abstractiterator.tobuffer(iterator.scala:1157) @ scala.collection.traversableonce$class.toarray(traversableonce.scala:252) @ scala.collection.abstractiterator.toarray(iterator.scala:1157) @ org.apache.spark.rdd.rdd$$anonfun$15.apply(rdd.scala:844) @ org.apache.spark.rdd.rdd$$anonfun$15.apply(rdd.scala:844) @ org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext.scala:884) @ org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext.scala:884) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:109) @ org.apache.spark.scheduler.task.run(task.scala:53) @ org.apache.spark.executor.executor$taskrunner$$anonfun$run$1.apply$mcv$sp(executor.scala:213) @ org.apache.spark.deploy.sparkhadooputil.runasuser(sparkhadooputil.scala:49) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:178) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:722) 14/04/06 13:55:50 error scheduler.tasksetmanager: task 16.0:0 failed 1 times; aborting job 14/04/06 13:55:50 info scheduler.taskschedulerimpl: remove taskset 16.0 pool 14/04/06 13:55:50 info scheduler.dagscheduler: failed run take @ parallelnaive.scala:89 [error] (run-main-0) org.apache.spark.sparkexception: job aborted: task 16.0:0 failed 1 times (most recent failure: exception failure: java.lang.nullpointerexception) org.apache.spark.sparkexception: job aborted: task 16.0:0 failed 1 times (most recent failure: exception failure: java.lang.nullpointerexception) @ org.apache.spark.scheduler.dagscheduler$$anonfun$org$apache$spark$scheduler$dagscheduler$$abortstage$1.apply(dagscheduler.scala:1028) @ org.apache.spark.scheduler.dagscheduler$$anonfun$org$apache$spark$scheduler$dagscheduler$$abortstage$1.apply(dagscheduler.scala:1026) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$abortstage(dagscheduler.scala:1026) @ org.apache.spark.scheduler.dagscheduler$$anonfun$processevent$10.apply(dagscheduler.scala:619) @ org.apache.spark.scheduler.dagscheduler$$anonfun$processevent$10.apply(dagscheduler.scala:619) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler.processevent(dagscheduler.scala:619) @ org.apache.spark.scheduler.dagscheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyorelse(dagscheduler.scala:207) @ akka.actor.actorcell.receivemessage(actorcell.scala:498) @ akka.actor.actorcell.invoke(actorcell.scala:456) @ akka.dispatch.mailbox.processmailbox(mailbox.scala:237) @ akka.dispatch.mailbox.run(mailbox.scala:219) @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:386) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) [trace] stack trace suppressed: run last compile:run full output. 14/04/06 13:55:50 info network.connectionmanager: selector thread interrupted! java.lang.runtimeexception: nonzero exit code: 1 @ scala.sys.package$.error(package.scala:27) [trace] stack trace suppressed: run last compile:run full output. [error] (compile:run) nonzero exit code: 1 [error] total time: 47 s, completed 2014/04/06 13:55:50
well, it's bit difficult understand what's going on because code badly formatted , huge amount of code can done in few lines. anyway think can narrow down problem line of code:
val filtered = class_rdd._2.filter{word_occur=> word_occur._1==word}.take(1)
which more readable if written:
val filtered = classrdd._2.filter(_._1 == word).take(1)
anyway, getting npe, therefore rdd class_rdd
contains null tuple. not problem spark, problem code.
Comments
Post a Comment