scala - Getting NullPointerException when running Spark job with naive Bayes implementation -


i trying implement naivebayes on spark, yet does't work well. because of line @ temp function : each_prob.map(_.take(1)) if have idea problem, please me...

this main function ;

object donaive extends app{  val file_pathes =      vector( ("plus","resource/doc1.txt"),             ("plus","resource/doc2.txt"),             ("plus","resource/doc3.txt"),             ("minus","resource/doc4.txt"),             ("minus","resource/doc5.txt"),             ("minus","resource/doc6.txt")           )  val pn = parallelnaive(file_pathes)       val cached_rdd = read.rdds("resource/examine.txt")    val each_prob : vector[rdd[string]] =     pn.allclassnames.map{            class_name =>                    cached_rdd                     .map { elt => ( pn.eachprobword(elt._1 , class_name ) * elt._2 ).tostring }         }  val head_prob = each_prob.head  println(pn.docs.map(elt=>elt._2.take(1).head))     pn.temp("resource/examine.txt")} 

and parallelnaive class. temp function find problem :

case class parallelnaive( file_pathes : vector[(string,string)] ) extends serializable {  val docs:vector[(string ,rdd[(string,int)])] = file_pathes.map( class_path => ( class_path._1 , read.rdds(class_path._2,false) ) )  val wholeclass :map[string,vector[(string,rdd[(string,int)])]] = docs.groupby(elt=>elt._1)  val allclassnames:vector[string] = wholeclass.map(elt=>elt._1).tovector  val eachnumdocs:map[string,int] = wholeclass.map(elt=>(elt._1,elt._2.length))  val sumnumdocs:int = docs.size  def eachnumword(word:string , class_name:string ):int = {       var doc_count = 0      wholeclass(class_name).foreach{         class_rdd =>  // == (class,rdd)             val filtered = class_rdd._2.filter{word_occur=> word_occur._1==word}.take(1)             if(filtered.size!=0) doc_count += 1     }      doc_count }  def eachprobword(word:string , class_name:string , alpha:int = 2):double={      val nwc = eachnumword(word , class_name).todouble     val nc = eachnumdocs(class_name).todouble      log( ( nwc+(alpha-1) ) / ( nc + 2*(alpha-1) ) ) }  def eachprobclass(class_name:string):double={      val nc = eachnumdocs(class_name).todouble      log( ( nc+1 ) / ( sumnumdocs + numclass ) )  }  val numclass = wholeclass.size  def temp(doc_path:string) ={      val cached_rdd = read.rdds(doc_path)      val each_prob : vector[rdd[double]] =         allclassnames.map{             class_name =>                        cached_rdd                         .map { elt => eachprobword(elt._1 , class_name ) * elt._2 }         }      each_prob.map(_.take(1))  }   def classify(doc_path:string , alpha:int = 2 ) = {      val cached_rdd = read.rdds(doc_path)    //何度も使うのでcache化      val probperclass =          allclassnames.map{             class_name =>                                     val each_prob =                      cached_rdd.map { elt => eachprobword(elt._1 , class_name , alpha) * elt._2 }                  val sum_prob : double = each_prob.reduce{ (a,b) => a+b }                   sum_prob + eachprobclass(class_name)             }        //list of probability document belong     println(" max_class---------------------------------------------")      println("probperclass : "+probperclass)      val max_class : (double,int) = probperclass.zipwithindex.max     // ( probability , index of class )     println(" return estimation class---------------------------------------------")      allclassnames(max_class._2)  } 

}

got error ;

14/04/06 13:55:50 info scheduler.taskschedulerimpl: adding task set 16.0 1 tasks 14/04/06 13:55:50 info scheduler.tasksetmanager: starting task 16.0:0 tid 15 on executor localhost: localhost (process_local) 14/04/06 13:55:50 info scheduler.tasksetmanager: serialized task 16.0:0 2941 bytes in 0 ms 14/04/06 13:55:50 info executor.executor: running task id 15 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_7 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_0 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_1 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_2 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_3 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_4 locally 14/04/06 13:55:50 info storage.blockmanager: found block broadcast_5 locally 14/04/06 13:55:50 info storage.blockmanager: found block rdd_57_0 locally eachprobword--------------------------------------------- eachnumword--------------------------------------------- 14/04/06 13:55:50 error executor.executor: exception in task id 15 java.lang.nullpointerexception @ org.apache.spark.rdd.rdd.filter(rdd.scala:261) @ supervised.parallelnaive$$anonfun$eachnumword$1.apply(parallelnaive.scala:38) @ supervised.parallelnaive$$anonfun$eachnumword$1.apply(parallelnaive.scala:37) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.iterablelike$class.foreach(iterablelike.scala:72) @ scala.collection.abstractiterable.foreach(iterable.scala:54) @ supervised.parallelnaive.eachnumword(parallelnaive.scala:36) @ supervised.parallelnaive.eachprobword(parallelnaive.scala:50) @ supervised.parallelnaive$$anonfun$7$$anonfun$apply$1.apply(parallelnaive.scala:84) @ supervised.parallelnaive$$anonfun$7$$anonfun$apply$1.apply(parallelnaive.scala:84) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$10.next(iterator.scala:312) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.generic.growable$class.$plus$plus$eq(growable.scala:48) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:103) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:47) @ scala.collection.traversableonce$class.to(traversableonce.scala:273) @ scala.collection.abstractiterator.to(iterator.scala:1157) @ scala.collection.traversableonce$class.tobuffer(traversableonce.scala:265) @ scala.collection.abstractiterator.tobuffer(iterator.scala:1157) @ scala.collection.traversableonce$class.toarray(traversableonce.scala:252) @ scala.collection.abstractiterator.toarray(iterator.scala:1157) @ org.apache.spark.rdd.rdd$$anonfun$15.apply(rdd.scala:844) @ org.apache.spark.rdd.rdd$$anonfun$15.apply(rdd.scala:844) @ org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext.scala:884) @ org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext.scala:884) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:109) @ org.apache.spark.scheduler.task.run(task.scala:53) @ org.apache.spark.executor.executor$taskrunner$$anonfun$run$1.apply$mcv$sp(executor.scala:213) @ org.apache.spark.deploy.sparkhadooputil.runasuser(sparkhadooputil.scala:49) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:178) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:722) 14/04/06 13:55:50 warn scheduler.tasksetmanager: lost tid 15 (task 16.0:0) 14/04/06 13:55:50 warn scheduler.tasksetmanager: loss due                 java.lang.nullpointerexception java.lang.nullpointerexception @ org.apache.spark.rdd.rdd.filter(rdd.scala:261) @ supervised.parallelnaive$$anonfun$eachnumword$1.apply(parallelnaive.scala:38) @ supervised.parallelnaive$$anonfun$eachnumword$1.apply(parallelnaive.scala:37) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.iterablelike$class.foreach(iterablelike.scala:72) @ scala.collection.abstractiterable.foreach(iterable.scala:54) @ supervised.parallelnaive.eachnumword(parallelnaive.scala:36) @ supervised.parallelnaive.eachprobword(parallelnaive.scala:50) @ supervised.parallelnaive$$anonfun$7$$anonfun$apply$1.apply(parallelnaive.scala:84) @ supervised.parallelnaive$$anonfun$7$$anonfun$apply$1.apply(parallelnaive.scala:84) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$10.next(iterator.scala:312) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.generic.growable$class.$plus$plus$eq(growable.scala:48) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:103) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:47) @ scala.collection.traversableonce$class.to(traversableonce.scala:273) @ scala.collection.abstractiterator.to(iterator.scala:1157) @ scala.collection.traversableonce$class.tobuffer(traversableonce.scala:265) @ scala.collection.abstractiterator.tobuffer(iterator.scala:1157) @ scala.collection.traversableonce$class.toarray(traversableonce.scala:252) @ scala.collection.abstractiterator.toarray(iterator.scala:1157) @ org.apache.spark.rdd.rdd$$anonfun$15.apply(rdd.scala:844) @ org.apache.spark.rdd.rdd$$anonfun$15.apply(rdd.scala:844) @ org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext.scala:884) @ org.apache.spark.sparkcontext$$anonfun$runjob$4.apply(sparkcontext.scala:884) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:109) @ org.apache.spark.scheduler.task.run(task.scala:53) @ org.apache.spark.executor.executor$taskrunner$$anonfun$run$1.apply$mcv$sp(executor.scala:213) @ org.apache.spark.deploy.sparkhadooputil.runasuser(sparkhadooputil.scala:49) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:178) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:722) 14/04/06 13:55:50 error scheduler.tasksetmanager: task 16.0:0 failed 1 times; aborting job 14/04/06 13:55:50 info scheduler.taskschedulerimpl: remove taskset 16.0 pool  14/04/06 13:55:50 info scheduler.dagscheduler: failed run take @ parallelnaive.scala:89 [error] (run-main-0) org.apache.spark.sparkexception: job aborted: task 16.0:0     failed 1 times (most recent failure: exception failure:     java.lang.nullpointerexception) org.apache.spark.sparkexception: job aborted: task 16.0:0 failed 1 times (most recent failure: exception failure: java.lang.nullpointerexception) @ org.apache.spark.scheduler.dagscheduler$$anonfun$org$apache$spark$scheduler$dagscheduler$$abortstage$1.apply(dagscheduler.scala:1028) @ org.apache.spark.scheduler.dagscheduler$$anonfun$org$apache$spark$scheduler$dagscheduler$$abortstage$1.apply(dagscheduler.scala:1026) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$abortstage(dagscheduler.scala:1026) @ org.apache.spark.scheduler.dagscheduler$$anonfun$processevent$10.apply(dagscheduler.scala:619) @ org.apache.spark.scheduler.dagscheduler$$anonfun$processevent$10.apply(dagscheduler.scala:619) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler.processevent(dagscheduler.scala:619) @ org.apache.spark.scheduler.dagscheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyorelse(dagscheduler.scala:207) @ akka.actor.actorcell.receivemessage(actorcell.scala:498) @ akka.actor.actorcell.invoke(actorcell.scala:456) @ akka.dispatch.mailbox.processmailbox(mailbox.scala:237) @ akka.dispatch.mailbox.run(mailbox.scala:219) @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:386) @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260) @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339) @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979) @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) [trace] stack trace suppressed: run last compile:run full output. 14/04/06 13:55:50 info network.connectionmanager: selector thread interrupted! java.lang.runtimeexception: nonzero exit code: 1 @ scala.sys.package$.error(package.scala:27) [trace] stack trace suppressed: run last compile:run full output. [error] (compile:run) nonzero exit code: 1 [error] total time: 47 s, completed 2014/04/06 13:55:50 

well, it's bit difficult understand what's going on because code badly formatted , huge amount of code can done in few lines. anyway think can narrow down problem line of code:

val filtered = class_rdd._2.filter{word_occur=> word_occur._1==word}.take(1) 

which more readable if written:

val filtered = classrdd._2.filter(_._1 == word).take(1) 

anyway, getting npe, therefore rdd class_rdd contains null tuple. not problem spark, problem code.


Comments

Popular posts from this blog

apache - Remove .php and add trailing slash in url using htaccess not loading css -

javascript - jQuery show full size image on click -