python - software - org apache spark examples
¿Cómo convierto una columna de matriz(es decir, lista) a Vector (2)
Versión corta de la pregunta!
Considere el siguiente fragmento (suponiendo que la
spark
ya esté configurada en alguna
SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Observe que el campo de temperaturas es una lista de flotadores.
Me gustaría convertir estas listas de flotantes al tipo de
Vector
MLlib, y me gustaría que esta conversión se expresara utilizando la API básica de
DataFrame
lugar de pasar por RDD (lo cual es ineficiente porque envía todos los datos de la JVM a Python, el procesamiento se realiza en Python, no obtenemos los beneficios del optimizador Catalyst de Spark, yada yada).
¿Cómo hago esto?
Específicamente:
- ¿Hay alguna manera de hacer que un elenco directo funcione? Consulte a continuación para obtener más detalles (y un intento fallido de una solución alternativa). O, ¿hay alguna otra operación que tenga el efecto que estaba buscando?
- ¿Cuál es más eficiente de las dos soluciones alternativas que sugiero a continuación (UDF vs explotar / reensamblar los elementos en la lista)? ¿O hay otras alternativas casi correctas que son mejores que cualquiera de ellas?
Un reparto directo no funciona
Esto es lo que esperaría que fuera la solución "adecuada". Quiero convertir el tipo de una columna de un tipo a otro, por lo que debería usar un yeso. Como contexto, permíteme recordarte la forma normal de enviarlo a otro tipo:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Ahora, por ejemplo,
df_with_strings.collect()[0]["temperatures"][1]
es
''-7.0''
.
Pero si lanzo un ml Vector, entonces las cosas no salen tan bien:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Esto da un error:
pyspark.sql.utils.AnalysisException: "cannot resolve ''CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)'' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
''Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
¡Ay! Alguna idea de cómo solucionar este problema?
Posibles alternativas
Alternativa 1: Usar
VectorAssembler
Hay un
Transformer
que parece casi ideal para este trabajo: el
VectorAssembler
.
Toma una o más columnas y las concatena en un solo vector.
Desafortunadamente, solo toma columnas
Vector
y
Float
, no columnas
Array
, por lo que lo siguiente no funciona:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
Da este error:
pyspark.sql.utils.IllegalArgumentException: ''Data type ArrayType(DoubleType,true) is not supported.''
El mejor trabajo en el que puedo pensar es explotar la lista en varias columnas y luego usar el
VectorAssembler
para recopilarlas nuevamente:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
Parece que sería ideal, excepto que
TEMPERATURE_COUNT
sea más de 100, y a veces más de 1000. (Otro problema es que el código sería más complicado si no conoce el tamaño de la matriz de antemano, aunque eso es no es el caso de mis datos.) ¿Spark realmente genera un conjunto de datos intermedio con tantas columnas, o simplemente considera que es un paso intermedio por el que los elementos individuales pasan de forma transitoria (o de hecho optimiza este paso de distancia por completo cuando ve que el único uso de estas columnas es ensamblarlo en un vector)?
Alternativa 2: use un UDF
Una alternativa bastante simple es usar un UDF para hacer la conversión. Esto me permite expresar directamente lo que quiero hacer en una línea de código, y no requiere hacer un conjunto de datos con un número loco de columnas. Pero todos esos datos deben intercambiarse entre Python y la JVM, y cada número individual debe ser manejado por Python (que es notoriamente lento para iterar sobre elementos de datos individuales). Así es como se ve:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
Observaciones ignorables
Las secciones restantes de esta pregunta son algunas cosas adicionales que se me ocurrieron al tratar de encontrar una respuesta. Es probable que la mayoría de las personas que lo lean puedan omitir.
No es una solución: use
Vector
para comenzar
En este ejemplo trivial, es posible crear los datos usando el tipo de vector para empezar, pero, por supuesto, mis datos no son realmente una lista de Python que estoy paralelizando, sino que se están leyendo desde una fuente de datos. Pero para el registro, así es como se vería:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
Solución ineficiente: use
map()
Una posibilidad es usar el método RDD
map()
para transformar la lista en un
Vector
.
Esto es similar a la idea de UDF, excepto que es aún peor porque se incurre en el costo de serialización, etc. para todos los campos en cada fila, no solo en el que se está operando.
Para el registro, así es como se vería esa solución:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
Intento fallido de una solución alternativa para el reparto
En la desesperación, noté que
Vector
está representado internamente por una estructura con cuatro campos, pero el uso de un molde tradicional de ese tipo de estructura tampoco funciona.
Aquí hay una ilustración (donde construí la estructura usando un udf pero el udf no es la parte importante):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
Esto da el error:
pyspark.sql.utils.AnalysisException: "cannot resolve ''CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)'' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
''Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
Personalmente, iría con Python UDF y no me molestaría con nada más:
-
Vectors
no son tipos SQL nativos, por lo que habrá una sobrecarga de rendimiento de una forma u otra. En particular, este proceso requiere dos pasos donde los datos se convierten primero de tipo externo a fila , y luego de fila a representación interna utilizandoRowEncoder
genérico . - Cualquier canalización ML descendente será mucho más costosa que una simple conversión. Además, requiere un proceso opuesto al descrito anteriormente
Pero si realmente quieres otras opciones aquí estás:
-
Scala UDF con Python wrapper:
Instale sbt siguiendo las instrucciones en el sitio del proyecto.
Cree el paquete Scala con la siguiente estructura:
. ├── build.sbt └── udfs.scala
Edite
build.sbt
(ajústelo para reflejar la versión de Scala y Spark):scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.spark" %% "spark-mllib" % "2.1.0" )
Editar
udfs.scala
:package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) }
Paquete:
sbt package
e incluir (o equivalente dependiendo de Scala vers:
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
como argumento para
--driver-class-path
al iniciar shell / enviar la aplicación.En PySpark, defina un contenedor:
from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
Prueba:
with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show()
+--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true)
-
Volcar datos a un formato JSON que refleje el esquema
DenseVector
y volver a leerlo:from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show()
+--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+
with_parsed_vector.printSchema()
root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)
Tuve el mismo problema que tú y lo hice de esta manera. Esta forma incluye la transformación RDD, por lo que no es crítico para el rendimiento, pero funciona.
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=[''city'',''temperatures''])
new_df
el resultado es,
DataFrame[city: string, temperatures: vector]