tutorial spark sources receive example español ejemplo data aws apache-spark dataframe pyspark spark-dataframe apache-spark-sql pyspark-sql

sources - Cómo hacer buenos ejemplos reproducibles de Apache Spark



spark streaming example (4)

He pasado una buena cantidad de tiempo leyendo algunas preguntas con las pyspark y spark-dataframe y muy a menudo encuentro que los carteles no proporcionan suficiente información para comprender realmente su pregunta. Normalmente comento pidiéndoles que publiquen un MCVE pero a veces hacer que muestren algunos datos de entrada / salida de muestra es como tirar de los dientes. Por ejemplo: vea los comentarios sobre esta pregunta .

Quizás parte del problema es que las personas simplemente no saben cómo crear fácilmente un MCVE para marcos de datos de chispa. Creo que sería útil tener una versión de marco de datos de chispa de esta pregunta de pandas como una guía que pueda vincularse.

Entonces, ¿cómo se puede crear un buen ejemplo reproducible?


La optimización del rendimiento

Si la pregunta está relacionada con el ajuste del rendimiento, incluya la siguiente información.

Plan de ejecución

Es mejor incluir un plan de ejecución extendido . En Python:

df.explain(True)

En Scala:

df.explain(true)

o plan de ejecución extendido con estadísticas . En Python:

print(df._jdf.queryExecution().stringWithStats())

en Scala:

df.queryExecution.stringWithStats

Información de modo y clúster

  • mode - local , client , `cluster.
  • Administrador de clústeres (si corresponde): ninguno (modo local), independiente, YARN, Mesos, Kubernetes.
  • Información básica de configuración (número de núcleos, memoria del ejecutor).

Información de tiempo

lento es relativo, especialmente cuando transfiere aplicaciones no distribuidas o espera baja latencia. Los tiempos exactos para diferentes tareas y etapas se pueden recuperar de los jobs de Spark UI ( sc.uiWebUrl ) o Spark REST UI.

Use nombres estandarizados para contextos

El uso de nombres establecidos para cada contexto nos permite reproducir rápidamente el problema.

  • sc : para SparkContext .
  • sqlContext : para SQLContext .
  • spark - para SparkSession .

Proporcionar información de tipo ( Scala )

La potente inferencia de tipos es una de las características más útiles de Scala, pero dificulta el análisis del código fuera de contexto. Incluso si el tipo es obvio por el contexto, es mejor anotar las variables. Preferir

val lines: RDD[String] = sc.textFile("path") val words: RDD[String] = lines.flatMap(_.split(" "))

terminado

val lines = sc.textFile("path") val words = lines.flatMap(_.split(" "))

Las herramientas de uso común pueden ayudarlo a:

  • spark-shell / concha Scala

    uso :t

    scala> val rdd = sc.textFile("README.md") rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> :t rdd org.apache.spark.rdd.RDD[String]

  • Idea Inteligente

    Use Alt + =


Proporcione datos de muestra pequeños, que puedan recrearse fácilmente.

Como mínimo, los carteles deben proporcionar un par de filas y columnas en su marco de datos y código que se puedan usar para crearlo fácilmente. Por fácil, me refiero a cortar y pegar. Hazlo lo más pequeño posible para demostrar tu problema.

Tengo el siguiente marco de datos:

+-----+---+-----+----------+ |index| X|label| date| +-----+---+-----+----------+ | 1| 1| A|2017-01-01| | 2| 3| B|2017-01-02| | 3| 5| A|2017-01-03| | 4| 7| B|2017-01-04| +-----+---+-----+----------+

que se puede crear con este código:

df = sqlCtx.createDataFrame( [ (1, 1, ''A'', ''2017-01-01''), (2, 3, ''B'', ''2017-01-02''), (3, 5, ''A'', ''2017-01-03''), (4, 7, ''B'', ''2017-01-04'') ], (''index'', ''X'', ''label'', ''date'') )

Mostrar la salida deseada.

Haga su pregunta específica y muéstrenos el resultado deseado.

¿Cómo puedo crear una nueva columna ''is_divisible'' que tenga el valor ''yes'' si el día del mes de la ''date'' más 7 días es divisible por el valor en la columna ''X'' , y ''no'' contrario?

Salida deseada:

+-----+---+-----+----------+------------+ |index| X|label| date|is_divisible| +-----+---+-----+----------+------------+ | 1| 1| A|2017-01-01| yes| | 2| 3| B|2017-01-02| yes| | 3| 5| A|2017-01-03| yes| | 4| 7| B|2017-01-04| no| +-----+---+-----+----------+------------+

Explica cómo obtener tu salida.

Explique, con gran detalle, cómo obtiene el resultado deseado. Ayuda a mostrar un ejemplo de cálculo.

Por ejemplo, en la fila 1, la X = 1 y la fecha = 2017-01-01. Agregar 7 días hasta la fecha produce 2017-01-08. El día del mes es 8 y dado que 8 es divisible por 1, la respuesta es ''sí''.

Del mismo modo, para la última fila X = 7 y la fecha = 2017-01-04. Agregar 7 a la fecha produce 11 como el día del mes. Como 11% 7 no es 0, la respuesta es ''no''.

Comparte tu código existente.

Muéstrenos lo que ha hecho o probado, incluido todo * del código, incluso si no funciona. Díganos dónde se está atascando y si recibe un error, incluya el mensaje de error.

(* Puede omitir el código para crear el contexto de chispa, pero debe incluir todas las importaciones).

Sé cómo agregar una nueva columna que es la date más 7 días, pero tengo problemas para obtener el día del mes como un número entero.

from pyspark.sql import functions as f df.withColumn("next_week", f.date_add("date", 7))

Incluya versiones, importaciones y use resaltado de sintaxis

  • Detalles completos en esta respuesta escrita por desertnaut .

Para publicaciones de ajuste de rendimiento, incluya el plan de ejecución

  • Detalles completos en esta respuesta escrita por user8371915 .
  • Ayuda a usar nombres estandarizados para contextos.

Análisis de archivos de salida de chispa

  • MaxU proporcionó un código útil en esta respuesta para ayudar a analizar los archivos de salida de Spark en un DataFrame.

Otras notas.

  • Asegúrese de leer primero cómo preguntar y MCVE .
  • Lea las otras respuestas a esta pregunta, que están vinculadas anteriormente.
  • Tener un buen título descriptivo.
  • Ser cortés. Las personas en SO son voluntarias, así que pregunte amablemente.

Buena pregunta y respuesta; algunas sugerencias adicionales:

Incluye tu versión de Spark

Spark todavía está evolucionando, aunque no tan rápido como en los días de 1.x. Siempre es (pero especialmente si está utilizando una versión algo anterior) una buena idea para incluir su versión de trabajo. Personalmente, siempre comienzo mis respuestas con:

spark.version # u''2.2.0''

o

sc.version # u''2.2.0''

Incluir tu versión de Python tampoco es una mala idea.

Incluye todas tus importaciones

Si su pregunta no es estrictamente sobre Spark SQL y los marcos de datos, por ejemplo, si tiene la intención de usar su marco de datos en alguna operación de aprendizaje automático, sea explícito acerca de sus importaciones; vea esta pregunta , donde las importaciones se agregaron en el OP solo después de un intercambio extenso en el (ahora eliminado) comentarios (y resultó que estas importaciones incorrectas fueron la causa principal del problema).

¿Por qué es esto necesario? Porque, por ejemplo, este LDA

from pyspark.mllib.clustering import LDA

es diferente de este LDA:

from pyspark.ml.clustering import LDA

el primero proviene de la antigua API basada en RDD (anteriormente Spark MLlib), mientras que la segunda proviene de la nueva API basada en marcos de datos (Spark ML).

Incluir resaltado de código

OK, confesaré que esto es subjetivo: creo que las preguntas de PySpark no deben etiquetarse como python de manera predeterminada ; El problema es que la etiqueta de python proporciona resaltado de código automáticamente (y creo que esta es una razón principal para aquellos que la usan para preguntas de PySpark). De todos modos, si está de acuerdo, y aún desea un código bonito y resaltado, simplemente incluya la directiva de rebajas correspondiente:

<!-- language-all: lang-python -->

en algún lugar de tu publicación, antes de tu primer fragmento de código.

[ACTUALIZACIÓN: meta..com/questions/362624/… el resaltado automático de sintaxis para las etiquetas pyspark y sparkr - los votos positivos son bienvenidos]


Esta pequeña función auxiliar podría ayudar a analizar los archivos de salida de Spark en DataFrame:

PySpark:

from pyspark.sql.functions import * def read_spark_output(file_path): step1 = spark.read / .option("header","true") / .option("inferSchema","true") / .option("delimiter","|") / .option("parserLib","UNIVOCITY") / .option("ignoreLeadingWhiteSpace","true") / .option("ignoreTrailingWhiteSpace","true") / .option("comment","+") / .csv("file://{}".format(file_path)) # select not-null columns step2 = t.select([c for c in t.columns if not c.startswith("_")]) # deal with ''null'' string in column return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table: def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = { val step1 = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", "|") .option("parserLib", "UNIVOCITY") .option("ignoreLeadingWhiteSpace", "true") .option("ignoreTrailingWhiteSpace", "true") .option("comment", "+") .csv(filePath) val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*) val columns = step2.columns columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c)))) }

Uso:

df = read_spark_output("file:///tmp/spark.out")

PD: para pyspark , eqNullSafe está disponible en spark 2.3 .