как сделать saveAsTextFile НЕ разделить вывод на несколько файлов?

При использовании Scala в Spark всякий раз, когда я saveAsTextFile результаты с помощью saveAsTextFile , он, похоже, разбивает вывод на несколько частей. Я просто передаю ему параметр (путь).

 val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year") 
  1. Соответствует ли количество выходов количеству используемых им редукторов?
  2. Означает ли это, что выход сжат?
  3. Я знаю, что я могу объединить вывод вместе с помощью bash, но есть ли возможность сохранить вывод в одном текстовом файле без разделения? Я просмотрел документы API, но об этом мало что говорит.

Причина, по которой он сохраняет его, поскольку несколько файлов – это то, что вычисление распределяется. Если выход достаточно мал, чтобы вы считали, что можете поместить его на одну машину, вы можете закончить свою программу с помощью

 val arr = year.collect() 

И затем сохраните результирующий массив в виде файла. Еще один способ – использовать пользовательский разделитель, partitionBy и сделать так, чтобы все переходило на один раздел, но это не рекомендуется, потому что вы не получите никакой распараллеливания.

Если вам нужен файл для сохранения с помощью saveAsTextFile вы можете использовать coalesce(1,true).saveAsTextFile() . Это в основном означает, что вычисление затем объединяется в 1 раздел. Вы также можете использовать repartition(1) который является всего лишь оболочкой для coalesce с аргументом shuffle, установленным в true. Посмотрев через источник RDD.scala, я понял, что большинство из этого материала вы должны посмотреть.

Вы можете вызвать coalesce(1) а затем saveAsTextFile() – но это может быть плохой идеей, если у вас много данных. Отдельные файлы на расщепление генерируются так же, как и в Hadoop, чтобы отдельные отпечатки и редукторы записывались в разные файлы. Наличие одного выходного файла является хорошей идеей, если у вас очень мало данных, и в этом случае вы могли бы собрать () также, как сказал @aaronman.

Для тех, кто работает с большим набором данных :

  • rdd.collect() этом случае нельзя использовать rdd.collect() поскольку он будет собирать все данные в виде Array в драйвере, что является самым простым способом выхода из памяти.

  • rdd.coalesce(1).saveAsTextFile() также не следует использовать, так как параллелизм rdd.coalesce(1).saveAsTextFile() этапов будет потерян для выполнения на одном узле, где будут храниться данные.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() – лучший простой вариант, так как он будет поддерживать параллельную обработку восходящих задач, а затем выполнять только перетасовку на один узел ( rdd.repartition(1).saveAsTextFile() точный синоним).

  • rdd.saveAsSingleTextFile() как предусмотрено ниже, позволяет хранить rdd в одном файле с определенным именем , сохраняя при этом свойства параллельности rdd.coalesce(1, shuffle = true).saveAsTextFile() .

Что может быть неудобно с rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") состоит в том, что он фактически создает файл, путь которого – path/to/file.txt/part-00000 и не path/to/file.txt .

Следующее решение: rdd.saveAsSingleTextFile("path/to/file.txt") фактически создаст файл, путь которого – path/to/file.txt :

 package com.whatever.package import org.apache.spark.rdd.RDD import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.CompressionCodec object SparkHelper { // This is an implicit class so that saveAsSingleTextFile can be attached to // SparkContext and be called like this: sc.saveAsSingleTextFile implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal { def saveAsSingleTextFile(path: String): Unit = saveAsSingleTextFileInternal(path, None) def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = saveAsSingleTextFileInternal(path, Some(codec)) private def saveAsSingleTextFileInternal( path: String, codec: Option[Class[_ <: CompressionCodec]] ): Unit = { // The interface with hdfs: val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration) // Classic saveAsTextFile in a temporary folder: hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already codec match { case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec) case None => rdd.saveAsTextFile(s"$path.tmp") } // Merge the folder of resulting part-xxxxx into one file: hdfs.delete(new Path(path), true) // to make sure it's not there already FileUtil.copyMerge( hdfs, new Path(s"$path.tmp"), hdfs, new Path(path), true, rdd.sparkContext.hadoopConfiguration, null ) hdfs.delete(new Path(s"$path.tmp"), true) } } } 

которые могут быть использованы следующим образом:

 import com.whatever.package.SparkHelper.RDDExtensions rdd.saveAsSingleTextFile("path/to/file.txt") // Or if the produced file is to be compressed: import org.apache.hadoop.io.compress.GzipCodec rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec]) 

Этот fragment сначала хранит rdd с rdd.saveAsTextFile("path/to/file.txt") во временном path/to/file.txt.tmp папке path/to/file.txt.tmp как если бы мы не хотели хранить данные в одном файле (который сохраняет обработка параллельных задач восходящего streamа).

И только тогда, используя файловую систему apoop , мы переходим к слиянию ( FileUtil.copyMerge() ) из разных выходных файлов для создания нашего конечного выходного path/to/file.txt одиночного файла path/to/file.txt .

Как уже упоминалось, вы можете собирать или объединять свой dataset, чтобы заставить Spark создать один файл. Но это также ограничивает количество задач Spark, которые могут работать на вашем наборе данных параллельно. Я предпочитаю, чтобы он создавал сто файлов в выходном каталоге HDFS, а затем использовал hadoop fs -getmerge /hdfs/dir /local/file.txt чтобы извлечь результаты в один файл в локальной файловой системе. Это имеет наибольший смысл, когда ваш вывод является относительно небольшим отчетом, конечно.

Вы сможете сделать это в следующей версии Spark, в текущей версии 1.0.0 это невозможно, если вы не сделаете это вручную так или иначе, например, как вы упомянули, с вызовом сценария bash.

Я также хочу упомянуть, что в документации четко указано, что пользователи должны быть осторожны при вызове coalesce с реальным небольшим количеством разделов. это может привести к тому, что восходящие разделы наследуют это количество разделов.

Я бы не рекомендовал использовать coalesce (1), если это действительно не требуется.

В Spark 1.6.1 формат показан ниже. Он создает один выходной файл. Лучше всего использовать его, если выход достаточно мал, чтобы обрабатывать. В основном, что он делает, так это то, что он возвращает новое RDD, которое сводится к разделам numPartitions. Если вы делаете радикальное объединение, например, numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один узел в случае numPartitions = 1)

 pair_result.coalesce(1).saveAsTextFile("/app/data/") 

Вы можете вызвать repartition() и следовать следующим образом:

 val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) var repartitioned = year.repartition(1) repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00") 

введите описание изображения здесь

Вот мой ответ для вывода одного файла. Я просто добавил coalesce(1)

 val year = sc.textFile("apat63_99.txt") .map(_.split(",")(1)) .flatMap(_.split(",")) .map((_,1)) .reduceByKey((_+_)).map(_.swap) year.saveAsTextFile("year") 

Код:

 year.coalesce(1).saveAsTextFile("year") 
  • Scala spark, listbuffer пуст
  • Оператор Scala @
  • Декартово произведение двух списков
  • Нет Json-сериализатора как JsObject для типа play.api.libs.json.JsObject
  • В чем разница между JavaConverters и JavaConversions в Scala?
  • Что такое «контекст» в Scala?
  • Вывод типа не выполняется Set, сделанный с .toSet?
  • Как я могу прочитать большой файл CSV с classом Scala Stream?
  • Задача не сериализуема: java.io.NotSerializableException при вызове функции закрытие только для classов не объектов
  • Ошибка кодирования при попытке сопоставить строку dataframe с обновленной строкой
  • Объяснение метода складки искры RDD
  • Давайте будем гением компьютера.