python - sparkcontext - pyspark EOFError después de llamar al mapa
spark python example (2)
¿Puedes por favor intentar hacer un mapa después de convertir el marco de datos en rdd? Está aplicando la función de mapa en un marco de datos y luego de nuevo creando un marco de datos a partir de eso. La sintaxis sería como
df.rdd.map().toDF()
Por favor, déjeme saber si funciona. Gracias.
Soy nuevo en spark & pyspark.
Estoy leyendo un pequeño archivo csv (~ 40k) en un marco de datos.
from pyspark.sql import functions as F
df = sqlContext.read.format(''com.databricks.spark.csv'').options(header=''true'', inferschema=''true'').load(''/tmp/sm.csv'')
df = df.withColumn(''verified'', F.when(df[''verified''] == ''Y'', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
Me sale un error extraño que no ocurre todas las veces, pero ocurre con bastante frecuencia
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
Traceback (most recent call last):
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int
raise EOFError
EOFError
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
Una vez que se haya generado EOFError, no lo volveré a ver hasta que haga algo que requiera interactuar con el servidor de chispa.
Cuando llamo a df2.count () muestra que el indicador [Etapa xxx], que es lo que quiero decir con eso, va al servidor de chispa. Cualquier cosa que se dispare que parezca que finalmente termine dándole a EOFError nuevamente cuando hago algo con df2.
No parece que suceda con df (vs. df2), así que parece que debe ser algo que sucede con la línea df.map ().
Creo que estás ejecutando Spark 2.x y superior. Debajo del código debe crear su marco de datos desde csv:
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
entonces puedes tener el siguiente código:
df = df.withColumn(''verified'', F.when(df[''verified''] == ''Y'', 1).otherwise(0))
y luego puedes crear df2 sin Fila y toDF ()
Avíseme si esto funciona o si está utilizando Spark 1.6 ... gracias.