Почему этот код Spark создает исключение NullPointerException?

У меня проблема с запуском приложения Spark.

Исходный код:

// Read table From HDFS val productInformation = spark.table("temp.temp_table1") val dict = spark.table("temp.temp_table2") // Custom UDF val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) => dict.filter( (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7 ).count ) val result = productInformation.withColumn("positive_count", countPositiveSimilarity($"title", $"internal_category")) // Error occurs! result.show 

Сообщение об ошибке:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 54.0 failed 4 times, most recent failure: Lost task 0.3 in stage 54.0 (TID 5887, ip-10-211-220-33.ap-northeast-2.compute.internal, executor 150): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array, array) => bigint) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at $anonfun$1.apply(:45) at $anonfun$1.apply(:43) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795) at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) at org.apache.spark.sql.Dataset.show(Dataset.scala:636) at org.apache.spark.sql.Dataset.show(Dataset.scala:595) at org.apache.spark.sql.Dataset.show(Dataset.scala:604) ... 48 elided Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array, array) => bigint) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) ... 3 more Caused by: java.lang.NullPointerException at $anonfun$1.apply(:45) at $anonfun$1.apply(:43) ... 16 more 

Я проверил, имеет ли значение productInformation и dict значение null в Columns . Но нет нулевых значений.

Может кто-нибудь мне помочь? Я приложил пример кода, чтобы сообщить вам более подробную информацию:

 case class Target(wordListOne: Seq[String], WordListTwo: Seq[String]) val targetData = Seq(Target(Seq("Spark", "Wrong", "Something"), Seq("Java", "Grape", "Banana")), Target(Seq("Java", "Scala"), Seq("Scala", "Banana")), Target(Seq(""), Seq("Grape", "Banana")), Target(Seq(""), Seq(""))) val targets = spark.createDataset(targetData) case class WordSimilarity(first: String, second: String, similarity: Double) val similarityData = Seq(WordSimilarity("Spark", "Java", 0.8), WordSimilarity("Scala", "Spark", 0.9), WordSimilarity("Java", "Scala", 0.9), WordSimilarity("Apple", "Grape", 0.66), WordSimilarity("Scala", "Apple", -0.1), WordSimilarity("Gine", "Spark", 0.1)) val dict = spark.createDataset(similarityData) val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) => dict.filter( (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7 ).count ) val countDF = targets.withColumn("positive_count", countPositiveSimilarity($"wordListOne", $"wordListTwo")) 

Это пример кода и похож на мой исходный код. Пример кода работает хорошо. В какой точке я должен проверить исходный код и данные?

Очень интересный вопрос. Мне нужно сделать некоторые поиски, и вот мой. Надеюсь, это поможет вам немного.

Когда вы создаете Dataset через createDataset , искра назначит этот dataset логическим планом LocalRelation .

 def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = { val enc = encoderFor[T] val attributes = enc.schema.toAttributes val encoded = data.map(d => enc.toRow(d).copy()) val plan = new LocalRelation(attributes, encoded) Dataset[T](self, plan) } 

Следуйте по этой ссылке : LocalRelation is a leaf logical plan that allow functions like collect or take to be executed locally, ie without using Spark executors.

И это правда, поскольку метод isLocal указывает

  /** * Returns true if the `collect` and `take` methods can be run locally * (without any Spark executors). * * @group basic * @since 1.6.0 */ def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation] 

Очевидно, вы можете проверить, что ваши 2 набора данных являются локальными.

И метод show фактически набирает внутренне.

 private[sql] def showString(_numRows: Int, truncate: Int = 20): String = { val numRows = _numRows.max(0) val takeResult = toDF().take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) 

Таким образом, с учетом этих ожиданий, я думаю, что вызов countDF.show выполняется, он будет вести себя так же, как при вызове count на dict dataset из драйвера , количество раз вызовов – это количество записей targets . И, конечно, dataset dict конечно, не обязательно должен быть локальным для показа на countDF .

Вы можете попытаться сохранить countDF , это даст вам исключение, countDF первому случаю. org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array, array) => bigint)

Вы не можете использовать Dataframe внутри Dataframe . Вам нужно будет присоединиться к productInformation и dict и выполнить логику productInformation после объединения.

  • Scala: Неявный приоритет разрешения параметров
  • Объявление параметра Tuple и странность присваивания
  • Scala: короткая форма соответствия шаблону, которая возвращает Boolean
  • sbt-assembly: обнаружена ошибка дедупликации
  • Что означает параметр: _ * в Scala?
  • Существует ли реализация PriorityQueue с фиксированной пропускной способностью и пользовательским компаратором?
  • В Scala, как я могу подclassифицировать class Java с несколькими конструкторами?
  • Как перенести RDD в Spark
  • Spark Row для JSON
  • Итерация над compilationами Java в Scala
  • Смешивание в динамике динамически
  • Давайте будем гением компьютера.