Как определить разбиение DataFrame?
Я начал использовать Spark SQL и DataFrames в Spark 1.4.0. Я хочу определить пользовательский разделитель на DataFrames в Scala, но не вижу, как это сделать.
Одна из таблиц данных, с которыми я работаю, содержит список транзакций, по учетным записям, silimar в следующем примере.
Account Date Type Amount 1001 2014-04-01 Purchase 100.00 1001 2014-04-01 Purchase 50.00 1001 2014-04-05 Purchase 70.00 1001 2014-04-01 Payment -150.00 1002 2014-04-01 Purchase 80.00 1002 2014-04-02 Purchase 22.00 1002 2014-04-04 Payment -120.00 1002 2014-04-04 Purchase 60.00 1003 2014-04-02 Purchase 210.00 1003 2014-04-03 Purchase 15.00
По крайней мере, изначально большинство вычислений будет происходить между транзакциями внутри учетной записи. Поэтому я хотел бы, чтобы данные были разделены так, чтобы все транзакции для учетной записи находились в одном и том же разделе Spark.
- Разбор (разделение) строки в C ++ с использованием разделителя строк (стандартный C ++)
- Отдел без использования '/'
- Математическая дивизия в Свифте
- Метод split () в Java не работает с точкой (.)
- R разделить числовой вектор в позиции
Но я не вижу способа определить это. Класс DataFrame имеет метод под названием «repartition (Int)», где вы можете указать количество создаваемых разделов. Но я не вижу доступных методов для определения пользовательского разделителя для DataFrame, например, для RDD.
Исходные данные хранятся в Parquet. Я видел, что при написании DataFrame в Parquet вы можете указать столбец для разделения, так что предположительно я мог бы рассказать Parquet о разделении его данных по столбцу «Учетная запись». Но могут быть миллионы учетных записей, и если я правильно понимаю Паркет, это создало бы отдельный каталог для каждой учетной записи, так что это не звучало как разумное решение.
Есть ли способ заставить Spark разбить этот DataFrame так, чтобы все данные для учетной записи находились в одном разделе?
- Разделите число на 3 без использования операторов *, /, +, -,%
- Разделить строку с точкой в качестве разделителя
- Схема разбиения по умолчанию в Spark
- Как разбить строку, разделенную запятой?
- Java разделяет строку на массив
- Как заставить мой раскол работать только на одной реальной линии и быть способным пропустить цитируемые части строки?
- Как разбить одну строку на несколько строк, разделенных хотя бы одним пространством в оболочке bash?
- Java: String split (): Я хочу, чтобы он включал пустые строки в конце
Искра> = 2,3,0
SPARK-22614 раскрывает разбиение диапазонов.
val partitionedByRange = df.repartitionByRange(42, $"k") partitionedByRange.explain // == Parsed Logical Plan == // 'RepartitionByExpression ['k ASC NULLS FIRST], 42 // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6] // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 // +- Project [_1#2 AS k#5, _2#3 AS v#6] // +- LocalRelation [_1#2, _2#3] // // == Optimized Logical Plan == // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 // +- LocalRelation [k#5, v#6] // // == Physical Plan == // Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42) // +- LocalTableScan [k#5, v#6]
SPARK-22389 предоставляет разбиение внешнего формата в API-интерфейсе источника данных v2 .
Искры> = 1,6,0
В Spark> = 1.6 можно использовать разбиение по столбцу для запроса и кэширования. См.: SPARK-11410 и SPARK-4849 с использованием метода repartition
:
val df = Seq( ("A", 1), ("B", 2), ("A", 3), ("C", 1) ).toDF("k", "v") val partitioned = df.repartition($"k") partitioned.explain // scala> df.repartition($"k").explain(true) // == Parsed Logical Plan == // 'RepartitionByExpression ['k], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at :27 // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#7], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at :27 // // == Optimized Logical Plan == // RepartitionByExpression [k#7], None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at :27 // // == Physical Plan == // TungstenExchange hashpartitioning(k#7,200), None // +- Project [_1#5 AS k#7,_2#6 AS v#8] // +- Scan PhysicalRDD[_1#5,_2#6]
В отличие от RDDs
Spark Dataset
(включая Dataset[Row]
aka DataFrame
) не может использовать пользовательский разделитель, как сейчас. Обычно вы можете обратиться к этому, создав столбец искусственного разделения, но он не даст вам такой же гибкости.
Искры <1.6.0:
Одна вещь, которую вы можете сделать, это предварительно разбить входные данные перед созданием DataFrame
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.HashPartitioner val schema = StructType(Seq( StructField("x", StringType, false), StructField("y", LongType, false), StructField("z", DoubleType, false) )) val rdd = sc.parallelize(Seq( Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0), Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99) )) val partitioner = new HashPartitioner(5) val partitioned = rdd.map(r => (r.getString(0), r)) .partitionBy(partitioner) .values val df = sqlContext.createDataFrame(partitioned, schema)
Поскольку DataFrame
создания DataFrame
из RDD
требуется только простая фаза карты, необходимо сохранить существующую структуру разделов *:
assert(df.rdd.partitions == partitioned.partitions)
Точно так же вы можете переделать существующий DataFrame
:
sqlContext.createDataFrame( df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values, df.schema )
Похоже, это не невозможно. Вопрос остается, если он имеет смысл. Я буду утверждать, что большую часть времени это не делает:
-
Перераспределение – дорогостоящий процесс. В типичном сценарии большая часть данных должна быть сериализована, перетасована и десериализована. С другой стороны, количество операций, которые могут извлечь выгоду из предварительно разделенных данных, относительно невелико и дополнительно ограничено, если внутренний API не предназначен для использования этого свойства.
- присоединяется к некоторым сценариям, но для этого потребуется внутренняя поддержка,
- window выполняет вызовы с соответствующим разделителем. То же, что и выше, ограничивается одним определением windows. Он уже разделен внутри, хотя, поэтому предварительное разбиение может быть избыточным,
- простые агрегаты с
GROUP BY
– можно уменьшить объем памяти временных буферов **, но общая стоимость намного выше. Более или менее эквивалентноgroupByKey.mapValues(_.reduce)
(текущее поведение) vsreduceByKey
(предварительное разбиение). Вряд ли будет полезно на практике. - сжатие данных с помощью
SqlContext.cacheTable
. Поскольку похоже, что используется кодировка длины выполнения, применениеOrderedRDDFunctions.repartitionAndSortWithinPartitions
может улучшить коэффициент сжатия.
-
Производительность сильно зависит от распределения ключей. Если он искажен, это приведет к субоптимальному использованию ресурсов. В худшем случае невозможно завершить работу вообще.
- Весь смысл использования декларативного API высокого уровня состоит в том, чтобы изолировать себя от деталей реализации низкого уровня. Как уже упоминалось @dwysakowicz и @RomiKuntsman , оптимизация – это работа Оптимизатора Catalyst . Это довольно сложный зверь, и я действительно сомневаюсь, что вы можете легко улучшить это, не погружаясь гораздо глубже в его внутренности.
Связанные понятия
Разделение с источниками JDBC :
Источники данных JDBC поддерживают аргумент predicates
. Его можно использовать следующим образом:
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
Он создает единый раздел JDBC для каждого предиката. Имейте в виду, что если наборы, созданные с использованием отдельных предикатов, не пересекаются, вы увидите дубликаты в результирующей таблице.
Метод partitionBy
в DataFrameWriter
:
Spark DataFrameWriter
предоставляет метод partitionBy
который может использоваться для «разделения» данных при записи. Он отделяет данные от записи, используя предоставленный набор столбцов
val df = Seq( ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6) ).toDF("k", "v") df.write.partitionBy("k").json("/tmp/foo.json")
Это позволяет предикату нажимать на чтение для запросов на основе ключа:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json") df1.where($"k" === "bar")
но это не эквивалентно DataFrame.repartition
. В частности, такие агрегаты, как:
val cnts = df1.groupBy($"k").sum()
по-прежнему потребует TungstenExchange
:
cnts.explain // == Physical Plan == // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93]) // +- TungstenExchange hashpartitioning(k#90,200), None // +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99]) // +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
в DataFrameWriter
(Spark> = 2.0):
bucketBy
имеет аналогичные приложения, такие как partitionBy
но доступен только для таблиц ( saveAsTable
). Информация о буклете может использоваться для оптимизации соединений:
// Temporarily disable broadcast joins spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df.write.bucketBy(42, "k").saveAsTable("df1") val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2") df2.write.bucketBy(42, "k").saveAsTable("df2") // == Physical Plan == // *Project [k#41, v#42, v2#47] // +- *SortMergeJoin [k#41], [k#46], Inner // :- *Sort [k#41 ASC NULLS FIRST], false, 0 // : +- *Project [k#41, v#42] // : +- *Filter isnotnull(k#41) // : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct // +- *Sort [k#46 ASC NULLS FIRST], false, 0 // +- *Project [k#46, v2#47] // +- *Filter isnotnull(k#46) // +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct
* По разметке я имею в виду только распределение данных. partitioned
RDD больше не является разделителем. ** Предполагая, что ранняя проекция отсутствует. Если агрегация охватывает только малую часть столбцов, то, вероятно, нет никакой выгоды.
В Spark <1.6 Если вы создаете HiveContext
, а не простой старый SqlContext
вы можете использовать HiveQL DISTRIBUTE BY colX...
(каждый из N редукторов получает неперекрывающиеся диапазоны x) и CLUSTER BY colX...
(ярлык для Распределить By и Сортировать By), например;
df.registerTempTable("partitionMe") hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
Не уверен, как это вписывается в Spark DF api. Эти ключевые слова не поддерживаются в обычном SqlContext (обратите внимание, что вам не нужно иметь мета-хранилище hive для использования HiveContext)
EDIT: Spark 1.6+ теперь имеет это в собственном API DataFrame
Используйте DataFrame, возвращенный:
yourDF.orderBy(account)
Нет явного способа использования partitionBy в DataFrame, только на PairRDD, но когда вы сортируете DataFrame, он будет использовать это в своем LogicalPlan, и это поможет, когда вам нужно делать вычисления для каждой учетной записи.
Я просто наткнулся на ту же самую проблему, с фреймворком данных, который я хочу разделить по аккаунту. Я предполагаю, что когда вы говорите: «хотите, чтобы данные были разделены так, чтобы все транзакции для учетной записи находились в одном и том же разделе Spark», вы хотите, чтобы это было для масштаба и производительности, но ваш код не зависит от него (например, используя mapPartitions () и т. д.), правильно?
Я смог сделать это с помощью RDD. Но я не знаю, приемлемо ли это для вас. Если у вас есть DF, ansible как RDD, вы можете применить repartitionAndSortWithinPartitions
для выполнения пользовательского перераспределения данных.
Вот пример, который я использовал:
class DatePartitioner(partitions: Int) extends Partitioner { override def getPartition(key: Any): Int = { val start_time: Long = key.asInstanceOf[Long] Objects.hash(Array(start_time)) % partitions } override def numPartitions: Int = partitions } myRDD .repartitionAndSortWithinPartitions(new DatePartitioner(24)) .map { v => v._2 } .toDF() .write.mode(SaveMode.Overwrite)
Поэтому, чтобы начать с какого-то ответа:) – Вы не можете
Я не эксперт, но насколько я понимаю DataFrames, они не равны rdd, а DataFrame не имеет такой вещи, как Partitioner.
Вообще идея DataFrame заключается в том, чтобы обеспечить еще один уровень абстракции, который сам справляется с такими проблемами. Запросы в DataFrame переводятся в логический план, который далее переводится на операции с RDD. Предлагаемое разбиение, вероятно, будет применяться автоматически или, по крайней мере, должно быть.
Если вы не доверяете SparkSQL, что он предоставит какое-то оптимальное задание, вы всегда можете преобразовать DataFrame в RDD [Row], как это предлагается в комментариях.