c# system.reactive

c# - Implementando IObservable<T> desde cero



system.reactive (5)

Las Extensiones reactivas vienen con muchos métodos auxiliares para convertir eventos existentes y operaciones asincrónicas en observables, pero ¿cómo implementaría un IObservable <T> desde cero?

IEnumerable tiene la encantadora palabra clave yield para que sea muy simple de implementar.

¿Cuál es la forma correcta de implementar IObservable <T>?

¿Debo preocuparme por la seguridad de los hilos?

Sé que hay soporte para recibir una devolución de llamada en un contexto de sincronización específico, pero, ¿es esto algo de lo que yo, como autor IObservable <T> debo preocuparme o esto de alguna manera está incorporado?

actualizar:

Aquí está mi versión C # de la solución F # de Brian

using System; using System.Linq; using Microsoft.FSharp.Collections; namespace Jesperll { class Observable<T> : IObservable<T>, IDisposable where T : EventArgs { private FSharpMap<int, IObserver<T>> subscribers = FSharpMap<int, IObserver<T>>.Empty; private readonly object thisLock = new object(); private int key; private bool isDisposed; public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { if (disposing && !isDisposed) { OnCompleted(); isDisposed = true; } } protected void OnNext(T value) { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnNext(value); } } protected void OnError(Exception exception) { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } if (exception == null) { throw new ArgumentNullException("exception"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnError(exception); } } protected void OnCompleted() { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnCompleted(); } } public IDisposable Subscribe(IObserver<T> observer) { if (observer == null) { throw new ArgumentNullException("observer"); } lock (thisLock) { int k = key++; subscribers = subscribers.Add(k, observer); return new AnonymousDisposable(() => { lock (thisLock) { subscribers = subscribers.Remove(k); } }); } } } class AnonymousDisposable : IDisposable { Action dispose; public AnonymousDisposable(Action dispose) { this.dispose = dispose; } public void Dispose() { dispose(); } } }

editar: no arroje ObjectDisposedException si Dispose se llama dos veces


  1. Rompe el reflector abierto y echa un vistazo.

  2. Vea algunos videos de C9: this muestra cómo puede ''derivar'' el Seleccionar ''combinador''

  3. El secreto es crear clases de AnonymousObservable, AnonymousObserver y AnonymousDisposable, (que son solo una solución para el hecho de que no se pueden crear instancias de interfaces). No contienen implementación, ya que lo transfieres con acciones y funciones.

Por ejemplo:

public class AnonymousObservable<T> : IObservable<T> { private Func<IObserver<T>, IDisposable> _subscribe; public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) { _subscribe = subscribe; } public IDisposable Subscribe(IObserver<T> observer) { return _subscribe(observer); } }

Dejaré que te hagas el resto ... es un muy buen ejercicio de comprensión.

Aquí hay un pequeño hilo interesante que crece con preguntas relacionadas.


Honestamente, no estoy seguro de qué tan "correcto" es todo esto, pero si se siente bastante bien en base a mi experiencia hasta el momento. Es código F #, pero con suerte obtienes una idea del sabor. Le permite ''actualizar'' un objeto de origen, al que puede llamar Siguiente / Completado / Error, y administra las suscripciones e intenta Assert cuando el origen o los clientes hacen cosas malas.

type ObservableSource<''T>() = // '' let protect f = let mutable ok = false try f() ok <- true finally Debug.Assert(ok, "IObserver methods must not throw!") // TODO crash? let mutable key = 0 // Why a Map and not a Dictionary? Someone''s OnNext() may unsubscribe, so we need threadsafe ''snapshots'' of subscribers to Seq.iter over let mutable subscriptions = Map.empty : Map<int,IObserver<''T>> // '' let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x))) let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted())) let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e))) let thisLock = new obj() let obs = { new IObservable<''T> with // '' member this.Subscribe(o) = let k = lock thisLock (fun () -> let k = key key <- key + 1 subscriptions <- subscriptions.Add(k, o) k) { new IDisposable with member this.Dispose() = lock thisLock (fun () -> subscriptions <- subscriptions.Remove(k)) } } let mutable finished = false // The methods below are not thread-safe; the source ought not call these methods concurrently member this.Next(x) = Debug.Assert(not finished, "IObserver is already finished") next x member this.Completed() = Debug.Assert(not finished, "IObserver is already finished") finished <- true completed() member this.Error(e) = Debug.Assert(not finished, "IObserver is already finished") finished <- true error e // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads member this.Value = obs

Estaré interesado en cualquier idea sobre lo que está bien o mal aquí; Todavía no he tenido la oportunidad de ver todas las novedades de Devlabs de Rx ...

Mis propias experiencias sugieren que:

  • Aquellos que se suscriben a observables nunca deben arrojar de las suscripciones. No hay nada razonable que un observable pueda hacer cuando lanza un suscriptor. (Esto es similar a los eventos). Lo más probable es que la excepción simplemente suba a un controlador catch-all de nivel superior o bloquee la aplicación.
  • Las fuentes probablemente deberían ser "lógicamente de un único hilo". Creo que puede ser más difícil escribir clientes que puedan reaccionar a llamadas concurrentes OnNext; incluso si cada llamada individual proviene de un hilo diferente, es útil evitar llamadas concurrentes.
  • Definitivamente es útil tener una clase base / ayudante que imponga algunos ''contratos''.

Tengo mucha curiosidad si las personas pueden mostrar consejos más concretos en este sentido.


La documentación oficial desaprueba a los usuarios que implementan IObservable. En cambio, se espera que los usuarios usen el método de fábrica Observable.Create

Cuando sea posible, implemente nuevos operadores al componer operadores existentes. De lo contrario, implemente operadores personalizados utilizando Observable.Create

Ocurre que Observable.Create es un contenedor trivial alrededor de la clase interna de Reactive AnonymousObservable :

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) { if (subscribe == null) { throw new ArgumentNullException("subscribe"); } return new AnonymousObservable<TSource>(subscribe); }

No sé por qué no hicieron pública su implementación, pero bueno, lo que sea.


Sí, la palabra clave yield es encantadora; tal vez habrá algo similar para IObservable (OfT)? [Editar: En la charla PDC ''09 de Eric Meijer dice "sí, mira este espacio" con un rendimiento declarativo para generar observables.]

Para algo cercano (en lugar de enrollar el suyo), consulte la parte inferior de la wiki " (todavía no) 101 muestras de Rx ", donde el equipo sugiere el uso de la clase sujeto (T) como un "backend" para implementar un IObservable ( A menudo). Aquí está su ejemplo:

public class Order { private DateTime? _paidDate; private readonly Subject<Order> _paidSubj = new Subject<Order>(); public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } } public void MarkPaid(DateTime paidDate) { _paidDate = paidDate; _paidSubj.OnNext(this); // Raise PAID event } } private static void Main() { var order = new Order(); order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe order.MarkPaid(DateTime.Now); }


solo un comentario con respecto a esta implementación:

después de la introducción de colecciones simultáneas en .net fw 4, probablemente sea mejor usar ConcurrentDictioary en lugar de un diccionario simple.

ahorra bloqueos de manejo en la colección.

adi.