Метод writeframe Spark для записи многих небольших файлов
У меня довольно простая работа, покрывающая файлы журналов на паркет. Он обрабатывает 1,1 Тбайт данных (размещается в 64 МБ – 128 МБ файлов – размер нашего блока составляет 128 МБ), что составляет около 12 тыс. Файлов.
Работа работает следующим образом:
val events = spark.sparkContext .textFile(s"$stream/$sourcetype") .map(_.split(" \\|\\| ").toList) .collect{case List(date, y, "Event") => MyEvent(date, y, "Event")} .toDF() df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")
Он собирает события с общей схемой, преобразуется в DataFrame, а затем записывается как паркет.
- Как загружать загруженные файлы в Play! 2 с помощью Scala?
- Scala underscore - ОШИБКА: отсутствует тип параметра для расширенной функции
- Что такое продолжения Скалы и зачем их использовать?
- Когда и почему следует использовать аппликативные функторы в Scala
- Преобразование вложенных classов case в вложенные Карты с использованием Shapeless
Проблема, с которой я сталкиваюсь, заключается в том, что это может привести к небольшому взрыву ввода-вывода в кластере HDFS, поскольку он пытается создать так много крошечных файлов.
В идеале я хочу создать только несколько паркетных файлов в разделе «дата».
Какой был бы лучший способ контролировать это? Используется ли «coalesce ()»?
Как это повлияет на количество файлов, созданных в данном разделе? Это зависит от количества исполнителей, которые я работаю в Spark? (в настоящее время установлено 100).
- Запись на несколько выходов с помощью ключа Spark - одно искровое задание
- Для чего нужны classы типов в Scala?
- Хороший вариант использования Akka
- Почему «разделение» на пустую строку возвращает непустой массив?
- Apache Spark: map vs mapPartitions?
- Scala "<-" для понимания
- Многострочный литерал функции в качестве аргументов в Scala
- Валидация против дизъюнкции
вы должны DataFrame
свой DataFrame
в соответствии с разделением DataFrameWriter
Попробуй это:
df .repartition($"date") .write.mode(SaveMode.Append) .partitionBy("date") .parquet(s"$path")
Самое простое решение – заменить ваше фактическое разбиение на:
df .repartition(to_date($"date")) .write.mode(SaveMode.Append) .parquet(s"$path")
Вы также можете использовать более точное разбиение на разделы для вашего DataFrame
то есть на день и, возможно, на час часового диапазона. и тогда вы можете быть менее точными для писателя. Это зависит от объема данных.
Вы можете уменьшить энтропию, DataFrame
и предложение write with partition by.
Я столкнулся с той же проблемой, и я мог использовать coalesce
решив мою проблему.
df .coalesce(3) // number of parts/files .write.mode(SaveMode.Append) .parquet(s"$path")
Для получения дополнительной информации об использовании coalesce
или repartition
вы можете обратиться к следующей искре: объединить или переделать