Как определить разбиение DataFrame?

Я начал использовать Spark SQL и DataFrames в Spark 1.4.0. Я хочу определить пользовательский разделитель на DataFrames в Scala, но не вижу, как это сделать.

Одна из таблиц данных, с которыми я работаю, содержит список транзакций, по учетным записям, silimar в следующем примере.

Account Date Type Amount 1001 2014-04-01 Purchase 100.00 1001 2014-04-01 Purchase 50.00 1001 2014-04-05 Purchase 70.00 1001 2014-04-01 Payment -150.00 1002 2014-04-01 Purchase 80.00 1002 2014-04-02 Purchase 22.00 1002 2014-04-04 Payment -120.00 1002 2014-04-04 Purchase 60.00 1003 2014-04-02 Purchase 210.00 1003 2014-04-03 Purchase 15.00 

По крайней мере, изначально большинство вычислений будет происходить между транзакциями внутри учетной записи. Поэтому я хотел бы, чтобы данные были разделены так, чтобы все транзакции для учетной записи находились в одном и том же разделе Spark.

Но я не вижу способа определить это. Класс DataFrame имеет метод под названием «repartition (Int)», где вы можете указать количество создаваемых разделов. Но я не вижу доступных методов для определения пользовательского разделителя для DataFrame, например, для RDD.

Исходные данные хранятся в Parquet. Я видел, что при написании DataFrame в Parquet вы можете указать столбец для разделения, так что предположительно я мог бы рассказать Parquet о разделении его данных по столбцу «Учетная запись». Но могут быть миллионы учетных записей, и если я правильно понимаю Паркет, это создало бы отдельный каталог для каждой учетной записи, так что это не звучало как разумное решение.

Есть ли способ заставить Spark разбить этот DataFrame так, чтобы все данные для учетной записи находились в одном разделе?

Искра> = 2,3,0

SPARK-22614 раскрывает разбиение диапазонов.

 val partitionedByRange = df.repartitionByRange(42, $"k") partitionedByRange.explain // == Parsed Logical Plan == // 'RepartitionByExpression ['k ASC NULLS FIRST], 42 // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6] // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 // +- Project [_1#2 AS k#5, _2#3 AS v#6] // +- LocalRelation [_1#2, _2#3] // // == Optimized Logical Plan == // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 // +- LocalRelation [k#5, v#6] // // == Physical Plan == // Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42) // +- LocalTableScan [k#5, v#6] 

SPARK-22389 предоставляет разбиение внешнего формата в API-интерфейсе источника данных v2 .

Искры> = 1,6,0

В Spark> = 1.6 можно использовать разбиение по столбцу для запроса и кэширования. См.: SPARK-11410 и SPARK-4849 с использованием метода repartition :

 val df = Seq( ("A", 1), ("B", 2), ("A", 3), ("C", 1) ).toDF("k", "v") val partitioned = df.repartition($"k") partitioned.explain // scala> df.repartition($"k").explain(true) // == Parsed Logical Plan == // 'RepartitionByExpression ['k], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at :27 // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#7], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at :27 // // == Optimized Logical Plan == // RepartitionByExpression [k#7], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at :27 // // == Physical Plan == // TungstenExchange hashpartitioning(k#7,200), None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- Scan PhysicalRDD[_1#5,_2#6] 

В отличие от RDDs Spark Dataset (включая Dataset[Row] aka DataFrame ) не может использовать пользовательский разделитель, как сейчас. Обычно вы можете обратиться к этому, создав столбец искусственного разделения, но он не даст вам такой же гибкости.

Искры <1.6.0:

Одна вещь, которую вы можете сделать, это предварительно разбить входные данные перед созданием DataFrame

 import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.HashPartitioner val schema = StructType(Seq( StructField("x", StringType, false), StructField("y", LongType, false), StructField("z", DoubleType, false) )) val rdd = sc.parallelize(Seq( Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0), Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99) )) val partitioner = new HashPartitioner(5) val partitioned = rdd.map(r => (r.getString(0), r)) .partitionBy(partitioner) .values val df = sqlContext.createDataFrame(partitioned, schema) 

Поскольку DataFrame создания DataFrame из RDD требуется только простая фаза карты, необходимо сохранить существующую структуру разделов *:

 assert(df.rdd.partitions == partitioned.partitions) 

Точно так же вы можете переделать существующий DataFrame :

 sqlContext.createDataFrame( df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values, df.schema ) 

Похоже, это не невозможно. Вопрос остается, если он имеет смысл. Я буду утверждать, что большую часть времени это не делает:

  1. Перераспределение – дорогостоящий процесс. В типичном сценарии большая часть данных должна быть сериализована, перетасована и десериализована. С другой стороны, количество операций, которые могут извлечь выгоду из предварительно разделенных данных, относительно невелико и дополнительно ограничено, если внутренний API не предназначен для использования этого свойства.

    • присоединяется к некоторым сценариям, но для этого потребуется внутренняя поддержка,
    • window выполняет вызовы с соответствующим разделителем. То же, что и выше, ограничивается одним определением windows. Он уже разделен внутри, хотя, поэтому предварительное разбиение может быть избыточным,
    • простые агрегаты с GROUP BY – можно уменьшить объем памяти временных буферов **, но общая стоимость намного выше. Более или менее эквивалентно groupByKey.mapValues(_.reduce) (текущее поведение) vs reduceByKey (предварительное разбиение). Вряд ли будет полезно на практике.
    • сжатие данных с помощью SqlContext.cacheTable . Поскольку похоже, что используется кодировка длины выполнения, применение OrderedRDDFunctions.repartitionAndSortWithinPartitions может улучшить коэффициент сжатия.
  2. Производительность сильно зависит от распределения ключей. Если он искажен, это приведет к субоптимальному использованию ресурсов. В худшем случае невозможно завершить работу вообще.

  3. Весь смысл использования декларативного API высокого уровня состоит в том, чтобы изолировать себя от деталей реализации низкого уровня. Как уже упоминалось @dwysakowicz и @RomiKuntsman , оптимизация – это работа Оптимизатора Catalyst . Это довольно сложный зверь, и я действительно сомневаюсь, что вы можете легко улучшить это, не погружаясь гораздо глубже в его внутренности.

Связанные понятия

Разделение с источниками JDBC :

Источники данных JDBC поддерживают аргумент predicates . Его можно использовать следующим образом:

 sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props) 

Он создает единый раздел JDBC для каждого предиката. Имейте в виду, что если наборы, созданные с использованием отдельных предикатов, не пересекаются, вы увидите дубликаты в результирующей таблице.

Метод partitionBy в DataFrameWriter :

Spark DataFrameWriter предоставляет метод partitionBy который может использоваться для «разделения» данных при записи. Он отделяет данные от записи, используя предоставленный набор столбцов

 val df = Seq( ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6) ).toDF("k", "v") df.write.partitionBy("k").json("/tmp/foo.json") 

Это позволяет предикату нажимать на чтение для запросов на основе ключа:

 val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json") df1.where($"k" === "bar") 

но это не эквивалентно DataFrame.repartition . В частности, такие агрегаты, как:

 val cnts = df1.groupBy($"k").sum() 

по-прежнему потребует TungstenExchange :

 cnts.explain // == Physical Plan == // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93]) // +- TungstenExchange hashpartitioning(k#90,200), None // +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99]) // +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json 

bucketBy в DataFrameWriter (Spark> = 2.0):

bucketBy имеет аналогичные приложения, такие как partitionBy но доступен только для таблиц ( saveAsTable ). Информация о буклете может использоваться для оптимизации соединений:

 // Temporarily disable broadcast joins spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df.write.bucketBy(42, "k").saveAsTable("df1") val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2") df2.write.bucketBy(42, "k").saveAsTable("df2") // == Physical Plan == // *Project [k#41, v#42, v2#47] // +- *SortMergeJoin [k#41], [k#46], Inner // :- *Sort [k#41 ASC NULLS FIRST], false, 0 // : +- *Project [k#41, v#42] // : +- *Filter isnotnull(k#41) // : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct // +- *Sort [k#46 ASC NULLS FIRST], false, 0 // +- *Project [k#46, v2#47] // +- *Filter isnotnull(k#46) // +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct 

* По разметке я имею в виду только распределение данных. partitioned RDD больше не является разделителем. ** Предполагая, что ранняя проекция отсутствует. Если агрегация охватывает только малую часть столбцов, то, вероятно, нет никакой выгоды.

В Spark <1.6 Если вы создаете HiveContext , а не простой старый SqlContext вы можете использовать HiveQL DISTRIBUTE BY colX... (каждый из N редукторов получает неперекрывающиеся диапазоны x) и CLUSTER BY colX... (ярлык для Распределить By и Сортировать By), например;

 df.registerTempTable("partitionMe") hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date") 

Не уверен, как это вписывается в Spark DF api. Эти ключевые слова не поддерживаются в обычном SqlContext (обратите внимание, что вам не нужно иметь мета-хранилище hive для использования HiveContext)

EDIT: Spark 1.6+ теперь имеет это в собственном API DataFrame

Используйте DataFrame, возвращенный:

 yourDF.orderBy(account) 

Нет явного способа использования partitionBy в DataFrame, только на PairRDD, но когда вы сортируете DataFrame, он будет использовать это в своем LogicalPlan, и это поможет, когда вам нужно делать вычисления для каждой учетной записи.

Я просто наткнулся на ту же самую проблему, с фреймворком данных, который я хочу разделить по аккаунту. Я предполагаю, что когда вы говорите: «хотите, чтобы данные были разделены так, чтобы все транзакции для учетной записи находились в одном и том же разделе Spark», вы хотите, чтобы это было для масштаба и производительности, но ваш код не зависит от него (например, используя mapPartitions () и т. д.), правильно?

Я смог сделать это с помощью RDD. Но я не знаю, приемлемо ли это для вас. Если у вас есть DF, ansible как RDD, вы можете применить repartitionAndSortWithinPartitions для выполнения пользовательского перераспределения данных.

Вот пример, который я использовал:

 class DatePartitioner(partitions: Int) extends Partitioner { override def getPartition(key: Any): Int = { val start_time: Long = key.asInstanceOf[Long] Objects.hash(Array(start_time)) % partitions } override def numPartitions: Int = partitions } myRDD .repartitionAndSortWithinPartitions(new DatePartitioner(24)) .map { v => v._2 } .toDF() .write.mode(SaveMode.Overwrite) 

Поэтому, чтобы начать с какого-то ответа:) – Вы не можете

Я не эксперт, но насколько я понимаю DataFrames, они не равны rdd, а DataFrame не имеет такой вещи, как Partitioner.

Вообще идея DataFrame заключается в том, чтобы обеспечить еще один уровень абстракции, который сам справляется с такими проблемами. Запросы в DataFrame переводятся в логический план, который далее переводится на операции с RDD. Предлагаемое разбиение, вероятно, будет применяться автоматически или, по крайней мере, должно быть.

Если вы не доверяете SparkSQL, что он предоставит какое-то оптимальное задание, вы всегда можете преобразовать DataFrame в RDD [Row], как это предлагается в комментариях.

  • Разделение строки Java удалено пустым значением
  • Убедитесь, что число делится на 3
  • Как разбить строку на массив символов?
  • Как использовать «.» Как разделитель с String.split () в java
  • Давайте будем гением компьютера.