NullPointerException в Scala Spark, по-видимому, вызвано типом коллекции?

Тип sessionIdList имеет тип:

scala> sessionIdList res19: org.apache.spark.rdd.RDD [String] = MappedRDD [17] при разных значениях: 30

Когда я пытаюсь запустить код ниже:

val x = sc.parallelize(List(1,2,3)) val cartesianComp = x.cartesian(x).map(x => (x)) val kDistanceNeighbourhood = sessionIdList.map(s => { cartesianComp.filter(v => v != null) }) kDistanceNeighbourhood.take(1) 

Я получаю исключение:

 14/05/21 16:20:46 ERROR Executor: Exception in task ID 80 java.lang.NullPointerException at org.apache.spark.rdd.RDD.filter(RDD.scala:261) at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:38) at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:36) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) 

Однако, если я использую:

 val l = sc.parallelize(List("1","2")) val kDistanceNeighbourhood = l.map(s => { cartesianComp.filter(v => v != null) }) kDistanceNeighbourhood.take(1) 

Тогда исключение не отображается

Разница между двумя fragmentами кода заключается в том, что в первом fragmentе sessionIdList имеет тип:

 res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at :30 

и во втором fragmentе «l» имеет тип

 scala> l res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at :12 

Почему эта ошибка возникает?

Мне нужно преобразовать sessionIdList в ParallelCollectionRDD, чтобы исправить это?

Spark не поддерживает вложенность RDD (см. https://stackoverflow.com/a/14130534/590203 для другого возникновения той же проблемы), поэтому вы не можете выполнять преобразования или действия на RDD внутри других операций RDD.

В первом случае вы видите исключение NullPointerException, которое бросает работник, когда он пытается получить доступ к объекту SparkContext, который присутствует только на драйвере, а не на рабочих.

Во втором случае моя догадка заключается в том, что работа выполнялась локально на драйвере и работала просто случайно.

Это разумный вопрос, и я слышал, что он достаточно часто спрашивал об этом. Я собираюсь попытаться объяснить, почему это так, потому что это может помочь.

Вложенные RDD всегда будут генерировать исключение в производстве. Вложенные вызовы функций, как я думаю, вы описываете их здесь, если это означает вызов операции RDD внутри операции RDD, вызовет также сбои причин, поскольку на самом деле это одно и то же. (RDD являются неизменяемыми, поэтому выполнение операции RDD, такой как «карта», эквивалентно созданию нового RDD.) Способность создавать вложенные RDD является необходимым следствием того, как определяется RDD и как приложение Spark настроить.

RDD – это распределенная коллекция объектов (называемых разделами), которые живут на Excrors Spark. Исполнители Spark не могут общаться друг с другом, только с помощью драйвера Spark. Операции RDD вычисляются по частям на этих разделах. Поскольку среда-исполнитель RDD не является рекурсивной (т. Е. Вы можете настроить драйвер Spark на исполнитель искры с суб-исполнителями), ни RDD не может быть.

В вашей программе вы создали распределенный набор разделов целых чисел. Затем вы выполняете операцию сопоставления. Когда драйвер Spark видит операцию сопоставления, он отправляет инструкции для выполнения сопоставления исполнителям, которые выполняют преобразование на каждом разделе параллельно. Но ваше сопоставление не может быть выполнено, потому что на каждом разделе вы пытаетесь вызвать «весь RDD» для выполнения другой распределенной операции. Этого не может быть сделано, потому что каждый раздел не имеет доступа к информации на других разделах, если это так, вычисление не может выполняться параллельно.

Что вы можете сделать вместо этого, потому что данные, которые вам нужны на карте, вероятно, невелики (поскольку вы делаете фильтр, а фильтр не требует никакой информации о sessionIdList) – это сначала отфильтровать список идентификаторов сеанса. Затем соберите этот список с драйвером. Затем передайте его исполнителям, где вы можете использовать его на карте. Если список sessionID слишком велик, вам, вероятно, потребуется выполнить соединение.

  • Scala "<-" для понимания
  • Являются ли HLists не более чем сложным способом написания кортежей?
  • Как получить экземпляр classа типа, связанного с привязкой к контексту?
  • Самый простой способ получить верхние n элементов Scala Iterable
  • Как переопределить применение в компаньоне classа case
  • Настройка sbt для использования Java 7 для компиляции?
  • (Почему) нам нужно вызвать кеш или сохранить на RDD
  • Стратифицированная выборка в Spark
  • Как развернуть Spark DataFrame?
  • Доступ к базовому ActorRef streamа akka Источник, созданный Source.actorRef
  • Spark UDF с varargs
  • Interesting Posts

    Автоматическое переключение на рабочий стол после входа в Windows 8.1

    Получить общий тип java.util.List

    C # – элегантный способ разделения списка?

    У вас возникли проблемы с настройками Windows XP для чтения только в папке

    Почему закрытие windows консоли сразу после отображения моего вывода?

    Есть ли вложенные мастер-страницы в ASP.NET MVC?

    Отключить автокоррекцию в libreoffice calc для конкретного документа, столбца, ячейки

    Статическое ключевое слово в c #

    Как получить часть файла после строки, которая соответствует выражению grep? (первое совпадение)

    Как решить проблему 32-байтового выравнивания для операций загрузки / хранения AVX?

    можно ли вызвать переопределенный метод из родительской структуры в golang?

    Совместное использование одного и того же `ssh-agent` между несколькими сеансами входа в систему

    Событие jQuery: обнаружение изменений в html / text div

    Сиро против SpringSecurity

    Список всех специальных символов, которые должны быть экранированы в регулярном выражении

    Давайте будем гением компьютера.