tutorial spark software example apache-spark dataframe pyspark spark-dataframe apache-spark-sql apache-spark-ml

apache spark - software - spark.ml StringIndexer arroja ''Etiqueta invisible'' en fit()



spark pi example (2)

Está bien, creo que tengo esto. Al menos tengo esto funcionando.

El almacenamiento en caché del marco de datos (incluidas las partes de tren / prueba) resuelve el problema. Eso es lo que encontré en este número de JIRA: https://issues.apache.org/jira/browse/SPARK-12590 .

Por lo tanto, no es un error, solo el hecho de que randomSample podría producir un resultado diferente en el mismo conjunto de datos, pero con particiones diferentes. Y aparentemente, algunas de mis funciones de munging (o Pipeline ) involucran la repartición, por lo tanto, los resultados de la recalculación del conjunto de trenes de su definición pueden ser diferentes.

Lo que aún me interesa es la reproducibilidad: siempre es la fila ''pl-PL'' la que se mezcla en la parte incorrecta del conjunto de datos, es decir, no es un reparto aleatorio. Es determinista, simplemente inconsistente. Me pregunto cómo sucede exactamente.

Estoy preparando un ejemplo de toy spark.ml . Spark version 1.6.0 , que se ejecuta sobre Oracle JDK version 1.8.0_65 , pyspark, ipython notebook.

Primero, apenas tiene nada que ver con Spark, ML, StringIndexer: manejo de etiquetas invisibles . La excepción se produce al ajustar una tubería a un conjunto de datos, no al transformarlo. Y suprimir la excepción podría no ser una solución aquí, ya que, me temo, el conjunto de datos se complica bastante en este caso.

Mi conjunto de datos es de aproximadamente 800 Mb sin comprimir, por lo que puede ser difícil de reproducir (los subconjuntos más pequeños parecen esquivar este problema).

El conjunto de datos se ve así:

+--------------------+-----------+-----+-------+-----+--------------------+ | url| ip| rs| lang|label| txt| +--------------------+-----------+-----+-------+-----+--------------------+ |http://3d-detmold...|217.160.215|378.0| de| 0.0|homwillkommskip c...| | http://3davto.ru/| 188.225.16|891.0| id| 1.0|оформить заказ пе...| | http://404.szm.com/| 85.248.42| 58.0| cs| 0.0|kliknite tu alebo...| | http://404.xls.hu/| 212.52.166|168.0| hu| 0.0|honlapkészítés404...| |http://a--m--a--t...| 66.6.43|462.0| en| 0.0|back top archiv r...| |http://a-wrf.ru/c...| 78.108.80|126.0|unknown| 1.0| | |http://a-wrf.ru/s...| 78.108.80|214.0| ru| 1.0|установк фаркопна...| +--------------------+-----------+-----+-------+-----+--------------------+

El valor que se predice es label . Toda la tubería aplicada a él:

from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF from pyspark.ml.classification import LogisticRegression train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345) pipe_stages = [ StringIndexer(inputCol=''lang'', outputCol=''lang_idx''), OneHotEncoder(inputCol=''lang_idx'', outputCol=''lang_onehot''), Tokenizer(inputCol=''ip'', outputCol=''ip_tokens''), HashingTF(numFeatures=2**10, inputCol=''ip_tokens'', outputCol=''ip_vector''), Tokenizer(inputCol=''txt'', outputCol=''txt_tokens''), HashingTF(numFeatures=2**18, inputCol=''txt_tokens'', outputCol=''txt_vector''), VectorAssembler(inputCols=[''lang_onehot'', ''ip_vector'', ''txt_vector''], outputCol=''features''), LogisticRegression(labelCol=''label'', featuresCol=''features'') ] pipe = Pipeline(stages=pipe_stages) pipemodel = pipe.fit(train)

Y aquí está el stacktrace:

Py4JJavaError: An error occurred while calling o10793.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271) at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159) at org.apache.spark.ml.Predictor.fit(Predictor.scala:90) at org.apache.spark.ml.Predictor.fit(Predictor.scala:71) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Unseen label: pl-PL. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more

La línea más interesante es:

org.apache.spark.SparkException: Unseen label: pl-PL.

No tengo idea de cómo pl-PL que es un valor de la columna lang , podría haberse mezclado en la columna label , que es un float , no una string editada: algunas conclusiones precipitadas, corregidas gracias a @ zero323

Lo examiné más a fondo y descubrí que pl-PL es un valor de la parte de prueba del conjunto de datos, no del entrenamiento. Así que ahora ni siquiera sé dónde buscar al culpable: podría ser fácilmente el código randomSplit , no StringIndexer , y quién sabe qué más.

¿Cómo investigo esto?


Unseen label es un mensaje genérico que no corresponde a una columna específica . El problema más probable es con una etapa siguiente:

StringIndexer(inputCol=''lang'', outputCol=''lang_idx'')

con pl-PL presente en el train("lang") y no presente en la test("lang") .

Puede corregirlo usando setHandleInvalid con skip :

from pyspark.ml.feature import StringIndexer train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"]) test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"]) indexer = StringIndexer(inputCol="v", outputCol="vi") indexer.fit(train).transform(test).show() ## Py4JJavaError: An error occurred while calling o112.showString. ## : org.apache.spark.SparkException: Job aborted due to stage failure: ## ... ## org.apache.spark.SparkException: Unseen label: foobar. indexer.setHandleInvalid("skip").fit(train).transform(test).show() ## +---+---+---+ ## | k| v| vi| ## +---+---+---+ ## | 3|foo|1.0| ## +---+---+---+

o, en las últimas versiones, keep :

indexer.setHandleInvalid("keep").fit(train).transform(test).show() ## +---+------+---+ ## | k| v| vi| ## +---+------+---+ ## | 3| foo|0.0| ## | 4|foobar|2.0| ## +---+------+---+