structtype spark read examples crear python apache-spark dataframe pyspark apache-spark-sql

read - python spark dataframe



Cuente el nĂºmero de entradas que no son NaN en cada columna del marco de datos Spark con Pyspark (2)

Comencemos con una información ficticia:

from pyspark.sql import Row row = Row("v", "x", "y", "z") df = sc.parallelize([ row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0), row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN")) ]).toDF() ## +----+----+---+---+ ## | v| x| y| z| ## +----+----+---+---+ ## | 0.0| 1| 2|3.0| ## |null| 3| 4|5.0| ## |null|null| 6|7.0| ## | NaN| 8| 9|NaN| ## +----+----+---+---+

Todo lo que necesitas es una agregación simple:

from pyspark.sql.functions import col, count, isnan, lit, sum def count_not_null(c, nan_as_null=False): """Use conversion between boolean and integer - False -> 0 - True -> 1 """ pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True)) return sum(pred.cast("integer")).alias(c) df.agg(*[count_not_null(c) for c in df.columns]).show() ## +---+---+---+---+ ## | v| x| y| z| ## +---+---+---+---+ ## | 2| 3| 4| 4| ## +---+---+---+---+

o si quieres tratar NaN un NULL :

df.agg(*[count_not_null(c, True) for c in df.columns]).show() ## +---+---+---+---+ ## | v| x| y| z| ## +---+---+---+---+ ## | 1| 3| 4| 3| ## +---+---+---+---

También puede aprovechar la semántica NULL SQL para lograr el mismo resultado sin crear una función personalizada:

df.agg(*[ count(c).alias(c) # vertical (column-wise) operations in SQL ignore NULLs for c in df.columns ]).show() ## +---+---+---+ ## | x| y| z| ## +---+---+---+ ## | 1| 2| 3| ## +---+---+---+

pero esto no funcionará con NaNs .

Si prefieres fracciones:

exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns] df.agg(*exprs).show() ## +------------------+------------------+---+ ## | x| y| z| ## +------------------+------------------+---+ ## |0.3333333333333333|0.6666666666666666|1.0| ## +------------------+------------------+---+

o

# COUNT(*) is equivalent to COUNT(1) so NULLs won''t be an issue df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show() ## +------------------+------------------+---+ ## | x| y| z| ## +------------------+------------------+---+ ## |0.3333333333333333|0.6666666666666666|1.0| ## +------------------+------------------+---+

Equivalente Scala:

import org.apache.spark.sql.Column import org.apache.spark.sql.functions.{col, isnan, sum} type JDouble = java.lang.Double val df = Seq[(JDouble, JDouble, JDouble, JDouble)]( (0.0, 1, 2, 3.0), (null, 3, 4, 5.0), (null, null, 6, 7.0), (java.lang.Double.NaN, 8, 9, java.lang.Double.NaN) ).toDF() def count_not_null(c: Column, nanAsNull: Boolean = false) = { val pred = c.isNotNull and (if (nanAsNull) not(isnan(c)) else lit(true)) sum(pred.cast("integer")) } df.select(df.columns map (c => count_not_null(col(c)).alias(c)): _*).show // +---+---+---+---+ // | _1| _2| _3| _4| // +---+---+---+---+ // | 2| 3| 4| 4| // +---+---+---+---+ df.select(df.columns map (c => count_not_null(col(c), true).alias(c)): _*).show // +---+---+---+---+ // | _1| _2| _3| _4| // +---+---+---+---+ // | 1| 3| 4| 3| // +---+---+---+---+

Tengo un conjunto de datos muy grande que se carga en Hive. Consiste en aproximadamente 1.9 millones de filas y 1450 columnas. Necesito determinar la "cobertura" de cada una de las columnas, es decir, la fracción de filas que tienen valores no NaN para cada columna.

Aquí está mi código:

from pyspark import SparkContext from pyspark.sql import HiveContext import string as string sc = SparkContext(appName="compute_coverages") ## Create the context sqlContext = HiveContext(sc) df = sqlContext.sql("select * from data_table") nrows_tot = df.count() covgs=sc.parallelize(df.columns) .map(lambda x: str(x)) .map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))

Al probar esto en el shell pyspark, si luego hago covgs.take (10), devuelve una pila de errores bastante grande. Dice que hay un problema al guardar en el archivo /usr/lib64/python2.6/pickle.py . Esta es la parte final del error:

py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)

Si hay una mejor manera de lograr esto que la que estoy intentando, estoy abierto a sugerencias. Sin embargo, no puedo usar pandas, ya que actualmente no está disponible en el clúster en el que trabajo y no tengo derechos para instalarlo.


Puede usar isNotNull() :

df.where(df[YOUR_COLUMN].isNotNull()).select(YOUR_COLUMN).show()