delphi queue delphi-xe delphi-xe2

delphi - TThreadedQueue no es capaz de mĂșltiples consumidores?



delphi-xe delphi-xe2 (5)

Bueno, es difícil estar seguro sin muchas pruebas, pero ciertamente parece que esto es un error, ya sea en TThreadedQueue o en TMonitor. De cualquier forma está en el RTL y no en tu código. Debería presentar esto como un informe de control de calidad y usar su ejemplo anterior como el código "cómo reproducir".

Intentando usar TThreadedQueue (Generics.Collections) en un esquema de consumidor múltiple de un solo productor. (Delphi-XE). La idea es insertar objetos en una cola y dejar que varios hilos de trabajo agoten la cola.

Sin embargo, no funciona como se esperaba. Cuando dos o más subprocesos de trabajo llaman a PopItem, las violaciones de acceso se producen desde TThreadedQueue.

Si la llamada a PopItem está serializada con una sección crítica, todo está bien.

Seguramente TThreadedQueue debería ser capaz de manejar múltiples consumidores, entonces ¿me estoy perdiendo algo o es esto un error en TThreadedQueue?

Aquí hay un ejemplo simple para producir el error.

program TestThreadedQueue; {$APPTYPE CONSOLE} uses // FastMM4 in ''../../../FastMM4/FastMM4.pas'', Windows, Messages, Classes, SysUtils, SyncObjs, Generics.Collections; type TThreadTaskMsg = class(TObject) private threadID : integer; threadMsg : string; public Constructor Create( ID : integer; const msg : string); end; type TThreadReader = class(TThread) private fPopQueue : TThreadedQueue<TObject>; fSync : TCriticalSection; fMsg : TThreadTaskMsg; fException : Exception; procedure DoSync; procedure DoHandleException; public Constructor Create( popQueue : TThreadedQueue<TObject>; sync : TCriticalSection); procedure Execute; override; end; Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>; sync : TCriticalSection); begin fPopQueue:= popQueue; fMsg:= nil; fSync:= sync; Self.FreeOnTerminate:= FALSE; fException:= nil; Inherited Create( FALSE); end; procedure TThreadReader.DoSync ; begin WriteLn(fMsg.threadMsg + '' '' + IntToStr(fMsg.threadId)); end; procedure TThreadReader.DoHandleException; begin WriteLn(''Exception ->'' + fException.Message); end; procedure TThreadReader.Execute; var signal : TWaitResult; begin NameThreadForDebugging(''QueuePop worker''); while not Terminated do begin try {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } Sleep(20); {- Serializing calls to PopItem works } if Assigned(fSync) then fSync.Enter; try signal:= fPopQueue.PopItem( TObject(fMsg)); finally if Assigned(fSync) then fSync.Release; end; if (signal = wrSignaled) then begin try if Assigned(fMsg) then begin fMsg.threadMsg:= ''<Thread id :'' +IntToStr( Self.threadId) + ''>''; fMsg.Free; // We are just dumping the message in this test //Synchronize( Self.DoSync); //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); end; except on E:Exception do begin end; end; end; except FException:= Exception(ExceptObject); try if not (FException is EAbort) then begin {Synchronize(} DoHandleException; //); end; finally FException:= nil; end; end; end; end; Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string); begin Inherited Create; threadID:= ID; threadMsg:= msg; end; var fSync : TCriticalSection; fThreadQueue : TThreadedQueue<TObject>; fReaderArr : array[1..4] of TThreadReader; i : integer; begin try IsMultiThread:= TRUE; fSync:= TCriticalSection.Create; fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100); try {- Calling without fSync throws exceptions when two or more threads calls PopItem at the same time } WriteLn(''Creating worker threads ...''); for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil); {- Calling with fSync works ! } //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync); WriteLn(''Init done. Pushing items ...''); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'''')); ReadLn; finally for i:= 1 to 4 do fReaderArr[i].Free; fThreadQueue.Free; fSync.Free; end; except on E: Exception do begin Writeln(E.ClassName, '': '', E.Message); ReadLn; end; end; end.

Actualización : el error en TMonitor que causó la falla de TThreadedQueue está solucionado en Delphi XE2.

Actualización 2 : la prueba anterior hizo hincapié en la cola en el estado vacío. Darian Miller descubrió que al hacer hincapié en la cola en estado completo, aún podría reproducir el error en XE2. El error una vez más está en el TMonitor. Consulte su respuesta a continuación para obtener más información. Y también un enlace al QC101114.

Actualización 3 : con la actualización 4 de Delphi-XE2, se anunció una solución para TMonitor que solucionaría los problemas en TThreadedQueue . Mis pruebas hasta el momento ya no pueden reproducir ningún error en TThreadedQueue . Probado subprocesos de productor único / consumidor múltiple cuando la cola está vacía y llena. También probado múltiples productores / consumidores múltiples. Varié los hilos del lector y los hilos del escritor del 1 al 100 sin ningún problema. Pero conociendo la historia, me atrevo a otros a romper TMonitor .


Busqué la clase TThreadedQueue pero no parece tenerla en mi D2009. No voy a matarme exactamente por esto: el soporte de subprocesos de Delphi siempre ha sido erróneo ... errm ... ''no óptimo'' y sospecho que TThreadedQueue no es diferente :)

¿Por qué usar genéricos para objetos de PC (Productor / Consumidor)? Un simple descendiente de TObjectQueue funcionará bien, ha estado usando esto durante décadas, funciona bien con múltiples productores / consumidores:

unit MinimalSemaphorePCqueue; { Absolutely minimal P-C queue based on TobjectQueue and a semaphore. The semaphore count reflects the queue count ''push'' will always succeed unless memory runs out, then you''re stuft anyway. ''pop'' has a timeout parameter as well as the address of where any received object is to be put. ''pop'' returns immediately with ''true'' if there is an object on the queue available for it. ''pop'' blocks the caller if the queue is empty and the timeout is not 0. ''pop'' returns false if the timeout is exceeded before an object is available from the queue. ''pop'' returns true if an object is available from the queue before the timeout is exceeded. If multiple threads have called ''pop'' and are blocked because the queue is empty, a single ''push'' will make only one of the waiting threads ready. Methods to push/pop from the queue A ''semaHandle'' property that can be used in a ''waitForMultipleObjects'' call. When the handle is signaled, the ''peek'' method will retrieve the queued object. } interface uses Windows, Messages, SysUtils, Classes,syncObjs,contnrs; type pObject=^Tobject; TsemaphoreMailbox=class(TobjectQueue) private countSema:Thandle; protected access:TcriticalSection; public property semaHandle:Thandle read countSema; constructor create; virtual; procedure push(aObject:Tobject); virtual; function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; function peek(pResObject:pObject):boolean; virtual; destructor destroy; override; end; implementation { TsemaphoreMailbox } constructor TsemaphoreMailbox.create; begin {$IFDEF D2009} inherited Create; {$ELSE} inherited create; {$ENDIF} access:=TcriticalSection.create; countSema:=createSemaphore(nil,0,maxInt,nil); end; destructor TsemaphoreMailbox.destroy; begin access.free; closeHandle(countSema); inherited; end; function TsemaphoreMailbox.pop(pResObject: pObject; timeout: DWORD): boolean; // dequeues an object, if one is available on the queue. If the queue is empty, // the caller is blocked until either an object is pushed on or the timeout // period expires begin // wait for a unit from the semaphore result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); if result then // if a unit was supplied before the timeout, begin access.acquire; try pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end; procedure TsemaphoreMailbox.push(aObject: Tobject); // pushes an object onto the queue. If threads are waiting in a ''pop'' call, // one of them is made ready. begin access.acquire; try inherited push(aObject); // shove the object onto the queue finally access.release; end; releaseSemaphore(countSema,1,nil); // release one unit to semaphore end; function TsemaphoreMailbox.peek(pResObject: pObject): boolean; begin access.acquire; try result:=(count>0); if result then pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end.


No creo que TThreadedQueue supuestamente sea compatible con múltiples consumidores. Es un FIFO, según el archivo de ayuda. Tengo la impresión de que hay un hilo que empuja y otro (¡solo uno!) Explotando.


Su ejemplo parece funcionar bien bajo XE2, pero si llenamos su cola, falla con AV en un PushItem. (Probado en XE2 Update1)

Para reproducir, simplemente aumente la creación de tareas de 100 a 1100 (la profundidad de la cola se estableció en 1024)

for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''''));

Esto muere para mí todo el tiempo en Windows 7. Inicialmente probé un empujón continuo para probarlo, y falló en el ciclo 30 ... luego en el ciclo 16 ... luego en el 65, por lo que a diferentes intervalos, pero falló sistemáticamente en algunos punto.

iLoop := 0; while iLoop < 1000 do begin Inc(iLoop); WriteLn(''Loop: '' + IntToStr(iLoop)); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'''')); end;