unitarias pruebas las integracion ejemplo desventajas desarrollo scala unit-testing apache-spark junit apache-spark-sql

scala - las - pruebas unitarias laravel



¿Cómo escribir pruebas unitarias en Spark 2.0+? (5)

Desde Spark 1.6 puede usar SharedSQLContext que Spark usa para sus propias pruebas unitarias:

class YourAppTest extends SharedSQLContext { var app: YourApp = _ protected override def beforeAll(): Unit = { super.beforeAll() app = new YourApp } protected override def afterAll(): Unit = { super.afterAll() } test("Your test") { val df = sqlContext.read.json("examples/src/main/resources/people.json") app.run(df) }

Dado que Spark 2.3 SharedSparkSession está disponible:

class YourAppTest extends SharedSparkSession { var app: YourApp = _ protected override def beforeAll(): Unit = { super.beforeAll() app = new YourApp } protected override def afterAll(): Unit = { super.afterAll() } test("Your test") { df = spark.read.json("examples/src/main/resources/people.json") app.run(df) }

Actualizar:

Dependencia de Maven:

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql</artifactId> <version>SPARK_VERSION</version> <type>test-jar</type> <scope>test</scope> </dependency>

Dependencia SBT:

"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

He estado tratando de encontrar una forma razonable de probar SparkSession con el marco de prueba de JUnit. Si bien parece que hay buenos ejemplos para SparkContext , no pude encontrar la manera de obtener un ejemplo correspondiente que funcione para SparkSession , incluso si se usa en varios lugares internamente en spark-testing-base . Estaría encantado de probar una solución que no utilice la base de prueba de chispas también si no es realmente la manera correcta de ir aquí.

Caso de prueba simple ( proyecto MWE completo con build.sbt ):

import com.holdenkarau.spark.testing.DataFrameSuiteBase import org.junit.Test import org.scalatest.FunSuite import org.apache.spark.sql.SparkSession class SessionTest extends FunSuite with DataFrameSuiteBase { implicit val sparkImpl: SparkSession = spark @Test def simpleLookupTest { val homeDir = System.getProperty("user.home") val training = spark.read.format("libsvm") .load(s"$homeDir//Documents//GitHub//sample_linear_regression_data.txt") println("completed simple lookup test") } }

El resultado de ejecutar esto con JUnit es un NPE en la línea de carga:

java.lang.NullPointerException at SessionTest.simpleLookupTest(SessionTest.scala:16) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Tenga en cuenta que no debería importar que el archivo que se está cargando exista o no; en una SparkSession configurada correctamente, se lanzará un error más sensible .


Gracias por poner esta pregunta pendiente por ahí. Por alguna razón, cuando se trata de Spark, todos quedan tan atrapados en el análisis que olvidan las prácticas geniales de ingeniería de software que surgieron durante los últimos 15 años. Es por eso que nos esforzamos por analizar las pruebas y la integración continua (entre otras cosas, como DevOps) en nuestro curso.

Un rápido aparte en terminología

Una verdadera prueba unitaria significa que tiene un control completo sobre cada componente de la prueba. No puede haber interacción con bases de datos, llamadas REST, sistemas de archivos o incluso el reloj del sistema; todo tiene que ser "duplicado" (por ejemplo, burlado, troquelado, etc.) como Gerard Mezaros lo pone en xUnit Test Patterns . Sé que esto parece una semántica, pero realmente importa. No entender esto es una de las principales razones por las que se ven fallas de prueba intermitentes en la integración continua.

Todavía podemos probar la unidad

Por lo tanto, dado este entendimiento, la unidad de prueba de un RDD es imposible. Sin embargo, todavía hay un lugar para pruebas unitarias cuando se desarrollan análisis.

Considere una operación simple:

rdd.map(foo).map(bar)

Aquí foo y bar son funciones simples. Esos pueden ser probados en una unidad de la manera normal, y deben estar con tantos casos de esquina como puedas reunir. Después de todo, ¿por qué les importa de dónde obtienen sus entradas, ya sea que se trate de un accesorio de prueba o un RDD ?

No olvides la carcasa de chispa

Esto no está probando per se , pero en estas primeras etapas también deberías estar experimentando en el caparazón Spark para descubrir tus transformaciones y especialmente las consecuencias de tu enfoque. Por ejemplo, puede examinar planes de consulta físicos y lógicos, estrategia y preservación de particiones y el estado de sus datos con muchas funciones diferentes, como toDebugString , explain , glom , show , printSchema , etc. Te dejaré explorar esos.

También puede configurar su master en local[2] en el shell Spark y en sus pruebas para identificar cualquier problema que pueda surgir una vez que comience a distribuir el trabajo.

Pruebas de integración con Spark

Ahora por las cosas divertidas.

Para poder probar Spark una vez que se sienta seguro de la calidad de sus funciones auxiliares y de la lógica de transformación RDD / DataFrame , es fundamental hacer algunas cosas (independientemente de la herramienta de compilación y el marco de prueba):

  • Aumenta la memoria JVM.
  • Habilite el horquillado pero deshabilite la ejecución paralela.
  • Use su marco de prueba para acumular sus pruebas de integración de Spark en suites, e inicialice SparkContext antes de todas las pruebas y deténgalo después de todas las pruebas.

Con ScalaTest, puede mezclar BeforeAndAfterAll (que prefiero en general) o BeforeAndAfterEach como @ShankarKoirala para inicializar y destruir artefactos Spark. Sé que este es un lugar razonable para hacer una excepción, pero realmente no me gustan esas var variables que debes usar.

El patrón de préstamo

Otro enfoque es usar el patrón de préstamo .

Por ejemplo (usando ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup { "My analytics" should { "calculate the right thing" in withSparkContext { (sparkContext) => val data = Seq(...) val rdd = sparkContext.parallelize(data) val total = rdd.map(...).filter(...).map(...).reduce(_ + _) total shouldBe 1000 } } } trait SparkContextSetup { def withSparkContext(testMethod: (SparkContext) => Any) { val conf = new SparkConf() .setMaster("local") .setAppName("Spark test") val sparkContext = new SparkContext(conf) try { testMethod(sparkContext) } finally sparkContext.stop() } }

Como puede ver, el patrón de préstamo hace uso de funciones de orden superior para "prestar" el SparkContext a la prueba y luego deshacerse de él una vez que lo haya hecho.

Programación orientada al sufrimiento (gracias, Nathan)

Es totalmente una cuestión de preferencia, pero prefiero usar el Patrón de Préstamo y encargarme todo lo que pueda antes de incorporar otro marco. Además de tratar de mantener el peso ligero, los marcos a veces agregan mucha "magia" que hace que las fallas en las pruebas de depuración sean difíciles de razonar. Así que tomo un enfoque de Programación Orientada al Sufrimiento , donde evito agregar un nuevo marco hasta que el dolor de no tenerlo sea demasiado para soportar. Pero, nuevamente, esto depende de ti.

La mejor opción para ese marco alternativo es, por supuesto spark-testing-base chispas como mencionó @ShankarKoirala. En ese caso, la prueba anterior se vería así:

class MySpec extends WordSpec with Matchers with SharedSparkContext { "My analytics" should { "calculate the right thing" in { val data = Seq(...) val rdd = sc.parallelize(data) val total = rdd.map(...).filter(...).map(...).reduce(_ + _) total shouldBe 1000 } } }

Tenga en cuenta que no tuve que hacer nada para tratar con el SparkContext . SharedSparkContext me dio todo eso, con sc como SparkContext gratis. Personalmente, aunque no aportaría esta dependencia solo para este propósito, dado que el Patrón de Préstamo hace exactamente lo que necesito para eso. Además, con tanta imprevisibilidad que sucede con los sistemas distribuidos, puede ser un verdadero dolor tener que rastrear a través de la magia que ocurre en el código fuente de una biblioteca de terceros cuando las cosas van mal en la integración continua.

Ahora donde brilla realmente la base de pruebas de chispas es con los ayudantes basados ​​en Hadoop como HDFSClusterLike y YARNClusterLike . Mezclar esos rasgos en realidad puede ahorrarle mucho dolor de configuración. Otro lugar donde brilla es con las propiedades y generadores tipo Scalacheck , suponiendo, por supuesto, que comprenda cómo funcionan las pruebas basadas en propiedades y por qué es útil. Pero, una vez más, me abstendré personalmente de usarlo hasta que mis análisis y mis pruebas alcancen ese nivel de sofisticación.

"Solo un Sith trata en absolutos". - Obi-Wan Kenobi

Por supuesto, tampoco tiene que elegir uno u otro. Tal vez podría utilizar el enfoque de patrón de préstamo para la mayoría de sus pruebas y pruebas de chispa, solo para unas pocas pruebas más rigurosas. La elección no es binaria; puedes hacer ambas cosas

Pruebas de integración con Spark Streaming

Finalmente, me gustaría presentar un fragmento de lo que podría parecer una configuración de prueba de integración SparkStreaming con valores en memoria sin la base de prueba de chispas :

val sparkContext: SparkContext = ... val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3")) val rdd: RDD[(String, String)] = sparkContext.parallelize(data) val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]] val streamingContext = new StreamingContext(sparkContext, Seconds(1)) val dStream: InputDStream = streamingContext.queueStream(strings) strings += rdd

Esto es más simple de lo que parece. Realmente solo convierte una secuencia de datos en una cola para alimentar al DStream . La mayor parte es simplemente una configuración repetitiva que funciona con las API de Spark. De todos modos, puede comparar esto con StreamingSuiteBase como se encuentra en spark-testing-base para decidir cuál prefiere.

Esta podría ser mi publicación más larga, así que la dejo aquí. Espero que otros compartan otras ideas para ayudar a mejorar la calidad de nuestros análisis con las mismas prácticas ágiles de ingeniería de software que han mejorado el desarrollo de todas las demás aplicaciones.

Y con disculpas por el complemento descarado, puede consultar nuestro curso de Análisis con Apache Spark , donde abordamos muchas de estas ideas y más. Esperamos tener una versión en línea pronto.


Me gusta crear un rasgo SparkSessionTestWrapper que se pueda combinar para probar clases. El enfoque de Shankar funciona, pero es prohibitivamente lento para suites de prueba con múltiples archivos.

import org.apache.spark.sql.SparkSession trait SparkSessionTestWrapper { lazy val spark: SparkSession = { SparkSession.builder().master("local").appName("spark session").getOrCreate() } }

El rasgo se puede usar de la siguiente manera:

class DatasetSpec extends FunSpec with SparkSessionTestWrapper { import spark.implicits._ describe("#count") { it("returns a count of all the rows in a DataFrame") { val sourceDF = Seq( ("jets"), ("barcelona") ).toDF("team") assert(sourceDF.count === 2) } } }

Compruebe el proyecto spark-spec para ver un ejemplo de la vida real que utiliza el enfoque SparkSessionTestWrapper .


Podría resolver el problema con el código siguiente

dependencia de la chispa-colmena se agrega en pom del proyecto

class DataFrameTest extends FunSuite with DataFrameSuiteBase{ test("test dataframe"){ val sparkSession=spark import sparkSession.implicits._ var df=sparkSession.read.format("csv").load("path/to/csv") //rest of the operations. } }


Puede escribir una prueba simple con FunSuite y BeforeAnd After each como a continuación

class Tests extends FunSuite with BeforeAndAfterEach { var sparkSession : SparkSession = _ override def beforeEach() { sparkSession = SparkSession.builder().appName("udf testings") .master("local") .config("", "") .getOrCreate() } test("your test name here"){ //your unit test assert here like below assert("True".toLowerCase == "true") } override def afterEach() { sparkSession.stop() } }

No necesita crear una función en la prueba, simplemente puede escribir como

test ("test name") {//implementation and assert}

Holden Karau ha escrito una muy buena prueba de prueba de spark-testing-base

Debes echar un vistazo a continuación, es un ejemplo simple

class TestSharedSparkContext extends FunSuite with SharedSparkContext { val expectedResult = List(("a", 3),("b", 2),("c", 4)) test("Word counts should be equal to expected") { verifyWordCount(Seq("c a a b a c b c c")) } def verifyWordCount(seq: Seq[String]): Unit = { assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList) } }

¡Espero que esto ayude!