tiempo threads threading python3 programacion mismo metodos manejo hilos entre eliminar ejecutar datos con compartir argumentos multithreading macos delphi upload firemonkey

multithreading - threads - Sincronización de carga de archivos multiproceso



python3 multithreading (3)

Puede que este no sea el problema, pero TFileInfo es un registro.

Esto significa que cuando se pasa como un parámetro (no const / var), se copia. Esto puede provocar problemas con cosas como cadenas en el registro que no obtienen recuentos de referencia actualizados cuando se copia el registro.

Una cosa para intentar sería convertirlo en una clase y pasar una instancia como parámetro (es decir, un puntero a los datos en el montón).

Otra cosa a tener en cuenta es compartir Int64 (por ejemplo, sus valores de tamaño) en sistemas de 32 bits con rosca.

Actualizar / leer esto no se hace atómicamente y no tiene ninguna protección específica, por lo que es posible que una lectura del valor se diferencie de 32 bits superior e inferior debido al enhebrado. (por ejemplo, leer los 32 bits superiores, escribir los 32 bits superiores, escribir los 32 bits inferiores, leer los 32 bits inferiores, con las lecturas y escribir en diferentes hilos). Probablemente esto no esté causando los problemas que está viendo y, a menos que esté trabajando con transferencias de archivos de> 4 GB, es poco probable que le cause ningún problema.

Actualmente estoy trabajando en una aplicación cliente / servidor Delphi XE3 para transferir archivos (con los componentes de Indy FTP). La parte del cliente supervisa una carpeta, obtiene una lista de los archivos que contiene, los sube al servidor y elimina los originales. La carga se realiza mediante un hilo separado, que procesa los archivos uno por uno. Los archivos pueden tener un rango de 0 a algunos miles y sus tamaños también varían mucho.

Es una aplicación Firemonkey compilada tanto para OSX como para Windows, así que tuve que usar TThread en lugar de OmniThreadLibrary, que prefería. Mi cliente informa que la aplicación se congela al azar. No pude duplicarlo, pero como no tengo mucha experiencia con TThread, podría haber puesto la condición de punto muerto en alguna parte. Leí bastantes ejemplos, pero aún no estoy seguro sobre algunos de los detalles multithread.

La estructura de la aplicación es simple:
Un temporizador en el hilo principal comprueba la carpeta y obtiene información sobre cada archivo en un registro, que va a un TList genérico. Esta lista contiene información sobre los nombres de los archivos, el tamaño, el progreso, si el archivo está completamente cargado o si se debe volver a intentar. Todo lo que se muestra en una grilla con barras de progreso, etc. A esta lista solo se accede por el hilo principal. Después de eso, los elementos de la lista se envían al hilo llamando al método AddFile (código a continuación). El hilo almacena todos los archivos en una cola segura para subprocesos como esta http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
Cuando se carga el archivo, el subproceso de carga notifica al hilo principal con una llamada a Sincronizar.
El hilo principal llama periódicamente al método Uploader.GetProgress para verificar el progreso del archivo actual y mostrarlo. Esta función no es realmente segura para subprocesos, pero ¿podría provocar un punto muerto o solo devolver datos incorrectos?

¿Cuál sería una forma segura y eficiente de hacer la verificación de progreso?

Entonces, ¿este enfoque está bien o me he perdido algo? ¿Cómo harías esto?
Por ejemplo, pensé en hacer un nuevo hilo solo para leer el contenido de la carpeta. Esto significa que el TList que uso debe estar seguro para subprocesos, pero se debe acceder a él todo el tiempo para actualizar la información mostrada en la grilla de la GUI. ¿No sería toda la sincronización solo ralentizaría la GUI?

He publicado el código simplificado a continuación en caso de que alguien quiera verlo. Si no, me gustaría escuchar algunas opiniones sobre lo que debería usar en general. Los objetivos principales son trabajar tanto en OSX como en Windows; para poder mostrar información sobre todos los archivos y el progreso del actual; y ser receptivo independientemente del número y tamaño de los archivos.

Ese es el código del subproceso de carga. He eliminado algunos de ellos para facilitar la lectura:

type TFileStatus = (fsToBeQueued, fsUploaded, fsQueued); TFileInfo = record ID: Integer; Path: String; Size: Int64; UploadedSize: Int64; Status: TFileStatus; end; TUploader = class(TThread) private FTP: TIdFTP; fQueue: TThreadedQueue<TFileInfo>; fCurrentFile: TFileInfo; FUploading: Boolean; procedure ConnectFTP; function UploadFile(aFileInfo: TFileInfo): String; procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); procedure SignalComplete; procedure SignalError(aError: String); protected procedure Execute; override; public property Uploading: Boolean read FUploading; constructor Create; destructor Destroy; override; procedure Terminate; procedure AddFile(const aFileInfo: TFileInfo); function GetProgress: TFileInfo; end; procedure TUploader.AddFile(const aFileInfo: TFileInfo); begin fQueue.Enqueue(aFileInfo); end; procedure TUploader.ConnectFTP; begin ... FTP.Connect; end; constructor TUploader.Create; begin inherited Create(false); FreeOnTerminate := false; fQueue := TThreadedQueue<TFileInfo>.Create; // Create the TIdFTP and set ports and other params ... end; destructor TUploader.Destroy; begin fQueue.Close; fQueue.Free; FTP.Free; inherited; end; // Process the whole queue and inform the main thread of the progress procedure TUploader.Execute; var Temp: TFileInfo; begin try ConnectFTP; except on E: Exception do SignalError(E.Message); end; // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails while fQueue.Peek(fCurrentFile) = wrSignaled do try if UploadFile(fCurrentFile) = '''' then begin fQueue.Dequeue(Temp); // Delete the item from the queue if succesful SignalComplete; end; except on E: Exception do SignalError(E.Message); end; end; // Return the current file''s info to the main thread. Used to update the progress indicators function TUploader.GetProgress: TFileInfo; begin Result := fCurrentFile; end; // Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); begin fCurrentFile.UploadedSize := AWorkCount; end; procedure TUploader.SignalComplete; begin Synchronize( procedure begin frmClientMain.OnCompleteFile(fCurrentFile); end); end; procedure TUploader.SignalError(aError: String); begin try FTP.Disconnect; except end; if fQueue.Closed then Exit; Synchronize( procedure begin frmClientMain.OnUploadError(aError); end); end; // Clear the queue and terminate the thread procedure TUploader.Terminate; begin fQueue.Close; inherited; end; function TUploader.UploadFile(aFileInfo: TFileInfo): String; begin Result := ''Error''; try if not FTP.Connected then ConnectFTP; FUploading := true; FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path)); Result := ''''; finally FUploading := false; end; end;

Y partes del hilo principal que interactúan con el cargador:

...... // Main form fUniqueID: Integer; // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted) fUploader: TUploader; // The uploader thread fFiles: TList<TFileInfo>; fCurrentFileName: String; // Used to display the progress function IndexOfFile(aID: Integer): Integer; //Return the index of the record inside the fFiles given the file ID public procedure OnCompleteFile(aFileInfo: TFileInfo); procedure OnUploadError(aError: String); end; // This is called by the uploader with Synchronize procedure TfrmClientMain.OnUploadError(aError: String); begin // show and log the error end; // This is called by the uploader with Synchronize procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo); var I: Integer; begin I := IndexOfFile(aFileInfo.ID); if (I >= 0) and (I < fFiles.Count) then begin aFileInfo.Status := fsUploaded; aFileInfo.UploadedSize := aFileInfo.Size; FFiles.Items[I] := aFileInfo; Inc(FFilesUploaded); TFile.Delete(aFileInfo.Path); colProgressImg.UpdateCell(I); end; end; procedure TfrmClientMain.ProcessFolder; var NewFiles: TStringDynArray; I, J: Integer; FileInfo: TFileInfo; begin // Remove completed files from the list if it contains more than XX files while FFiles.Count > 1000 do if FFiles[0].Status = fsUploaded then begin Dec(FFilesUploaded); FFiles.Delete(0); end else Break; NewFiles := TDirectory.GetFiles(WatchFolder, ''*.*'',TSearchOption.soAllDirectories); for I := 0 to Length(NewFiles) - 1 do begin FileInfo.ID := FUniqueID; Inc(FUniqueID); FileInfo.Path := NewFiles[I]; FileInfo.Size := GetFileSizeByName(NewFiles[I]); FileInfo.UploadedSize := 0; FileInfo.Status := fsToBeQueued; FFiles.Add(FileInfo); if (I mod 100) = 0 then begin UpdateStatusLabel; grFiles.RowCount := FFiles.Count; Application.ProcessMessages; if fUploader = nil then break; end; end; // Send the new files and resend failed to the uploader thread for I := 0 to FFiles.Count - 1 do if (FFiles[I].Status = fsToBeQueued) then begin if fUploader = nil then Break; FileInfo := FFiles[I]; FileInfo.Status := fsQueued; FFiles[I] := FileInfo; SaveDebug(1, ''Add: '' + ExtractFileName(FFiles[I].Path)); FUploader.AddFile(FFiles[I]); end; end; procedure TfrmClientMain.tmrGUITimer(Sender: TObject); var FileInfo: TFileInfo; I: Integer; begin if (fUploader = nil) or not fUploader.Uploading then Exit; FileInfo := fUploader.GetProgress; I := IndexOfFile(FileInfo.ID); if (I >= 0) and (I < fFiles.Count) then begin fFiles.Items[I] := FileInfo; fCurrentFileName := ExtractFileName(FileInfo.Path); colProgressImg.UpdateCell(I); end; end; function TfrmClientMain.IndexOfFile(aID: Integer): Integer; var I: Integer; begin Result := -1; for I := 0 to FFiles.Count - 1 do if FFiles[I].ID = aID then Exit(I); end;


Los callejones sin salida son definitivamente difíciles de detectar, pero este puede ser el problema. En su código, no vi que haya agregado tiempo de espera a la cola, al vistazo o a la eliminación de la secuencia, lo que significa que tomará el valor predeterminado de Infinite.

El enqueue tiene esta línea, lo que significa que, como cualquier objeto de sincronización, se bloqueará hasta que Enter se complete (bloquee el monitor) o se produzca el tiempo de espera (ya que no tiene un tiempo de espera, esperará por siempre)

TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult; ... if not TMonitor.Enter(FQueue, Timeout)

También voy a suponer que implementó PEEK usted mismo basado en Dequeue, solo que en realidad no elimina el artículo.

Parece que implementa su propio tiempo de espera; sin embargo, todavía tiene lo siguiente:

function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult; ... if not TMonitor.Enter(FQueue, Timeout)

Donde el tiempo de espera es infinito, entonces, si está en el método Peek esperando que se señalice con un tiempo de espera infinito, entonces no puede Enqueue algo de un segundo hilo sin bloquear ese hilo esperando que el método peek se complete en un tiempo de espera infinito

Aquí hay un fragmento del comentario de TMonitor

Enter locks the monitor object with an optional timeout (in ms) value. Enter without a timeout will wait until the lock is obtained. If the procedure returns it can be assumed that the lock was acquired. Enter with a timeout will return a boolean status indicating whether or not the lock was obtained (True) or the attempt timed out prior to acquire the lock (False). Calling Enter with an INFINITE timeout is the same as calling Enter without a timeout.

Dado que la implementación usa Infinite por defecto, y no se proporciona un valor TMonitor.Spinlock, eso bloqueará el hilo hasta que pueda adquirir el objeto FQueue.

Mi sugerencia sería cambiar tu código de la siguiente manera:

// Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails while true do case fQueue.Peek(fCurrentFile,10) wrSignaled: try if UploadFile(fCurrentFile) = '''' then begin fQueue.Dequeue(Temp); // Delete the item from the queue if succesful SignalComplete; end; except on E: Exception do SignalError(E.Message); end; wrTimeout: sleep(10); wrIOCompletion, wrAbandoned, wrError: break; end; //case

De esta forma, Peek no mantendrá el bloqueo en FQueue indefinidamente, dejando una ventana para que Enqueue lo adquiera y agregue el archivo del hilo principal (UI).


Esto podría ser una posibilidad remota, pero aquí hay otra posibilidad [la primera respuesta puede ser más probable] (algo que acabo de encontrar, pero que había sabido antes): el uso de Sincronizar puede estar causando el punto muerto. Aquí hay un blog sobre por qué sucede esto: Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx

El punto pertinente del artículo:

El hilo A llama a Sincronizar (MethodA)

El hilo B llama a Synchronize (MethodB)

Luego, dentro del contexto del hilo principal:

El hilo principal llama a CheckSynchronize () mientras procesa mensajes

CheckSynchronize se implementa para procesar por lotes todas las llamadas en espera (*). Por lo tanto, recoge la cola de las llamadas en espera (que contienen MethodA y MethodB) y las recorre una a una.

MethodA se ejecuta en el contexto del hilo principal. Suponer MethodA llama a ThreadB.WaitFor

WaitFor llama a CheckSynchronize para procesar cualquier llamada en espera para Sincronizar

En teoría, esto debería procesar la Sincronización de ThreadB (Método B), permitiendo que se complete el Subproceso B. Sin embargo, MethodB ya es una posesión de la primera llamada CheckSynchronize, por lo que nunca se llama.

¡PUNTO MUERTO!

El artículo de Embarcadero QC describe el problema con más detalle.

Si bien no veo ninguna llamada a ProcessMessages en el código anterior, o para el caso, un WaitFor que se invocaría durante una Sincronización, podría ser un problema que en el punto en que se llama a una sincronización, otro hilo llame a la sincronización como bien, pero el hilo principal ya se ha sincronizado y está bloqueando.

Esto no hizo clic conmigo al principio, porque tiendo a evitar Sincronizar llamadas como la peste y usualmente diseñar actualizaciones de UI desde subprocesos usando otros métodos como pasar mensajes y listas de seguridad de subprocesos con notificación de mensajes en lugar de sincronizar llamadas.