Как использовать COGROUP для больших наборов данных

У меня есть два rdd's : val tab_a: RDD[(String, String)] и val tab_b: RDD[(String, String)] Я использую cogroup для таких наборов данных, как:

 val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } 

Я использую tab_c cogrouped значения для функции карты, и он отлично подходит для небольших наборов данных, но в случае огромных наборов данных он выбрасывает Out Of Memory exception .

Я попытался преобразовать окончательное значение в RDD, но не повезло с той же ошибкой

 val newcos = spark.sparkContext.parallelize(tab_c) 

1.Как использовать Cogroup для больших наборов данных?

2. Можем ли мы сохранить ценность в группе?

Код

  val source_primary_key = source.map(rec => (rec.split(",")(0), rec)) source_primary_key.persist(StorageLevel.DISK_ONLY) val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec)) destination_primary_key.persist(StorageLevel.DISK_ONLY) val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect() var srcmis: Array[String] = new Array[String](0) var destmis: Array[String] = new Array[String](0) var extrainsrc: Array[String] = new Array[String](0) var extraindest: Array[String] = new Array[String](0) var srcs: String = Seq("")(0) var destt: String = Seq("")(0) val updated = cos.map { x => { val key = x._1 val value = x._2 srcs = value._1.mkString(",") destt = value._2.mkString(",") if (srcs.equalsIgnoreCase(destt) == false && destt != "") { srcmis :+= srcs destmis :+= destt } if (srcs == "") { extraindest :+= destt.mkString("") } if (destt == "") { extrainsrc :+= srcs.mkString("") } } } 

Код обновлен:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2) // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)} {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}.. 

ОШИБКА:

  ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

спасибо

Когда вы используете collect() вы в основном говорите искру, чтобы переместить все полученные данные обратно на главный узел, что может легко создать узкое место. Вы больше не используете Spark в этой точке, просто простой массив на одной машине.

Чтобы вызвать вычисление, просто используйте то, что требует данных на каждом узле, поэтому исполнители живут поверх распределенной файловой системы. Например, saveAsTextFile() .

Вот несколько основных примеров.

Помните, что вся цель здесь (то есть, если у вас есть большие данные) заключается в перемещении кода на ваши данные и вычислении там, а не при переносе всех данных в вычисление.

TL; DR Не collect .

Для безопасного выполнения этого кода без дополнительных допущений (в среднем требования для рабочих узлов могут быть значительно меньше), каждый узел (драйвер и каждый исполнитель) потребует памяти, значительно превышающей общие требования к памяти для всех данных.

Если вы запустили его за пределами Spark, вам понадобится только один узел. Поэтому Spark не дает никаких преимуществ здесь.

Однако, если вы пропустите collect.toArray и сделаете некоторые предположения о распределении данных, вы можете запустить его просто отлично.

  • Ошибка с varargs для объектов-функций в Scala?
  • Как преобразовать файл csv в rdd
  • Что такое идентификатор Scala «неявно»?
  • Редактор не содержит основного типа
  • Равномерность classа в Apache Spark
  • Почему «невозможно найти кодировщик для типа, хранящегося в наборе данных» при создании набора данных пользовательского classа case?
  • Два способа карри в Скала; Каков прецедент для каждого?
  • Как определить схему для настраиваемого типа в Spark SQL?
  • Порядок линеаризации в Scala
  • Как добавить метод в Enumeration в Scala?
  • Scala: Список в будущее без учета неудачных фьючерсов
  • Давайте будем гением компьютера.