asynchronous f#

asynchronous - ¿Async.StartChild tiene una pérdida de memoria?



f# (1)

Creo que está en lo cierto: parece haber una pérdida de memoria en la implementación de StartChild .

Hice un poco de perfiles (siguiendo un fantástico tutorial de Dave Thomas ) y el lanzamiento de código abierto de F # y creo que incluso sé cómo solucionarlo. Si observa la implementación de StartChild , registra un controlador con el token de cancelación actual del flujo de trabajo:

let _reg = ct.Register( (fun _ -> match !ctsRef with | null -> () | otherwise -> otherwise.Cancel()), null)

Los objetos que permanecen vivos en el montón son instancias de esta función registrada. Pueden _reg.Dispose() registro llamando a _reg.Dispose() , pero eso nunca sucede en el código fuente de F #. Intenté agregar _reg.Dispose() a las funciones a las que se llama cuando se completa el async:

(fun res -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Ok res, reuseThread=true)) (fun err -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Error err,reuseThread=true)) (fun err -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Canceled err,reuseThread=true))

... y basado en mis experimentos, esto soluciona el problema. Por lo tanto, si desea una solución alternativa, probablemente pueda copiar todo el código requerido de control.fs y agregarlo como una solución.

Enviaré un informe de error al equipo de F # con un enlace a su pregunta. Si encuentra algo más, puede contactarlos enviando informes de errores a fsbugs en microsoft dot com .

Cuando ejecuto la siguiente prueba (construida con F # 2.0) obtengo la excepción OutOfMemoryException. Se tarda unos 5 minutos en llegar a la excepción en mi sistema (i7-920 6gb ram si se estaba ejecutando como proceso x86), pero en cualquier caso podemos ver cómo está creciendo la memoria en el administrador de tareas.

module start_child_test open System open System.Diagnostics open System.Threading open System.Threading.Tasks let cnt = ref 0 let sw = Stopwatch.StartNew() Async.RunSynchronously(async{ while true do let! x = Async.StartChild(async{ if (Interlocked.Increment(cnt) % 100000) = 0 then if sw.ElapsedMilliseconds > 0L then printfn "ops per sec = %d" (100000L*1000L / sw.ElapsedMilliseconds) else printfn "ops per sec = INF" sw.Restart() GC.Collect() }) do! x }) printfn "done...."

No veo nada malo con este código, y no veo ninguna razón para el crecimiento de la memoria. Hice una implementación alternativa para asegurarme de que mis argumentos son válidos:

module start_child_fix open System open System.Collections open System.Collections.Generic open System.Threading open System.Threading.Tasks type IAsyncCallbacks<''T> = interface abstract member OnSuccess: result:''T -> unit abstract member OnError: error:Exception -> unit abstract member OnCancel: error:OperationCanceledException -> unit end type internal AsyncResult<''T> = | Succeeded of ''T | Failed of Exception | Canceled of OperationCanceledException type internal AsyncGate<''T> = | Completed of AsyncResult<''T> | Subscribed of IAsyncCallbacks<''T> | Started | Notified type Async with static member StartChildEx (comp:Async<''TRes>) = async{ let! ct = Async.CancellationToken let gate = ref AsyncGate.Started let CompleteWith(result:AsyncResult<''T>, callbacks:IAsyncCallbacks<''T>) = if Interlocked.Exchange(gate, Notified) <> Notified then match result with | Succeeded v -> callbacks.OnSuccess(v) | Failed e -> callbacks.OnError(e) | Canceled e -> callbacks.OnCancel(e) let ProcessResults (result:AsyncResult<''TRes>) = let t = Interlocked.CompareExchange<AsyncGate<''TRes>>(gate, AsyncGate.Completed(result), AsyncGate.Started) match t with | Subscribed callbacks -> CompleteWith(result, callbacks) | _ -> () let Subscribe (success, error, cancel) = let callbacks = { new IAsyncCallbacks<''TRes> with member this.OnSuccess v = success v member this.OnError e = error e member this.OnCancel e = cancel e } let t = Interlocked.CompareExchange<AsyncGate<''TRes>>(gate, AsyncGate.Subscribed(callbacks), AsyncGate.Started) match t with | AsyncGate.Completed result -> CompleteWith(result, callbacks) | _ -> () Async.StartWithContinuations( computation = comp, continuation = (fun v -> ProcessResults(AsyncResult.Succeeded(v))), exceptionContinuation = (fun e -> ProcessResults(AsyncResult.Failed(e))), cancellationContinuation = (fun e -> ProcessResults(AsyncResult.Canceled(e))), cancellationToken = ct ) return Async.FromContinuations( fun (success, error, cancel) -> Subscribe(success, error, cancel) ) }

Para esta prueba funciona bien sin ningún consumo de memoria considerable. Desafortunadamente, no tengo mucha experiencia en F # y tengo dudas si echo de menos algunas cosas. En caso de que sea un error, ¿cómo puedo reportarlo al equipo de F #?