ver una tablas tabla script para mostrar existe ejemplo datos crear consola codigo mysql job-queue

tablas - ¿Cuál es la mejor forma de implementar una tabla de cola de mensajes en mysql?



script mysql ejemplo (6)

Probablemente es la décima vez que estoy implementando algo como esto, y nunca he estado 100% contento con las soluciones que se me ocurrieron.

La razón por la que usar la tabla mysql en lugar de un sistema de mensajería "apropiado" es atractivo es principalmente porque la mayoría de las aplicaciones ya usan alguna base de datos relacional para otras cosas (que tiende a ser mysql para la mayoría de las cosas que he estado haciendo), mientras que usa un sistema de mensajes Además, las bases de datos relacionales tienen propiedades ACID muy fuertes, mientras que los sistemas de mensajería a menudo no.

La primera idea es usar:

create table jobs( id auto_increment not null primary key, message text not null, process_id varbinary(255) null default null, key jobs_key(process_id) );

Y luego enqueue se ve así:

insert into jobs(message) values(''blah blah'');

Y dequeue se ve así:

begin; select * from jobs where process_id is null order by id asc limit 1; update jobs set process_id = ? where id = ?; -- whatever i just got commit; -- return (id, message) to application, cleanup after done

La mesa y la cola se ven bien, pero dequeue me molesta un poco. ¿Qué tan probable es revertir? O para ser bloqueado? ¿Qué teclas debería usar para hacerlo O (1) -ish?

¿O hay alguna solución mejor que lo que estoy haciendo?


Brian Aker habló sobre un motor de cola hace un tiempo. Se ha hablado también de una sintaxis SELECT table FROM DELETE .

Si no le preocupa el rendimiento, siempre puede usar SELECT GET_LOCK () como mutex. Por ejemplo:

SELECT GET_LOCK(''READQUEUE''); SELECT * FROM jobs; DELETE FROM JOBS WHERE ID = ?; SELECT RELEASE_LOCK(''READQUEUE'');

Y si quieres ser realmente elegante, envuélvela en un procedimiento almacenado.


He creado algunos sistemas de cola de mensajes y no estoy seguro del tipo de mensaje al que te refieres, pero en el caso de la eliminación (¿es una palabra?) Hice lo mismo que tú has hecho . Su método parece simple, limpio y sólido. No es que mi trabajo sea el mejor, pero se ha comprobado que es muy efectivo para monitorear en gran cantidad en muchos sitios. (registro de errores, campañas masivas de mercadeo por correo electrónico, avisos de redes sociales)

Mi voto: ¡no se preocupe!


Sugeriría usar Quartz.NET

Tiene proveedores para SQL Server, Oracle, MySql, SQLite y Firebird.


Su dequeue podría ser más conciso. En lugar de confiar en la reversión de la transacción, puede hacerlo en una declaración atómica sin una transacción explícita:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Luego puede retirar trabajos con (corchetes [] significa opcional, dependiendo de sus detalles):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];


Aquí hay una solución que utilicé, trabajando sin el process_id del hilo actual, o bloqueando la tabla.

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Obtenga el resultado en una matriz $ row y ejecute:

DELETE from jobs WHERE ID=$row[''ID''];

A continuación, obtenga las filas afectadas (mysql_affected_rows). Si hay filas afectadas, procese el trabajo en la matriz $ row. Si hay 0 filas afectadas, significa que algún otro proceso ya está procesando el trabajo seleccionado. Repita los pasos anteriores hasta que no haya filas.

He probado esto con una tabla de "trabajos" que tiene 100k filas, y generando 20 procesos concurrentes que hacen lo anterior. No hubo condiciones de carrera. Puede modificar las consultas anteriores para actualizar una fila con un indicador de procesamiento y eliminar la fila una vez que la haya procesado:

while(time()-$startTime<$timeout) { SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1; if (count($row)==0) break; UPDATE jobs set processing=1 WHERE ID=$row[''ID'']; if (mysql_affected_rows==0) continue; //process your job here DELETE from jobs WHERE ID=$row[''ID'']; }

No hace falta decir que debe usar una cola de mensajes adecuada (ActiveMQ, RabbitMQ, etc.) para este tipo de trabajo. Sin embargo, tuvimos que recurrir a esta solución, ya que nuestro anfitrión rompe con frecuencia las cosas al actualizar el software, por lo que cuanto menos se rompa, mejor.


Este hilo tiene información de diseño que debería poder mapearse.

Citar:

Esto es lo que he usado exitosamente en el pasado:

Esquema de tabla MsgQueue

Identidad MsgId - NO NULO
MsgTypeCode varchar (20) - NOT NULL
SourceCode varchar (20) - proceso de inserción del mensaje - NULLable
State char (1) - ''N''ew si está en cola,'' A ''(ctivo) si está procesando,'' C ''completado, predeterminado'' N ''- NOT NULL
CreateTime datetime - default GETDATE () - NOT NULL
Msg varchar (255) - NULLable

Sus tipos de mensajes son lo que usted esperaría: mensajes que se ajustan a un contrato entre la (s) inserción (es) del proceso y la (s) lectura (es) del proceso, estructurados con XML u otra representación de la representación (JSON sería útil en algunos casos para ejemplo).

Entonces, los procesos de 0 a n se pueden insertar, y los procesos de 0 a n pueden leer y procesar los mensajes. Cada proceso de lectura generalmente maneja un tipo de mensaje único. Se pueden ejecutar varias instancias de un tipo de proceso para equilibrar la carga.

El lector saca un mensaje y cambia el estado a "A" mientras trabaja en él. Cuando termina, cambia el estado a "C" ompleto. Puede eliminar el mensaje o no dependiendo de si desea mantener el seguimiento de auditoría. Los mensajes de estado = ''N'' se extraen en orden MsgType / Timestamp, por lo que hay un índice en MsgType + State + CreateTime.

Variaciones:
Estado para "E" rror.
Columna para el código de proceso de Reader.
Indicaciones de tiempo para transiciones de estado.

Esto ha proporcionado un mecanismo agradable, escalable, visible y simple para hacer una serie de cosas como las que está describiendo. Si tiene una comprensión básica de las bases de datos, es bastante infalible y extensible. Nunca ha habido un problema con las reversiones de bloqueos, etc. debido a las transacciones de transición de estado atómico.