read - sqlcontext spark python
¿Cómo puedo probar los programas de PySpark? (5)
Mi enfoque actual de prueba de unidad de Java / Spark funciona (detallado here ) al crear una instancia de SparkContext usando pruebas de unidades "locales" y en ejecución usando JUnit.
El código debe estar organizado para hacer E / S en una función y luego llamar a otra con múltiples RDD.
Esto funciona genial Tengo una transformación de datos altamente probada escrita en Java + Spark.
¿Puedo hacer lo mismo con Python?
¿Cómo podría ejecutar las pruebas de unidad Spark con Python?
Aquí hay una solución con Pytest si está utilizando Spark 2.x y SparkSession
. También estoy importando un paquete de terceros.
import logging
import pytest
from pyspark.sql import SparkSession
def quiet_py4j():
"""Suppress spark logging for the test context."""
logger = logging.getLogger(''py4j'')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_session(request):
"""Fixture for creating a spark context."""
spark = (SparkSession
.builder
.master(''local[2]'')
.config(''spark.jars.packages'', ''com.databricks:spark-avro_2.11:3.0.1'')
.appName(''pytest-pyspark-local-testing'')
.enableHiveSupport()
.getOrCreate())
request.addfinalizer(lambda: spark.stop())
quiet_py4j()
return spark
def test_my_app(spark_session):
...
Tenga en cuenta que si utilizo Python 3, tuve que especificar eso como una variable de entorno PYSPARK_PYTHON:
import os
import sys
IS_PY2 = sys.version_info < (3,)
if not IS_PY2:
os.environ[''PYSPARK_PYTHON''] = ''python3''
De lo contrario, obtiene el error:
Excepción: Python en worker tiene una versión 2.7 diferente a la del controlador 3.5, PySpark no se puede ejecutar con diferentes versiones menores. Compruebe que las variables de entorno PYSPARK_PYTHON y PYSPARK_DRIVER_PYTHON estén configuradas correctamente.
Asumiendo que tiene pyspark
instalado, puede usar la clase a continuación para unitTest it in unittest
:
import unittest
import pyspark
class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
cls.sc = pyspark.SparkContext(conf=conf)
@classmethod
def tearDownClass(cls):
cls.sc.stop()
Ejemplo:
class SimpleTestCase(PySparkTestCase):
def test_basic(self):
test_input = [
'' hello spark '',
'' hello again spark spark''
]
input_rdd = self.sc.parallelize(test_input, 1)
from operator import add
results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
self.assertEqual(results, [(''hello'', 2), (''spark'', 3), (''again'', 1)])
Hace un tiempo también me enfrenté al mismo problema y después de leer varios artículos, foros y algunas respuestas de , terminé escribiendo un pequeño complemento para pytest: pytest-spark
Ya lo estoy usando durante algunos meses y el flujo de trabajo general se ve bien en Linux:
- Instale Apache Spark (configure JVM + descomprima la distribución de Spark en algún directorio)
- Instalar "pytest" + plugin "pytest-spark"
- Cree "pytest.ini" en su directorio de proyecto y especifique la ubicación de Spark allí.
- Ejecute sus pruebas por Pytest como de costumbre.
- Opcionalmente, puede utilizar el accesorio "spark_context" en las pruebas que proporciona el complemento: intenta minimizar los registros de Spark en la salida.
Utilizo pytest
, que permite accesorios de prueba para que pueda instanciar un contexto de pyspark e inyectarlo en todas las pruebas que lo requieran. Algo a lo largo de las líneas de
@pytest.fixture(scope="session",
params=[pytest.mark.spark_local(''local''),
pytest.mark.spark_yarn(''yarn'')])
def spark_context(request):
if request.param == ''local'':
conf = (SparkConf()
.setMaster("local[2]")
.setAppName("pytest-pyspark-local-testing")
)
elif request.param == ''yarn'':
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("pytest-pyspark-yarn-testing")
.set("spark.executor.memory", "1g")
.set("spark.executor.instances", 2)
)
request.addfinalizer(lambda: sc.stop())
sc = SparkContext(conf=conf)
return sc
def my_test_that_requires_sc(spark_context):
assert spark_context.textFile(''/path/to/a/file'').count() == 10
Luego puede ejecutar las pruebas en modo local llamando a py.test -m spark_local
o en YARN con py.test -m spark_yarn
. Esto funcionó bastante bien para mí.
Yo recomendaría usar py.test también. py.test facilita la creación de accesorios de prueba SparkContext reutilizables y su uso para escribir funciones de prueba concisas. También puede especializar accesorios (para crear un StreamingContext por ejemplo) y usar uno o más de ellos en sus pruebas.
Escribí una publicación de blog en Medium sobre este tema:
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b
Aquí hay un fragmento de la publicación:
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
""" test word couting
Args:
spark_context: test fixture SparkContext
"""
test_input = [
'' hello spark '',
'' hello again spark spark''
]
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)
expected_results = {''hello'':2, ''spark'':3, ''again'':1}
assert results == expected_results