rdds nuevo definicion data crear como apache-spark apache-spark-sql rdd

apache-spark - nuevo - data frame arcgis definicion



Spark especifica condiciones de columna múltiple para unión de marco de datos (7)

A partir de la versión 1.5.0 de Spark (que actualmente no se ha publicado), puede unirse a múltiples columnas de DataFrame. Consulte SPARK-7990: Agregar métodos para facilitar la combinación equitativa en varias claves de combinación .

Pitón

Leads.join( Utm_Master, ["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"], "left_outer" )

Scala

La pregunta solicitó una respuesta de Scala, pero no uso Scala. Aquí está mi mejor conjetura ...

Leads.join( Utm_Master, Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"), "left_outer" )

Cómo dar más condiciones de columna al unir dos marcos de datos. Por ejemplo, quiero ejecutar lo siguiente:

val Lead_all = Leads.join(Utm_Master, Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") == Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"), "left")

Quiero unirme solo cuando coincidan estas columnas. Pero la sintaxis anterior no es válida ya que cols solo toma una cadena. Entonces, ¿cómo obtengo lo que quiero?


En Pyspark puede simplemente especificar cada condición por separado:

val Lead_all = Leads.join(Utm_Master, (Leaddetails.LeadSource == Utm_Master.LeadSource) & (Leaddetails.Utm_Source == Utm_Master.Utm_Source) & (Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) & (Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))

Solo asegúrese de usar operadores y paréntesis correctamente.


Hay una columna Spark / API de expresión para este caso:

Leaddetails.join( Utm_Master, Leaddetails("LeadSource") <=> Utm_Master("LeadSource") && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source") && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium") && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"), "left" )

El operador <=> en el ejemplo significa " Prueba de igualdad que es segura para valores nulos ".

La diferencia principal con la prueba de igualdad simple ( === ) es que la primera es segura de usar en caso de que una de las columnas tenga valores nulos.


Las opciones === me dan columnas duplicadas. Así que uso Seq lugar.

val Lead_all = Leads.join(Utm_Master, Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")

Por supuesto, esto solo funciona cuando los nombres de las columnas que se unen son los mismos.


Spark SQL admite join en tupla de columnas cuando está entre paréntesis, como

... WHERE (list_of_columns1) = (list_of_columns2)

que es una forma más corta que especificar expresiones iguales (=) para cada par de columnas combinadas por un conjunto de "AND" s.

Por ejemplo:

SELECT a,b,c FROM tab1 t1 WHERE NOT EXISTS ( SELECT 1 FROM t1_except_t2_df e WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c) )

en lugar de

SELECT a,b,c FROM tab1 t1 WHERE NOT EXISTS ( SELECT 1 FROM t1_except_t2_df e WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c )

que es menos legible también, especialmente cuando la lista de columnas es grande y desea tratar NULLs fácilmente.


Una cosa que puedes hacer es usar SQL sin formato:

case class Bar(x1: Int, y1: Int, z1: Int, v1: String) case class Foo(x2: Int, y2: Int, z2: Int, v2: String) val bar = sqlContext.createDataFrame(sc.parallelize( Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") :: Bar(3, 1, 2, "bar") :: Nil)) val foo = sqlContext.createDataFrame(sc.parallelize( Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") :: Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil)) foo.registerTempTable("foo") bar.registerTempTable("bar") sqlContext.sql( "SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")


Scala:

Leaddetails.join( Utm_Master, Leaddetails("LeadSource") <=> Utm_Master("LeadSource") && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source") && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium") && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"), "left" )

Para que sea insensible a mayúsculas y minúsculas ,

import org.apache.spark.sql.functions.{lower, upper}

luego solo use lower(value) en la condición del método join.

Ejemplo: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))