tutorial sparkcontext spark guide example español downloads python apache-spark pyspark

python - sparkcontext - pyspark EOFError después de llamar al mapa



spark python example (2)

¿Puedes por favor intentar hacer un mapa después de convertir el marco de datos en rdd? Está aplicando la función de mapa en un marco de datos y luego de nuevo creando un marco de datos a partir de eso. La sintaxis sería como

df.rdd.map().toDF()

Por favor, déjeme saber si funciona. Gracias.

Soy nuevo en spark & ​​pyspark.

Estoy leyendo un pequeño archivo csv (~ 40k) en un marco de datos.

from pyspark.sql import functions as F df = sqlContext.read.format(''com.databricks.spark.csv'').options(header=''true'', inferschema=''true'').load(''/tmp/sm.csv'') df = df.withColumn(''verified'', F.when(df[''verified''] == ''Y'', 1).otherwise(0)) df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()

Me sale un error extraño que no ocurre todas las veces, pero ocurre con bastante frecuencia

>>> df2.show(1) +--------------------+---------+ | features| label| +--------------------+---------+ |[0.0,0.0,0.0,0.0,...|4700734.0| +--------------------+---------+ only showing top 1 row >>> df2.count() 41999 >>> df2.show(1) +--------------------+---------+ | features| label| +--------------------+---------+ |[0.0,0.0,0.0,0.0,...|4700734.0| +--------------------+---------+ only showing top 1 row >>> df2.count() 41999 >>> df2.show(1) Traceback (most recent call last): File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main if read_int(infile) == SpecialLengths.END_OF_STREAM: File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int raise EOFError EOFError +--------------------+---------+ | features| label| +--------------------+---------+ |[0.0,0.0,0.0,0.0,...|4700734.0| +--------------------+---------+ only showing top 1 row

Una vez que se haya generado EOFError, no lo volveré a ver hasta que haga algo que requiera interactuar con el servidor de chispa.

Cuando llamo a df2.count () muestra que el indicador [Etapa xxx], que es lo que quiero decir con eso, va al servidor de chispa. Cualquier cosa que se dispare que parezca que finalmente termine dándole a EOFError nuevamente cuando hago algo con df2.

No parece que suceda con df (vs. df2), así que parece que debe ser algo que sucede con la línea df.map ().


Creo que estás ejecutando Spark 2.x y superior. Debajo del código debe crear su marco de datos desde csv:

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

entonces puedes tener el siguiente código:

df = df.withColumn(''verified'', F.when(df[''verified''] == ''Y'', 1).otherwise(0))

y luego puedes crear df2 sin Fila y toDF ()

Avíseme si esto funciona o si está utilizando Spark 1.6 ... gracias.