Как использовать COGROUP для больших наборов данных

У меня есть два rdd's : val tab_a: RDD[(String, String)] и val tab_b: RDD[(String, String)] Я использую cogroup для таких наборов данных, как:

 val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } 

Я использую tab_c cogrouped значения для функции карты, и он отлично подходит для небольших наборов данных, но в случае огромных наборов данных он выбрасывает Out Of Memory exception .

Я попытался преобразовать окончательное значение в RDD, но не повезло с той же ошибкой

 val newcos = spark.sparkContext.parallelize(tab_c) 

1.Как использовать Cogroup для больших наборов данных?

2. Можем ли мы сохранить ценность в группе?

Код

  val source_primary_key = source.map(rec => (rec.split(",")(0), rec)) source_primary_key.persist(StorageLevel.DISK_ONLY) val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec)) destination_primary_key.persist(StorageLevel.DISK_ONLY) val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect() var srcmis: Array[String] = new Array[String](0) var destmis: Array[String] = new Array[String](0) var extrainsrc: Array[String] = new Array[String](0) var extraindest: Array[String] = new Array[String](0) var srcs: String = Seq("")(0) var destt: String = Seq("")(0) val updated = cos.map { x => { val key = x._1 val value = x._2 srcs = value._1.mkString(",") destt = value._2.mkString(",") if (srcs.equalsIgnoreCase(destt) == false && destt != "") { srcmis :+= srcs destmis :+= destt } if (srcs == "") { extraindest :+= destt.mkString("") } if (destt == "") { extrainsrc :+= srcs.mkString("") } } } 

Код обновлен:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2) // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)} {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}.. 

ОШИБКА:

  ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

спасибо

2 Solutions collect form web for “Как использовать COGROUP для больших наборов данных”

Когда вы используете collect() вы в основном говорите искру, чтобы переместить все полученные данные обратно на главный узел, что может легко создать узкое место. Вы больше не используете Spark в этой точке, просто простой массив на одной машине.

Чтобы вызвать вычисление, просто используйте то, что требует данных на каждом узле, поэтому исполнители живут поверх распределенной файловой системы. Например, saveAsTextFile() .

Вот несколько основных примеров.

Помните, что вся цель здесь (то есть, если у вас есть большие данные) заключается в перемещении кода на ваши данные и вычислении там, а не при переносе всех данных в вычисление.

TL; DR Не collect .

Для безопасного выполнения этого кода без дополнительных допущений (в среднем требования для рабочих узлов могут быть значительно меньше), каждый узел (драйвер и каждый исполнитель) потребует памяти, значительно превышающей общие требования к памяти для всех данных.

Если вы запустили его за пределами Spark, вам понадобится только один узел. Поэтому Spark не дает никаких преимуществ здесь.

Однако, если вы пропустите collect.toArray и сделаете некоторые предположения о распределении данных, вы можете запустить его просто отлично.

  • В чем преимущество использования абстрактных classов вместо признаков?
  • Какова (скрытая) стоимость ленивого значения Scala?
  • Что такое запечатанная черта?
  • Скорость компиляции Java и скорость компиляции Scala
  • Что означает «#» в Scala?
  • Какие хорошие право-ассоциативные методы в Scala?
  • Печать массива в Scala
  • Синтаксис сахара: _ * для лечения Seq как параметры метода
  • В чем разница между def foo = {} и def foo () = {} в Scala?
  • Как заставить DataFrame оценивать в Spark
  • Итерация над compilationами Java в Scala
  • Interesting Posts

    Как изменить каталог данных MySQL?

    Просмотры в отдельных assemblyх в ASP.NET MVC

    Как заставить логин для администратора при использовании подключения к удаленному рабочему столу?

    Тест для равенства с плавающей точкой. (FE_FLOATING_POINT_EQUALITY)

    Windows 7 и Vista UAC – программатически запрашивает повышение в C #

    В чем разница между int и Integer в Java и C #?

    Почему эти цифры не равны?

    Десять внешних жестких дисков через USB-концентратор

    Android, как использовать Environment.getExternalStorageDirectory ()

    Угловой ng-view / routing не работает в PhoneGap

    определять интервалы последовательных целых последовательностей

    Как идентифицировать ключевое слово emacs для чего-то в графическом интерфейсе?

    Экран стал розовым

    Ошибка WCF “Максимальное количество элементов, которые могут быть сериализованы или десериализованы в графе объектов, -« 65536 »

    Каков размер этого видеокарты?

    Давайте будем гением компьютера.