python apache-spark apache-spark-sql pyspark

python - ¿Cómo podemos unir dos marcos de datos SQL Spark usando un criterio "LIKE" de SQL-esque?



apache-spark apache-spark-sql (1)

Estamos utilizando las bibliotecas PySpark que interactúan con Spark 1.3.1.

Tenemos dos marcos de datos, documents_df := {document_id, document_text} y keywords_df := {keyword} . Nos gustaría unirnos a los dos marcos de datos y devolver un marco de datos resultante con pares {document_id, keyword} , utilizando el criterio de que la palabra clave_df.keyword aparezca en la cadena document_df.document_text.

En PostgreSQL, por ejemplo, podríamos lograr esto usando una cláusula ON del formulario:

document_df.document_text ilike ''%'' || keyword_df.keyword || ''%''

Sin embargo, en PySpark, no puedo obtener ninguna forma de sintaxis de combinación para trabajar. ¿Alguien ha logrado algo como esto antes?

Atentamente,

Será


Es posible de dos maneras diferentes, pero en general no se recomienda. Primero creemos un dato ficticio:

from pyspark.sql import Row document_row = Row("document_id", "document_text") keyword_row = Row("keyword") documents_df = sc.parallelize([ document_row(1L, "apache spark is the best"), document_row(2L, "erlang rocks"), document_row(3L, "but haskell is better") ]).toDF() keywords_df = sc.parallelize([ keyword_row("erlang"), keyword_row("haskell"), keyword_row("spark") ]).toDF()

  1. UDF de colmena

    documents_df.registerTempTable("documents") keywords_df.registerTempTable("keywords") query = """SELECT document_id, keyword FROM documents JOIN keywords ON document_text LIKE CONCAT(''%'', keyword, ''%'')""" like_with_hive_udf = sqlContext.sql(query) like_with_hive_udf.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+

  2. Python UDF

    from pyspark.sql.functions import udf, col from pyspark.sql.types import BooleanType # Of you can replace `in` with a regular expression contains = udf(lambda s, q: q in s, BooleanType()) like_with_python_udf = (documents_df.join(keywords_df) .where(contains(col("document_text"), col("keyword"))) .select(col("document_id"), col("keyword"))) like_with_python_udf.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+

¿Por qué no recomendado? Porque en ambos casos requiere un producto cartesiano:

like_with_hive_udf.explain() ## TungstenProject [document_id#2L,keyword#4] ## Filter document_text#3 LIKE concat(%,keyword#4,%) ## CartesianProduct ## Scan PhysicalRDD[document_id#2L,document_text#3] ## Scan PhysicalRDD[keyword#4] like_with_python_udf.explain() ## TungstenProject [document_id#2L,keyword#4] ## Filter pythonUDF#13 ## !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3,keyword#4), ... ## CartesianProduct ## Scan PhysicalRDD[document_id#2L,document_text#3] ## Scan PhysicalRDD[keyword#4]

Hay otras formas de lograr un efecto similar sin un cartesiano completo.

  1. Únase en un documento tokenizado: útil si la lista de palabras clave es demasiado grande para ser manejada en la memoria de una sola máquina

    from pyspark.ml.feature import Tokenizer from pyspark.sql.functions import explode tokenizer = Tokenizer(inputCol="document_text", outputCol="words") tokenized = (tokenizer.transform(documents_df) .select(col("document_id"), explode(col("words")).alias("token"))) like_with_tokenizer = (tokenized .join(keywords_df, col("token") == col("keyword")) .drop("token")) like_with_tokenizer.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 3|haskell| ## | 1| spark| ## | 2| erlang| ## +-----------+-------+

    Esto requiere barajar pero no cartesiano:

    like_with_tokenizer.explain() ## TungstenProject [document_id#2L,keyword#4] ## SortMergeJoin [token#29], [keyword#4] ## TungstenSort [token#29 ASC], false, 0 ## TungstenExchange hashpartitioning(token#29) ## TungstenProject [document_id#2L,token#29] ## !Generate explode(words#27), true, false, [document_id#2L, ... ## ConvertToSafe ## TungstenProject [document_id#2L,UDF(document_text#3) AS words#27] ## Scan PhysicalRDD[document_id#2L,document_text#3] ## TungstenSort [keyword#4 ASC], false, 0 ## TungstenExchange hashpartitioning(keyword#4) ## ConvertToUnsafe ## Scan PhysicalRDD[keyword#4]

  2. Python UDF y variable de difusión: si la lista de palabras clave es relativamente pequeña

    from pyspark.sql.types import ArrayType, StringType keywords = sc.broadcast(set( keywords_df.map(lambda row: row[0]).collect())) bd_contains = udf( lambda s: list(set(s.split()) & keywords.value), ArrayType(StringType())) like_with_bd = (documents_df.select( col("document_id"), explode(bd_contains(col("document_text"))).alias("keyword"))) like_with_bd.show() ## +-----------+-------+ ## |document_id|keyword| ## +-----------+-------+ ## | 1| spark| ## | 2| erlang| ## | 3|haskell| ## +-----------+-------+

    No requiere barajar ni cartesiano, pero aún debe transferir la variable de difusión a cada nodo de trabajo.

    like_with_bd.explain() ## TungstenProject [document_id#2L,keyword#46] ## !Generate explode(pythonUDF#47), true, false, ... ## ConvertToSafe ## TungstenProject [document_id#2L,pythonUDF#47] ## !BatchPythonEvaluation PythonUDF#<lambda>(document_text#3), ... ## Scan PhysicalRDD[document_id#2L,document_text#3]

  3. Desde Spark 1.6.0 puede marcar un pequeño marco de datos usando sql.functions.broadcast para obtener un efecto similar al anterior sin usar UDF y variables de transmisión explícitas. Reutilizando datos tokenizados:

    from pyspark.sql.functions import broadcast like_with_tokenizer_and_bd = (broadcast(tokenized) .join(keywords_df, col("token") == col("keyword")) .drop("token")) like_with_tokenizer.explain() ## TungstenProject [document_id#3L,keyword#5] ## BroadcastHashJoin [token#10], [keyword#5], BuildLeft ## TungstenProject [document_id#3L,token#10] ## !Generate explode(words#8), true, false, ... ## ConvertToSafe ## TungstenProject [document_id#3L,UDF(document_text#4) AS words#8] ## Scan PhysicalRDD[document_id#3L,document_text#4] ## ConvertToUnsafe ## Scan PhysicalRDD[keyword#5]

Relacionado :

  • Para una coincidencia aproximada, consulte Coincidencia de cadenas eficiente en Apache Spark .