c# system.reactive

c# - Observable.Generate with TimeSpan selector parece perder memoria



system.reactive (1)

Esto me parece un error, o al menos un comportamiento desordenado / indeseable en la implementación de programación "recursiva" de DefaultScheduler (no es realmente recursivo, estoy hablando de la sobrecarga que pasa en el programador a una acción programada para que pueda programar una continuación).

Los elementos desechables que está viendo construir se crean mediante la llamada al método DefaultScheduler.Schedule (línea 71 aquí: https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs ).

Hay un par de razones por las que otros intentos aquí para detectar esto fallaron. En primer lugar, los elementos desechables ESTÁN finalmente eliminados, pero solo cuando Generate OnCompletes u OnErrors , en cuyo punto System.Reactive.AnonymousSafeObserver<T> devuelto por Generate cuando se suscriba, se limpia.

En segundo lugar, si usa un TimeSpan corto (recuerde que la resolución mínima del temporizador .NET es de 15 ms), entonces Rx optimizará el uso del temporizador y llamará a QueueUserWorkItem sin usar un temporizador, por lo que estos desechables nunca se crearán.

Si profundiza en la implementación de Generate ( https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs ) puede ver que pasa el IDisposable devuelto por la llamada inicial a la Programación y se lo pasa al observador que lo cuelga hasta el error / finalización. Eso evita que toda la cadena resultante de llamadas recursivas sea cobrable, y significa que si necesita cancelar, o cuando se realiza la limpieza, solo se eliminarán todas las acciones programadas desechables.

Puede ver el mismo efecto en el código a continuación, que usa el DefaultScheduler directamente: la referencia a cancel en la última línea es suficiente para causar la fuga. Asegúrate de usar una compilación de lanzamiento, de lo contrario el compilador mantendrá la espera hasta que el método finalice independientemente.

// ensure you are using a release build of this code ManualResetEvent mre = new ManualResetEvent(); IDisposable cancel; int maxCount = 20; TimeSpan timeSpan = TimeSpan.FromSeconds(1); Func<IScheduler, int, IDisposable> recurse = null; recurse = (self, state) => { Console.WriteLine(state); if (state == maxCount) { mre.Set(); return Disposable.Empty; } return self.Schedule(state + 1, timeSpan, recurse); }; cancel = Scheduler.Default.Schedule(1, timeSpan, recurse); mre.WaitOne(); // uncomment the following line, and you''ll get the same leak // leave it commented, and cancel reference is GC''d early and there''s no leak // if(cancel == null) Console.WriteLine("Hang on to cancel");

Usé la API dotMemory de Jetbrains para extraer volcados de memoria y sacar conclusiones aquí. He eliminado el código anterior de esas llamadas a la API, pero aquí hay una idea completa si tiene ese producto, y podrá ver el impacto de Descomenta la línea final con bastante claridad: https://gist.github.com/james-world/f20377ea610fb8fc0ee811d27f7a837c Alternativamente, puedes usar la API de perfiles de MS, ¡que no he encontrado en el conjunto de funciones de mi cerebro en este momento!

Estoy investigando el uso de Observable.Generate para crear una secuencia de resultados muestreados a intervalos usando los ejemplos del sitio web msdn como punto de partida.

El siguiente código SIN un selector de TimeSpan no presenta una pérdida de memoria:

IObservable<string> obs = Observable.Generate(initialState: 1, condition: x => x < 1000, iterate: x => x + 1, resultSelector: x => x.ToString()); obs.Subscribe(x => Console.WriteLine(x));

Sin embargo, el siguiente código CON un selector de TimeSpan exhibe una pérdida de memoria:

TimeSpan timeSpan = TimeSpan.FromSeconds(1); IObservable<string> obs = Observable.Generate(initialState: 1, condition: x => x < 1000, iterate: x => x + 1, resultSelector: x => x.ToString(), timeSelector: x => timeSpan); obs.Subscribe(x => Console.WriteLine(x));

Por ejemplo, esta aplicación de juguete mostrará rápidamente la pérdida de memoria utilizando el Perfilador de memoria que se envía con la comunidad VS 2015:

using System; using System.Reactive.Linq; namespace Sample { public class Program { static void Main() { IObservable<string> obs = Observable.Generate(1, x => x < 1000*1000, x => x + 1, x => x.ToString(), x => TimeSpan.FromMilliseconds(500)); obs.Subscribe(x => { /*Do nothing but simply run the observable*/ }); Console.ReadLine(); } } }

La pérdida de memoria es una colección creciente de:

System.Reactive.Disposables StableCompositeDisposable.Binary System.Reactive.Disposables SingleAssignmentDisposable

¿Estoy usando esta API incorrectamente? ¿Debo esperar que la memoria crezca o es un error con Reactivo?