Решение проблем зависимостей в Apache Spark

Общие проблемы при создании и развертывании приложений Spark:

  • java.lang.ClassNotFoundException .
  • object x is not a member of package y ошибок компиляции object x is not a member of package y .
  • java.lang.NoSuchMethodError

Как это можно решить?

Путь classов Apache Spark построен динамически (для размещения кода пользователя для каждого приложения), что делает его уязвимым для таких проблем. @ user7337271 ответ правильный, но есть еще несколько проблем, в зависимости от используемого вами менеджера кластера («мастер»).

Во-первых, приложение Spark состоит из этих компонентов (каждый из них является отдельным JVM, поэтому потенциально содержит разные classы в его пути к classам):

  1. Драйвер : это ваше приложение, создающее SparkSession (или SparkContext ) и подключение к менеджеру кластера для выполнения фактической работы
  2. Диспетчер кластеров : служит в качестве «точки входа» в кластер, отвечающий за распределение исполнителей для каждого приложения. В Spark есть несколько различных типов: автономный, YARN и Mesos, о которых мы расскажем ниже.
  3. Исполнители : это процессы на узлах кластера, выполнение фактической работы (запуск задач Spark)

Соотношение между ними описано на этой диаграмме из обзора кластерного режима Apache Spark:

Обзор режима кластера

Теперь – какие classы должны находиться в каждом из этих компонентов?

На это можно ответить на следующую диаграмму:

Обзор размещения классов

Давайте разберем это медленно:

  1. Spark Code – это библиотеки Spark. Они должны существовать во всех трех компонентах, так как они include в себя клей, который позволяет Spark выполнять связь между ними. Кстати, авторы Spark внесли дизайнерское решение включить код для ВСЕХ компонентов во ВСЕХ компонентах (например, включить код, который должен запускаться только в Executor в драйвере), чтобы упростить это – так что «толстая банка Spark» (в версиях до 1.6 ) или «архив» (в версии 2.0, ниже) содержится необходимый код для всех компонентов и должен быть доступен во всех них.

  2. Код только для драйверов – это код пользователя, который не содержит ничего, что должно использоваться для исполнителей, то есть код, который не используется при каких-либо преобразованиях в RDD / DataFrame / Dataset. Это необязательно должно быть отделено от распределенного кода пользователя, но это может быть.

  3. Распределенный код – это код пользователя, который скомпилирован с кодом драйвера, но также должен выполняться на исполнителях – все используемые фактические преобразования должны быть включены в эту банку.

Теперь, когда мы получили это прямо, как мы получаем, чтобы classы правильно загружались в каждом компоненте, и какими правилами они должны следовать?

  1. Код искры : как и в предыдущих ответах, вы должны использовать те же версии Scala и Spark для всех компонентов.

    1.1 В автономном режиме существует «ранее существовавшая» установка Spark, к которой могут подключаться приложения (драйверы). Это означает, что все драйверы должны использовать ту же версию Spark, что и на главном и исполнительном устройствах.

    1.2 В YARN / Mesos каждое приложение может использовать другую версию Spark, но все компоненты одного и того же приложения должны использовать один и тот же. Это означает, что если вы использовали версию X для компиляции и упаковки вашего приложения драйвера, вы должны предоставить ту же версию при запуске SparkSession (например, через параметры spark.yarn.archive или spark.yarn.jars при использовании YARN). Банки / архив, которые вы предоставляете, должны включать все зависимости Spark ( включая транзитивные зависимости ), и он будет отправлен менеджером кластера каждому исполнителю при запуске приложения.

  2. Код драйвера : этот код полностью может быть отправлен в виде кучи банок или «толстой банки», если он включает все зависимости Spark + весь код пользователя

  3. Распределенный код : помимо присутствия в драйвере этот код должен быть отправлен исполнителям (опять же, вместе со всеми его транзитивными зависимостями). Это делается с использованием параметра spark.jars .

Подведем итог , вот предлагаемый подход к созданию и развертыванию Spark Application (в данном случае – с использованием YARN):

  • Создайте библиотеку с вашим распределенным кодом, упакуйте ее как «обычную» банку (с файлом .pom, описывающим ее зависимости), так и как «живую банку» (со всеми включенными транзитными зависимостями).
  • Создайте приложение драйвера с зависимостями компиляции от вашей библиотеки распределенных кодов и от Apache Spark (с определенной версией)
  • Упакуйте приложение драйвера в толстую банку, которая будет развернута водителю
  • Передайте правильную версию вашего распределенного кода как значение параметра spark.jars при запуске SparkSession
  • Передайте местоположение файла архива (например, gzip), содержащего все банки в папке lib/ из загруженных двоичных файлов Spark, как значение spark.yarn.archive

При создании и развертывании приложений Spark все зависимости требуют совместимых версий.

  • Версия Scala . Все пакеты должны использовать одну и ту же версию (2.10, 2.11, 2.12) Scala.

    Рассмотрим следующий (неверный) build.sbt :

     name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" ) 

    Мы используем spark-streaming для Scala 2.10, а оставшиеся пакеты для Scala 2.11. Допустимым файлом может быть

     name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.11" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" ) 

    но лучше указать версию в глобальном масштабе и использовать %% :

     name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1", "org.apache.spark" %% "spark-streaming" % "2.0.1", "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1" ) 

    Аналогично в Maven:

      com.example simple-project 4.0.0 Simple Project jar 1.0  2.0.1     org.apache.spark spark-core_2.11 ${spark.version}   org.apache.spark spark-streaming_2.11 ${spark.version}   org.apache.bahir spark-streaming-twitter_2.11 ${spark.version}    
  • Версия Spark Все пакеты должны использовать ту же самую основную версию Spark (1.6, 2.0, 2.1, …).

    Рассмотрим следующий (неверный) build.sbt:

     name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "1.6.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" ) 

    Мы используем spark-core 1.6, в то время как остальные компоненты находятся в Spark 2.0. Допустимым файлом может быть

     name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" ) 

    но лучше использовать переменную:

     name := "Simple Project" version := "1.0" val sparkVersion = "2.0.1" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion, "org.apache.spark" % "spark-streaming_2.10" % sparkVersion, "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion ) 

    Аналогично в Maven:

      com.example simple-project 4.0.0 Simple Project jar 1.0  2.0.1 2.11     org.apache.spark spark-core_${scala.version} ${spark.version}   org.apache.spark spark-streaming_${scala.version} ${spark.version}   org.apache.bahir spark-streaming-twitter_${scala.version} ${spark.version}    
  • Версия Spark, используемая в зависимостях Spark, должна соответствовать версии Spark установки Spark. Например, если вы используете 1.6.1 в кластере, вы должны использовать 1.6.1 для сборки банок. Незначительные несоответствия версий не всегда принимаются.

  • Версия Scala, используемая для сборки jar, должна соответствовать версии Scala, используемой для создания развернутой Spark. По умолчанию (загружаемые двоичные файлы и сборки по умолчанию):

    • Spark 1.x -> Scala 2.10
    • Spark 2.x -> Scala 2.11
  • Дополнительные пакеты должны быть доступны на рабочих узлах, если они включены в жировую банку. Существует множество вариантов, в том числе:

    • --jars аргумент --jars для spark-submit – для распространения локальных файлов jar .
    • --packages аргумент для spark-submit – для извлечения зависимостей из репозитория Maven.

    При отправке в узел кластера вы должны включить приложение jar в --jars .

В дополнение к очень обширному ответу, уже заданному пользователем7337271, если проблема возникает из-за отсутствия внешних зависимостей, вы можете создать банку с вашими зависимостями, например, с помощью плагина сборки maven

В этом случае убедитесь, что все основные искровые зависимости отмечены как «предоставленные» в вашей системе сборки, и, как уже отмечалось, убедитесь, что они коррелируют с вашей версией искробезопасности.

Классы зависимостей вашего приложения должны быть указаны в параметре application-jar вашей команды запуска.

Более подробную информацию можно найти в документации Spark

Из документации:

application-jar: путь к объединенной банке, включая ваше приложение и все зависимости. URL-адрес должен быть глобально видимым внутри вашего кластера, например, путь hdfs: // или путь file: //, который присутствует на всех узлах

Я думаю, что эта проблема должна решить плагин сборки. Вам нужно построить жирную банку. Например, в sbt:

  • добавить файл $PROJECT_ROOT/project/assembly.sbt с кодом addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
  • to build.sbt added some libraries libraryDependencies ++ = Seq (“com.some.company” %% “some-lib”% “1.0.0”) `
  • в консоли sbt введите «assembly» и разверните сборную банку

Если вам нужна дополнительная информация, перейдите на страницу https://github.com/sbt/sbt-assembly

  • Отменить в начале складки
  • Пункты памяти и стиль кодирования по Java VM
  • Как использовать эту типизацию Scala, абстрактные типы и т. Д. Для реализации типа Self?
  • Определение UDF, который принимает массив объектов в Spark DataFrame?
  • Как создать и использовать multidimensional array в Scala?
  • Преобразовать java.util.HashMap в scala.collection.immutable.Map в java
  • Фильтр Искра DataFrame в строке содержит
  • Как передать дополнительные параметры UDF в SparkSql?
  • Как вызвать метод Scala Object с использованием отражения?
  • Совместимость шаблонов с несколькими типами Scala
  • Что такое урожайность Скалы?
  • Interesting Posts

    Остановить Mac OS от повторного создания файлов ~ / Pictures, ~ / Music или ~ / Public

    Как изменить время ожидания тайм-аута запроса nodejs?

    Использование удаленного рабочего стола для подключения к другим компьютерам за одним IP-адресом

    Как определить версию Windows из мертвой установки из Linux, имея доступ только к своей файловой системе?

    Уменьшить RAID, удалив диск?

    Диалоговое изображение с четким фоном (не затемнено)

    Как получить общий объем оперативной памяти устройства?

    Код необходимости создания пула соединений в java

    Как создать .NET DateTime из ISO 8601

    Вычисление размера каталога с помощью Python?

    Я не могу подключить DMG, получив ошибку «без монтируемых файловых систем»

    ASP.NET MVC 4 Пользовательский авторизованный атрибут с кодами разрешений (без ролей)

    Owin Self host & ASP .Net MVC

    Вложенные хранимые процедуры, содержащие шаблон TRY CATCH ROLLBACK?

    Ошибка десериализации Xml в Object – xmlns = ” не ожидалось

    Давайте будем гением компьютера.