tutorial spark software org examples example python apache-spark pyspark apache-spark-sql apache-spark-ml

python - software - org apache spark examples



¿Cómo convierto una columna de matriz(es decir, lista) a Vector (2)

Versión corta de la pregunta!

Considere el siguiente fragmento (suponiendo que la spark ya esté configurada en alguna SparkSession ):

from pyspark.sql import Row source_data = [ Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]), Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), ] df = spark.createDataFrame(source_data)

Observe que el campo de temperaturas es una lista de flotadores. Me gustaría convertir estas listas de flotantes al tipo de Vector MLlib, y me gustaría que esta conversión se expresara utilizando la API básica de DataFrame lugar de pasar por RDD (lo cual es ineficiente porque envía todos los datos de la JVM a Python, el procesamiento se realiza en Python, no obtenemos los beneficios del optimizador Catalyst de Spark, yada yada). ¿Cómo hago esto? Específicamente:

  1. ¿Hay alguna manera de hacer que un elenco directo funcione? Consulte a continuación para obtener más detalles (y un intento fallido de una solución alternativa). O, ¿hay alguna otra operación que tenga el efecto que estaba buscando?
  2. ¿Cuál es más eficiente de las dos soluciones alternativas que sugiero a continuación (UDF vs explotar / reensamblar los elementos en la lista)? ¿O hay otras alternativas casi correctas que son mejores que cualquiera de ellas?

Un reparto directo no funciona

Esto es lo que esperaría que fuera la solución "adecuada". Quiero convertir el tipo de una columna de un tipo a otro, por lo que debería usar un yeso. Como contexto, permíteme recordarte la forma normal de enviarlo a otro tipo:

from pyspark.sql import types df_with_strings = df.select( df["city"], df["temperatures"].cast(types.ArrayType(types.StringType()))), )

Ahora, por ejemplo, df_with_strings.collect()[0]["temperatures"][1] es ''-7.0'' . Pero si lanzo un ml Vector, entonces las cosas no salen tan bien:

from pyspark.ml.linalg import VectorUDT df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

Esto da un error:

pyspark.sql.utils.AnalysisException: "cannot resolve ''CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)'' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; ''Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)] +- LogicalRDD [city#0, temperatures#1] "

¡Ay! Alguna idea de cómo solucionar este problema?

Posibles alternativas

Alternativa 1: Usar VectorAssembler

Hay un Transformer que parece casi ideal para este trabajo: el VectorAssembler . Toma una o más columnas y las concatena en un solo vector. Desafortunadamente, solo toma columnas Vector y Float , no columnas Array , por lo que lo siguiente no funciona:

from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector") df_fail = assembler.transform(df)

Da este error:

pyspark.sql.utils.IllegalArgumentException: ''Data type ArrayType(DoubleType,true) is not supported.''

El mejor trabajo en el que puedo pensar es explotar la lista en varias columnas y luego usar el VectorAssembler para recopilarlas nuevamente:

from pyspark.ml.feature import VectorAssembler TEMPERATURE_COUNT = 3 assembler_exploded = VectorAssembler( inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], outputCol="temperature_vector" ) df_exploded = df.select( df["city"], *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)] ) converted_df = assembler_exploded.transform(df_exploded) final_df = converted_df.select("city", "temperature_vector")

Parece que sería ideal, excepto que TEMPERATURE_COUNT sea ​​más de 100, y a veces más de 1000. (Otro problema es que el código sería más complicado si no conoce el tamaño de la matriz de antemano, aunque eso es no es el caso de mis datos.) ¿Spark realmente genera un conjunto de datos intermedio con tantas columnas, o simplemente considera que es un paso intermedio por el que los elementos individuales pasan de forma transitoria (o de hecho optimiza este paso de distancia por completo cuando ve que el único uso de estas columnas es ensamblarlo en un vector)?

Alternativa 2: use un UDF

Una alternativa bastante simple es usar un UDF para hacer la conversión. Esto me permite expresar directamente lo que quiero hacer en una línea de código, y no requiere hacer un conjunto de datos con un número loco de columnas. Pero todos esos datos deben intercambiarse entre Python y la JVM, y cada número individual debe ser manejado por Python (que es notoriamente lento para iterar sobre elementos de datos individuales). Así es como se ve:

from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT()) df_with_vectors = df.select( df["city"], list_to_vector_udf(df["temperatures"]).alias("temperatures") )

Observaciones ignorables

Las secciones restantes de esta pregunta son algunas cosas adicionales que se me ocurrieron al tratar de encontrar una respuesta. Es probable que la mayoría de las personas que lo lean puedan omitir.

No es una solución: use Vector para comenzar

En este ejemplo trivial, es posible crear los datos usando el tipo de vector para empezar, pero, por supuesto, mis datos no son realmente una lista de Python que estoy paralelizando, sino que se están leyendo desde una fuente de datos. Pero para el registro, así es como se vería:

from pyspark.ml.linalg import Vectors from pyspark.sql import Row source_data = [ Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])), Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])), ] df = spark.createDataFrame(source_data)

Solución ineficiente: use map()

Una posibilidad es usar el método RDD map() para transformar la lista en un Vector . Esto es similar a la idea de UDF, excepto que es aún peor porque se incurre en el costo de serialización, etc. para todos los campos en cada fila, no solo en el que se está operando. Para el registro, así es como se vería esa solución:

df_with_vectors = df.rdd.map(lambda row: Row( city=row["city"], temperatures=Vectors.dense(row["temperatures"]) )).toDF()

Intento fallido de una solución alternativa para el reparto

En la desesperación, noté que Vector está representado internamente por una estructura con cuatro campos, pero el uso de un molde tradicional de ese tipo de estructura tampoco funciona. Aquí hay una ilustración (donde construí la estructura usando un udf pero el udf no es la parte importante):

from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType()) df_almost_vector = df.select( df["city"], list_to_almost_vector_udf(df["temperatures"]).alias("temperatures") ) df_with_vectors = df_almost_vector.select( df_almost_vector["city"], df_almost_vector["temperatures"].cast(VectorUDT()) )

Esto da el error:

pyspark.sql.utils.AnalysisException: "cannot resolve ''CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)'' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; ''Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)] +- Project [city#0, <lambda>(temperatures#1) AS temperatures#5] +- LogicalRDD [city#0, temperatures#1] "


Personalmente, iría con Python UDF y no me molestaría con nada más:

  • Vectors no son tipos SQL nativos, por lo que habrá una sobrecarga de rendimiento de una forma u otra. En particular, este proceso requiere dos pasos donde los datos se convierten primero de tipo externo a fila , y luego de fila a representación interna utilizando RowEncoder genérico .
  • Cualquier canalización ML descendente será mucho más costosa que una simple conversión. Además, requiere un proceso opuesto al descrito anteriormente

Pero si realmente quieres otras opciones aquí estás:

  • Scala UDF con Python wrapper:

    Instale sbt siguiendo las instrucciones en el sitio del proyecto.

    Cree el paquete Scala con la siguiente estructura:

    . ├── build.sbt └── udfs.scala

    Edite build.sbt (ajústelo para reflejar la versión de Scala y Spark):

    scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.spark" %% "spark-mllib" % "2.1.0" )

    Editar udfs.scala :

    package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) }

    Paquete:

    sbt package

    e incluir (o equivalente dependiendo de Scala vers:

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar

    como argumento para --driver-class-path al iniciar shell / enviar la aplicación.

    En PySpark, defina un contenedor:

    from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

    Prueba:

    with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show()

    +--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema()

    root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true)

  • Volcar datos a un formato JSON que refleje el esquema DenseVector y volver a leerlo:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show()

    +--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+

    with_parsed_vector.printSchema()

    root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)


Tuve el mismo problema que tú y lo hice de esta manera. Esta forma incluye la transformación RDD, por lo que no es crítico para el rendimiento, pero funciona.

from pyspark.sql import Row from pyspark.ml.linalg import Vectors source_data = [ Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]), Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), ] df = spark.createDataFrame(source_data) city_rdd = df.rdd.map(lambda row:row[0]) temp_rdd = df.rdd.map(lambda row:row[1]) new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=[''city'',''temperatures'']) new_df

el resultado es,

DataFrame[city: string, temperatures: vector]