asynchronous - ¿Async.StartChild tiene una pérdida de memoria?
f# (1)
Creo que está en lo cierto: parece haber una pérdida de memoria en la implementación de StartChild
.
Hice un poco de perfiles (siguiendo un fantástico tutorial de Dave Thomas ) y el lanzamiento de código abierto de F # y creo que incluso sé cómo solucionarlo. Si observa la implementación de StartChild
, registra un controlador con el token de cancelación actual del flujo de trabajo:
let _reg = ct.Register(
(fun _ ->
match !ctsRef with
| null -> ()
| otherwise -> otherwise.Cancel()), null)
Los objetos que permanecen vivos en el montón son instancias de esta función registrada. Pueden _reg.Dispose()
registro llamando a _reg.Dispose()
, pero eso nunca sucede en el código fuente de F #. Intenté agregar _reg.Dispose()
a las funciones a las que se llama cuando se completa el async:
(fun res -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Ok res, reuseThread=true))
(fun err -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Error err,reuseThread=true))
(fun err -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Canceled err,reuseThread=true))
... y basado en mis experimentos, esto soluciona el problema. Por lo tanto, si desea una solución alternativa, probablemente pueda copiar todo el código requerido de control.fs
y agregarlo como una solución.
Enviaré un informe de error al equipo de F # con un enlace a su pregunta. Si encuentra algo más, puede contactarlos enviando informes de errores a fsbugs
en microsoft
dot com
.
Cuando ejecuto la siguiente prueba (construida con F # 2.0) obtengo la excepción OutOfMemoryException. Se tarda unos 5 minutos en llegar a la excepción en mi sistema (i7-920 6gb ram si se estaba ejecutando como proceso x86), pero en cualquier caso podemos ver cómo está creciendo la memoria en el administrador de tareas.
module start_child_test
open System
open System.Diagnostics
open System.Threading
open System.Threading.Tasks
let cnt = ref 0
let sw = Stopwatch.StartNew()
Async.RunSynchronously(async{
while true do
let! x = Async.StartChild(async{
if (Interlocked.Increment(cnt) % 100000) = 0 then
if sw.ElapsedMilliseconds > 0L then
printfn "ops per sec = %d" (100000L*1000L / sw.ElapsedMilliseconds)
else
printfn "ops per sec = INF"
sw.Restart()
GC.Collect()
})
do! x
})
printfn "done...."
No veo nada malo con este código, y no veo ninguna razón para el crecimiento de la memoria. Hice una implementación alternativa para asegurarme de que mis argumentos son válidos:
module start_child_fix
open System
open System.Collections
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
type IAsyncCallbacks<''T> = interface
abstract member OnSuccess: result:''T -> unit
abstract member OnError: error:Exception -> unit
abstract member OnCancel: error:OperationCanceledException -> unit
end
type internal AsyncResult<''T> =
| Succeeded of ''T
| Failed of Exception
| Canceled of OperationCanceledException
type internal AsyncGate<''T> =
| Completed of AsyncResult<''T>
| Subscribed of IAsyncCallbacks<''T>
| Started
| Notified
type Async with
static member StartChildEx (comp:Async<''TRes>) = async{
let! ct = Async.CancellationToken
let gate = ref AsyncGate.Started
let CompleteWith(result:AsyncResult<''T>, callbacks:IAsyncCallbacks<''T>) =
if Interlocked.Exchange(gate, Notified) <> Notified then
match result with
| Succeeded v -> callbacks.OnSuccess(v)
| Failed e -> callbacks.OnError(e)
| Canceled e -> callbacks.OnCancel(e)
let ProcessResults (result:AsyncResult<''TRes>) =
let t = Interlocked.CompareExchange<AsyncGate<''TRes>>(gate, AsyncGate.Completed(result), AsyncGate.Started)
match t with
| Subscribed callbacks ->
CompleteWith(result, callbacks)
| _ -> ()
let Subscribe (success, error, cancel) =
let callbacks = {
new IAsyncCallbacks<''TRes> with
member this.OnSuccess v = success v
member this.OnError e = error e
member this.OnCancel e = cancel e
}
let t = Interlocked.CompareExchange<AsyncGate<''TRes>>(gate, AsyncGate.Subscribed(callbacks), AsyncGate.Started)
match t with
| AsyncGate.Completed result ->
CompleteWith(result, callbacks)
| _ -> ()
Async.StartWithContinuations(
computation = comp,
continuation = (fun v -> ProcessResults(AsyncResult.Succeeded(v))),
exceptionContinuation = (fun e -> ProcessResults(AsyncResult.Failed(e))),
cancellationContinuation = (fun e -> ProcessResults(AsyncResult.Canceled(e))),
cancellationToken = ct
)
return Async.FromContinuations( fun (success, error, cancel) ->
Subscribe(success, error, cancel)
)
}
Para esta prueba funciona bien sin ningún consumo de memoria considerable. Desafortunadamente, no tengo mucha experiencia en F # y tengo dudas si echo de menos algunas cosas. En caso de que sea un error, ¿cómo puedo reportarlo al equipo de F #?