unit-testing - software - pruebas unitarias
Pruebas de tareas de tareas unitarias en AppEngine (4)
Durante mucho tiempo, he estado utilizando colas de tareas en App Engine para programar tareas, tal como se supone que debo hacer.
Pero lo que siempre me he preguntado es ¿cómo se escriben las pruebas para eso? Hasta ahora, simplemente hice pruebas para asegurarme de que no ocurra un error en la API que pone en cola una tarea y luego escribí las pruebas más adecuadas para la API que ejecuta la tarea.
Sin embargo, últimamente he empezado a sentirme un poco insatisfecho con esto y estoy buscando una forma de probar realmente que la tarea correcta se haya agregado a la cola correcta. Con suerte, esto se puede hacer mejor que simplemente desplegando el código y esperando lo mejor.
Estoy usando django-nonrel, si eso tiene alguna relación con la respuesta.
Para recapitular: ¿cómo se puede escribir una prueba unitaria para confirmar que las tareas se han puesto en cola?
Hay un pequeño marco de prueba llamado gaetestbed que puede adaptarse a su necesidad. Para más detalles, consulte: https://github.com/jgeewax/gaetestbed .
Este entorno de prueba funciona en conexión con la nariz, el plugin nose-gae y el paquete WebTest. La combinación de paquetes de python es la mejor manera de probar aplicaciones GAE en lo que a mí respecta.
El uso de GAE Test Bed le permitirá anular una cola de tareas.
Si hereda de FunctionalTestCase
o TaskQueueTestCase
, obtendrá métodos como get_tasks
y assertTasksInQueue
.
También puedes ejecutar las tareas. Cómo hacerlo difiere dependiendo de si usa tareas o diferido.
Para diferidos, tengo un código como este:
from google.appengine.ext import deferred
import base64
# gets the most recent task -- since the task queue is reset between tests,
# this is usually what you want
def get_task(self):
for task in self.get_task_queue_stub().GetTasks(''default''):
return task
# decode and execute the deferred method
def run_deferred(task):
deferred.run(base64.b64decode(task[''body'']))
La ejecución de tareas es similar, pero después de recuperar la tarea, utiliza WebTest (sobre el cual GAE Test Bed se basa) para enviar como solicitud POST a la URL de la tarea con sus parámetros.
Si está utilizando google.appengine.ext.testbed
lugar de GAE Testbed (GAE Testbed ahora está en desuso y se está moviendo a ext.testbed
), puede hacer lo siguiente:
import base64
import unittest2
from google.appengine.ext import deferred
from google.appengine.ext import testbed
class TestTasks(unittest2.TestCase):
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_taskqueue_stub()
self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)
def tearDown(self):
self.testbed.deactivate()
def test_send_contact_request(self):
# Make the request to your app that "defers" something:
response = ...
self.assertEqual(response.status_int, 200)
# Get the task out of the queue
tasks = self.taskqueue_stub.get_filtered_tasks()
self.assertEqual(1, len(tasks))
# Run the task
task = tasks[0]
deferred.run(task.payload)
# Assert that other things happened (ie, if the deferred was sending mail...)
self.assertEqual(...)