spark.ml StringIndexer бросает «Невидимый ярлык» на fit ()
Я spark.ml
пример spark.ml
. Spark version 1.6.0
, работающая поверх Oracle JDK version 1.8.0_65
, pyspark, ipython notebook.
Во-первых, он вряд ли имеет какое-либо отношение к Spark, ML, StringIndexer: обработка невидимых ярлыков . Исключение возникает при установке конвейера в dataset, а не на его преобразование. И подавление исключения, возможно, не является решением здесь, так как, боюсь, dataset в этом случае становится бесполезным.
Мой dataset составляет около 800 Мб без сжатия, поэтому его трудно воспроизвести (меньшие подмножества, похоже, уклоняются от этой проблемы).
- Оптимизация соединения DataFrame - Broadcast Hash Join
- Как повысить производительность для медленных заданий Spark с использованием соединения DataFrame и JDBC?
- Запрос Spark SQL DataFrame со сложными типами
- Определение UDF, который принимает массив объектов в Spark DataFrame?
- Как определить настраиваемую функцию агрегации для суммирования столбца векторов?
Набор данных выглядит так:
+--------------------+-----------+-----+-------+-----+--------------------+ | url| ip| rs| lang|label| txt| +--------------------+-----------+-----+-------+-----+--------------------+ |http://3d-detmold...|217.160.215|378.0| de| 0.0|homwillkommskip c...| | http://3davto.ru/| 188.225.16|891.0| id| 1.0|оформить заказ пе...| | http://404.szm.com/| 85.248.42| 58.0| cs| 0.0|kliknite tu alebo...| | http://404.xls.hu/| 212.52.166|168.0| hu| 0.0|honlapkészítés404...| |http://a--m--a--t...| 66.6.43|462.0| en| 0.0|back top archiv r...| |http://a-wrf.ru/c...| 78.108.80|126.0|unknown| 1.0| | |http://a-wrf.ru/s...| 78.108.80|214.0| ru| 1.0|установк фаркопна...| +--------------------+-----------+-----+-------+-----+--------------------+
Предсказываемое значение – это label
. К нему обратился весь трубопровод:
from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF from pyspark.ml.classification import LogisticRegression train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345) pipe_stages = [ StringIndexer(inputCol='lang', outputCol='lang_idx'), OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'), Tokenizer(inputCol='ip', outputCol='ip_tokens'), HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'), Tokenizer(inputCol='txt', outputCol='txt_tokens'), HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'), VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'), LogisticRegression(labelCol='label', featuresCol='features') ] pipe = Pipeline(stages=pipe_stages) pipemodel = pipe.fit(train)
И вот стек:
Py4JJavaError: An error occurred while calling o10793.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159) at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Unseen label: pl-PL. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more
Самая интересная строка:
org.apache.spark.SparkException: Unseen label: pl-PL.
Не знаю, как отредактированной: некоторые поспешные высказывания, исправленные благодаря @ zero323 pl-PL
который является значением из столбца lang
мог быть замешан в столбце label
, который является float
, а не string
Я посмотрел дальше и обнаружил, что pl-PL
– это ценность из тестовой части набора данных, а не для обучения. Так что теперь я даже не знаю, где искать виновника: это может быть легко randomSplit
код, а не StringIndexer
, и кто знает, что еще.
Как я могу это расследовать?
- Вызывается: java.lang.NullPointerException в org.apache.spark.sql.Dataset
- Spark / Scala: форвардная заливка с последним наблюдением
- Как импортировать несколько файлов csv в одной загрузке?
- Неверные поля Scark DataFrame
- Spark sql как взорваться без потери нулевых значений
- Как получить доступ к элементу столбца VectorUDT в Spark DataFrame?
- Как запросить столбцы данных JSON, используя Spark DataFrames?
- Вывести несколько столбцов из одного столбца в Spark DataFrame
Unseen label
– это общее сообщение, которое не соответствует определенному столбцу . Скорее всего, проблема заключается в следующем этапе:
StringIndexer(inputCol='lang', outputCol='lang_idx')
с pl-PL
присутствующим в train("lang")
и не присутствующим в test("lang")
.
Вы можете исправить его с помощью setHandleInvalid
с skip
:
from pyspark.ml.feature import StringIndexer train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"]) test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"]) indexer = StringIndexer(inputCol="v", outputCol="vi") indexer.fit(train).transform(test).show() ## Py4JJavaError: An error occurred while calling o112.showString. ## : org.apache.spark.SparkException: Job aborted due to stage failure: ## ... ## org.apache.spark.SparkException: Unseen label: foobar. indexer.setHandleInvalid("skip").fit(train).transform(test).show() ## +---+---+---+ ## | k| v| vi| ## +---+---+---+ ## | 3|foo|1.0| ## +---+---+---+
Хорошо, я думаю, что получил это. По крайней мере, я получил эту работу.
Кэширование данных (в том числе поезда / тестовые партии) решает проблему. Вот что я нашел в этой проблеме JIRA: https://issues.apache.org/jira/browse/SPARK-12590 .
Таким образом, это не ошибка, а только тот факт, что randomSample
может дать другой результат на одном и том же, но по-разному разделенном наборе данных. И, по-видимому, некоторые из моих функций munging (или Pipeline
) связаны с переделкой, поэтому результаты пересчета поезда могут быть расходятся.
Меня все еще интересует – это воспроизводимость: это всегда строка pl-PL, которая смешивается в неправильной части набора данных, т. Е. Это не случайный передел. Это детерминированный, просто непоследовательный. Интересно, как именно это происходит.