Как работает функция Distinct () в Spark?

Я новичок в Apache Spark и изучал основные функции. Было небольшое сомнение. Предположим, у меня есть RDD кортежей (ключ, значение) и вы хотите получить из них некоторые уникальные. Я использую функцию distinct (). Мне интересно, на каком основании функция считает, что кортежи как разрозненные.? Он основан на ключах или значениях или обоих?

.distinct (), безусловно, выполняет перетасовку между разделами. Чтобы узнать больше о том, что происходит, запустите .toDebugString на вашем RDD.

val hashPart = new HashPartitioner() val myRDDPreStep =  val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) myRDD.checkpoint println(myRDD.toDebugString) 

который для примера RDD у меня (myRDDPreStep уже hash-partitioned ключом, сохраняется StorageLevel.MEMORY_AND_DISK_SER и checkpointed), возвращает:

 (2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] | ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] | myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated] | CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B | myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated] 

Обратите внимание, что могут быть более эффективные способы получения различий, которые include в себя меньшее количество тасований, ОСОБЕННО, если ваш RDD уже разбит на разделы интеллектуальным образом, а разделы не слишком перекошены.

См. Есть ли способ переписать Spark RDD, отличный от использования mapPartitions вместо отдельных? и Apache Spark: Какова эквивалентная реализация RDD.groupByKey () с использованием RDD.aggregateByKey ()?

Документы API для RDD.distinct () предоставляют только одно предложение:

«Верните новый RDD, содержащий отдельные элементы в этом RDD».

Из недавнего опыта я могу сказать вам, что в кортеже-RDD рассматривается кортеж в целом.

Если вам нужны разные ключи или разные значения, то в зависимости от того, что вы хотите выполнить, вы можете:

A. call groupByKey() для преобразования {(k1,v11),(k1,v12),(k2,v21),(k2,v22)} в {(k1,[v11,v12]), (k2,[v21,v22])} ; или

B. вычеркивайте либо ключи, либо значения, вызывая keys() или values() за которыми следуют distinct()

На момент написания этой статьи (июнь 2015 г.) в UC Berkeley + EdX был запущен бесплатный онлайн-курс « Введение в большие данные» и Apache Spark, который обеспечит практическую работу с этими функциями.

Justin Pihony is right.Distinct использует метод hashCode и equals для этого определения. Его возвращают отдельные элементы (объекты)

 val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) 

отчетливый

 rdd.distinct.collect().foreach(println) (2,22) (1,20) (3,22) (2,20) (1,21) (3,21) 

Если вы хотите применить отдельный ключ. В этом случае лучше уменьшить вариант

ReduceBy

  val reduceRDD= rdd.map(tup => (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2) reduceRDD.collect().foreach(println) 

Вывод:-

 (2,20) (1,20) (3,21) 

distinct использует hashCode и equals метод объектов для этого определения. Наборы построены с механизмами равенства, делегирующими вниз в равенство и положение каждого объекта. Таким образом, distinct будут работать против всего объекта Tuple2 . Как указал Павел, вы можете вызывать keys или values а затем distinct . Или вы можете написать свои собственные значения через aggregateByKey , который сохранит сопряжение ключей. Или, если вам нужны разные ключи, вы можете использовать обычный aggregate

Похоже, что distinct будут избавлены от (ключа, значения) дубликатов.

В приведенном ниже примере (1,20) и (2,20) повторяются дважды в myRDD , но после distinct() дубликаты удаляются.

 scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at :22 scala> myRDD.collect().foreach(println _) (1,20) (1,21) (1,20) (2,20) (2,22) (2,20) (3,21) (3,22) scala> myRDD.distinct.collect().foreach(println _) (2,22) (1,20) (3,22) (2,20) (1,21) (3,21) 
Interesting Posts

Принудительный сценарий bash для выполнения до завершения, когда некоторые элементы могут

IE 8: исправление фонового размера

Общаться между виртуальными машинами во внутренней сети с использованием двух маршрутизаторов

Как создать Android-hashи Facebook Facebook?

Удаление писем от отправителя через x дней?

Как найти продолжительность разницы между двумя датами в java?

Сложная информация о самом развязном параде

Не удалось загрузить class дескриптора модуля: не нашел class «com.google.android.gms.dynamite.descriptors.com.google.firebase.auth.ModuleDescriptor»

Декартово произведение streamов в Java 8 как stream (только с использованием streamов)

Что это за порт / разъем на моем ноутбуке?

Есть ли способ настроить функцию оснастки Windows 7?

C # LINQ to SQL: рефакторинг этого общего метода GetByID

Можно ли смешивать Swift с C ++? Как и файлы Objective – C .mm

Как извлечь числовое значение после определенной строки в предложении в ячейке Excel?

Не удается SSH на второй компьютер внутри моей сети

Давайте будем гением компьютера.