python sqlalchemy denormalization

python - ¿Se pueden usar los eventos de SQLAlchemy para actualizar un caché de datos desnormalizado?



denormalization (2)

Puede hacer esto con aggregated columnas aggregated SQLAlchemy-Utils: http://sqlalchemy-utils.readthedocs.org/en/latest/aggregates.html

Por razones de rendimiento, tengo una base de datos desnormalizada donde algunas tablas contienen datos que se han agregado de muchas filas en otras tablas. Me gustaría mantener esta caché de datos desnormalizada mediante el uso de eventos de SQLAlchemy . Como ejemplo, supongamos que estaba escribiendo software de foro y quería que cada Thread tuviera una columna que rastreara el recuento de palabras combinadas de todos los comentarios en el subproceso para mostrar de manera eficiente esa información:

class Thread(Base): id = Column(UUID, primary_key=True, default=uuid.uuid4) title = Column(UnicodeText(), nullable=False) word_count = Column(Integer, nullable=False, default=0) class Comment(Base): id = Column(UUID, primary_key=True, default=uuid.uuid4) thread_id = Column(UUID, ForeignKey(''thread.id'', ondelete=''CASCADE''), nullable=False) thread = relationship(''Thread'', backref=''comments'') message = Column(UnicodeText(), nullable=False) @property def word_count(self): return len(self.message.split())

Por lo tanto, cada vez que se inserta un comentario (en aras de la simplicidad, digamos que los comentarios nunca se editan o eliminan), queremos actualizar el atributo word_count en el objeto Thread asociado. Entonces me gustaría hacer algo como

def after_insert(mapper, connection, target): thread = target.thread thread.word_count = sum(c.word_count for c in thread.comments) print "updated cached word count to", thread.word_count event.listen(Comment, "after_insert", after_insert)

Entonces, cuando inserto un Comment , puedo ver el evento activando y veo que ha calculado correctamente el conteo de palabras, pero ese cambio no se guarda en la fila de Thread en la base de datos. No veo ninguna advertencia sobre otras tablas actualizadas en la documentación after_insert , aunque veo algunas advertencias en algunas de las otras, como after_delete .

Entonces, ¿hay una forma compatible de hacer esto con los eventos de SQLAlchemy? Ya estoy usando eventos SQLAlchemy para muchas otras cosas, por lo que me gustaría hacer todo de esa manera en lugar de tener que escribir desencadenadores de base de datos.


el evento after_insert () es una forma de hacerlo, y es posible que observe que se pasa un objeto de Connection SQLAlchemy, en lugar de una Session como es el caso con otros eventos relacionados con descarga. Los eventos de descarga a nivel de mapeador están destinados a ser utilizados normalmente para invocar SQL directamente en la Connection dada:

@event.listens_for(Comment, "after_insert") def after_insert(mapper, connection, target): thread_table = Thread.__table__ thread = target.thread connection.execute( thread_table.update(). where(thread_table.c.id==thread.id). values(word_count=sum(c.word_count for c in thread.comments)) ) print "updated cached word count to", thread.word_count

Lo que es notable aquí es que invocar una instrucción UPDATE directamente también es mucho más eficiente que ejecutar ese cambio de atributo nuevamente a través del proceso de la unidad de trabajo.

Sin embargo, un evento como after_insert () no es realmente necesario aquí, ya que conocemos el valor de "word_count" antes de que el flujo ocurra. De hecho, lo sabemos porque los objetos Comment y Thread están asociados entre sí, y podríamos mantener a Thread.word_count completamente nuevo en la memoria en todo momento usando eventos de atributo:

def _word_count(msg): return len(msg.split()) @event.listens_for(Comment.message, "set") def set(target, value, oldvalue, initiator): if target.thread is not None: target.thread.word_count += (_word_count(value) - _word_count(oldvalue)) @event.listens_for(Comment.thread, "set") def set(target, value, oldvalue, initiator): # the new Thread, if any if value is not None: value.word_count += _word_count(target.message) # the old Thread, if any if oldvalue is not None: oldvalue.word_count -= _word_count(target.message)

La gran ventaja de este método es que tampoco hay necesidad de iterar a través de thread.comments, que para una colección descargada significa que se emite otro SELECT.

otro método más es hacerlo en before_flush (). A continuación se muestra una versión rápida y sucia, que se puede refinar para analizar más detenidamente lo que ha cambiado a fin de determinar si el word_count debe actualizarse o no:

@event.listens_for(Session, "before_flush") def before_flush(session, flush_context, instances): for obj in session.new | session.dirty: if isinstance(obj, Thread): obj.word_count = sum(c.word_count for c in obj.comments) elif isinstance(obj, Comment): obj.thread.word_count = sum(c.word_count for c in obj.comments)

Me gustaría ir con el método de evento de atributo ya que es el más eficiente y actualizado.