python - ¿Por qué los adbapi de Twisted no recuperan los datos de los unitesthests?
unit-testing sqlite3 (2)
De acuerdo, resulta que esto es un poco complicado. Ejecutar las pruebas de forma aislada (tal como se publicó en esta pregunta) hace que el error solo ocurra de manera rara. Sin embargo, cuando se ejecuta en el contexto de un conjunto de pruebas completo, falla casi el 100% del tiempo.
yield task.deferLater(reactor, .00001, lambda: None)
después de escribir en el db y antes de leer desde el db, y esto resuelve el problema.
A partir de ahí, sospeché que podría tratarse de una condición de carrera derivada del grupo de conexiones y de la tolerancia a concurrencia limitada de sqlite. Intenté establecer los parámetros cb_min
y cb_max
en ConnectionPool
en 1
, y esto también resolvió el problema.
En resumen: parece que sqlite no funciona muy bien con conexiones múltiples, y que la solución adecuada es evitar la concurrencia en la medida de lo posible.
Visión de conjunto
Contexto
Estoy escribiendo pruebas unitarias para alguna lógica de orden superior que depende de escribir en una base de datos SQLite3. Para esto estoy usando twisted.trial.unittest
y twisted.enterprise.adbapi.ConnectionPool
.
Planteamiento del problema
Puedo crear una base de datos sqlite3 persistente y almacenar datos allí. Al usar sqlitebrowser , puedo verificar que los datos se hayan conservado como se esperaba.
El problema es que las llamadas a teaConnectionPool.run*
(por ejemplo: runQuery
) devuelven un conjunto vacío de resultados, pero solo cuando se llaman desde un TestCase
.
Notas y detalles significativos
El problema que estoy experimentando ocurre solo dentro del marco de trial
de Twisted. Mi primer intento de depuración fue sacar el código de la base de datos de la prueba unitaria y colocarlo en una secuencia de prueba / depuración independiente. Dicha secuencia de comandos funciona como se esperaba, mientras que el código de prueba de unidad no lo hace (ver ejemplos a continuación).
Caso 1: prueba de unidad que se porta mal
init.sql
Esta es la secuencia de comandos utilizada para inicializar la base de datos. No hay errores (aparentes) derivados de este archivo.
CREATE TABLE ajxp_changes ( seq INTEGER PRIMARY KEY AUTOINCREMENT, node_id NUMERIC, type TEXT, source TEXT, target TEXT, deleted_md5 TEXT );
CREATE TABLE ajxp_index ( node_id INTEGER PRIMARY KEY AUTOINCREMENT, node_path TEXT, bytesize NUMERIC, md5 TEXT, mtime NUMERIC, stat_result BLOB);
CREATE TABLE ajxp_last_buffer ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT, location TEXT, source TEXT, target TEXT );
CREATE TABLE ajxp_node_status ("node_id" INTEGER PRIMARY KEY NOT NULL , "status" TEXT NOT NULL DEFAULT ''NEW'', "detail" TEXT);
CREATE TABLE events (id INTEGER PRIMARY KEY AUTOINCREMENT, type text, message text, source text, target text, action text, status text, date text);
CREATE TRIGGER LOG_DELETE AFTER DELETE ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type,deleted_md5) VALUES (old.node_id, old.node_path, "NULL", "delete", old.md5); END;
CREATE TRIGGER LOG_INSERT AFTER INSERT ON ajxp_index BEGIN INSERT INTO ajxp_changes (node_id,source,target,type) VALUES (new.node_id, "NULL", new.node_path, "create"); END;
CREATE TRIGGER LOG_UPDATE_CONTENT AFTER UPDATE ON "ajxp_index" FOR EACH ROW BEGIN INSERT INTO "ajxp_changes" (node_id,source,target,type) VALUES (new.node_id, old.node_path, new.node_path, CASE WHEN old.node_path = new.node_path THEN "content" ELSE "path" END);END;
CREATE TRIGGER STATUS_DELETE AFTER DELETE ON "ajxp_index" BEGIN DELETE FROM ajxp_node_status WHERE node_id=old.node_id; END;
CREATE TRIGGER STATUS_INSERT AFTER INSERT ON "ajxp_index" BEGIN INSERT INTO ajxp_node_status (node_id) VALUES (new.node_id); END;
CREATE INDEX changes_node_id ON ajxp_changes( node_id );
CREATE INDEX changes_type ON ajxp_changes( type );
CREATE INDEX changes_node_source ON ajxp_changes( source );
CREATE INDEX index_node_id ON ajxp_index( node_id );
CREATE INDEX index_node_path ON ajxp_index( node_path );
CREATE INDEX index_bytesize ON ajxp_index( bytesize );
CREATE INDEX index_md5 ON ajxp_index( md5 );
CREATE INDEX node_status_status ON ajxp_node_status( status );
test_sqlite.py
Esta es la clase de prueba de unidad que falla inesperadamente. TestStateManagement.test_db_clean
passes, indicó que las tablas se crearon correctamente. TestStateManagement.test_inode_create
falla, informando que se recuperaron cero resultados.
import os.path as osp
from twisted.internet import defer
from twisted.enterprise import adbapi
import sqlengine # see below
class TestStateManagement(TestCase):
def setUp(self):
self.meta = mkdtemp()
self.db = adbapi.ConnectionPool(
"sqlite3", osp.join(self.meta, "db.sqlite"), check_same_thread=False,
)
self.stateman = sqlengine.StateManager(self.db)
with open("init.sql") as f:
script = f.read()
self.d = self.db.runInteraction(lambda c, s: c.executescript(s), script)
def tearDown(self):
self.db.close()
del self.db
del self.stateman
del self.d
rmtree(self.meta)
@defer.inlineCallbacks
def test_db_clean(self):
"""Canary test to ensure that the db is initialized in a blank state"""
yield self.d # wait for db to be initialized
q = "SELECT name FROM sqlite_master WHERE type=''table'' AND name=?;"
for table in ("ajxp_index", "ajxp_changes"):
res = yield self.db.runQuery(q, (table,))
self.assertTrue(
len(res) == 1,
"table {0} does not exist".format(table)
)
@defer.inlineCallbacks
def test_inode_create_file(self):
yield self.d
path = osp.join(self.ws, "test.txt")
with open(path, "wt") as f:
pass
inode = mk_dummy_inode(path)
yield self.stateman.create(inode, directory=False)
entry = yield self.db.runQuery("SELECT * FROM ajxp_index")
emsg = "got {0} results, expected 1. Are canary tests failing?"
lentry = len(entry)
self.assertTrue(lentry == 1, emsg.format(lentry))
sqlengine.py
Estos son los artefactos que se prueban mediante las pruebas unitarias anteriores.
def values_as_tuple(d, *param):
"""Return the values for each key in `param` as a tuple"""
return tuple(map(d.get, param))
class StateManager:
"""Manages the SQLite database''s state, ensuring that it reflects the state
of the filesystem.
"""
log = Logger()
def __init__(self, db):
self._db = db
def create(self, inode, directory=False):
params = values_as_tuple(
inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
)
directive = (
"INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
"VALUES (?,?,?,?,?);"
)
return self._db.runOperation(directive, params)
Caso 2: error desaparece fuera de twisted.trial
#! /usr/bin/env python
import os.path as osp
from tempfile import mkdtemp
from twisted.enterprise import adbapi
from twisted.internet.task import react
from twisted.internet.defer import inlineCallbacks
INIT_FILE = "example.sql"
def values_as_tuple(d, *param):
"""Return the values for each key in `param` as a tuple"""
return tuple(map(d.get, param))
def create(db, inode):
params = values_as_tuple(
inode, "node_path", "bytesize", "md5", "mtime", "stat_result"
)
directive = (
"INSERT INTO ajxp_index (node_path,bytesize,md5,mtime,stat_result) "
"VALUES (?,?,?,?,?);"
)
return db.runOperation(directive, params)
def init_database(db):
with open(INIT_FILE) as f:
script = f.read()
return db.runInteraction(lambda c, s: c.executescript(s), script)
@react
@inlineCallbacks
def main(reactor):
meta = mkdtemp()
db = adbapi.ConnectionPool(
"sqlite3", osp.join(meta, "db.sqlite"), check_same_thread=False,
)
yield init_database(db)
# Let''s make sure the tables were created as expected and that we''re
# starting from a blank slate
res = yield db.runQuery("SELECT * FROM ajxp_index LIMIT 1")
assert not res, "database is not empty [ajxp_index]"
res = yield db.runQuery("SELECT * FROM ajxp_changes LIMIT 1")
assert not res, "database is not empty [ajxp_changes]"
# The details of this are not important. Suffice to say they (should)
# conform to the DB schema for ajxp_index.
test_data = {
"node_path": "/this/is/some/arbitrary/path.ext",
"bytesize": 0,
"mtime": 179273.0,
"stat_result": b"this simulates a blob of raw binary data",
"md5": "d41d8cd98f00b204e9800998ecf8427e", # arbitrary
}
# store the test data in the ajxp_index table
yield create(db, test_data)
# test if the entry exists in the db
entry = yield db.runQuery("SELECT * FROM ajxp_index")
assert len(entry) == 1, "got {0} results, expected 1".format(len(entry))
print("OK")
Observaciones finales
De nuevo, al verificar con sqlitebrowser, parece que los datos se escriben en db.sqlite
, por lo que parece un problema de recuperación . Desde aquí, estoy algo perplejo ... ¿Alguna idea?
EDITAR
Este código producirá un inode
que se puede usar para probar.
def mk_dummy_inode(path, isdir=False):
return {
"node_path": path,
"bytesize": osp.getsize(path),
"mtime": osp.getmtime(path),
"stat_result": dumps(stat(path), protocol=4),
"md5": "directory" if isdir else "d41d8cd98f00b204e9800998ecf8427e",
}
Si setUp
un vistazo a tu función setUp
, estás devolviendo self.db.runInteraction(...)
, que devuelve un diferido. Como habrás notado, supones que espera a que termine el diferido. Sin embargo, este no es el caso y es una trampa de la que la mayoría es víctima (yo incluido). Seré honesto con usted, para situaciones como esta, especialmente para pruebas unitarias, solo ejecuto el código síncrono fuera de la clase TestCase
para inicializar la base de datos. Por ejemplo:
def init_db():
import sqlite3
conn = sqlite3.connect(''db.sqlite'')
c = conn.cursor()
with open("init.sql") as f:
c.executescript(f.read())
init_db() # call outside test case
class TestStateManagement(TestCase):
"""
My test cases
"""
Alternativamente, podría decorar la configuración y yield runOperation(...)
pero algo me dice que no funcionaría ... En cualquier caso, es sorprendente que no se hayan yield runOperation(...)
errores.
PD
He estado observando esta pregunta por un tiempo y ha estado en la parte posterior de mi cabeza durante días. Una posible razón para esto finalmente se me ocurrió a casi la 1am. Sin embargo, estoy demasiado cansado / perezoso para probar esto: D, pero es una corazonada bastante buena. Me gustaría felicitarte por tu nivel de detalle en esta pregunta.