python apache-spark pyspark apache-spark-1.6

python - Serialización PySpark EOFError



apache-spark apache-spark-1.6 (2)

Estoy leyendo un CSV como Spark DataFrame y estoy realizando operaciones de aprendizaje automático en él. Sigo recibiendo un EOFError de serialización de Python, ¿alguna idea de por qué? Pensé que podría tratarse de un problema de memoria, es decir, un archivo que excede la RAM disponible, pero reducir drásticamente el tamaño del DataFrame no impidió el error EOF.

Código de juguete y error a continuación.

#set spark context conf = SparkConf().setMaster("local").setAppName("MyApp") sc = SparkContext(conf = conf) sqlContext = SQLContext(sc) #read in 500mb csv as DataFrame df = sqlContext.read.format(''com.databricks.spark.csv'').options(header=''true'', inferschema=''true'').load(''myfile.csv'') #get dataframe into machine learning format r_formula = RFormula(formula = "outcome ~ .") mldf = r_formula.fit(df).transform(df) #fit random forest model rf = RandomForestClassifier(numTrees = 3, maxDepth = 2) model = rf.fit(mldf) result = model.transform(mldf).head()

Ejecutar el código anterior con spark-submit en un solo nodo emite repetidamente el siguiente error, incluso si el tamaño del DataFrame se reduce antes de ajustar el modelo (por ejemplo, tinydf = df.sample(False, 0.00001) :

Traceback (most recent call last): File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/worker.py", line 136, in main if read_int(infile) == SpecialLengths.END_OF_STREAM: File "/home/hduser/spark1.6/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in read_int raise EOFError EOFError


¿Ha revisado para ver en qué parte del código está surgiendo el error EOError?

Mi conjetura sería que vendrá cuando intentes definir df con, ya que ese es el único lugar en tu código en el que el archivo está intentando ser leído.

df = sqlContext.read.format(''com.databricks.spark.csv'').options(header=''true'', inferschema=''true'').load(''myfile.csv'')

En cada punto después de esta línea, su código está trabajando con la variable df , no con el archivo en sí, por lo que parece probable que esta línea esté generando el error.

Una forma sencilla de probar si este es el caso sería comentar el resto de su código y / o colocar una línea como esta justo después de la línea de arriba.

print(len(df))

Otra forma sería usar un bucle de try , como:

try: df = sqlContext.read.format(''com.databricks.spark.csv'').options(header=''true'', inferschema=''true'').load(''myfile.csv'') except: print(''Didn''t load file into df!'')

Si resulta que esa línea es la que genera el error EOFError, en primer lugar nunca obtendrás los marcos de datos, por lo que intentar reducirlos no hará una diferencia.

Si esa es la línea que genera el error, se nos ocurren dos posibilidades:

1) Su código está llamando a uno o ambos archivos .csv anteriormente y no lo está cerrando antes de esta línea. Si es así, simplemente ciérrelo sobre su código aquí.

2) Hay algo mal con los archivos .csv. Intente cargarlos fuera de este código, y vea si puede guardarlos en la memoria correctamente en primer lugar, utilizando algo como csv.reader, y manipúlelos de la forma que usted espera.


El error parece ocurrir en la función pySpark read_int. Código para lo cual es el siguiente del sitio de chispa :

def read_int(stream): length = stream.read(4) if not length: raise EOFError return struct.unpack("!i", length)[0]

Esto significaría que al leer 4 bytes del flujo, si se leen 0 bytes, se genera un error EOF. Los documentos de python están here .