procedimientos linea funciones español desde datos consultas con comandos cero bases aprender administración sql postgresql queue producer-consumer

linea - Cola de trabajos como tabla SQL con múltiples consumidores(PostgreSQL)



linea de comandos postgresql (7)

¿Qué hay de solo seleccionar?

SELECT * FROM table WHERE status = ''QUEUED'' LIMIT 10 FOR UPDATE SKIP LOCKED;

https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

Tengo un problema típico productor-consumidor:

Múltiples aplicaciones productoras escriben solicitudes de trabajo en una tabla de trabajos en una base de datos PostgreSQL.

Las solicitudes de trabajo tienen un campo de estado que comienza contiene QUEUED en la creación.

Hay varias aplicaciones para el consumidor que son notificadas por una regla cuando un productor inserta un nuevo registro:

CREATE OR REPLACE RULE "jobrecord.added" AS ON INSERT TO jobrecord DO NOTIFY "jobrecordAdded";

Tratarán de reservar un nuevo registro estableciendo su estado en RESERVADO. Por supuesto, solo en el consumidor debería tener éxito. Todos los demás consumidores no deben poder reservar el mismo registro. En su lugar, deberían reservar otros registros con state = QUEUED.

Ejemplo: algunos productores agregaron los siguientes registros a la tabla jobrecord :

id state owner payload ------------------------ 1 QUEUED null <data> 2 QUEUED null <data> 3 QUEUED null <data> 4 QUEUED null <data>

Ahora, dos consumidores A , B quieren procesarlos. Empiezan a correr al mismo tiempo. Uno debe reservar la identificación 1, la otra debe reservar la identificación 2, luego la primera que termina debe reservar la identificación 3 y así sucesivamente.

En un mundo puramente multiproceso, usaría un mutex para controlar el acceso a la cola de trabajos, pero los consumidores son procesos diferentes que pueden ejecutarse en diferentes máquinas. Solo acceden a la misma base de datos, por lo que toda la sincronización debe realizarse a través de la base de datos.

Leí una gran cantidad de documentación sobre el acceso concurrente y el bloqueo en PostgreSQL, por ejemplo, http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Seleccione la fila desbloqueada en PostgreSQL PostgreSQL y bloqueo

De estos temas, aprendí que la siguiente instrucción SQL debería hacer lo que necesito:

UPDATE jobrecord SET owner= :owner, state = :reserved WHERE id = ( SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 ) RETURNING id; // will only return an id when they reserved it successfully

Lamentablemente, cuando ejecuto esto en múltiples procesos de consumo, en aproximadamente el 50% del tiempo, aún reservan el mismo registro, procesándolo y sobrescribiendo los cambios del otro.

¿Qué me estoy perdiendo? ¿Cómo debo escribir la declaración SQL para que varios consumidores no reserven el mismo registro?


Bien, aquí está la solución que está funcionando para mí, basada en el enlace de jordani. Como algunos de mis problemas estaban relacionados con el funcionamiento de Qt-SQL, incluí el código Qt:

QSqlDatabase db = GetDatabase(); db.transaction(); QSqlQuery lockQuery(db); bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; "); QSqlQuery query(db); query.prepare( "UPDATE jobrecord " " SET /"owner/"= :owner, state = :reserved " " WHERE id = ( " " SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 " " ) RETURNING id;" ); query.bindValue(":owner", pid); query.bindValue(":reserved", JobRESERVED); query.bindValue(":queued", JobQUEUED); bool result = query.exec();

Para verificar, si varios consumidores procesan el mismo trabajo, agregué una regla y una tabla de registro:

CREATE TABLE serverjobrecord_log ( serverjobrecord_id integer, oldowner text, newowner text ) WITH ( OIDS=FALSE ); CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord WHERE old.owner IS NOT NULL AND new.state = 1 DO INSERT INTO jobrecord_log (id, oldowner, newowner) VALUES (new.id, old.owner, new.owner);

Sin el LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; declaración, la tabla de registro se llena ocasionalmente con entradas, donde un consumidor ha sobrescrito los valores de otro, pero utilizando la instrucción LOCK, la tabla de registro permanece vacía :-)


Echa un vistazo a PgQ lugar de reinventar la rueda.




No es necesario hacer un bloqueo de tabla completa para esto: /.

Un bloqueo de fila creado con for update funciona bien.

Consulte https://gist.github.com/mackross/a49b72ad8d24f7cefc32 para conocer el cambio que realicé en la respuesta de apinstein y verifique que todavía funciona.

El código final es

update tx_test_queue set status=''running'' where job_id in ( select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1 for update ) returning job_id;


Yo uso postgres para una cola FIFO también. Originalmente usé ACCESS EXCLUSIVE, que produce resultados correctos en alta concurrencia, pero tiene el desafortunado efecto de ser mutuamente excluyente con pg_dump, que adquiere un bloqueo ACCESS SHARE durante su ejecución. Esto hace que mi función next () se bloquee durante mucho tiempo (la duración del pg_dump). Esto no era aceptable, ya que somos una tienda 24x7 y a los clientes no les gustaba el tiempo muerto en la cola en el medio de la noche.

Me di cuenta de que debe haber un bloqueo menos restrictivo que aún sería seguro al mismo tiempo y no de bloqueo mientras se ejecuta pg_dump. Mi búsqueda me llevó a esta publicación SO.

Entonces hice algunas investigaciones.

Los siguientes modos son suficientes para una función FIFO de cola FIFO () que actualizará el estado de un trabajo de la cola a la ejecución sin ningún error de concurrencia, y tampoco bloqueará contra pg_dump:

SHARE UPDATE EXCLUSIVE SHARE ROW EXCLUSIVE EXCLUSIVE

Consulta:

begin; lock table tx_test_queue in exclusive mode; update tx_test_queue set status=''running'' where job_id in ( select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1 ) returning job_id; commit;

El resultado se ve como:

UPDATE 1 job_id -------- 98 (1 row)

Aquí hay un script de shell que prueba todos los diferentes modos de bloqueo en alta concurrencia (30).

#!/bin/bash # RESULTS, feel free to repro yourself # # noLock FAIL # accessShare FAIL # rowShare FAIL # rowExclusive FAIL # shareUpdateExclusive SUCCESS # share FAIL+DEADLOCKS # shareRowExclusive SUCCESS # exclusive SUCCESS # accessExclusive SUCCESS, but LOCKS against pg_dump #config strategy="exclusive" db=postgres dbuser=postgres queuecount=100 concurrency=30 # code psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);" # empty queue psql84 -t -U $dbuser $db -c "truncate tx_test_queue;"; echo "Simulating 10 second pg_dump with ACCESS SHARE" psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select ''pg_dump finished...''" & echo "Starting workers..." # queue $queuecount items seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values (''queued'');" #psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;" # process $queuecount w/concurrency of $concurrency case $strategy in "noLock") strategySql="update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; "accessShare") strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; "rowShare") strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; "rowExclusive") strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; "share") strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; "shareRowExclusive") strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; "exclusive") strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; "accessExclusive") strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status=''running{}'' where job_id in (select job_id from tx_test_queue where status=''queued'' order by job_id asc limit 1);";; *) echo "Unknown strategy $strategy";; esac echo $strategySql seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql" #psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;" psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;" psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

El código también está aquí si desea editar: https://gist.github.com/1083936

Estoy actualizando mi aplicación para usar el modo EXCLUSIVO ya que es el modo más restrictivo que a) es correcto yb) no está en conflicto con pg_dump. Elegí la más restrictiva ya que parece ser la menos riesgosa en términos de cambiar la aplicación de ACCESS EXCLUSIVE sin ser un gran experto en el bloqueo de postgres.

Me siento bastante cómodo con mi banco de pruebas y con las ideas generales que hay detrás de la respuesta. Espero que compartir esto ayude a resolver este problema para otros.