apache spark - ¿Cómo acceder al elemento de una columna VectorUDT en un Spark DataFrame?
apache-spark pyspark (2)
Tengo un marco de datos
df
con una columna
VectorUDT
llamada
features
.
¿Cómo obtengo un elemento de la columna, digamos primer elemento?
He intentado hacer lo siguiente
from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()
pero obtengo un
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)
.
El mismo error si hago
first_elem_udf = first_elem_udf(lambda row: row.toArray()[0])
lugar.
También intenté
explode()
pero recibo un error porque requiere un tipo de matriz o mapa.
Esta debería ser una operación común, creo.
Convertir salida a
float
:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf
def ith_(v, i):
try:
return float(v[i])
except ValueError:
return None
ith = udf(ith_, DoubleType())
Ejemplo de uso:
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
(1, Vectors.dense([1, 2, 3])),
(2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])
df.select(ith("features", lit(1))).show()
## +-----------------+
## |ith_(features, 1)|
## +-----------------+
## | 2.0|
## | 9.0|
## +-----------------+
Explicación:
Los valores de salida deben ser reserializados a objetos Java equivalentes.
Si desea acceder a los
values
(tenga cuidado con
SparseVectors
) debe usar
item
método del
item
:
v.values.item(0)
que devuelven escalares estándar de Python. Del mismo modo, si desea acceder a todos los valores como una estructura densa:
v.toArray().tolist()
Si prefiere usar spark.sql, puede usar la siguiente función personalizada ''to_array'' para convertir el vector a arrary. Entonces puedes manipularlo como una matriz.
from pyspark.sql.types import ArrayType, DoubleType def to_array_(v): return v.toArray().tolist() from pyspark.sql import SQLContext sqlContext=SQLContext(spark.sparkContext, sparkSession=spark, jsqlContext=None) sqlContext.udf.register("to_array",to_array_, ArrayType(DoubleType()))
ejemplo
from pyspark.ml.linalg import Vectors df = sc.parallelize([ (1, Vectors.dense([1, 2, 3])), (2, Vectors.sparse(3, [1], [9])) ]).toDF(["id", "features"]) df.createOrReplaceTempView("tb") spark.sql("""select * , to_array(features)[1] Second from tb """).toPandas()
salida
id features Second 0 1 [1.0, 2.0, 3.0] 2.0 1 2 (0.0, 9.0, 0.0) 9.0