sqlcontext spark read createdataframe create python unit-testing apache-spark pyspark

read - sqlcontext spark python



¿Cómo puedo probar los programas de PySpark? (5)

Mi enfoque actual de prueba de unidad de Java / Spark funciona (detallado here ) al crear una instancia de SparkContext usando pruebas de unidades "locales" y en ejecución usando JUnit.

El código debe estar organizado para hacer E / S en una función y luego llamar a otra con múltiples RDD.

Esto funciona genial Tengo una transformación de datos altamente probada escrita en Java + Spark.

¿Puedo hacer lo mismo con Python?

¿Cómo podría ejecutar las pruebas de unidad Spark con Python?


Aquí hay una solución con Pytest si está utilizando Spark 2.x y SparkSession . También estoy importando un paquete de terceros.

import logging import pytest from pyspark.sql import SparkSession def quiet_py4j(): """Suppress spark logging for the test context.""" logger = logging.getLogger(''py4j'') logger.setLevel(logging.WARN) @pytest.fixture(scope="session") def spark_session(request): """Fixture for creating a spark context.""" spark = (SparkSession .builder .master(''local[2]'') .config(''spark.jars.packages'', ''com.databricks:spark-avro_2.11:3.0.1'') .appName(''pytest-pyspark-local-testing'') .enableHiveSupport() .getOrCreate()) request.addfinalizer(lambda: spark.stop()) quiet_py4j() return spark def test_my_app(spark_session): ...

Tenga en cuenta que si utilizo Python 3, tuve que especificar eso como una variable de entorno PYSPARK_PYTHON:

import os import sys IS_PY2 = sys.version_info < (3,) if not IS_PY2: os.environ[''PYSPARK_PYTHON''] = ''python3''

De lo contrario, obtiene el error:

Excepción: Python en worker tiene una versión 2.7 diferente a la del controlador 3.5, PySpark no se puede ejecutar con diferentes versiones menores. Compruebe que las variables de entorno PYSPARK_PYTHON y PYSPARK_DRIVER_PYTHON estén configuradas correctamente.


Asumiendo que tiene pyspark instalado, puede usar la clase a continuación para unitTest it in unittest :

import unittest import pyspark class PySparkTestCase(unittest.TestCase): @classmethod def setUpClass(cls): conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing") cls.sc = pyspark.SparkContext(conf=conf) @classmethod def tearDownClass(cls): cls.sc.stop()

Ejemplo:

class SimpleTestCase(PySparkTestCase): def test_basic(self): test_input = [ '' hello spark '', '' hello again spark spark'' ] input_rdd = self.sc.parallelize(test_input, 1) from operator import add results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect() self.assertEqual(results, [(''hello'', 2), (''spark'', 3), (''again'', 1)])


Hace un tiempo también me enfrenté al mismo problema y después de leer varios artículos, foros y algunas respuestas de , terminé escribiendo un pequeño complemento para pytest: pytest-spark

Ya lo estoy usando durante algunos meses y el flujo de trabajo general se ve bien en Linux:

  1. Instale Apache Spark (configure JVM + descomprima la distribución de Spark en algún directorio)
  2. Instalar "pytest" + plugin "pytest-spark"
  3. Cree "pytest.ini" en su directorio de proyecto y especifique la ubicación de Spark allí.
  4. Ejecute sus pruebas por Pytest como de costumbre.
  5. Opcionalmente, puede utilizar el accesorio "spark_context" en las pruebas que proporciona el complemento: intenta minimizar los registros de Spark en la salida.

Utilizo pytest , que permite accesorios de prueba para que pueda instanciar un contexto de pyspark e inyectarlo en todas las pruebas que lo requieran. Algo a lo largo de las líneas de

@pytest.fixture(scope="session", params=[pytest.mark.spark_local(''local''), pytest.mark.spark_yarn(''yarn'')]) def spark_context(request): if request.param == ''local'': conf = (SparkConf() .setMaster("local[2]") .setAppName("pytest-pyspark-local-testing") ) elif request.param == ''yarn'': conf = (SparkConf() .setMaster("yarn-client") .setAppName("pytest-pyspark-yarn-testing") .set("spark.executor.memory", "1g") .set("spark.executor.instances", 2) ) request.addfinalizer(lambda: sc.stop()) sc = SparkContext(conf=conf) return sc def my_test_that_requires_sc(spark_context): assert spark_context.textFile(''/path/to/a/file'').count() == 10

Luego puede ejecutar las pruebas en modo local llamando a py.test -m spark_local o en YARN con py.test -m spark_yarn . Esto funcionó bastante bien para mí.


Yo recomendaría usar py.test también. py.test facilita la creación de accesorios de prueba SparkContext reutilizables y su uso para escribir funciones de prueba concisas. También puede especializar accesorios (para crear un StreamingContext por ejemplo) y usar uno o más de ellos en sus pruebas.

Escribí una publicación de blog en Medium sobre este tema:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

Aquí hay un fragmento de la publicación:

pytestmark = pytest.mark.usefixtures("spark_context") def test_do_word_counts(spark_context): """ test word couting Args: spark_context: test fixture SparkContext """ test_input = [ '' hello spark '', '' hello again spark spark'' ] input_rdd = spark_context.parallelize(test_input, 1) results = wordcount.do_word_counts(input_rdd) expected_results = {''hello'':2, ''spark'':3, ''again'':1} assert results == expected_results