oracle plsql oracle11g oracle-aq

oracle - dbms_aq.dequeue_array, el primer mensaje se devuelve dos veces



plsql oracle11g (1)

Introducción

Estoy enfrentando un comportamiento muy extraño en mi Oracle SQL Server (exactamente: Oracle Database 11g Enterprise Edition Versión 11.2.0.4.0 - Producción de 64 bits) cuando uso los métodos de Oracle Advanced Queuing.

El problema

El error es que encola los mensajes X , pero dequeue_array devuelve mensajes X + 1 , con el primer mensaje duplicado (como lo ve el MessageId).

Reproducir:

Pude escribir algunos PoC simples para reproducir el error. Este código es bastante directo, el material de enqueue / dequeue es Oracle AQ estándar. El código realiza los siguientes pasos dos veces (ejecuciones de prueba):

  • Purgar la tabla de cola
  • Enqueue X mensajes
  • Dequeue todos los mensajes usando la llamada dbms_aq.dequeue_array
  • Verifique cuántos mensajes se han quitado de la cola

Al ejecutar el POC en una conexión nueva, la primera ejecución se realiza correctamente sin un error, pero todas las siguientes ejecuciones fallan. Después de eso, cuando utilice la misma conexión, cada vez que ejecute el script fallará en ambas ejecuciones de prueba.

Lo que intenté hasta ahora:

  • Ejecutar este script en una conexión "nueva": solo falla la primera ejecución
  • Cualquier ejecución adicional de la secuencia de comandos en la misma conexión: todas las ejecuciones fallan
  • Al usar / crear una nueva cola: solo falla la primera ejecución
  • Cuando se usa "delete from <queue_table>" en lugar de dbms_aqadm.purge_queue_table (): todo está bien
  • Cuando se usa dequeue "normal" uno a uno: todo está bien

Conclusión:

No puedo explicar este comportamiento ni encontrar un error en mi código. Por favor, échale un vistazo, debe ser ejecutable directamente en tu cliente sql favorito (probado con PL / SQL Developer).

Si necesita más información o tiene problemas para hacer que el PoC funcione, solo pregunte, verifico este hilo regularmente. Intenté hacer que el PoC sea lo más legible posible, incluido un resultado detallado sobre lo que está sucediendo.

Código:

declare C_QueueName constant varchar2(32767) := ''TEST_QUEUE''; C_QueueTable constant varchar2(32767) := ''TEST_Q_TABLE''; C_MsgCount constant pls_integer := 1; C_TestRuns constant pls_integer := 2; C_DequeueArraySize constant pls_integer := 10; /* * Create the queue and the queue table used for theses tests */ procedure CreateQueueIfMissing is L_Present pls_integer; begin dbms_output.put_line(''START CreateQueueIfMissing''); execute immediate ''select count(*) from USER_OBJECTS where OBJECT_NAME = '''''' || C_QueueName || '''''' and OBJECT_TYPE = ''''QUEUE'''''' into L_Present; if L_Present = 1 then dbms_output.put_line(''Skipping queue creation, already present.''); dbms_output.put_line(''END CreateQueueIfMissing''); return; end if; dbms_output.put_line('' Creating queue table '' || C_QueueTable); DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable ,storage_clause => ''LOGGING NOCACHE NOPARALLEL MONITORING'' ,sort_list => ''priority,enq_time'' ,multiple_consumers => false ,queue_payload_type => ''SYS.AQ$_JMS_BYTES_MESSAGE'' ,comment => ''Queue for messages''); dbms_output.put_line('' Creating queue '' || C_QueueName); DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName ,queue_table => C_QueueTable ,max_retries => 8640 ,retry_delay => 30 ,comment => ''Queue for messages''); dbms_output.put_line('' Starting queue '' || C_QueueName); DBMS_AQADM.START_QUEUE(queue_name => C_QueueName); dbms_output.put_line(''END CreateQueueIfMissing''); end CreateQueueIfMissing; -- ================================================================================================ /* * This procedure is the root of all evil. * The error only occurs when using the purge_queue_tables procedure. * When using a normal "delete from <queue_table>" then everything is just fine. */ procedure CleanQueueTable is L_PurgeOptions dbms_aqadm.aq$_purge_options_t; L_Count pls_integer; begin dbms_output.put_line(''START CleanQueueTable''); execute immediate ''select count(*) from '' || C_QueueTable into L_Count; dbms_output.put_line('' Messages in queue table BEFORE purge: '' || L_Count); dbms_aqadm.purge_queue_table(queue_table => C_QueueTable ,purge_condition => null ,purge_options => L_PurgeOptions); execute immediate ''select count(*) from '' || C_QueueTable into L_Count; dbms_output.put_line('' Messages in queue table AFTER purge: '' || L_Count); dbms_output.put_line(''END CleanQueueTable''); end CleanQueueTable; -- ================================================================================================ /* * Enqueue the configured count of messages on the queue */ procedure EnqueueMessages is L_BodyId pls_integer; L_Msg sys.aq$_jms_bytes_message; L_MsgId raw(16); L_Count pls_integer; L_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T; L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T; begin dbms_output.put_line(''START EnqueueMessages''); execute immediate ''select count(*) from '' || C_QueueTable into L_Count; dbms_output.put_line('' Messages in queue table BEFORE enqueue: '' || L_Count); for i in 1 .. C_MsgCount loop dbms_output.put_line('' Construct #'' || i); L_Msg := sys.aq$_jms_bytes_message.construct; -- set the JMS header L_Msg.set_type(''JmsBytesMessage''); L_Msg.set_userid(1); L_Msg.set_appid(''test''); L_Msg.set_groupid(''cs''); L_Msg.set_groupseq(1); -- set JMS message content L_BodyId := L_Msg.clear_body(-1); L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw(''<test>Lorem Ipsum</test>''))); L_Msg.flush(L_BodyId); L_Msg.clean(L_BodyId); dbms_output.put_line('' Enqueue #'' || i); DBMS_AQ.ENQUEUE (queue_name => C_QueueName ,enqueue_options => L_EnqueueOptions ,message_properties => L_MessageProperties ,payload => L_Msg ,msgid => L_MsgId); end loop; execute immediate ''select count(*) from '' || C_QueueTable into L_Count; dbms_output.put_line('' Messages in queue table AFTER enqueue: '' || L_Count); dbms_output.put_line(''END EnqueueMessages''); end EnqueueMessages; -- ================================================================================================ /* * Dequeues messages using dequeue_array from the configured queue. */ procedure DequeueMessages is L_DequeueOptions dbms_aq.dequeue_options_t; L_MsgPropArr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t(); L_PayloadArr sys.aq$_jms_bytes_messages; L_MsgIdArr dbms_aq.msgid_array_t; L_MsgCnt pls_integer := 0; L_Count pls_integer; begin dbms_output.put_line(''START DequeueMessages''); execute immediate ''select count(*) from '' || C_QueueTable into L_Count; dbms_output.put_line('' Messages in queue table BEFORE dequeue: '' || L_Count); L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName ,dequeue_options => L_DequeueOptions ,array_size => C_DequeueArraySize ,message_properties_array => L_MsgPropArr ,payload_array => L_PayloadArr ,msgid_array => L_MsgIdArr); execute immediate ''select count(*) from '' || C_QueueTable into L_Count; dbms_output.put_line('' Messages in queue table AFTER dequeue: '' || L_Count); dbms_output.put_line('' Expected: '' || C_MsgCount || '', Received: '' || L_MsgCnt); if C_MsgCount != L_MsgCnt then dbms_output.put_line('' *****************************************''); dbms_output.put_line('' TOO MANY ITEMS DEQUEUED?!?''); dbms_output.put_line('' *****************************************''); for i in 1 .. L_MsgCnt loop dbms_output.put_line('' #'' || i || '' MsdId='' || L_MsgIdArr(i)); end loop; end if; dbms_output.put_line(''END DequeueMessages''); end DequeueMessages; -- ================================================================================================ /* * This is the testcase */ procedure RunTestCase is begin CreateQueueIfMissing; for i in 1 .. C_TestRuns loop dbms_output.put_line(null); dbms_output.put_line(''=========== START test run #'' || i || ''===========''); CleanQueueTable; EnqueueMessages; DequeueMessages; end loop; end; -- ================================================================================================ begin RunTestCase; end;

Ejemplo de salida:

START CreateQueueIfMissing Skipping queue creation, already present. END CreateQueueIfMissing =========== START test run #1=========== START CleanQueueTable Messages in queue table BEFORE purge: 0 Messages in queue table AFTER purge: 0 END CleanQueueTable START EnqueueMessages Messages in queue table BEFORE enqueue: 0 Construct #1 Enqueue #1 Messages in queue table AFTER enqueue: 1 END EnqueueMessages START DequeueMessages Messages in queue table BEFORE dequeue: 1 Messages in queue table AFTER dequeue: 0 Expected: 1, Received: 1 END DequeueMessages =========== START test run #2=========== START CleanQueueTable Messages in queue table BEFORE purge: 0 Messages in queue table AFTER purge: 0 END CleanQueueTable START EnqueueMessages Messages in queue table BEFORE enqueue: 0 Construct #1 Enqueue #1 Messages in queue table AFTER enqueue: 1 END EnqueueMessages START DequeueMessages Messages in queue table BEFORE dequeue: 1 Messages in queue table AFTER dequeue: 0 Expected: 1, Received: 2 ***************************************** TOO MANY ITEMS DEQUEUED?!? ***************************************** #1 MsdId=2949A0FF2EE456A7E0540010E0467A30 #2 MsdId=2949A0FF2EE456A7E0540010E0467A30 END DequeueMessages


Esto se parece al error 20659700 . Hay un poco más de información en el documento 2002148.1 .

Usted (o su DBA) debe presentar una solicitud de servicio para confirmarlo y ver si hay un parche disponible para su plataforma.