python - software - Codificar y ensamblar múltiples características en PySpark
map reduce spark (1)
Chispa> = 2.3
Dado que Spark 2.3 OneHotEncoder
está en desuso a favor de OneHotEncoderEstimator
. Si usa una versión reciente, modifique el código del encoder
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
Chispa <2.3
Bueno, puedes escribir un UDF pero ¿por qué lo harías? Ya hay bastantes herramientas diseñadas para manejar esta categoría de tareas:
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector
row = Row("gender", "foo", "bar")
df = sc.parallelize([
row("0", 3.0, DenseVector([0, 2.1, 1.0])),
row("1", 1.0, DenseVector([0, 1.1, 1.0])),
row("1", -1.0, DenseVector([0, 3.4, 0.0])),
row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()
En primer lugar StringIndexer
.
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()
## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## | 0| 3.0| 0.0|
## | 1| 1.0| 1.0|
## | 1|-1.0| 1.0|
## | 0|-3.0| 0.0|
## +------+----+--------------+
Siguiente OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()
## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## | 0| 3.0| 0.0|(1,[0],[1.0])|
## | 1| 1.0| 1.0| (1,[],[])|
## | 1|-1.0| 1.0| (1,[],[])|
## | 0|-3.0| 0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+
VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["gender_vector", "bar", "foo"], outputCol="features")
encoded_df_with_indexed_bar = (vector_indexer
.fit(encoded_df)
.transform(encoded_df))
final_df = assembler.transform(encoded_df)
Si la bar
contiene variables categóricas, puede usar VectorIndexer
para configurar los metadatos requeridos:
from pyspark.ml.feature import VectorIndexer
vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")
Pero no es el caso aquí.
Finalmente puedes envolver todo eso usando tuberías:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)
Podría decirse que es un enfoque mucho más robusto y limpio que escribir todo desde cero. Hay algunas advertencias, especialmente cuando necesita una codificación consistente entre diferentes conjuntos de datos. Puede leer más en la documentación oficial de StringIndexer
y VectorIndexer
.
Otra forma de obtener una salida comparable es RFormula
which :
RFormula
produce una columna vectorial de características y una columna de cadena doble o cadena de etiqueta. Al igual que cuando se usan las fórmulas en R para la regresión lineal, las columnas de entrada de cadena se codificarán por una sola vez y las columnas numéricas se convertirán a dobles. Si la columna de la etiqueta es de tipo cadena, primero se transformará para duplicarse conStringIndexer
. Si la columna de etiqueta no existe en el marco de datos, la columna de etiqueta de salida se creará a partir de la variable de respuesta especificada en la fórmula.
from pyspark.ml.feature import RFormula
rf = RFormula(formula="~ gender + bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)
Como puede ver, es mucho más conciso, pero más difícil de componer no permite mucha personalización. Sin embargo, el resultado para una tubería simple como esta será idéntico:
final_df_rf.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
final_df.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
Respecto a tus preguntas:
Hacer un UDF con una funcionalidad similar que pueda usar en una consulta Spark SQL (o de alguna otra forma, supongo)
Es solo un UDF como cualquier otro. Asegúrate de usar tipos compatibles y, además, todo debería funcionar bien.
tomar el RDD resultante del mapa descrito anteriormente y agregarlo como una nueva columna al marco de datos user_data?
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField
schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)
Nota :
Para Spark 1.x, reemplace pyspark.ml.linalg
con pyspark.mllib.linalg
.
Tengo una clase de Python que estoy usando para cargar y procesar algunos datos en Spark. Entre las varias cosas que debo hacer, estoy generando una lista de variables ficticias derivadas de varias columnas en un marco de datos de Spark. Mi problema es que no estoy seguro de cómo definir correctamente una función definida por el usuario para lograr lo que necesito.
Actualmente tengo un método que, cuando se asigna sobre el RDD del marco de datos subyacente, resuelve la mitad del problema (recuerde que este es un método en una clase más grande de data_processor
):
def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {''gender'':[''1'',''2''], ..}
# there are actually two differnt dataframes that I need to do this for, this just specifies which I''m looking at, and grabs the relevant features from a config file
if table == ''users'':
iter_over = self.config.dyadic_features_to_include
elif table == ''activty'':
iter_over = self.config.user_features_to_include
def _build_feature_arr(row):
result = []
row = row.asDict()
for col in iter_over:
column_value = str(row[col]).lower()
cats = categories[col]
result += [1 if column_value and cat==column_value else 0 for cat in cats]
return result
return _build_feature_arr
Esencialmente, lo que esto hace es, para el marco de datos especificado, toma los valores de las variables categóricas para las columnas especificadas y devuelve una lista de los valores de estas nuevas variables ficticias. Eso significa el siguiente código:
data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr(''users''))
devuelve algo como:
In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 0],
[1, 0, 1, 0, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[0, 1, 1, 0, 0, 0],
[1, 0, 1, 1, 0, 0],
[1, 0, 0, 1, 0, 0],
[1, 0, 0, 0, 0, 1]]
Esto es exactamente lo que quiero en términos de generar la lista de variables ficticias que quiero, pero aquí está mi pregunta: ¿Cómo puedo (a) hacer un UDF con una funcionalidad similar que pueda usar en una consulta Spark SQL (o de alguna otra forma)? , Supongo), o (b) tomar el RDD resultante del mapa descrito anteriormente y agregarlo como una nueva columna al marco de datos de datos de usuario?
De cualquier manera, lo que necesito hacer es generar un nuevo marco de datos que contenga las columnas de user_data, junto con una nueva columna (llamémosle feature_array
) que contenga la salida de la función anterior (o algo funcionalmente equivalente).