structtype spark read python apache-spark dataframe pyspark spark-dataframe apache-spark-sql

python - read - spark sql java



Spark Dataframe distingue columnas con nombre duplicado (9)

Puede que este no sea el mejor enfoque, pero si desea cambiar el nombre de las columnas duplicadas (después de unirlas), puede hacerlo utilizando esta pequeña función.

def rename_duplicate_columns(dataframe): columns = dataframe.columns duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2])) for index in duplicate_column_indices: columns[index] = columns[index]+''2'' dataframe = dataframe.toDF(*columns) return dataframe

Entonces, como sé en Spark Dataframe, que para varias columnas puede tener el mismo nombre que se muestra en la siguiente instantánea del marco de datos:

[ Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})), Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})), Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042})) ]

El resultado anterior se crea al unirse con un marco de datos para sí mismo, puede ver que hay 4 columnas con dos a y f .

El problema es que cuando trato de hacer más cálculos con la columna a, no puedo encontrar una manera de seleccionar a , tengo que intentar df[0] y df.select(''a'') , ambos me devolvieron el error de mesaage :

AnalysisException: Reference ''a'' is ambiguous, could be: a#1333L, a#1335L.

¿Hay alguna forma en la API de Spark que pueda distinguir las columnas de los nombres duplicados nuevamente? o tal vez alguna forma de dejarme cambiar los nombres de las columnas?


Así es como podemos unir dos Dataframes en los mismos nombres de columna en PySpark.

df = df1.join(df2, [''col1'',''col2'',''col3''])

Si printSchema() después de esto, puede ver que se han eliminado las columnas duplicadas.


Comencemos con algunos datos:

df1.join(df2, df1[''a''] == df2[''a'']).select(df1[''f'']).show(2) ## +--------------------+ ## | f| ## +--------------------+ ## |(5,[0,1,2,3,4],[0...| ## |(5,[0,1,2,3,4],[0...| ## +--------------------+

Hay algunas maneras de abordar este problema. En primer lugar, puede hacer referencia inequívocamente a las columnas de la tabla secundaria utilizando columnas principales:

from pyspark.sql.functions import col df1_a = df1.alias("df1_a") df2_a = df2.alias("df2_a") df1_a.join(df2_a, col(''df1_a.a'') == col(''df2_a.a'')).select(''df1_a.f'').show(2) ## +--------------------+ ## | f| ## +--------------------+ ## |(5,[0,1,2,3,4],[0...| ## |(5,[0,1,2,3,4],[0...| ## +--------------------+

También puede usar alias de tabla:

df1_r = df1.select(*(col(x).alias(x + ''_df1'') for x in df1.columns)) df2_r = df1.select(*(col(x).alias(x + ''_df2'') for x in df2.columns)) df1_r.join(df2_r, col(''a_df1'') == col(''a_df2'')).select(col(''f_df1'')).show(2) ## +--------------------+ ## | f_df1| ## +--------------------+ ## |(5,[0,1,2,3,4],[0...| ## |(5,[0,1,2,3,4],[0...| ## +--------------------+

Finalmente, puede cambiar el nombre de las columnas mediante programación:

df1.join(df2,[''a''])


Después de profundizar en la API de Spark, descubrí que primero puedo usar un alias para crear un alias para el marco de datos original, luego lo uso con withColumnRenamed para cambiar el nombre manualmente de cada columna en el alias, esto hará la join sin causar la duplicación del nombre de la columna.

Más detalles se pueden consultar a continuación Spark Dataframe API :

pyspark.sql.DataFrame.alias

pyspark.sql.DataFrame.withColumnRenamed

Sin embargo, creo que esta es solo una solución problemática y me pregunto si hay una mejor manera de responder mi pregunta.


Hay una manera más simple que escribir alias para todas las columnas a las que se une haciendo:

DataFrame:df1 +-------+-----+ | a | f | +-------+-----+ |107831 | ... | |107831 | ... | +-------+-----+ DataFrame:df2 +-------+-----+ | a | f | +-------+-----+ |107831 | ... | |107831 | ... | +-------+-----+

Esto funciona si la clave a la que se está uniendo es la misma en ambas tablas.

Ver https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html


Le recomendaría que cambie los nombres de columna para su join

df1.select(''a as "df1_a", ''f as "df1_f") .join(df2.select(''a as "df2_a", ''f as "df2_f"), ''df1_a === ''df2_a)

El DataFrame resultante tendrá un schema

(df1_a, df1_f, df2_a, df2_f)

Si necesita una solución de Python pura, puede usar selectExpr() lugar de select() que le permite usar el cambio de nombre de estilo SQL:

from pyspark.mllib.linalg import SparseVector from pyspark.sql import Row df1 = sqlContext.createDataFrame([ Row(a=107831, f=SparseVector( 5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), Row(a=125231, f=SparseVector( 5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})), ]) df2 = sqlContext.createDataFrame([ Row(a=107831, f=SparseVector( 5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), Row(a=107831, f=SparseVector( 5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), ])


Puede usar el método def drop(col: Column) para descartar la columna duplicada, por ejemplo:

val newDf = df1.join(df2,df1("a")===df2("a")) DataFrame:newDf +-------+-----+-------+-----+ | a | f | a | f | +-------+-----+-------+-----+ |107831 | ... |107831 | ... | |107831 | ... |107831 | ... | +-------+-----+-------+-----+

cuando me uno a df1 con df2, el DataFrame será el siguiente:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))

Ahora, podemos usar el método def drop(col: Column) para colocar la columna duplicada ''a'' o ''f'', de la siguiente manera:

df = df1.join(df2, [''col1'',''col2'',''col3''])


Si tiene un caso de uso más complicado que el descrito en la respuesta de Glennie Helles Sindholt, por ejemplo, tiene otros / pocos nombres de columnas que no son de unión que también son iguales y desea distinguirlos mientras selecciona es mejor usar alias, por ejemplo:

df3 = df1.select("a", "b").alias("left")/ .join(df2.select("a", "b").alias("right"), ["a"])/ .select("left.a", "left.b", "right.b") df3.columns [''a'', ''b'', ''b'']


Suponga que los DataFrames que desea unir son df1 y df2, y los está uniendo en la columna ''a'', entonces tiene 2 métodos

Método 1

df1.join (df2, ''a'', ''left_outer'')

Este es un método impresionante y es muy recomendable.

Método 2

df1.join (df2, df1.a == df2.a, ''left_outer''). drop (df2.a)