Средняя скорость Apache Spark

У меня есть огромный файл в HDFS с точками данных Time Series (цены на акции Yahoo).

Я хочу найти скользящее среднее временного ряда, как я могу начать писать задачу Apache Spark для этого.

Вы можете использовать функцию скольжения из MLLIB, которая, вероятно, делает то же самое, что и ответ Дэниела. Вам придется сортировать данные по времени, прежде чем использовать функцию скольжения.

import org.apache.spark.mllib.rdd.RDDFunctions._ sc.parallelize(1 to 100, 10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect() 

Скользящее среднее – это сложная проблема для Spark и любой распределенной системы. Когда данные распределяются по нескольким машинам, будут временные windows, которые пересекают разделы. Мы должны дублировать данные в начале разделов, так что вычисление скользящей средней для каждого раздела дает полный охват.

Вот как это сделать в Spark. Примеры данных:

 val ts = sc.parallelize(0 to 100, 10) val window = 3 

Простой разделитель, который помещает каждую строку в раздел, который мы указываем ключом:

 class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = key.asInstanceOf[Int] } 

Создайте данные с первым window - 1 строка скопирована в предыдущий раздел:

 val partitioned = ts.mapPartitionsWithIndex((i, p) => { val overlap = p.take(window - 1).toArray val spill = overlap.iterator.map((i - 1, _)) val keep = (overlap.iterator ++ p).map((i, _)) if (i == 0) keep else keep ++ spill }).partitionBy(new StraightPartitioner(ts.partitions.length)).values 

Просто вычислите скользящее среднее на каждом разделе:

 val movingAverage = partitioned.mapPartitions(p => { val sorted = p.toSeq.sorted val olds = sorted.iterator val news = sorted.iterator var sum = news.take(window - 1).sum (olds zip news).map({ case (o, n) => { sum += n val v = sum sum -= o v }}) }) 

Из-за повторяющихся сегментов это не будет иметь пробелов в охвате.

 scala> movingAverage.collect.sameElements(3 to 297 by 3) res0: Boolean = true 

Spark 1.4 ввел функции окон , что означает, что вы можете сделать скользящее среднее следующим образом: отрегулируйте оконные операции с rowsBetween :

 val schema = Seq("id", "cykle", "value") val data = Seq( (1, 1, 1), (1, 2, 11), (1, 3, 1), (1, 4, 11), (1, 5, 1), (1, 6, 11), (2, 1, 1), (2, 2, 11), (2, 3, 1), (2, 4, 11), (2, 5, 1), (2, 6, 11) ) val dft = sc.parallelize(data).toDF(schema: _*) dft.select('*).show // PARTITION BY id ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5) val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2) val x = dft.select($"id",$"cykle",avg($"value").over(w)) x.show 

Выход (в цепелине):

 schema: Seq[String] = List(id, cykle, value) data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11)) dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int] +---+-----+-----+ | id|cykle|value| +---+-----+-----+ | 1| 1| 1| | 1| 2| 11| | 1| 3| 1| | 1| 4| 11| | 1| 5| 1| | 1| 6| 11| | 2| 1| 1| | 2| 2| 11| | 2| 3| 1| | 2| 4| 11| | 2| 5| 1| | 2| 6| 11| +---+-----+-----+ w: org.apache.spark.sql.expressions.WindowSpec = [email protected] x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double] +---+-----+-------------------------------------------------------------------------+ | id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING| +---+-----+-------------------------------------------------------------------------+ | 1| 1| 4.333333333333333| | 1| 2| 6.0| | 1| 3| 5.0| | 1| 4| 7.0| | 1| 5| 6.0| | 1| 6| 7.666666666666667| | 2| 1| 4.333333333333333| | 2| 2| 6.0| | 2| 3| 5.0| | 2| 4| 7.0| | 2| 5| 6.0| | 2| 6| 7.666666666666667| +---+-----+————————————————————————————————————+ 
Давайте будем гением компьютера.