Как работает функция Distinct () в Spark?

Я новичок в Apache Spark и изучал основные функции. Было небольшое сомнение. Предположим, у меня есть RDD кортежей (ключ, значение) и вы хотите получить из них некоторые уникальные. Я использую функцию distinct (). Мне интересно, на каком основании функция считает, что кортежи как разрозненные.? Он основан на ключах или значениях или обоих?

.distinct (), безусловно, выполняет перетасовку между разделами. Чтобы узнать больше о том, что происходит, запустите .toDebugString на вашем RDD.

val hashPart = new HashPartitioner() val myRDDPreStep =  val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) myRDD.checkpoint println(myRDD.toDebugString) 

который для примера RDD у меня (myRDDPreStep уже hash-partitioned ключом, сохраняется StorageLevel.MEMORY_AND_DISK_SER и checkpointed), возвращает:

 (2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] | ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] | myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated] | CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B | myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated] 

Обратите внимание, что могут быть более эффективные способы получения различий, которые include в себя меньшее количество тасований, ОСОБЕННО, если ваш RDD уже разбит на разделы интеллектуальным образом, а разделы не слишком перекошены.

См. Есть ли способ переписать Spark RDD, отличный от использования mapPartitions вместо отдельных? и Apache Spark: Какова эквивалентная реализация RDD.groupByKey () с использованием RDD.aggregateByKey ()?

Документы API для RDD.distinct () предоставляют только одно предложение:

«Верните новый RDD, содержащий отдельные элементы в этом RDD».

Из недавнего опыта я могу сказать вам, что в кортеже-RDD рассматривается кортеж в целом.

Если вам нужны разные ключи или разные значения, то в зависимости от того, что вы хотите выполнить, вы можете:

A. call groupByKey() для преобразования {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} в {(k1,[v11,v12]), (k2,[v21,v22])} ; или

B. вычеркивайте либо ключи, либо значения, вызывая keys() или values() за которыми следуют distinct()

На момент написания этой статьи (июнь 2015 г.) в UC Berkeley + EdX был запущен бесплатный онлайн-курс « Введение в большие данные» и Apache Spark, который обеспечит практическую работу с этими функциями.

Justin Pihony is right.Distinct использует метод hashCode и equals для этого определения. Его возвращают отдельные элементы (объекты)

 val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) 

отчетливый

 rdd.distinct.collect().foreach(println) (2,22) (1,20) (3,22) (2,20) (1,21) (3,21) 

Если вы хотите применить отдельный ключ. В этом случае лучше уменьшить вариант

ReduceBy

  val reduceRDD= rdd.map(tup => (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2) reduceRDD.collect().foreach(println) 

Вывод:-

 (2,20) (1,20) (3,21) 

distinct использует hashCode и equals метод объектов для этого определения. Наборы построены с механизмами равенства, делегирующими вниз в равенство и положение каждого объекта. Таким образом, distinct будут работать против всего объекта Tuple2 . Как указал Павел, вы можете вызывать keys или values а затем distinct . Или вы можете написать свои собственные значения через aggregateByKey , который сохранит сопряжение ключей. Или, если вам нужны разные ключи, вы можете использовать обычный aggregate

Похоже, что distinct будут избавлены от (ключа, значения) дубликатов.

В приведенном ниже примере (1,20) и (2,20) повторяются дважды в myRDD , но после distinct() дубликаты удаляются.

 scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at :22 scala> myRDD.collect().foreach(println _) (1,20) (1,21) (1,20) (2,20) (2,22) (2,20) (3,21) (3,22) scala> myRDD.distinct.collect().foreach(println _) (2,22) (1,20) (3,22) (2,20) (1,21) (3,21) 
Interesting Posts

Как изменить цвет строки состояния для соответствия приложению в Lollipop?

Базовая клиентская библиотека на основе Boost.ASIO (например, libcurl)

Сохранять пропорцию графиков с использованием grid.arrange

Насколько хорошо поддерживается Objective-C ++?

Firefox помнит пароль для одного сайта

Excel: Мне нужна формула, которая будет считать месяцы между двумя датами. Если это 1-15, то это будет полмесяца. Есть идеи?

Можно ли наследовать реализацию из контейнеров STL, а не делегировать?

Написание компилятора на своем родном языке

Обеспечивает ли NAT безопасность?

Ember-data embedded записывает текущее состояние?

Как различаются операторы сравнения с равенством равенства (== double equals) и идентичности (=== triple equals)?

Static vs функции / переменные classа в classах Swift?

Шифрование каталога пользователя в Windows 7

Selenium: загрузите файл в Internet Explorer в указанную папку без прямой ссылки, без форм Windows, без AutoIt или Robot

Чтобы составить оглавление для многих .odt-файлов

Давайте будем гением компьютера.