Как сохранить пользовательские объекты в наборе данных?

В соответствии с Представляя Spark Datasets :

Поскольку мы с нетерпением ждем Spark 2.0, мы планируем некоторые интересные улучшения для наборов данных, в частности: … Пользовательские кодеры – в то время как мы в настоящее время создаем автокодировщики для самых разных типов, мы хотели бы открыть API для пользовательских объектов.

и попытки сохранить пользовательский тип в Dataset приводят к следующей ошибке, например:

Не удалось найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы продуктов (classы case) поддерживаются при импорте sqlContext.implicits._ Поддержка сериализации других типов будет добавлена ​​в будущих версиях

или:

Java.lang.UnsupportedOperationException: нет кодера для ….

Существуют ли какие-либо существующие обходные пути?


Обратите внимание, что этот вопрос существует только как точка входа для ответа сообщества Wiki. Не стесняйтесь обновлять / улучшать как вопрос, так и ответ.

Обновить

Этот ответ по-прежнему остается актуальным и информативным, хотя теперь ситуация улучшается с 2.2 / 2.3, что добавляет поддержку встроенного энкодера для Set , Seq , Map , Date , Timestamp и BigDecimal . Если вы придерживаетесь того, чтобы создавать типы только с classами classов и обычными типами Scala, вы должны быть в порядке с неявным в SQLImplicits .


К сожалению, практически ничего не было добавлено, чтобы помочь в этом. Поиск @since 2.0.0 в SQLImplicits.scala или SQLImplicits.scala находит вещи в основном для примитивных типов (и некоторой настройки classов case). Итак, первое, что можно сказать: в настоящее время нет реальной хорошей поддержки кодировщиков пользовательского classа . Из-за этого следует, что некоторые из трюков, которые делают так же хорошо, как мы можем когда-либо надеяться, учитывая то, что у нас есть в настоящее время. Как предварительный отказ от ответственности: это не сработает отлично, и я сделаю все возможное, чтобы все ограничения были ясными и заранее.

В чем именно проблема

Когда вы хотите создать dataset, Spark «требует кодировщика (для преобразования объекта JVM типа T в и из внутреннего представления Spark SQL), который обычно создается автоматически через implicits из SparkSession или может быть явно создан путем вызова статического методы на Encoders “(взято из документов на createDataset ). Кодер примет форму Encoder[T] где T – тип, который вы кодируете. Первое предложение – добавить import spark.implicits._ (который дает вам эти неявные кодеры), а второе предложение – явно передать неявный кодер, используя этот набор связанных функций, связанных с кодировщиком.

Кодировщик не доступен для обычных classов, поэтому

 import spark.implicits._ class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

даст вам следующую неявную связанную ошибку времени компиляции:

Не удалось найти кодировщик для типа, хранящегося в наборе данных. Примитивные типы (Int, String и т. Д.) И типы продуктов (classы case) поддерживаются при импорте sqlContext.implicits._ Поддержка сериализации других типов будет добавлена ​​в будущих версиях

Однако, если вы обертываете любой тип, который вы использовали только для получения вышеуказанной ошибки в каком-то classе, который расширяет Product , ошибка смутно задерживается во время выполнения, поэтому

 import spark.implicits._ case class Wrap[T](unwrap: T) class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3)))) 

Компилирует просто отлично, но не работает во время выполнения с

java.lang.UnsupportedOperationException: Нет кодера для MyObj

Причина этого в том, что кодеры Spark, созданные с помощью implicits, фактически выполняются только во время выполнения (через scala relfection). В этом случае все проверки Spark во время компиляции заключаются в том, что внешний class расширяет Product (все classы classов) и реализует только во время выполнения, что он все еще не знает, что делать с MyObj (та же проблема возникает, если я попытался для создания Dataset[(Int,MyObj)] – Spark ждет, пока среда выполнения не будет MyObj на MyObj ). Это основные проблемы, которые остро нуждаются в исправлении:

  • некоторые classы, которые расширяют компиляцию Product несмотря на то, что всегда рушились во время выполнения и
  • нет способа передать в пользовательские кодеры для вложенных типов (у меня нет возможности кормить Spark кодер для только MyObj , чтобы он тогда знал, как кодировать Wrap[MyObj] или (Int,MyObj) ).

Просто используйте kryo

Решение, которое все предлагают, – использовать кодировщик kryo .

 import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

Тем не менее, это довольно утомительно. Особенно, если ваш код манипулирует множеством наборов данных, объединяется, группируется и т. Д. В итоге вы получаете множество дополнительных имплицитов. Итак, почему бы просто не сделать неявный, который делает это все автоматически?

 import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct) 

И теперь кажется, что я могу делать почти все, что захочу (пример ниже не будет работать в spark-shell где spark.implicits._ автоматически импортируется)

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and .. val d3 = d1.map(d => (di, d)).alias("d3") // .. deals with the new type val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom! 

Или почти. Проблема в том, что использование kryo приводит к тому, что Spark просто сохраняет каждую строку в наборе данных как плоский двоичный объект. Для map , filter , foreach это достаточно, но для таких операций, как join , Spark действительно нуждается в их разделении на столбцы. Проверяя схему для d2 или d3 , вы видите, что есть только один двоичный столбец:

 d2.printSchema // root // |-- value: binary (nullable = true) 

Частичное решение для кортежей

Таким образом, используя магию implicits в Scala (более подробно в 6.26.3 Overloading Resolution ), я могу сделать серию имплицитов, которые сделают как можно более хорошую работу, по крайней мере для кортежей, и будут хорошо работать с существующими implicits:

 import org.apache.spark.sql.{Encoder,Encoders} import scala.reflect.ClassTag import spark.implicits._ // we can still take advantage of all the old implicits implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2] ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3] ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3) // ... you can keep making these 

Затем, вооружившись этими имплицитами, я могу сделать свой пример выше работы, хотя и с некоторым переименованием столбцов

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2") val d3 = d1.map(d => (di ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") 

Я еще не понял, как получить ожидаемые имена кортежей ( _1 , _2 , …) по умолчанию без их переименования – если кто-то хочет поиграть с этим, здесь вводится имя "value" и здесь обычно добавляются имена кортежей. Однако ключевым моментом является то, что у меня теперь есть хорошая структурированная схема:

 d4.printSchema // root // |-- _1: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) // |-- _2: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) 

Итак, вкратце, это обходное решение:

  • позволяет нам получить отдельные столбцы для кортежей (так что мы снова можем присоединиться к кортежам, yay!)
  • мы снова можем просто полагаться на имплициты (так что не нужно проходить мимо kryo по всему месту)
  • почти полностью обратно совместим с import spark.implicits._ (с некоторым переименованием)
  • не позволяет нам присоединяться к сериализованным двоичным столбцам kyro , не говоря уже о полях, которые могут иметь
  • имеет неприятный побочный эффект переименования некоторых столбцов кортежа в «значение» (при необходимости это можно отменить, преобразовывая .toDF , указывая имена новых столбцов и преобразовывая обратно в dataset), и имена схем, похоже, сохраняются через соединения, где они больше всего нужны).

Частичное решение для classов в целом

Этот менее приятный и не имеет хорошего решения. Однако теперь, когда у нас есть решение кортежа выше, у меня есть догадка, что неявное решение для преобразования из другого ответа будет немного менее болезненным, так как вы можете преобразовать более сложные classы в кортежи. Затем, создав dataset, вы, вероятно, переименуете столбцы, используя подход dataframe. Если все идет хорошо, это действительно улучшается, так как теперь я могу выполнять объединения в полях моих classов. Если бы я просто использовал один плоский двоичный сериализатор kryo который бы не был возможен.

Вот пример, который выполняет немного всего: у меня есть class MyObj который имеет поля типов Int , java.util.UUID и Set[String] . Первый заботится о себе. Второй, хотя я мог бы сериализовать использование kryo был бы более полезен, если бы он хранился как String (поскольку UUID – это обычно то, к чему я хочу присоединиться). Третий действительно просто принадлежит двоичному столбцу.

 class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (oi, outoString, os) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3) 

Теперь я могу создать dataset с хорошей схемой, используя эту технику:

 val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded] 

И схема показывает мне столбцы с правильными именами и двумя первыми, с чем я могу присоединиться.

 d.printSchema // root // |-- i: integer (nullable = false) // |-- u: string (nullable = true) // |-- s: binary (nullable = true) 
  1. Использование общих кодеров.

    На данный момент доступны два общих kryo и javaSerialization где последний явно описывается как:

    крайне неэффективны и должны использоваться только в качестве последнего средства.

    Предполагая следующий class

     class Bar(i: Int) { override def toString = s"bar $i" def bar = i } 

    вы можете использовать эти кодеры, добавив неявный кодер:

     object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] } 

    которые могут использоваться вместе следующим образом:

     object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } } 

    Он хранит объекты как binary столбец, поэтому при преобразовании в DataFrame вы получаете следующую схему:

     root |-- value: binary (nullable = true) 

    Также возможно кодировать кортежи, используя kryo encoder для определенного поля:

     val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary] 

    Обратите внимание, что мы не зависим от неявных кодеров здесь, но передаем кодировщик явно, поэтому это, скорее всего, не будет работать с методом toDS .

  2. Использование неявных преобразований:

    Обеспечьте неявные преобразования между представлением, которое может быть закодировано и пользовательским classом, например:

     object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } } 

Связанные вопросы:

  • Как создать кодер для конструктора типа Option, например Option [Int]?

Кодеры работают более или менее одинаково в Spark2.0 . И Kryo по-прежнему является рекомендуемым выбором serialization .

Вы можете посмотреть следующий пример с искровой оболочкой

 scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old. - scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old. - scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old. 

До сих пор не было appropriate encoders в настоящем объеме, поэтому наши люди не были закодированы как binary значения. Но это изменится, если мы предоставим некоторые implicit кодировщики с использованием сериализации Kryo .

 // Provide Encoders scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson] normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary] scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson] reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary] // Ecoders will be used since they are now present in Scope scala> val ds3 = sc.parallelize(normalPersons).toDS ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary] scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name)) ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary] // now all our persons show up as binary values scala> ds3.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ scala> ds4.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ // Our instances still work as expected scala> ds3.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Spiderman. I am 17 years old. I am Superman. I am 25 years old. scala> ds4.foreach(p => println(p.aboutMe)) I am 25. I am Superman years old. I am 29. I am Ironman years old. I am 17. I am Spiderman years old. 

В случае classа Java Bean это может быть полезно

 import spark.sqlContext.implicits._ import org.apache.spark.sql.Encoders implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass]) 

Теперь вы можете просто прочитать dataFrame как пользовательский DataFrame

 dataFrame.as[MyClass] 

Это создаст собственный кодировщик classов, а не двоичный.

Вы можете использовать UDTRegistration, а затем Case Classes, Tuples и т. Д. … все правильно работает с вашим Пользовательским типом!

Предположим, вы хотите использовать пользовательское Enum:

 trait CustomEnum { def value:String } case object Foo extends CustomEnum { val value = "F" } case object Bar extends CustomEnum { val value = "B" } object CustomEnum { def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get } 

Зарегистрируйте его так:

 // First define a UDT class for it: class CustomEnumUDT extends UserDefinedType[CustomEnum] { override def sqlType: DataType = org.apache.spark.sql.types.StringType override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value) // Note that this will be a UTF8String type override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString) override def userClass: Class[CustomEnum] = classOf[CustomEnum] } // Then Register the UDT Class! // NOTE: you have to put this file into the org.apache.spark package! UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName) 

Тогда ИСПОЛЬЗУЙТЕ ЭТО!

 case class UsingCustomEnum(id:Int, en:CustomEnum) val seq = Seq( UsingCustomEnum(1, Foo), UsingCustomEnum(2, Bar), UsingCustomEnum(3, Foo) ).toDS() seq.filter(_.en == Foo).show() println(seq.collect()) 

Предположим, вы хотите использовать Полиморфную запись:

 trait CustomPoly case class FooPoly(id:Int) extends CustomPoly case class BarPoly(value:String, secondValue:Long) extends CustomPoly 

… и использовать его так:

 case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show() 

Вы можете написать пользовательский UDT, который кодирует все в байты (я использую сериализацию java здесь, но, вероятно, лучше использовать контекст Spark’s Kryo).

Сначала определите class UDT:

 class CustomPolyUDT extends UserDefinedType[CustomPoly] { val kryo = new Kryo() override def sqlType: DataType = org.apache.spark.sql.types.BinaryType override def serialize(obj: CustomPoly): Any = { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(obj) bos.toByteArray } override def deserialize(datum: Any): CustomPoly = { val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]]) val ois = new ObjectInputStream(bis) val obj = ois.readObject() obj.asInstanceOf[CustomPoly] } override def userClass: Class[CustomPoly] = classOf[CustomPoly] } 

Затем зарегистрируйте его:

 // NOTE: The file you do this in has to be inside of the org.apache.spark package! UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName) 

Тогда вы можете использовать его!

 // As shown above: case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show() 

Мои примеры будут на Java, но я не думаю, что это трудно адаптироваться к Scala.

Я довольно успешно конвертировал RDD в Dataset используя spark.createDataset и Encoders.bean, пока Fruit – это простой Java-компонент .

Шаг 1. Создайте простой Java-компонент.

 public class Fruit implements Serializable { private String name = "default-fruit"; private String color = "default-color"; // AllArgsConstructor public Fruit(String name, String color) { this.name = name; this.color = color; } // NoArgsConstructor public Fruit() { this("default-fruit", "default-color"); } // ...create getters and setters for above fields // you figure it out } 

Я бы придерживался classов с примитивными типами и String в качестве полей, прежде чем люди DataBricks усилят свои кодеры. Если у вас есть class с вложенным объектом, создайте еще один простой Java Bean со всеми его полями, чтобы вы могли использовать преобразования RDD для сопоставления сложного типа с более простым. Конечно, это небольшая дополнительная работа, но я думаю, что это очень поможет в производительности, связанной с плоской схемой.

Шаг 2: Получите ваш dataset из RDD

 SparkSession spark = SparkSession.builder().getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(); List fruitList = ImmutableList.of( new Fruit("apple", "red"), new Fruit("orange", "orange"), new Fruit("grape", "purple")); JavaRDD fruitJavaRDD = jsc.parallelize(fruitList); RDD fruitRDD = fruitJavaRDD.rdd(); Encoder fruitBean = Encoders.bean(Fruit.class); Dataset fruitDataset = spark.createDataset(rdd, bean); 

И вуаля! Намочите, промойте, повторите.

Для тех, кто может в моей ситуации, я тоже здесь отвечу.

Чтобы быть конкретным,

  1. Я читал «Set typed data» из SQLContext. Таким образом, исходный формат данных – DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Затем преобразуйте его в RDD, используя rdd.map () с mutable.WrappedArray.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Результат:

    (1,Set(1))

  • В чем преимущество использования абстрактных classов вместо признаков?
  • Класс classа для отображения в Scala
  • Как записать файл в Scala?
  • Почему искровая shell не работает с NullPointerException?
  • Преобразовать список Scala в кортеж?
  • Какие хорошие право-ассоциативные методы в Scala?
  • Каковы возможные причины для получения TimeoutException: фьючерсы, истекающие после при работе с Spark
  • Итерация над compilationами Java в Scala
  • «Ошибка: тип несоответствия» в Spark с теми же найденными и требуемыми типами данных
  • По умолчанию для отсутствующих свойств в игре 2 формата JSON
  • Play 2.4: Форма: не удалось найти неявное значение для сообщений параметров: play.api.i18n.Messages
  • Interesting Posts

    Обновление Microsoft (MSU) не находит обновлений для других продуктов Microsoft (например, Office) после обновления MSU

    Линейный gradleиент CSS3

    Как обрабатывать бесконечный цикл, вызванный недопустимым вводом (InputMismatchException) с помощью Scanner

    Как отлаживать службы Windows в Visual Studio?

    Как получить покрытие кода с помощью Android Studio?

    Моя скорость интернета составляет 100 Мбит / с, но скорость загрузки составляет всего 250 Кбит / с. Почему?

    Почему recursion должна быть предпочтительнее итерации?

    Как загрузить весь (активный) форум phpbb?

    матричная транспозиция в clojure

    Удаленная отладка с эмулятором Android

    Как изменить HOMEDRIVE HOMEPATH и HOMESHARE в Windows XP?

    Разница между getAttribute () и getParameter ()

    Как добавить дополнительный столбец в массив NumPy

    Запросы зависают при использовании mongoose.createConnection () vs mongoose.connect ()

    Почему индексы массива равны нулю на большинстве языков программирования?

    Давайте будем гением компьютера.