Запись на несколько выходов с помощью ключа Spark – одно искровое задание

Как вы можете писать на несколько выходов в зависимости от ключа, используя Spark в одном задании.

Связано: пишите на несколько выходов с помощью ключа Scalding Hadoop, один MapReduce Job

Например

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .writeAsMultiple(prefix, compressionCodecOption) 

будет гарантировать, что cat prefix/1

 a b 

и cat prefix/2 будут

 c 

Ответ

Для точного ответа с полным импортом, сутенером и кодеком сжатия, см. Https://stackoverflow.com/a/46118044/1586965

Если вы используете Spark 1.4+, это стало намного проще, благодаря API DataFrame . (DataFrames были введены в Spark 1.3, но разделBy partitionBy() , который нам нужен, был введен в 1.4 .)

Если вы начинаете с RDD, сначала вам нужно будет преобразовать его в DataFrame:

 val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie"))) val people_df = people_rdd.toDF("number", "name") 

В Python этот же код:

 people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")]) people_df = people_rdd.toDF(["number", "name"]) 

Когда у вас есть DataFrame, запись на несколько выходов на основе конкретного ключа проста. Более того – и это красота API DataFrame – код почти одинаковый для Python, Scala, Java и R:

 people_df.write.partitionBy("number").text("people") 

И вы можете легко использовать другие форматы вывода, если хотите:

 people_df.write.partitionBy("number").json("people-json") people_df.write.partitionBy("number").parquet("people-parquet") 

В каждом из этих примеров Spark создаст подкаталог для каждого из ключей, на которые мы разбивали DataFrame:

 people/ _SUCCESS number=1/ part-abcd part-efgh number=2/ part-abcd part-efgh 

Я бы сделал это так, как это масштабируется

 import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Split" + args(1)) val sc = new SparkContext(conf) sc.textFile("input/path") .map(a => (k, v)) // Your own implementation .partitionBy(new HashPartitioner(num)) .saveAsHadoopFile("output/path", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } } 

Просто увидел аналогичный ответ выше, но на самом деле нам не нужны настроенные разделы. MultipleTextOutputFormat создаст файл для каждого ключа. Это нормально, что несколько записей с теми же ключами попадают в один раздел.

новый HashPartitioner (num), где num – номер раздела, который вы хотите. В случае, если у вас есть большое количество разных ключей, вы можете установить число в большое. В этом случае каждый раздел не будет открывать слишком много обработчиков файлов hdfs.

Если у вас потенциально много значений для данного ключа, я думаю, что масштабируемое решение состоит в том, чтобы выписать один файл за ключ для каждого раздела. К сожалению, в Spark нет встроенной поддержки, но мы можем что-то взбить.

 sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .mapPartitionsWithIndex { (p, it) => val outputs = new MultiWriter(p.toString) for ((k, v) <- it) { outputs.write(k.toString, v) } outputs.close Nil.iterator } .foreach((x: Nothing) => ()) // To trigger the job. // This one is Local, but you could write one for HDFS class MultiWriter(suffix: String) { private val writers = collection.mutable.Map[String, java.io.PrintWriter]() def write(key: String, value: Any) = { if (!writers.contains(key)) { val f = new java.io.File("output/" + key + "/" + suffix) f.getParentFile.mkdirs writers(key) = new java.io.PrintWriter(f) } writers(key).println(value) } def close = writers.values.foreach(_.close) } 

(Замените PrintWriter на ваш выбор работы с распределенной файловой системой.)

Это делает один проход по RDD и не выполняет тасование. Он дает вам один каталог на ключ, с несколькими файлами внутри каждого.

Это включает в себя кодек по требованию, необходимый импорт и сутенер в соответствии с запросом.

 import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) { def writeAsMultiple(prefix: String, codec: String, keyName: String = "key") (implicit sqlContext: SQLContext): Unit = { import sqlContext.implicits._ rdd.toDF(keyName, "_2").write.partitionBy(keyName) .format("text").option("codec", codec).save(prefix) } } val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

Одно тонкое отличие от OP состоит в том, что он будет префикс = именам каталогов. Например

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

Даст:

 prefix/key=1/part-00000 prefix/key=2/part-00000 

где prefix/my_number=1/part-00000 будет содержать строки a и b , а prefix/my_number=2/part-00000 будет содержать строку c .

А также

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo") 

Даст:

 prefix/foo=1/part-00000 prefix/foo=2/part-00000 

Должно быть понятно, как отредактировать parquet .

Наконец, ниже приведен пример Dataset , который, возможно, более приятен, чем использование Tuples.

 implicit class PimpedDataset[T](dataset: Dataset[T]) { def writeAsMultiple(prefix: String, codec: String, field: String): Unit = { dataset.write.partitionBy(field) .format("text").option("codec", codec).save(prefix) } } 

У меня есть аналогичная потребность и нашел способ. Но у него есть один недостаток (что не является проблемой для моего случая): вам необходимо переразделить данные с одним разделом на выходной файл.

Для разделения таким образом обычно требуется заранее знать, сколько файлов будет выдано задаче и найти функцию, которая будет отображать каждую клавишу в каждый раздел.

Сначала давайте создадим наш class на основе MultipleTextOutputFormat:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] { override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = { key.toString } override protected def generateActualKey(key: T, value: V) = { null } } 

С этим classом Spark получит ключ от раздела (первый / последний, я думаю) и назову файл с помощью этого ключа, поэтому неплохо смешивать несколько ключей в одном разделе.

Для вашего примера вам понадобится пользовательский разделитель. Это сделает работу:

 import org.apache.spark.Partitioner class IdentityIntPartitioner(maxKey: Int) extends Partitioner { def numPartitions = maxKey def getPartition(key: Any): Int = key match { case i: Int if i < maxKey => i } } 

Теперь давайте все вместе:

 val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e"))) // You need to know the max number of partitions (files) beforehand // In this case we want one partition per key and we have 3 keys, // with the biggest key being 7, so 10 will be large enough val partitioner = new IdentityIntPartitioner(10) val prefix = "hdfs://.../prefix" val partitionedRDD = rdd.partitionBy(partitioner) partitionedRDD.saveAsHadoopFile(prefix, classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]]) 

Это будет генерировать 3 файла под префиксом (с именами 1, 2 и 7), обрабатывая все за один проход.

Как вы можете видеть, вам нужно знать свои ключи, чтобы иметь возможность использовать это решение.

Для меня это было проще, потому что мне нужен был один выходной файл для каждого хеша ключа, и количество файлов находилось под моим контролем, поэтому я мог использовать запас HashPartitioner, чтобы сделать трюк.

saveAsText () и saveAsHadoop (…) реализованы на основе данных RDD, в частности по методу: PairRDD.saveAsHadoopDataset, который берет данные из PairRdd, где он выполняется. Я вижу два возможных варианта. Если ваши данные относительно невелики по размеру, вы можете сэкономить некоторое время реализации, объединив RDD, создав новый RDD из каждой коллекции и используя этот RDD для записи данных. Что-то вроде этого:

 val byKey = dataRDD.groupByKey().collect() val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)} val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k} 

Обратите внимание, что это не будет работать для больших наборов данных. B / c материализация iteratorа при v.toSeq может не соответствовать памяти.

Другой вариант, который я вижу, и на самом деле тот, который я бы рекомендовал в этом случае, заключается в следующем: сверните свой собственный, напрямую вызвав hasoop / hdfs api.

Вот обсуждение, которое я начал при исследовании этого вопроса: Как создать RDD из другого RDD?

Я нуждался в том же самом в Java. Публикация моего перевода ответа Scala от Zhang Zhan на пользователей Spark Java API:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat { @Override protected String generateFileNameForKeyValue(A key, B value, String name) { return key.toString(); } } public class Main { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("Split Job") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"}; sc.parallelize(Arrays.asList(strings)) // The first character of the string is the key .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s)) .saveAsHadoopFile("output/", String.class, String.class, RDDMultipleTextOutputFormat.class); sc.stop(); } } 

У меня был аналогичный случай использования, когда я разбил входной файл на Hadoop HDFS на несколько файлов на основе ключа (1 файл на ключ). Вот мой код scala для искры

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; val hadoopconf = new Configuration(); val fs = FileSystem.get(hadoopconf); @serializable object processGroup { def apply(groupName:String, records:Iterable[String]): Unit = { val outFileStream = fs.create(new Path("/output_dir/"+groupName)) for( line <- records ) { outFileStream.writeUTF(line+"\n") } outFileStream.close() } } val infile = sc.textFile("input_file") val dateGrouped = infile.groupBy( _.split(",")(0)) dateGrouped.foreach( (x) => processGroup(x._1, x._2)) 

Я сгруппировал записи на основе ключа. Значения для каждого ключа записываются в отдельный файл.

хорошие новости для пользователя python в случае, если у вас много столбцов, и вы хотите сохранить все остальные столбцы, не разбитые на разделы в формате csv, которые не удастся, если вы используете «текстовый» метод, как предложение Ника Чаммаса.

 people_df.write.partitionBy("number").text("people") 

сообщение об ошибке «AnalysisException: источник данных u’Text поддерживает только один столбец, и у вас есть 2 столбца .;»

В искровом 2.0.0 (моя тестовая среда – искру 2.0.0 hdp) пакет «com.databricks.spark.csv» теперь интегрирован и позволяет нам сохранять текстовый файл, разделенный только на один столбец, см. Пример blow:

 people_rdd = sc.parallelize([(1,"2016-12-26", "alice"), (1,"2016-12-25", "alice"), (1,"2016-12-25", "tom"), (1, "2016-12-25","bob"), (2,"2016-12-26" ,"charlie")]) df = people_rdd.toDF(["number", "date","name"]) df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people") [[email protected] people]# tree . ├── number=1 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv ├── number=2 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv └── _SUCCESS [[email protected] people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,alice 2016-12-25,alice 2016-12-25,tom 2016-12-25,bob [[email protected] people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,charlie 

В моей среде 1.6.1 среды код не выдавал никаких ошибок, однако это только один файл. он не разбивается на две папки.

Надеюсь, это поможет.

У меня был аналогичный вариант использования. Я разрешил его на Java, написав два пользовательских classа, реализующих MultipleTextOutputFormat и RecordWriter .

Моим вводом был JavaPairRDD> и я хотел сохранить его в файле с именем по его ключу со всеми строками, содержащимися в его значении.

Вот код для моей реализации MultipleTextOutputFormat

 class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat { @Override protected String generateFileNameForKeyValue(K key, V value, String name) { return key.toString(); //The return will be used as file name } /** The following 4 functions are only for visibility purposes (they are used in the class MyRecordWriter) **/ protected String generateLeafFileName(String name) { return super.generateLeafFileName(name); } protected V generateActualValue(K key, V value) { return super.generateActualValue(key, value); } protected String getInputFileBasedOutputFileName(JobConf job, String name) { return super.getInputFileBasedOutputFileName(job, name); } protected RecordWriter getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { return super.getBaseRecordWriter(fs, job, name, arg3); } /** Use my custom RecordWriter **/ @Override RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException { final String myName = this.generateLeafFileName(name); return new MyRecordWriter(this, fs, job, arg3, myName); } } 

Вот код для моей реализации RecordWriter .

 class MyRecordWriter implements RecordWriter { private RDDMultipleTextOutputFormat rddMultipleTextOutputFormat; private final FileSystem fs; private final JobConf job; private final Progressable arg3; private String myName; TreeMap> recordWriters = new TreeMap(); MyRecordWriter(RDDMultipleTextOutputFormat rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) { this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat; this.fs = fs; this.job = job; this.arg3 = arg3; this.myName = myName; } @Override void write(K key, V value) throws IOException { String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName); String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath); Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value); RecordWriter rw = this.recordWriters.get(finalPath); if(rw == null) { rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3); this.recordWriters.put(finalPath, rw); } List lines = (List) actualValue; for (String line : lines) { rw.write(null, line); } } @Override void close(Reporter reporter) throws IOException { Iterator keys = this.recordWriters.keySet().iterator(); while(keys.hasNext()) { RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next()); rw.close(reporter); } this.recordWriters.clear(); } } 

Большая часть кода точно такая же, как в FileOutputFormat . Единственное отличие состоит в том, что несколько строк

 List lines = (List) actualValue; for (String line : lines) { rw.write(null, line); } 

Эти строки позволили мне написать каждую строку моего ввода List в файле. Первый аргумент функции write имеет значение null , чтобы избежать наложения клавиши на каждой строке.

Чтобы закончить, мне нужно только сделать этот вызов, чтобы написать мои файлы

 javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class); 
  • Scala String vs java.lang.String - вывод типа
  • Разделить список на несколько списков с фиксированным числом элементов в java 8
  • В Scala, как я могу подclassифицировать class Java с несколькими конструкторами?
  • Задача не сериализуема: java.io.NotSerializableException при вызове функции закрытие только для classов не объектов
  • Полностью настроенные windows JavaFX?
  • Как оптимизировать для-понимания и петель в Scala?
  • Редактор не содержит основного типа
  • Преобразование вложенных classов case в вложенные Карты с использованием Shapeless
  • В чем разница между JavaConverters и JavaConversions в Scala?
  • Что такое манифест в Скала и когда он вам нужен?
  • По умолчанию для отсутствующих свойств в игре 2 формата JSON
  • Давайте будем гением компьютера.