Метод writeframe Spark для записи многих небольших файлов

У меня довольно простая работа, покрывающая файлы журналов на паркет. Он обрабатывает 1,1 Тбайт данных (размещается в 64 МБ – 128 МБ файлов – размер нашего блока составляет 128 МБ), что составляет около 12 тыс. Файлов.

Работа работает следующим образом:

val events = spark.sparkContext .textFile(s"$stream/$sourcetype") .map(_.split(" \\|\\| ").toList) .collect{case List(date, y, "Event") => MyEvent(date, y, "Event")} .toDF() df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path") 

Он собирает события с общей схемой, преобразуется в DataFrame, а затем записывается как паркет.

Проблема, с которой я сталкиваюсь, заключается в том, что это может привести к небольшому взрыву ввода-вывода в кластере HDFS, поскольку он пытается создать так много крошечных файлов.

В идеале я хочу создать только несколько паркетных файлов в разделе «дата».

Какой был бы лучший способ контролировать это? Используется ли «coalesce ()»?

Как это повлияет на количество файлов, созданных в данном разделе? Это зависит от количества исполнителей, которые я работаю в Spark? (в настоящее время установлено 100).

вы должны DataFrame свой DataFrame в соответствии с разделением DataFrameWriter

Попробуй это:

 df .repartition($"date") .write.mode(SaveMode.Append) .partitionBy("date") .parquet(s"$path") 

Самое простое решение – заменить ваше фактическое разбиение на:

 df .repartition(to_date($"date")) .write.mode(SaveMode.Append) .parquet(s"$path") 

Вы также можете использовать более точное разбиение на разделы для вашего DataFrame то есть на день и, возможно, на час часового диапазона. и тогда вы можете быть менее точными для писателя. Это зависит от объема данных.

Вы можете уменьшить энтропию, DataFrame и предложение write with partition by.

Я столкнулся с той же проблемой, и я мог использовать coalesce решив мою проблему.

 df .coalesce(3) // number of parts/files .write.mode(SaveMode.Append) .parquet(s"$path") 

Для получения дополнительной информации об использовании coalesce или repartition вы можете обратиться к следующей искре: объединить или переделать

  • В чем смысл classа Option ?
  • Как я могу устранить ошибку в Scala между методами с vararg и без
  • Scala: Почему mapValues ​​создает представление и есть ли стабильные альтернативы?
  • В чем разница между classом и classом classа Scala?
  • В чем разница между JavaConverters и JavaConversions в Scala?
  • Скала «для понимания» с фьючерсами
  • Как найти максимальное значение в паре RDD?
  • Итерация над запечатанной чертой в Скала?
  • Как преобразовать A ] в B ], если A и B являются монадами?
  • Поиск параметров типа через reflection в Scala 2.10?
  • Что такое урожайность Скалы?
  • Давайте будем гением компьютера.