Как избежать дублирования столбцов после присоединения?

У меня есть два кадра данных со следующими столбцами:

df1.columns // Array(ts, id, X1, X2) 

а также

 df2.columns // Array(ts, id, Y1, Y2) 

После того, как я

 val df_combined = df1.join(df2, Seq(ts,id)) 

Я получаю следующие столбцы: Array(ts, id, X1, X2, ts, id, Y1, Y2) . Я мог ожидать, что общие столбцы будут удалены. Есть ли что-то, что необходимо сделать?

Простой ответ (из часто задаваемых вопросов по Databricks ) заключается в том, чтобы выполнить объединение, в котором объединенные столбцы выражаются в виде массива строк (или одной строки) вместо предиката.

Ниже приведен пример, связанный с часто задаваемыми вопросами по Databricks, но с двумя столбцами соединения, чтобы ответить на вопрос оригинального плаката.

Вот левый блок данных:

 val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10)) val left = llist.toDF("firstname","lastname","date","duration") left.show() /* +---------+--------+----------+--------+ |firstname|lastname| date|duration| +---------+--------+----------+--------+ | bob| b|2015-01-13| 4| | alice| a|2015-04-23| 10| +---------+--------+----------+--------+ */ 

Вот правильный фреймворк данных:

 val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload") right.show() /* +---------+--------+------+ |firstname|lastname|upload| +---------+--------+------+ | alice| a| 100| | bob| b| 23| +---------+--------+------+ */ 

Вот некорректное решение, где столбцы соединения определяются как предикат left("firstname")===right("firstname") && left("lastname")===right("lastname") .

Неправильный результат состоит в том, что столбцы firstname и lastname дублируются в объединенном фрейме данных:

 left.join(right, left("firstname")===right("firstname") && left("lastname")===right("lastname")).show /* +---------+--------+----------+--------+---------+--------+------+ |firstname|lastname| date|duration|firstname|lastname|upload| +---------+--------+----------+--------+---------+--------+------+ | bob| b|2015-01-13| 4| bob| b| 23| | alice| a|2015-04-23| 10| alice| a| 100| +---------+--------+----------+--------+---------+--------+------+ */ 

Правильное решение состоит в том, чтобы определить столбцы объединения как массив строк Seq("firstname", "lastname") . В кадре выходной информации нет дублированных столбцов:

 left.join(right, Seq("firstname", "lastname")).show /* +---------+--------+----------+--------+------+ |firstname|lastname| date|duration|upload| +---------+--------+----------+--------+------+ | bob| b|2015-01-13| 4| 23| | alice| a|2015-04-23| 10| 100| +---------+--------+----------+--------+------+ */ 

Это ожидаемое поведение. Метод DataFrame.join эквивалентен SQL-соединению, подобному этому

 SELECT * FROM a JOIN b ON joinExprs 

Если вы хотите игнорировать повторяющиеся столбцы, просто отпустите их или выберите интересующие столбцы. Если вы хотите устранить неоднозначность, вы можете использовать их с помощью родительских DataFrames :

 val a: DataFrame = ??? val b: DataFrame = ??? val joinExprs: Column = ??? a.join(b, joinExprs).select(a("id"), b("foo")) // drop equivalent a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo")) 

или использовать псевдонимы:

 // As for now aliases don't work with drop a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo") 

Для equi-joins существует специальный синтаксис ярлыков, который принимает либо последовательность строк :

 val usingColumns: Seq[String] = ??? a.join(b, usingColumns) 

или как одна строка

 val usingColumn: String = ??? a.join(b, usingColumn) 

которые содержат только одну копию столбцов, используемых в состоянии соединения.

Я застрял в этом некоторое время, и только недавно я придумал решение, что довольно легко.

Скажем, что a

 scala> val a = Seq(("a", 1), ("b", 2)).toDF("key", "vala") a: org.apache.spark.sql.DataFrame = [key: string, vala: int] scala> a.show +---+----+ |key|vala| +---+----+ | a| 1| | b| 2| +---+----+ and scala> val b = Seq(("a", 1)).toDF("key", "valb") b: org.apache.spark.sql.DataFrame = [key: string, valb: int] scala> b.show +---+----+ |key|valb| +---+----+ | a| 1| +---+----+ 

и я могу сделать это, чтобы выбрать только значение в dataframe a:

 scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show +---+----+ |key|vala| +---+----+ | a| 1| | b| 2| +---+----+ 

Вы можете просто использовать это

 df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN") 

Здесь TYPE-OF-JOIN может быть

  • оставил
  • правильно
  • внутренний
  • fullouter

Например, у меня есть два таких кадра данных:

 // df1 word count1 w1 10 w2 15 w3 20 // df2 word count2 w1 100 w2 150 w5 200 

Если вы делаете полноценное соединение, результат выглядит следующим образом:

 df1.join(df2, Seq("word"),"fullouter").show() word count1 count2 w1 10 100 w2 15 150 w3 20 null w5 null 200 

Это нормальное поведение SQL, что я делаю для этого:

  • Отбрасывать или переименовывать исходные столбцы
  • Присоединитесь
  • Drop переименовал столбец, если он

Здесь я заменяю столбец «fullname»:

Некоторый код в Java:

 this .sqlContext .read() .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day)) .drop("fullname") .registerTempTable("data_original"); this .sqlContext .read() .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day)) .registerTempTable("data_v2"); this .sqlContext .sql(etlQuery) .repartition(1) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); 

Где запрос:

 SELECT d.*, concat_ws('_', product_name, product_module, name) AS fullname FROM {table_source} d LEFT OUTER JOIN {table_updates} u ON u.id = d.id 

Это то, что вы можете сделать только с Spark, я верю (падение столбца из списка), очень полезно!

  • Как рассчитать наилучшее числоOfPartitions для объединения?
  • Как преобразовать файл csv в rdd
  • Полностью настроенные windows JavaFX?
  • Действительные символы идентификатора в Scala
  • Как подавить информацию и сообщения о успехе в sbt?
  • Что подразумевается под типами, зависящими от Scala?
  • Преобразование вложенных classов case в вложенные Карты с использованием Shapeless
  • Как преобразовать A ] в B ], если A и B являются монадами?
  • Работайте на соседних элементах в RDD в Spark
  • Два способа определения функций в Scala. В чем разница?
  • Как выйти из цикла в Scala?
  • Interesting Posts

    Как создать динамические свойства в C #?

    Как я могу производить многоточечную линейную интерполяцию?

    Переопределение SaveChanges и установка ModifiedDate, но как установить ModifiedBy?

    Laravel – Получить последнюю запись каждого типа UID

    Установите файлы содержимого для «копирования локального: всегда» в пакете nuget

    Как я могу получить список смонтированного внешнего хранилища Android-устройства

    Differnce между blockize и bytesize в файловых системах linux / unix

    Как вернуть 2 значения из метода Java?

    Как удалить материал, напечатанный на консоль System.out.println ()?

    Как извлечь изображение из файла PDF

    Как заставить BackgroundWorker возвращать объект

    Согласование файлов с различными расширениями с использованием цикла

    Знак C ++ как устаревший

    Можно возобновить загрузку с сетевого диска, если он отключился?

    Щебетать сетку Bootstrap 3, изменение точки останова и удаление прокладки

    Давайте будем гением компьютера.