apache-spark dataframe pyspark apache-spark-sql apache-spark-ml

apache spark - ¿Cómo acceder al elemento de una columna VectorUDT en un Spark DataFrame?



apache-spark pyspark (2)

Tengo un marco de datos df con una columna VectorUDT llamada features . ¿Cómo obtengo un elemento de la columna, digamos primer elemento?

He intentado hacer lo siguiente

from pyspark.sql.functions import udf first_elem_udf = udf(lambda row: row.values[0]) df.select(first_elem_udf(df.features)).show()

pero obtengo un net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype) . El mismo error si hago first_elem_udf = first_elem_udf(lambda row: row.toArray()[0]) lugar.

También intenté explode() pero recibo un error porque requiere un tipo de matriz o mapa.

Esta debería ser una operación común, creo.


Convertir salida a float :

from pyspark.sql.types import DoubleType from pyspark.sql.functions import lit, udf def ith_(v, i): try: return float(v[i]) except ValueError: return None ith = udf(ith_, DoubleType())

Ejemplo de uso:

from pyspark.ml.linalg import Vectors df = sc.parallelize([ (1, Vectors.dense([1, 2, 3])), (2, Vectors.sparse(3, [1], [9])) ]).toDF(["id", "features"]) df.select(ith("features", lit(1))).show() ## +-----------------+ ## |ith_(features, 1)| ## +-----------------+ ## | 2.0| ## | 9.0| ## +-----------------+

Explicación:

Los valores de salida deben ser reserializados a objetos Java equivalentes. Si desea acceder a los values (tenga cuidado con SparseVectors ) debe usar item método del item :

v.values.item(0)

que devuelven escalares estándar de Python. Del mismo modo, si desea acceder a todos los valores como una estructura densa:

v.toArray().tolist()


Si prefiere usar spark.sql, puede usar la siguiente función personalizada ''to_array'' para convertir el vector a arrary. Entonces puedes manipularlo como una matriz.

from pyspark.sql.types import ArrayType, DoubleType def to_array_(v): return v.toArray().tolist() from pyspark.sql import SQLContext sqlContext=SQLContext(spark.sparkContext, sparkSession=spark, jsqlContext=None) sqlContext.udf.register("to_array",to_array_, ArrayType(DoubleType()))

ejemplo

from pyspark.ml.linalg import Vectors df = sc.parallelize([ (1, Vectors.dense([1, 2, 3])), (2, Vectors.sparse(3, [1], [9])) ]).toDF(["id", "features"]) df.createOrReplaceTempView("tb") spark.sql("""select * , to_array(features)[1] Second from tb """).toPandas()

salida

id features Second 0 1 [1.0, 2.0, 3.0] 2.0 1 2 (0.0, 9.0, 0.0) 9.0