tutorial remove query queries delete create python sqlalchemy audit-logging

python - remove - sqlalchemy query



Registro de auditoría SQLAlchemy; cómo manejar las eliminaciones? (0)

Estoy usando una versión modificada del ejemplo de código de versión que viene con SQLAlchemy para registrar una identificación de usuario y la fecha de los cambios. Sin embargo, también quiero modificarlo para que se is_deleted marcando un is_deleted tipo is_deleted lugar de ejecutar un SQL DELETE real. Mi problema es que no estoy seguro de cómo capturar la eliminación y reemplazarla con una actualización.

Esto es lo que tengo hasta ahora:

'''''' http://docs.sqlalchemy.org/en/rel_0_8/orm/examples.html?highlight=versioning#versioned-objects '''''' from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper, scoping from sqlalchemy.orm.session import Session from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError from sqlalchemy import Table, Column, ForeignKeyConstraint, DateTime, String, Boolean from sqlalchemy import event from sqlalchemy.orm.properties import RelationshipProperty from datetime import datetime from sqlalchemy.schema import ForeignKey from sqlalchemy.sql.expression import false def col_references_table(col, table): for fk in col.foreign_keys: if fk.references(table): return True return False def _history_mapper(local_mapper): cls = local_mapper.class_ # set the "active_history" flag # on on column-mapped attributes so that the old version # of the info is always loaded (currently sets it on all attributes) for prop in local_mapper.iterate_properties: getattr(local_mapper.class_, prop.key).impl.active_history = True super_mapper = local_mapper.inherits super_history_mapper = getattr(cls, ''__history_mapper__'', None) polymorphic_on = None super_fks = [] if not super_mapper or local_mapper.local_table is not super_mapper.local_table: cols = [] for column in local_mapper.local_table.c: if column.name.startswith(''version_''): continue col = column.copy() col.unique = False if super_mapper and col_references_table(column, super_mapper.local_table): super_fks.append((col.key, list(super_history_mapper.local_table.primary_key)[0])) cols.append(col) if column is local_mapper.polymorphic_on: polymorphic_on = col if super_mapper: super_fks.append((''version_datetime'', super_history_mapper.base_mapper.local_table.c.version_datetime)) super_fks.append((''version_userid'', super_history_mapper.base_mapper.local_table.c.version_userid)) super_fks.append((''version_deleted'', super_history_mapper.base_mapper.local_table.c.version_deleted)) cols.append(Column(''version_datetime'', DateTime, default=datetime.now, nullable=False, primary_key=True, info={''colanderalchemy'': {''exclude'': True}})) cols.append(Column(''version_userid'', String(60), ForeignKey("user.login"), nullable=True, info={''colanderalchemy'': {''exclude'': True}})) cols.append(Column(''version_deleted'', Boolean, server_default=false(), nullable=False, info={''colanderalchemy'': {''exclude'': True}})) else: cols.append(Column(''version_datetime'', DateTime, default=datetime.now, nullable=False, primary_key=True, info={''colanderalchemy'': {''exclude'': True}})) cols.append(Column(''version_userid'', String(60), ForeignKey("user.login"), nullable=True, info={''colanderalchemy'': {''exclude'': True}})) cols.append(Column(''version_deleted'', Boolean, server_default=false(), nullable=False, info={''colanderalchemy'': {''exclude'': True}})) if super_fks: cols.append(ForeignKeyConstraint(*zip(*super_fks))) table = Table(local_mapper.local_table.name + ''_history'', local_mapper.local_table.metadata, *cols ) else: # single table inheritance. take any additional columns that may have # been added and add them to the history table. for column in local_mapper.local_table.c: if column.key not in super_history_mapper.local_table.c: col = column.copy() col.unique = False super_history_mapper.local_table.append_column(col) table = None if super_history_mapper: bases = (super_history_mapper.class_,) else: bases = local_mapper.base_mapper.class_.__bases__ versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {}) m = mapper( versioned_cls, table, inherits=super_history_mapper, polymorphic_on=polymorphic_on, polymorphic_identity=local_mapper.polymorphic_identity ) cls.__history_mapper__ = m if not super_history_mapper: local_mapper.local_table.append_column( Column(''version_datetime'', DateTime, default=datetime.now, nullable=False, primary_key=False, info={''colanderalchemy'': {''exclude'': True}}) ) local_mapper.add_property("version_datetime", local_mapper.local_table.c.version_datetime) local_mapper.local_table.append_column( Column(''version_userid'', String(60), ForeignKey("user.login"), nullable=True, info={''colanderalchemy'': {''exclude'': True}}) ) local_mapper.add_property("version_userid", local_mapper.local_table.c.version_userid) local_mapper.local_table.append_column( Column(''version_deleted'', Boolean, server_default=false(), nullable=False, info={''colanderalchemy'': {''exclude'': True}}) ) local_mapper.add_property("version_deleted", local_mapper.local_table.c.version_deleted) class Versioned(object): @declared_attr def __mapper_cls__(cls): def map(cls, *arg, **kw): mp = mapper(cls, *arg, **kw) _history_mapper(mp) return mp return map def versioned_objects(iter): for obj in iter: if hasattr(obj, ''__history_mapper__''): yield obj def create_version(obj, session, deleted = False): obj_mapper = object_mapper(obj) history_mapper = obj.__history_mapper__ history_cls = history_mapper.class_ obj_state = attributes.instance_state(obj) attr = {} obj_changed = False for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()): if hm.single: continue for hist_col in hm.local_table.c: if hist_col.key.startswith(''version_''): continue obj_col = om.local_table.c[hist_col.key] # get the value of the # attribute based on the MapperProperty related to the # mapped column. this will allow usage of MapperProperties # that have a different keyname than that of the mapped column. try: prop = obj_mapper.get_property_by_column(obj_col) except UnmappedColumnError: # in the case of single table inheritance, there may be # columns on the mapped table intended for the subclass only. # the "unmapped" status of the subclass column on the # base class is a feature of the declarative module as of sqla 0.5.2. continue # expired object attributes and also deferred cols might not be in the # dict. force it to load no matter what by using getattr(). if prop.key not in obj_state.dict: getattr(obj, prop.key) a, u, d = attributes.get_history(obj, prop.key) if d: attr[hist_col.key] = d[0] obj_changed = True elif u: attr[hist_col.key] = u[0] else: # if the attribute had no value. attr[hist_col.key] = a[0] obj_changed = True if not obj_changed: # not changed, but we have relationships. OK # check those too for prop in obj_mapper.iterate_properties: if isinstance(prop, RelationshipProperty) and / attributes.get_history(obj, prop.key).has_changes(): obj_changed = True break if not obj_changed and not deleted: return attr[''version_datetime''] = obj.version_datetime attr[''version_userid''] = obj.version_userid attr[''version_deleted''] = obj.version_deleted hist = history_cls() for key, value in attr.items(): setattr(hist, key, value) session.add(hist) obj.version_datetime = datetime.now() obj.version_userid = getattr(session, ''userid'', None) obj.version_deleted = deleted def versioned_session(session): @event.listens_for(session, ''before_flush'') def before_flush(session, flush_context, instances): for obj in versioned_objects(session.deleted): create_version(obj, session, deleted = True) for obj in versioned_objects(session.dirty): create_version(obj, session) def add_userid_to_session(userid, session): if isinstance(session, scoping.scoped_session): thread_local_session = session.registry() thread_local_session.userid = userid elif isinstance(session, Session): session.userid = userid else: raise TypeError("Not sure how to add the userid into session of type {}".format(type(session)))

Y así es como lo estoy usando (todas las partes no esenciales se han cortado):

Base = declarative_base() class User(Versioned, Base): __tablename__ = ''user'' login = Column(String(60), primary_key=True, nullable=False) groups = association_proxy(''user_to_groups'', ''group'', creator=lambda group: UserToGroup(group_name=group.name)) def __init__(self, login, groups=None): self.login = login if groups: for group in groups: self.groups.append(group) class Group(Versioned, Base): __tablename__ = ''group'' name = Column(String(100), primary_key=True, nullable=False) description = Column(String(100), nullable=True) users = association_proxy(''group_to_user'', ''user'', creator=lambda user: UserToGroup(user_login=user.login)) def __eq__(self, other): return self.name == other.name class UserToGroup(Versioned, Base): __tablename__ = ''user_to_group'' user_login = Column(String(60), ForeignKey(User.login), primary_key=true) group_name = Column(String(100), ForeignKey(Group.name), primary_key=true) user = relationship(User, backref=backref(''user_to_groups'', cascade=''all, delete-orphan'')) group = relationship(Group, backref=backref(''group_to_user'', cascade=''all, delete-orphan'')) session.configure(bind=engine) add_userid_to_session("test", session.registry()) versioned_session(session) user = session.query(User).filter(User.login==''test'').one() user.groups.remove(Group(name ="g:admin"))

Antes de ejecutar ese código, la base de datos tiene actualmente un usuario llamado ''prueba'' y dos grupos a los que el usuario está conectado llamados ''g: admin'' y ''g: superadmin''.

Lo que hace actualmente es: Copie la entrada user_to_group existente para la asignación ''test'' => ''g: admin'' y cópiela en la tabla de historial. A continuación, elimine la entrada de user_to_group.

Lo que me gustaría hacer es copiar el valor en la tabla de historial y luego actualizar la entrada en user_to_group para que version_deleted establezca en true .

Estoy pensando que la manera de hacerlo es arrebatar la entrada de la sesión. Limitarla (es por eso que cambié el orden del código original) y modificarla para ponerla en session.dirty. Simplemente no estoy seguro de cuál es la forma "más segura" de hacer esto.

Otro problema (que probablemente requerirá otra pregunta) es cómo detectar relaciones que están cubiertas en otra tabla ya que actualmente el sistema hace una copia de la fila ''usuario'' en la tabla de historial y luego actualiza la información de la versión a pesar de que no se realicen cambios reales a la fila.

EDITAR: He decidido hacer las cosas de forma un poco diferente, pero aún tengo un problema ... En vez de tener un indicador "borrado" en las tablas activas, realmente elimino el contenido y grabo otro elemento del historial que indica cuándo ocurrió la eliminación. Si elimino un objeto directamente, esto funciona correctamente. Si elimino un objeto de una relación, no puedo hacerlo correctamente. Se DELETE un DELETE get a la tabla de relaciones para eliminar el enlace, pero parece que no puedo descubrir cómo detectar esa eliminación en el método "create_version".

Por ejemplo, si lo hago:

group = session.query(Group).filter(Group.name==''g:admin'').one() group.users.remove(group.users[0])

Ningún objeto se coloca en session.deleted. Puedo detectar algún tipo de eliminación a través de attributes.get_history(obj, prop.key) , pero parece indicar una eliminación de un objeto UserToGroup de Group (que quiero detectar y registrar un elemento de historial), pero también indica una eliminación de un Group del objeto UserToGroup (que no quiero hacer nada porque el Group real no se está eliminando).