python - read - spark sql java
Spark Dataframe distingue columnas con nombre duplicado (9)
Puede que este no sea el mejor enfoque, pero si desea cambiar el nombre de las columnas duplicadas (después de unirlas), puede hacerlo utilizando esta pequeña función.
def rename_duplicate_columns(dataframe): columns = dataframe.columns duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2])) for index in duplicate_column_indices: columns[index] = columns[index]+''2'' dataframe = dataframe.toDF(*columns) return dataframe
Entonces, como sé en Spark Dataframe, que para varias columnas puede tener el mismo nombre que se muestra en la siguiente instantánea del marco de datos:
[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]
El resultado anterior se crea al unirse con un marco de datos para sí mismo, puede ver que hay
4
columnas con dos
a
y
f
.
El problema es que cuando trato de hacer más cálculos con la columna a, no puedo encontrar una manera de seleccionar
a
, tengo que intentar
df[0]
y
df.select(''a'')
, ambos me devolvieron el error de mesaage :
AnalysisException: Reference ''a'' is ambiguous, could be: a#1333L, a#1335L.
¿Hay alguna forma en la API de Spark que pueda distinguir las columnas de los nombres duplicados nuevamente? o tal vez alguna forma de dejarme cambiar los nombres de las columnas?
Así es como podemos unir dos Dataframes en los mismos nombres de columna en PySpark.
df = df1.join(df2, [''col1'',''col2'',''col3''])
Si
printSchema()
después de esto, puede ver que se han eliminado las columnas duplicadas.
Comencemos con algunos datos:
df1.join(df2, df1[''a''] == df2[''a'']).select(df1[''f'']).show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
Hay algunas maneras de abordar este problema. En primer lugar, puede hacer referencia inequívocamente a las columnas de la tabla secundaria utilizando columnas principales:
from pyspark.sql.functions import col
df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")
df1_a.join(df2_a, col(''df1_a.a'') == col(''df2_a.a'')).select(''df1_a.f'').show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
También puede usar alias de tabla:
df1_r = df1.select(*(col(x).alias(x + ''_df1'') for x in df1.columns))
df2_r = df1.select(*(col(x).alias(x + ''_df2'') for x in df2.columns))
df1_r.join(df2_r, col(''a_df1'') == col(''a_df2'')).select(col(''f_df1'')).show(2)
## +--------------------+
## | f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
Finalmente, puede cambiar el nombre de las columnas mediante programación:
df1.join(df2,[''a''])
Después de profundizar en la API de Spark, descubrí que primero puedo usar un
alias
para crear un alias para el marco de datos original, luego lo uso con
withColumnRenamed
para cambiar el nombre manualmente de cada columna en el alias, esto hará la
join
sin causar la duplicación del nombre de la columna.
Más detalles se pueden consultar a continuación Spark Dataframe API :
pyspark.sql.DataFrame.withColumnRenamed
Sin embargo, creo que esta es solo una solución problemática y me pregunto si hay una mejor manera de responder mi pregunta.
Hay una manera más simple que escribir alias para todas las columnas a las que se une haciendo:
DataFrame:df1
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
DataFrame:df2
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
Esto funciona si la clave a la que se está uniendo es la misma en ambas tablas.
Ver https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html
Le recomendaría que cambie los nombres de columna para su
join
df1.select(''a as "df1_a", ''f as "df1_f")
.join(df2.select(''a as "df2_a", ''f as "df2_f"), ''df1_a === ''df2_a)
El
DataFrame
resultante tendrá un
schema
(df1_a, df1_f, df2_a, df2_f)
Si necesita una solución de Python pura, puede usar
selectExpr()
lugar de
select()
que le permite usar el cambio de nombre de estilo SQL:
from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row
df1 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=125231, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])
df2 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])
Puede usar el método
def drop(col: Column)
para descartar la columna duplicada, por ejemplo:
val newDf = df1.join(df2,df1("a")===df2("a"))
DataFrame:newDf
+-------+-----+-------+-----+
| a | f | a | f |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+
cuando me uno a df1 con df2, el DataFrame será el siguiente:
val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
Ahora, podemos usar el método
def drop(col: Column)
para colocar la columna duplicada ''a'' o ''f'', de la siguiente manera:
df = df1.join(df2, [''col1'',''col2'',''col3''])
Si tiene un caso de uso más complicado que el descrito en la respuesta de Glennie Helles Sindholt, por ejemplo, tiene otros / pocos nombres de columnas que no son de unión que también son iguales y desea distinguirlos mientras selecciona es mejor usar alias, por ejemplo:
df3 = df1.select("a", "b").alias("left")/ .join(df2.select("a", "b").alias("right"), ["a"])/ .select("left.a", "left.b", "right.b") df3.columns [''a'', ''b'', ''b'']
Suponga que los DataFrames que desea unir son df1 y df2, y los está uniendo en la columna ''a'', entonces tiene 2 métodos
Método 1
df1.join (df2, ''a'', ''left_outer'')
Este es un método impresionante y es muy recomendable.
Método 2
df1.join (df2, df1.a == df2.a, ''left_outer''). drop (df2.a)