Оптимизация соединения DataFrame – Broadcast Hash Join

Я пытаюсь эффективно объединить два DataFrames, один из которых большой, а второй немного меньше.

Есть ли способ избежать всех этих перетасовки? Я не могу установить autoBroadCastJoinThreshold , потому что он поддерживает только целые числа – и таблица, которую я пытаюсь транслировать, немного больше целого числа байтов.

Есть ли способ заставить трансляцию игнорировать эту переменную?

Broadcast Hash Joins (аналогично объединению карты или объединению карт в Mapreduce):

В SparkSQL вы можете видеть тип соединения, выполняемого вызовом queryExecution.executedPlan . Как и в случае с Core Spark, если одна из таблиц намного меньше другой, вам может понадобиться хеш-соединение трансляции. Вы можете намекнуть на Spark SQL, что данный DF должен быть передан для соединения, вызывая метод broadcast на DataFrame перед его присоединением

Пример: largedataframe.join(broadcast(smalldataframe), "key")

в условиях DWH, где большой размер может быть как фактом
smalldataframe может быть как размер

Как описано в моих любимых книгах (HPS). см. ниже, чтобы иметь лучшее понимание. введите описание изображения здесь

Примечание. SparkContext broadcast осуществляется из import org.apache.spark.sql.functions.broadcast не из SparkContext

Spark также автоматически использует spark.sql.conf.autoBroadcastJoinThreshold чтобы определить, должна ли отображаться таблица.

Совет: см. Метод DataFrame.explain ()

 def explain(): Unit Prints the physical plan to the console for debugging purposes. 

Есть ли способ заставить трансляцию игнорировать эту переменную?

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

Вы можете намекнуть, что кадр данных будет транслироваться с помощью left.join(broadcast(right), ...)

Это текущее ограничение искры, см. SPARK-6235 . Предел 2GB также применяется для широковещательных переменных.

Вы уверены, что нет другого хорошего способа сделать это, например, разное разбиение?

В противном случае вы можете взломать его, вручную создав несколько широковещательных переменных, каждый из которых составляет <2 ГБ.

Установка spark.sql.autoBroadcastJoinThreshold = -1 полностью отключит широковещательную передачу. См. Другие параметры конфигурации в руководстве Spark SQL, DataFrames и Datasets .

Я нашел этот код для Broadcast Join в Spark 2.11 версии 2.0.0.

 import org.apache.spark.sql.functions.broadcast val employeesDF = employeesRDD.toDF val departmentsDF = departmentsRDD.toDF // materializing the department data val tmpDepartments = broadcast(departmentsDF.as("departments")) import context.implicits._ employeesDF.join(broadcast(tmpDepartments), $"depId" === $"id", // join by employees.depID == departments.id "inner").show() 

Вот ссылка на приведенный выше код Henning Kropp Blog, Broadcast Join with Spark

  • Spark Sql UDF со сложным входным параметром
  • Как выбрать первую строку каждой группы?
  • Определение UDF, который принимает массив объектов в Spark DataFrame?
  • Как закрепить два (или более) DataFrame в Spark
  • Spark / Scala: форвардная заливка с последним наблюдением
  • Как найти размер вспышки RDD / Dataframe?
  • spark.ml StringIndexer бросает «Невидимый ярлык» на fit ()
  • Блок данных фильтра Pyspark по столбцам другого блока данных
  • Spark sql как взорваться без потери нулевых значений
  • Перезаписывать конкретные разделы в методе записи данных в режиме искривления
  • Interesting Posts

    Установленный установщик Inno Setup не показывает страницу «Выбрать место назначения» на некоторых системах

    Удаление идентификатора fragmentа из URL-адресов AngularJS (символ #)

    Как прокрутить UITableView в определенную позицию

    Множественный выбор в пользовательском ListView с CAB

    Как установить Python OpenCV через Conda?

    Задайте разные части поля формы, чтобы иметь разные шрифты, используя iTextSharp

    Разделение жесткого диска Dell XPS 15 L502X

    Как вы можете получить неудачный вход в Windows 10

    Очистить элемент из автозаполнения адресной строки Chrome

    Записи чтения / записи в C ++

    Как перечислить все свойства зависимостей управления?

    Понимание событий и обработчиков событий в C #

    Cmake не может найти библиотеку, используя “link_directories”

    Почему NoClassDefFoundError вызвано сбоем инициализации статического поля?

    Сохранять / сохранять / восстанавливать позицию прокрутки при возврате в ListView

    Давайте будем гением компьютера.