Перезаписывать конкретные разделы в методе записи данных в режиме искривления

Я хочу перезаписать конкретные разделы, а не все в искровом режиме. Я пытаюсь выполнить следующую команду:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4') 

где df – это фреймворк данных, содержащий инкрементные данные, подлежащие перезаписыванию.

hdfs-base-path содержит core data.

Когда я пытаюсь выполнить вышеуказанную команду, она удаляет все разделы и вставляет те, которые присутствуют в df по пути hdfs.

Мое требование состоит в том, чтобы перезаписать только те разделы, которые присутствуют в df по указанному пути hdfs. Может кто-нибудь, пожалуйста, помогите мне в этом?

Это распространенная проблема. Единственным решением с Spark до 2.0 является запись непосредственно в каталог разделов, например,

 df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value") 

Если вы используете Spark до 2.0, вам необходимо остановить Spark от испускания файлов метаданных (потому что они сломают автоматическое обнаружение разделов), используя:

 sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") 

Если вы используете Spark до 1.6.2, вам также необходимо удалить файл _SUCCESS в /root/path/to/data/partition_col=value или его присутствие приведет к разрыву автоматического обнаружения разделов. (Я настоятельно рекомендую использовать 1.6.2 или новее).

Вы можете получить еще несколько подробностей о том, как управлять большими секционированными таблицами из моей беседы Spark Summit о Bulletproof Jobs .

В заключение! Это теперь функция Spark 2.3.0: https://issues.apache.org/jira/browse/SPARK-20236

Чтобы использовать его, вам необходимо установить для параметра spark.sql.sources.partitionOverwriteMode значение dynamic, dataset необходимо разделить, а режим записи перезаписать . Пример:

 spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") data.write.mode("overwrite").insertInto("partitioned_table") 

Я рекомендую сделать перераспределение на основе столбца раздела перед написанием, так что вы не получите 400 файлов в папке.

До Spark 2.3.0 лучшим решением было бы запустить SQL-инструкции для удаления этих разделов, а затем записать их с помощью добавления режима.

Использование Spark 1.6 …

HiveContext может значительно упростить этот процесс. Ключ состоит в том, что вы должны сначала создать таблицу в Hive, используя оператор CREATE EXTERNAL TABLE с определенным разделением. Например:

 # Hive SQL CREATE EXTERNAL TABLE test (name STRING) PARTITIONED BY (age INT) STORED AS PARQUET LOCATION 'hdfs:///tmp/tables/test' 

Отсюда, скажем, у вас есть Dataframe с новыми записями в нем для определенного раздела (или нескольких разделов). Вы можете использовать оператор HiveContext SQL для выполнения INSERT OVERWRITE OVERWRITE с использованием этого Dataframe, который перезапишет таблицу только для разделов, содержащихся в Dataframe:

 # PySpark hiveContext = HiveContext(sc) update_dataframe.registerTempTable('update_dataframe') hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age) SELECT name, age FROM update_dataframe""") 

Примечание: update_dataframe в этом примере имеет схему, которая соответствует схеме целевой test таблицы.

Одной простой ошибкой сделать этот подход является пропустить шаг CREATE EXTERNAL TABLE в Hive и просто сделать таблицу с использованием методов записи API Dataframe. Для таблиц, основанных на Паркетах, таблица не будет определена надлежащим образом для поддержки функции INSERT OVERWRITE... PARTITION .

Надеюсь это поможет.

Если вы используете DataFrame, возможно, вы хотите использовать таблицу Hive над данными. В этом случае вам нужен только метод вызова

 df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name) 

Он перезапишет разделы, которые содержит DataFrame.

Нет необходимости указывать формат (orc), потому что Spark будет использовать формат таблицы Hive.

Он отлично работает в версии Spark 1.6

Я попытался подходить к подходу, чтобы перезаписать конкретный раздел в таблице HIVE.

 ### load Data and check records raw_df = spark.table("test.original") raw_df.count() lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925 ### Check data in few partitions. sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag") print "Number of records: ", sample.count() sample.show() ### Back-up the partitions before deletion raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite") ### UDF : To delete particular partition. def delete_part(table, part): qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")" spark.sql(qry) ### Delete partitions part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct() part_list = part_df.rdd.map(lambda x : x[0]).collect() table = "test.original" for p in part_list: delete_part(table, p) ### Do the required Changes to the columns in partitions df = spark.table("test.original_bkp") newdf = df.withColumn("c_preferred_cust_flag", lit("Y")) newdf.select("c_customer_sk", "c_preferred_cust_flag").show() ### Write the Partitions back to Original table newdf.write.insertInto("test.original") ### Verify data in Original table orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show() Hope it helps. Regards, Neeraj 

Вы могли бы сделать что-то подобное, чтобы сделать работу реентерабельной (идемпотент): (попробовал это на искру 2.2)

 # drop the partition drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition) print drop_query spark.sql(drop_query) # delete directory dbutils.fs.rm(,recurse=True) # Load the partition df.write\ .partitionBy("partition_col")\ .saveAsTable(table_name, format = "parquet", mode = "append", path = ) 

Я предлагаю вам выполнить очистку, а затем писать новые разделы в режиме Append :

 import scala.sys.process._ def deletePath(path: String): Unit = { s"hdfs dfs -rm -r -skipTrash $path".! } df.select(partitionColumn).distinct.collect().foreach(p => { val partition = p.getAs[String](partitionColumn) deletePath(s"$path/$partitionColumn=$partition") }) df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path) 

Это приведет к удалению только новых разделов. После записи данных выполните эту команду, если вам нужно обновить метастор:

 sparkSession.sql(s"MSCK REPAIR TABLE $db.$table") 

Примечание: deletePath предполагает, что команда hfds доступна в вашей системе.

  • Как выбрать первую строку каждой группы?
  • Как найти размер вспышки RDD / Dataframe?
  • Spark Sql UDF со сложным входным параметром
  • Как передать дополнительные параметры UDF в SparkSql?
  • Как определить настраиваемую функцию агрегации для суммирования столбца векторов?
  • Как закрепить два (или более) DataFrame в Spark
  • Вывести несколько столбцов из одного столбца в Spark DataFrame
  • Как импортировать несколько файлов csv в одной загрузке?
  • Вызывается: java.lang.NullPointerException в org.apache.spark.sql.Dataset
  • Каковы возможные причины для получения TimeoutException: фьючерсы, истекающие после при работе с Spark
  • Давайте будем гением компьютера.