Как разбить фрейм данных на dataframes с одинаковыми значениями столбцов?

Используя Scala, как я могу разделить dataFrame на несколько dataFrame (будь то массив или коллекция) с одинаковым значением столбца. Например, я хочу разбить следующий DataFrame:

ID Rate State 1 24 AL 2 35 MN 3 46 FL 4 34 AL 5 78 MN 6 99 FL 

чтобы:

dataset 1

 ID Rate State 1 24 AL 4 34 AL 

dataset 2

 ID Rate State 2 35 MN 5 78 MN 

dataset 3

 ID Rate State 3 46 FL 6 99 FL 

Вы можете собирать уникальные значения состояния и просто отображать результирующий массив:

 val states = df.select("State").distinct.collect.flatMap(_.toSeq) val byStateArray = states.map(state => df.where($"State" <=> state)) 

или для отображения:

 val byStateMap = states .map(state => (state -> df.where($"State" <=> state))) .toMap 

То же самое в Python:

 from itertools import chain from pyspark.sql.functions import col states = chain(*df.select("state").distinct().collect()) # PySpark 2.3 and later # In 2.2 and before col("state") == state) # should give the same outcome, ignoring NULLs # if NULLs are important # (lit(state).isNull() & col("state").isNull()) | (col("state") == state) df_by_state = {state: df.where(col("state").eqNullSafe(state)) for state in states} 

Очевидная проблема заключается в том, что для каждого уровня требуется полная проверка данных, поэтому это дорогостоящая операция. Если вы ищете способ просто разделить выход, см. Также. Как разделить RDD на два или более RDD?

В частности, вы можете написать Dataset разделенный на интересующий столбец:

 val path: String = ??? df.write.partitionBy("State").parquet(path) 

и, если необходимо, при необходимости отчитаться:

 // Depend on partition prunning for { state <- states } yield spark.read.parquet(path).where($"State" === state) // or explicitly read the partition for { state <- states } yield spark.read.parquet(s"$path/State=$state") 

В зависимости от размера данных, количества уровней расщепления, хранения и уровня персистентности ввода он может быть быстрее или медленнее, чем несколько фильтров.

Это очень просто (если версия искры – 2), если вы делаете dataframe временной таблицей.

 df1.createOrReplaceTempView("df1") 

И теперь вы можете делать запросы,

 var df2 = spark.sql("select * from df1 where state = 'FL'") var df3 = spark.sql("select * from df1 where state = 'MN'") var df4 = spark.sql("select * from df1 where state = 'AL'") 

Теперь вы получили df2, df3, df4. Если вы хотите иметь их в качестве списка, вы можете использовать,

 df2.collect() df3.collect() 

или даже функцию map / filter. См. https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes.

ясень

  • Что такое идентификатор Scala «неявно»?
  • Загружать файл Scala в интерпретатор для использования функций?
  • Разница между Seq и списком в Scala
  • Действительные символы идентификатора в Scala
  • Scala String vs java.lang.String - вывод типа
  • Настройте SparkContext с помощью sparkConf.set (..) при использовании искровой оболочки
  • Scala's '::', как это работает?
  • Использовать функциональные комбинаторы на Scala Tuples?
  • Синтаксис сахара: _ * для лечения Seq как параметры метода
  • как определить, имеет ли блок данных искровой диаграммы столбец
  • Как определить пользовательский разделитель для Spark RDD с одинаковым размером раздела, где каждый раздел имеет одинаковое количество элементов?
  • Давайте будем гением компьютера.