Как работает функция Distinct () в Spark?
Я новичок в Apache Spark и изучал основные функции. Было небольшое сомнение. Предположим, у меня есть RDD кортежей (ключ, значение) и вы хотите получить из них некоторые уникальные. Я использую функцию distinct (). Мне интересно, на каком основании функция считает, что кортежи как разрозненные.? Он основан на ключах или значениях или обоих?
.distinct (), безусловно, выполняет перетасовку между разделами. Чтобы узнать больше о том, что происходит, запустите .toDebugString на вашем RDD.
val hashPart = new HashPartitioner() val myRDDPreStep = val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) myRDD.checkpoint println(myRDD.toDebugString)
который для примера RDD у меня (myRDDPreStep уже hash-partitioned ключом, сохраняется StorageLevel.MEMORY_AND_DISK_SER и checkpointed), возвращает:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] | ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] | myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated] | CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B | myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
Обратите внимание, что могут быть более эффективные способы получения различий, которые include в себя меньшее количество тасований, ОСОБЕННО, если ваш RDD уже разбит на разделы интеллектуальным образом, а разделы не слишком перекошены.
См. Есть ли способ переписать Spark RDD, отличный от использования mapPartitions вместо отдельных? и Apache Spark: Какова эквивалентная реализация RDD.groupByKey () с использованием RDD.aggregateByKey ()?
Документы API для RDD.distinct () предоставляют только одно предложение:
«Верните новый RDD, содержащий отдельные элементы в этом RDD».
Из недавнего опыта я могу сказать вам, что в кортеже-RDD рассматривается кортеж в целом.
Если вам нужны разные ключи или разные значения, то в зависимости от того, что вы хотите выполнить, вы можете:
A. call groupByKey()
для преобразования {(k1,v11),(k1,v12),(k2,v21),(k2,v22)}
в {(k1,[v11,v12]), (k2,[v21,v22])}
; или
B. вычеркивайте либо ключи, либо значения, вызывая keys()
или values()
за которыми следуют distinct()
На момент написания этой статьи (июнь 2015 г.) в UC Berkeley + EdX был запущен бесплатный онлайн-курс « Введение в большие данные» и Apache Spark, который обеспечит практическую работу с этими функциями.
Justin Pihony is right.Distinct использует метод hashCode и equals для этого определения. Его возвращают отдельные элементы (объекты)
val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22)))
отчетливый
rdd.distinct.collect().foreach(println) (2,22) (1,20) (3,22) (2,20) (1,21) (3,21)
Если вы хотите применить отдельный ключ. В этом случае лучше уменьшить вариант
ReduceBy
val reduceRDD= rdd.map(tup => (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2) reduceRDD.collect().foreach(println)
Вывод:-
(2,20) (1,20) (3,21)
distinct
использует hashCode
и equals
метод объектов для этого определения. Наборы построены с механизмами равенства, делегирующими вниз в равенство и положение каждого объекта. Таким образом, distinct
будут работать против всего объекта Tuple2
. Как указал Павел, вы можете вызывать keys
или values
а затем distinct
. Или вы можете написать свои собственные значения через aggregateByKey
, который сохранит сопряжение ключей. Или, если вам нужны разные ключи, вы можете использовать обычный aggregate
Похоже, что distinct
будут избавлены от (ключа, значения) дубликатов.
В приведенном ниже примере (1,20) и (2,20) повторяются дважды в myRDD
, но после distinct()
дубликаты удаляются.
scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at :22 scala> myRDD.collect().foreach(println _) (1,20) (1,21) (1,20) (2,20) (2,22) (2,20) (3,21) (3,22) scala> myRDD.distinct.collect().foreach(println _) (2,22) (1,20) (3,22) (2,20) (1,21) (3,21)