spark query duplicate dataframes data columns python apache-spark pyspark

python - query - spark join seq



Concatenar dos marcos de datos de PySpark (6)

Estoy tratando de concatenar dos marcos de datos de PySpark con algunas columnas que solo están en cada una de ellas:

from pyspark.sql.functions import randn, rand df_1 = sqlContext.range(0, 10) +--+ |id| +--+ | 0| | 1| | 2| | 3| | 4| | 5| | 6| | 7| | 8| | 9| +--+ df_2 = sqlContext.range(11, 20) +--+ |id| +--+ | 10| | 11| | 12| | 13| | 14| | 15| | 16| | 17| | 18| | 19| +--+ df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")) df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))

Y ahora quiero generar un tercer dataframe. Me gustaría algo como pandas concat :

df_1.show() +---+--------------------+--------------------+ | id| uniform| normal| +---+--------------------+--------------------+ | 0| 0.8122802274304282| 1.2423430583597714| | 1| 0.8642043127063618| 0.3900018344856156| | 2| 0.8292577771850476| 1.8077401259195247| | 3| 0.198558705368724| -0.4270585782850261| | 4|0.012661361966674889| 0.702634599720141| | 5| 0.8535692890157796|-0.42355804115129153| | 6| 0.3723296190171911| 1.3789648582622995| | 7| 0.9529794127670571| 0.16238718777444605| | 8| 0.9746632635918108| 0.02448061333761742| | 9| 0.513622008243935| 0.7626741803250845| +---+--------------------+--------------------+ df_2.show() +---+--------------------+--------------------+ | id| uniform| normal_2| +---+--------------------+--------------------+ | 11| 0.3221262660507942| 1.0269298899109824| | 12| 0.4030672316912547| 1.285648175568798| | 13| 0.9690555459609131|-0.22986601831364423| | 14|0.011913836266515876| -0.678915153834693| | 15| 0.9359607054250594|-0.16557488664743034| | 16| 0.45680471157575453| -0.3885563551710555| | 17| 0.6411908952297819| 0.9161177183227823| | 18| 0.5669232696934479| 0.7270125277020573| | 19| 0.513622008243935| 0.7626741803250845| +---+--------------------+--------------------+ #do some concatenation here, how? df_concat.show() | id| uniform| normal| normal_2 | +---+--------------------+--------------------+------------+ | 0| 0.8122802274304282| 1.2423430583597714| None | | 1| 0.8642043127063618| 0.3900018344856156| None | | 2| 0.8292577771850476| 1.8077401259195247| None | | 3| 0.198558705368724| -0.4270585782850261| None | | 4|0.012661361966674889| 0.702634599720141| None | | 5| 0.8535692890157796|-0.42355804115129153| None | | 6| 0.3723296190171911| 1.3789648582622995| None | | 7| 0.9529794127670571| 0.16238718777444605| None | | 8| 0.9746632635918108| 0.02448061333761742| None | | 9| 0.513622008243935| 0.7626741803250845| None | | 11| 0.3221262660507942| None | 0.123 | | 12| 0.4030672316912547| None |0.12323 | | 13| 0.9690555459609131| None |0.123 | | 14|0.011913836266515876| None |0.18923 | | 15| 0.9359607054250594| None |0.99123 | | 16| 0.45680471157575453| None |0.123 | | 17| 0.6411908952297819| None |1.123 | | 18| 0.5669232696934479| None |0.10023 | | 19| 0.513622008243935| None |0.916332123 | +---+--------------------+--------------------+------------+

¿Es eso posible?


Aquí hay una forma de hacerlo, en caso de que siga siendo útil: lo ejecuté en el shell pyspark, la versión 2.7.12 de Python y mi instalación de Spark era la versión 2.0.1.

PD: Supongo que querías usar diferentes semillas para el df_1 df_2 y el siguiente código refleja eso.

from pyspark.sql.types import FloatType from pyspark.sql.functions import randn, rand import pyspark.sql.functions as F df_1 = sqlContext.range(0, 10) df_2 = sqlContext.range(11, 20) df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")) df_2 = df_2.select("id", rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2")) def get_uniform(df1_uniform, df2_uniform): if df1_uniform: return df1_uniform if df2_uniform: return df2_uniform u_get_uniform = F.udf(get_uniform, FloatType()) df_3 = df_1.join(df_2, on = "id", how = ''outer'').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id"))

Aquí están las salidas que obtengo:

df_1.show() +---+-------------------+--------------------+ | id| uniform| normal| +---+-------------------+--------------------+ | 0|0.41371264720975787| 0.5888539012978773| | 1| 0.7311719281896606| 0.8645537008427937| | 2| 0.1982919638208397| 0.06157382353970104| | 3|0.12714181165849525| 0.3623040918178586| | 4| 0.7604318153406678|-0.49575204523675975| | 5|0.12030715258495939| 1.0854146699817222| | 6|0.12131363910425985| -0.5284523629183004| | 7|0.44292918521277047| -0.4798519469521663| | 8| 0.8898784253886249| -0.8820294772950535| | 9|0.03650707717266999| -2.1591956435415334| +---+-------------------+--------------------+ df_2.show() +---+-------------------+--------------------+ | id| uniform| normal_2| +---+-------------------+--------------------+ | 11| 0.1982919638208397| 0.06157382353970104| | 12|0.12714181165849525| 0.3623040918178586| | 13|0.12030715258495939| 1.0854146699817222| | 14|0.12131363910425985| -0.5284523629183004| | 15|0.44292918521277047| -0.4798519469521663| | 16| 0.8898784253886249| -0.8820294772950535| | 17| 0.2731073068483362|-0.15116027592854422| | 18| 0.7784518091224375| -0.3785563841011868| | 19|0.43776394586845413| 0.47700719174464357| +---+-------------------+--------------------+ df_3.show() +---+-----------+--------------------+--------------------+ | id| uniform| normal| normal_2| +---+-----------+--------------------+--------------------+ | 0| 0.41371265| 0.5888539012978773| null| | 1| 0.7311719| 0.8645537008427937| null| | 2| 0.19829196| 0.06157382353970104| null| | 3| 0.12714182| 0.3623040918178586| null| | 4| 0.7604318|-0.49575204523675975| null| | 5|0.120307155| 1.0854146699817222| null| | 6| 0.12131364| -0.5284523629183004| null| | 7| 0.44292918| -0.4798519469521663| null| | 8| 0.88987845| -0.8820294772950535| null| | 9|0.036507078| -2.1591956435415334| null| | 11| 0.19829196| null| 0.06157382353970104| | 12| 0.12714182| null| 0.3623040918178586| | 13|0.120307155| null| 1.0854146699817222| | 14| 0.12131364| null| -0.5284523629183004| | 15| 0.44292918| null| -0.4798519469521663| | 16| 0.88987845| null| -0.8820294772950535| | 17| 0.27310732| null|-0.15116027592854422| | 18| 0.7784518| null| -0.3785563841011868| | 19| 0.43776396| null| 0.47700719174464357| +---+-----------+--------------------+--------------------+


Esto debería hacerlo por ti ...

from pyspark.sql.types import FloatType from pyspark.sql.functions import randn, rand, lit, coalesce, col import pyspark.sql.functions as F df_1 = sqlContext.range(0, 6) df_2 = sqlContext.range(3, 10) df_1 = df_1.select("id", lit("old").alias("source")) df_2 = df_2.select("id") df_1.show() df_2.show() df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")/ .select(/ [coalesce(df_1.id, df_2.id).alias("id")] +/ [col("df_1." + c) for c in df_1.columns if c != "id"])/ .sort("id") df_3.show()


Las respuestas anteriores son muy elegantes. Hace mucho tiempo que escribí esta función en la que también luchaba por concatenar dos marcos de datos con columnas distintas.

Supongamos que tiene dataframe sdf1 y sdf2

from pyspark.sql import functions as F from pyspark.sql.types import * def unequal_union_sdf(sdf1, sdf2): s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema) s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema) for i,j in s_df2_schema.difference(s_df1_schema): sdf1 = sdf1.withColumn(i,F.lit(None).cast(j)) for i,j in s_df1_schema.difference(s_df2_schema): sdf2 = sdf2.withColumn(i,F.lit(None).cast(j)) common_schema_colnames = sdf1.columns sdk = / sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames)) return sdk sdf_concat = unequal_union_sdf(sdf1, sdf2)


Tal vez pueda intentar crear las columnas que no unionAll y llamar a union ( unionAll for Spark 1.6 o inferior):

cols = [''id'', ''uniform'', ''normal'', ''normal_2''] df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols) df_2_new = df_2.withColumn("normal", lit(None)).select(cols) result = df_1_new.union(df_2_new)


Tal vez, usted desea concatenar más de dos marcos de datos. Encontré un problema que utiliza la conversión de pandas Dataframe.

Supongamos que tiene 3 Dataframe de chispa que desean concatenar.

El código es el siguiente:

list_dfs = [] list_dfs_ = [] df = spark.read.json(''path_to_your_jsonfile.json'',multiLine = True) df2 = spark.read.json(''path_to_your_jsonfile2.json'',multiLine = True) df3 = spark.read.json(''path_to_your_jsonfile3.json'',multiLine = True) list_dfs.extend([df,df2,df3]) for df in list_dfs : df = df.select([column for column in df.columns]).toPandas() list_dfs_.append(df) list_dfs.clear() df_ = sqlContext.createDataFrame(pd.concat(list_dfs_))


df_concat = df_1.union(df_2)

Es posible que los marcos de datos deban tener columnas idénticas, en cuyo caso puede usar withColumn() para crear normal_1 y normal_2