oracle - dbms_aq.dequeue_array, el primer mensaje se devuelve dos veces
plsql oracle11g (1)
Introducción
Estoy enfrentando un comportamiento muy extraño en mi Oracle SQL Server (exactamente: Oracle Database 11g Enterprise Edition Versión 11.2.0.4.0 - Producción de 64 bits) cuando uso los métodos de Oracle Advanced Queuing.
El problema
El error es que encola los mensajes X , pero dequeue_array devuelve mensajes X + 1 , con el primer mensaje duplicado (como lo ve el MessageId).
Reproducir:
Pude escribir algunos PoC simples para reproducir el error. Este código es bastante directo, el material de enqueue / dequeue es Oracle AQ estándar. El código realiza los siguientes pasos dos veces (ejecuciones de prueba):
- Purgar la tabla de cola
- Enqueue X mensajes
- Dequeue todos los mensajes usando la llamada dbms_aq.dequeue_array
- Verifique cuántos mensajes se han quitado de la cola
Al ejecutar el POC en una conexión nueva, la primera ejecución se realiza correctamente sin un error, pero todas las siguientes ejecuciones fallan. Después de eso, cuando utilice la misma conexión, cada vez que ejecute el script fallará en ambas ejecuciones de prueba.
Lo que intenté hasta ahora:
- Ejecutar este script en una conexión "nueva": solo falla la primera ejecución
- Cualquier ejecución adicional de la secuencia de comandos en la misma conexión: todas las ejecuciones fallan
- Al usar / crear una nueva cola: solo falla la primera ejecución
- Cuando se usa "delete from <queue_table>" en lugar de dbms_aqadm.purge_queue_table (): todo está bien
- Cuando se usa dequeue "normal" uno a uno: todo está bien
Conclusión:
No puedo explicar este comportamiento ni encontrar un error en mi código. Por favor, échale un vistazo, debe ser ejecutable directamente en tu cliente sql favorito (probado con PL / SQL Developer).
Si necesita más información o tiene problemas para hacer que el PoC funcione, solo pregunte, verifico este hilo regularmente. Intenté hacer que el PoC sea lo más legible posible, incluido un resultado detallado sobre lo que está sucediendo.
Código:
declare
C_QueueName constant varchar2(32767) := ''TEST_QUEUE'';
C_QueueTable constant varchar2(32767) := ''TEST_Q_TABLE'';
C_MsgCount constant pls_integer := 1;
C_TestRuns constant pls_integer := 2;
C_DequeueArraySize constant pls_integer := 10;
/*
* Create the queue and the queue table used for theses tests
*/
procedure CreateQueueIfMissing is
L_Present pls_integer;
begin
dbms_output.put_line(''START CreateQueueIfMissing'');
execute immediate ''select count(*) from USER_OBJECTS where OBJECT_NAME = '''''' || C_QueueName || '''''' and OBJECT_TYPE = ''''QUEUE'''''' into L_Present;
if L_Present = 1 then
dbms_output.put_line(''Skipping queue creation, already present.'');
dbms_output.put_line(''END CreateQueueIfMissing'');
return;
end if;
dbms_output.put_line('' Creating queue table '' || C_QueueTable);
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable
,storage_clause => ''LOGGING NOCACHE NOPARALLEL MONITORING''
,sort_list => ''priority,enq_time''
,multiple_consumers => false
,queue_payload_type => ''SYS.AQ$_JMS_BYTES_MESSAGE''
,comment => ''Queue for messages'');
dbms_output.put_line('' Creating queue '' || C_QueueName);
DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName
,queue_table => C_QueueTable
,max_retries => 8640
,retry_delay => 30
,comment => ''Queue for messages'');
dbms_output.put_line('' Starting queue '' || C_QueueName);
DBMS_AQADM.START_QUEUE(queue_name => C_QueueName);
dbms_output.put_line(''END CreateQueueIfMissing'');
end CreateQueueIfMissing;
-- ================================================================================================
/*
* This procedure is the root of all evil.
* The error only occurs when using the purge_queue_tables procedure.
* When using a normal "delete from <queue_table>" then everything is just fine.
*/
procedure CleanQueueTable is
L_PurgeOptions dbms_aqadm.aq$_purge_options_t;
L_Count pls_integer;
begin
dbms_output.put_line(''START CleanQueueTable'');
execute immediate ''select count(*) from '' || C_QueueTable into L_Count;
dbms_output.put_line('' Messages in queue table BEFORE purge: '' || L_Count);
dbms_aqadm.purge_queue_table(queue_table => C_QueueTable
,purge_condition => null
,purge_options => L_PurgeOptions);
execute immediate ''select count(*) from '' || C_QueueTable into L_Count;
dbms_output.put_line('' Messages in queue table AFTER purge: '' || L_Count);
dbms_output.put_line(''END CleanQueueTable'');
end CleanQueueTable;
-- ================================================================================================
/*
* Enqueue the configured count of messages on the queue
*/
procedure EnqueueMessages is
L_BodyId pls_integer;
L_Msg sys.aq$_jms_bytes_message;
L_MsgId raw(16);
L_Count pls_integer;
L_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T;
L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T;
begin
dbms_output.put_line(''START EnqueueMessages'');
execute immediate ''select count(*) from '' || C_QueueTable into L_Count;
dbms_output.put_line('' Messages in queue table BEFORE enqueue: '' || L_Count);
for i in 1 .. C_MsgCount
loop
dbms_output.put_line('' Construct #'' || i);
L_Msg := sys.aq$_jms_bytes_message.construct;
-- set the JMS header
L_Msg.set_type(''JmsBytesMessage'');
L_Msg.set_userid(1);
L_Msg.set_appid(''test'');
L_Msg.set_groupid(''cs'');
L_Msg.set_groupseq(1);
-- set JMS message content
L_BodyId := L_Msg.clear_body(-1);
L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw(''<test>Lorem Ipsum</test>'')));
L_Msg.flush(L_BodyId);
L_Msg.clean(L_BodyId);
dbms_output.put_line('' Enqueue #'' || i);
DBMS_AQ.ENQUEUE (queue_name => C_QueueName
,enqueue_options => L_EnqueueOptions
,message_properties => L_MessageProperties
,payload => L_Msg
,msgid => L_MsgId);
end loop;
execute immediate ''select count(*) from '' || C_QueueTable into L_Count;
dbms_output.put_line('' Messages in queue table AFTER enqueue: '' || L_Count);
dbms_output.put_line(''END EnqueueMessages'');
end EnqueueMessages;
-- ================================================================================================
/*
* Dequeues messages using dequeue_array from the configured queue.
*/
procedure DequeueMessages is
L_DequeueOptions dbms_aq.dequeue_options_t;
L_MsgPropArr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();
L_PayloadArr sys.aq$_jms_bytes_messages;
L_MsgIdArr dbms_aq.msgid_array_t;
L_MsgCnt pls_integer := 0;
L_Count pls_integer;
begin
dbms_output.put_line(''START DequeueMessages'');
execute immediate ''select count(*) from '' || C_QueueTable into L_Count;
dbms_output.put_line('' Messages in queue table BEFORE dequeue: '' || L_Count);
L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName
,dequeue_options => L_DequeueOptions
,array_size => C_DequeueArraySize
,message_properties_array => L_MsgPropArr
,payload_array => L_PayloadArr
,msgid_array => L_MsgIdArr);
execute immediate ''select count(*) from '' || C_QueueTable into L_Count;
dbms_output.put_line('' Messages in queue table AFTER dequeue: '' || L_Count);
dbms_output.put_line('' Expected: '' || C_MsgCount || '', Received: '' || L_MsgCnt);
if C_MsgCount != L_MsgCnt then
dbms_output.put_line('' *****************************************'');
dbms_output.put_line('' TOO MANY ITEMS DEQUEUED?!?'');
dbms_output.put_line('' *****************************************'');
for i in 1 .. L_MsgCnt
loop
dbms_output.put_line('' #'' || i || '' MsdId='' || L_MsgIdArr(i));
end loop;
end if;
dbms_output.put_line(''END DequeueMessages'');
end DequeueMessages;
-- ================================================================================================
/*
* This is the testcase
*/
procedure RunTestCase is
begin
CreateQueueIfMissing;
for i in 1 .. C_TestRuns
loop
dbms_output.put_line(null);
dbms_output.put_line(''=========== START test run #'' || i || ''==========='');
CleanQueueTable;
EnqueueMessages;
DequeueMessages;
end loop;
end;
-- ================================================================================================
begin
RunTestCase;
end;
Ejemplo de salida:
START CreateQueueIfMissing
Skipping queue creation, already present.
END CreateQueueIfMissing
=========== START test run #1===========
START CleanQueueTable
Messages in queue table BEFORE purge: 0
Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
Messages in queue table BEFORE enqueue: 0
Construct #1
Enqueue #1
Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
Messages in queue table BEFORE dequeue: 1
Messages in queue table AFTER dequeue: 0
Expected: 1, Received: 1
END DequeueMessages
=========== START test run #2===========
START CleanQueueTable
Messages in queue table BEFORE purge: 0
Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
Messages in queue table BEFORE enqueue: 0
Construct #1
Enqueue #1
Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
Messages in queue table BEFORE dequeue: 1
Messages in queue table AFTER dequeue: 0
Expected: 1, Received: 2
*****************************************
TOO MANY ITEMS DEQUEUED?!?
*****************************************
#1 MsdId=2949A0FF2EE456A7E0540010E0467A30
#2 MsdId=2949A0FF2EE456A7E0540010E0467A30
END DequeueMessages
Esto se parece al error 20659700 . Hay un poco más de información en el documento 2002148.1 .
Usted (o su DBA) debe presentar una solicitud de servicio para confirmarlo y ver si hay un parche disponible para su plataforma.