vuelve seguir que puedo pueden porque poner pasa los intentarlo hashtags guardaron funcionan cuantos con como cambios c# system.reactive

c# - pueden - porque no puedo seguir hashtag en instagram



Con Rx, ¿cómo ignoro el valor de todos excepto el último cuando mi método de suscripción se está ejecutando? (9)

Al usar las extensiones reactivas , quiero ignorar los mensajes que provienen de mi flujo de eventos que ocurren mientras mi método de Subscribe se está ejecutando. Es decir, a veces me lleva más tiempo procesar un mensaje que el tiempo transcurrido entre cada mensaje, por lo que quiero soltar los mensajes que no tengo tiempo de procesar.

Sin embargo, cuando mi método Subscribe se complete, si llegara algún mensaje, quiero procesar el último. Por lo tanto, siempre proceso el mensaje más reciente.

Entonces, si tengo un código que hace:

messages.OnNext(100); messages.OnNext(1); messages.OnNext(2);

y si asumimos que el ''100'' tarda mucho tiempo en procesarse. Luego quiero que se procese el ''2'' cuando se complete el ''100''. El ''1'' debe ignorarse porque fue reemplazado por el ''2'' mientras que el ''100'' aún estaba siendo procesado.

Aquí hay un ejemplo del resultado que quiero usar una tarea en segundo plano y Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100)); Task.Factory.StartNew(() => { foreach(var n in messages.Latest()) { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.WriteLine(n); } });

Sin embargo, Latest () es una llamada de bloqueo y prefiero no tener un hilo sentado esperando el siguiente valor como este (a veces habrá espacios muy largos entre los mensajes).

También puedo obtener el resultado que quiero usando un BroadcastBlock de TPL Dataflow, así:

var buffer = new BroadcastBlock<long>(n => n); Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n)); buffer.AsObservable() .Subscribe(n => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.WriteLine(n); });

pero parece que debería ser posible directamente en Rx. ¿Cuál es la mejor manera de hacerlo?


Acabo de terminar (y ya completamente revisado) mi propia solución al problema, que planeo usar en producción.

A menos que el programador utilice el subproceso actual, las llamadas a OnNext , OnCompleted , OnError desde la fuente deben regresar inmediatamente; si el observador está ocupado con notificaciones anteriores, ingresa en una cola con un tamaño máximo que se puede especificar, desde donde se le notificará siempre que se haya procesado la notificación anterior. Si la cola se llena, los elementos menos recientes se descartan. Por lo tanto, un tamaño máximo de cola de 0 ignora todos los elementos que llegan mientras el observador está ocupado; un tamaño de 1 siempre permitirá observar el último artículo; un tamaño hasta int.MaxValue mantiene ocupado al consumidor hasta que alcance al productor.

Si el programador admite ejecución prolongada (es decir, le proporciona un hilo propio), programo un ciclo para notificar al observador; de lo contrario, uso la programación recursiva.

Aquí está el código. Cualquier comentario es apreciado.

partial class MoreObservables { /// <summary> /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process. /// </summary> /// <param name="source">The source sequence.</param> /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param> /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param> /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception> /// <remarks> /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready. /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any. /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>. /// </remarks> public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null) { if (source == null) throw new ArgumentNullException(nameof(source)); if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize)); if (scheduler == null) scheduler = Scheduler.Default; return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer)); } private static class LatestImpl<TSource> { public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); var longrunningScheduler = scheduler.AsLongRunning(); if (longrunningScheduler != null) return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer); return new RecursiveSubscription(source, maxQueueSize, scheduler, observer); } #region Subscriptions /// <summary> /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop. /// </summary> private sealed class LoopSubscription : IDisposable { private enum State { Idle, // nothing to notify Head, // next notification is in _head Queue, // next notifications are in _queue, followed by _completion Disposed, // disposed } private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable(); private readonly IObserver<TSource> _observer; private State _state; private TSource _head; // item in front of the queue private IQueue _queue; // queued items private Notification<TSource> _completion; // completion notification public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer) { _observer = observer; _queue = Queue.Create(maxQueueSize); scheduler.ScheduleLongRunning(_ => Loop()); _subscription.Disposable = source.Subscribe( OnNext, error => OnCompletion(Notification.CreateOnError<TSource>(error)), () => OnCompletion(Notification.CreateOnCompleted<TSource>())); } private void OnNext(TSource value) { lock (_subscription) { switch (_state) { case State.Idle: _head = value; _state = State.Head; Monitor.Pulse(_subscription); break; case State.Head: case State.Queue: if (_completion != null) return; try { _queue.Enqueue(value); } catch (Exception error) // probably OutOfMemoryException { _completion = Notification.CreateOnError<TSource>(error); _subscription.Dispose(); } break; } } } private void OnCompletion(Notification<TSource> completion) { lock (_subscription) { switch (_state) { case State.Idle: _completion = completion; _state = State.Queue; Monitor.Pulse(_subscription); _subscription.Dispose(); break; case State.Head: case State.Queue: if (_completion != null) return; _completion = completion; _subscription.Dispose(); break; } } } public void Dispose() { lock (_subscription) { if (_state == State.Disposed) return; _head = default(TSource); _queue = null; _completion = null; _state = State.Disposed; Monitor.Pulse(_subscription); _subscription.Dispose(); } } private void Loop() { try { while (true) // overall loop for all notifications { // next notification to emit Notification<TSource> completion; TSource next; // iff completion == null lock (_subscription) { while (true) { while (_state == State.Idle) Monitor.Wait(_subscription); if (_state == State.Head) { completion = null; next = _head; _head = default(TSource); _state = State.Queue; break; } if (_state == State.Queue) { if (!_queue.IsEmpty) { completion = null; next = _queue.Dequeue(); // assumption: this never throws break; } if (_completion != null) { completion = _completion; next = default(TSource); break; } _state = State.Idle; continue; } Debug.Assert(_state == State.Disposed); return; } } if (completion != null) { completion.Accept(_observer); return; } _observer.OnNext(next); } } finally { Dispose(); } } } /// <summary> /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively. /// </summary> private sealed class RecursiveSubscription : IDisposable { private enum State { Idle, // nothing to notify Scheduled, // emitter scheduled or executing Disposed, // disposed } private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable(); private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action private readonly IScheduler _scheduler; private readonly IObserver<TSource> _observer; private State _state; private IQueue _queue; // queued items private Notification<TSource> _completion; // completion notification public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer) { _scheduler = scheduler; _observer = observer; _queue = Queue.Create(maxQueueSize); _subscription.Disposable = source.Subscribe( OnNext, error => OnCompletion(Notification.CreateOnError<TSource>(error)), () => OnCompletion(Notification.CreateOnCompleted<TSource>())); } private void OnNext(TSource value) { lock (_subscription) { switch (_state) { case State.Idle: _emitter.Disposable = _scheduler.Schedule(value, EmitNext); _state = State.Scheduled; break; case State.Scheduled: if (_completion != null) return; try { _queue.Enqueue(value); } catch (Exception error) // probably OutOfMemoryException { _completion = Notification.CreateOnError<TSource>(error); _subscription.Dispose(); } break; } } } private void OnCompletion(Notification<TSource> completion) { lock (_subscription) { switch (_state) { case State.Idle: _completion = completion; _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion)); _state = State.Scheduled; _subscription.Dispose(); break; case State.Scheduled: if (_completion != null) return; _completion = completion; _subscription.Dispose(); break; } } } public void Dispose() { lock (_subscription) { if (_state == State.Disposed) return; _emitter.Dispose(); _queue = null; _completion = null; _state = State.Disposed; _subscription.Dispose(); } } private void EmitNext(TSource value, Action<TSource> self) { try { _observer.OnNext(value); } catch { Dispose(); return; } lock (_subscription) { if (_state == State.Disposed) return; Debug.Assert(_state == State.Scheduled); if (!_queue.IsEmpty) self(_queue.Dequeue()); else if (_completion != null) _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion)); else _state = State.Idle; } } private void EmitCompletion(Notification<TSource> completion) { try { completion.Accept(_observer); } finally { Dispose(); } } } #endregion #region IQueue /// <summary> /// FIFO queue that discards least recent items if size limit is reached. /// </summary> private interface IQueue { bool IsEmpty { get; } void Enqueue(TSource item); TSource Dequeue(); } /// <summary> /// <see cref="IQueue"/> implementations. /// </summary> private static class Queue { public static IQueue Create(int maxSize) { switch (maxSize) { case 0: return Zero.Instance; case 1: return new One(); default: return new Many(maxSize); } } private sealed class Zero : IQueue { // ReSharper disable once StaticMemberInGenericType public static Zero Instance { get; } = new Zero(); private Zero() { } public bool IsEmpty => true; public void Enqueue(TSource item) { } public TSource Dequeue() { throw new InvalidOperationException(); } } private sealed class One : IQueue { private TSource _item; public bool IsEmpty { get; private set; } = true; public void Enqueue(TSource item) { _item = item; IsEmpty = false; } public TSource Dequeue() { if (IsEmpty) throw new InvalidOperationException(); var item = _item; _item = default(TSource); IsEmpty = true; return item; } } private sealed class Many : IQueue { private readonly int _maxSize, _initialSize; private int _deq, _enq; // indices of deque and enqueu positions private TSource[] _buffer; public Many(int maxSize) { if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize)); _maxSize = maxSize; if (maxSize == int.MaxValue) _initialSize = 4; else { // choose an initial size that won''t get us too close to maxSize when doubling _initialSize = maxSize; while (_initialSize >= 7) _initialSize = (_initialSize + 1) / 2; } } public bool IsEmpty { get; private set; } = true; public void Enqueue(TSource item) { if (IsEmpty) { if (_buffer == null) _buffer = new TSource[_initialSize]; _buffer[0] = item; _deq = 0; _enq = 1; IsEmpty = false; return; } if (_deq == _enq) // full { if (_buffer.Length == _maxSize) // overwrite least recent { _buffer[_enq] = item; if (++_enq == _buffer.Length) _enq = 0; _deq = _enq; return; } // increse buffer size var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length; var newBuffer = new TSource[newSize]; var count = _buffer.Length - _deq; Array.Copy(_buffer, _deq, newBuffer, 0, count); Array.Copy(_buffer, 0, newBuffer, count, _deq); _deq = 0; _enq = _buffer.Length; _buffer = newBuffer; } _buffer[_enq] = item; if (++_enq == _buffer.Length) _enq = 0; } public TSource Dequeue() { if (IsEmpty) throw new InvalidOperationException(); var result = ReadAndClear(ref _buffer[_deq]); if (++_deq == _buffer.Length) _deq = 0; if (_deq == _enq) { IsEmpty = true; if (_buffer.Length > _initialSize) _buffer = null; } return result; } private static TSource ReadAndClear(ref TSource item) { var result = item; item = default(TSource); return result; } } } #endregion } }


Aquí hay un intento de usar "solo" Rx. El temporizador y el suscriptor se mantienen independientes al observar en el grupo de temas y he utilizado un tema para proporcionar comentarios sobre cómo completar la tarea.

No creo que esta sea una solución simple, pero espero que pueda darte ideas para mejorar.

messages. Buffer(() => feedback). Select(l => l.LastOrDefault()). ObserveOn(Scheduler.ThreadPool). Subscribe(n => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.WriteLine(n); feedback.OnNext(Unit.Default); }); feedback.OnNext(Unit.Default);

Hay un pequeño problema: el búfer se cierra primero cuando está vacío, por lo que genera el valor predeterminado. Probablemente puedas resolverlo haciendo los comentarios después del primer mensaje.

Aquí está como una función de extensión:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action) { var feedback = new Subject<Unit>(); var sub = source. Buffer(() => feedback). ObserveOn(Scheduler.ThreadPool). Subscribe(l => { action(l.LastOrDefault()); feedback.OnNext(Unit.Default); }); feedback.OnNext(Unit.Default); return sub; }

Y el uso:

messages.SubscribeWithoutOverlap(n => { Thread.Sleep(1000); Console.WriteLine(n); });


Aquí hay un método que es similar al de Dave pero en su lugar usa Sample (que es más apropiado que un buffer). He incluido un método de extensión similar al que agregué a la respuesta de Dave.

La extensión:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action) { var sampler = new Subject<Unit>(); var sub = source. Sample(sampler). ObserveOn(Scheduler.ThreadPool). Subscribe(l => { action(l); sampler.OnNext(Unit.Default); }); // start sampling when we have a first value source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default)); return sub; }

Tenga en cuenta que es más simple, y no hay un búfer ''vacío'' que se dispara. El primer elemento que se envía a la acción en realidad proviene de la transmisión en sí.

El uso es sencillo:

messages.SubscribeWithoutOverlap(n => { Console.WriteLine("start: " + n); Thread.Sleep(500); Console.WriteLine("end: " + n); }); messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

Y resultados:

source: 0 start: 0 source: 1 source: 2 source: 3 source: 4 source: 5 end: 0 start: 5 source: 6 source: 7 source: 8 source: 9 source: 10 end: 5 start: 10 source: 11 source: 12 source: 13 source: 14 source: 15 end: 10


Aquí hay una implementación basada en Task , con semántica de cancelación, que no usa un tema. Calling dispose permite que la acción suscrita cancele el procesamiento, si así lo desea.

public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action) { var cancellation = new CancellationDisposable(); var token = cancellation.Token; Task task = null; return new CompositeDisposable( cancellation, observable.Subscribe(value => { if (task == null || task.IsCompleted) task = Task.Factory.StartNew(() => action(value, token), token); }) ); }

Aquí hay una prueba simple:

Observable.Interval(TimeSpan.FromMilliseconds(150)) .SampleSubscribe((v, ct) => { //cbeck for cancellation, do work for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++) Thread.Sleep(100); Console.WriteLine(v); });

La salida:

0 7 14 21 28 35


Con Rx 2.0 RC puede usar Chunkify para obtener un IEnumerable de listas, cada una de las cuales contiene lo que se observó desde el último MoveNext.

Luego puede usar ToObservable para convertir eso a un IObservable y solo prestar atención a la última entrada en cada lista que no esté vacía.

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100)); messages.Chunkify() .ToObservable(Scheduler.TaskPool) .Where(list => list.Any()) .Select(list => list.Last()) .Subscribe(n => { Thread.Sleep(TimeSpan.FromMilliseconds(250)); Console.WriteLine(n); });


Gracias a Lee Campbell (de Intro To Rx fame), ahora tengo una solución de trabajo con este método de extensión:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler) { return Observable.Create<T>(observer => { Notification<T> outsideNotification = null; var gate = new object(); bool active = false; var cancelable = new MultipleAssignmentDisposable(); var disposable = source.Materialize().Subscribe(thisNotification => { bool alreadyActive; lock (gate) { alreadyActive = active; active = true; outsideNotification = thisNotification; } if (!alreadyActive) { cancelable.Disposable = scheduler.Schedule(self => { Notification<T> localNotification = null; lock (gate) { localNotification = outsideNotification; outsideNotification = null; } localNotification.Accept(observer); bool hasPendingNotification = false; lock (gate) { hasPendingNotification = active = (outsideNotification != null); } if (hasPendingNotification) { self(); } }); } }); return new CompositeDisposable(disposable, cancelable); }); }


He escrito una publicación en el blog sobre esto con una solución que usa CAS en lugar de bloqueos y evita la recursión. El código está debajo, pero puede encontrar una explicación completa aquí: http://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>( this IObservable<TSource> source, IScheduler scheduler) { return Observable.Create<TSource>(observer => { Notification<TSource> pendingNotification = null; var cancelable = new MultipleAssignmentDisposable(); var sourceSubscription = source.Materialize() .Subscribe(notification => { var previousNotification = Interlocked.Exchange( ref pendingNotification, notification); if (previousNotification != null) return; cancelable.Disposable = scheduler.Schedule(() => { var notificationToSend = Interlocked.Exchange( ref pendingNotification, null); notificationToSend.Accept(observer); }); }); return new CompositeDisposable(sourceSubscription, cancelable); }); }


Otra solución más

Esto no es bonito, porque mezcla Task y Observable , por lo que no es realmente comprobable con ReactiveTest (aunque, para ser sincero, tampoco estoy seguro de cómo implementar un suscriptor "lento" con ReactiveTest ).

public static IObservable<T> ShedLoad<T>(this IObservable<T> source) { return Observable.Create<T>(observer => { Task task = Task.FromResult(0); return source.Subscribe(t => { if(task.IsCompleted) task = Task.Run(() => observer.OnNext(t)); else Debug.WriteLine("Skip, task not finished"); }, observer.OnError, observer.OnCompleted); }); }

Supongo que podría haber una condición de carrera allí, pero en mi opinión, si estamos en la etapa en la que estamos abandonando las cosas porque está yendo demasiado rápido, no me importa abandonar demasiadas o muy pocas. Ah, y cada OnNext se llama (potencialmente) en un hilo diferente (supongo que podría poner una Synchronize en la parte posterior de la Create ).

Admito que no pude conseguir que la extensión share funcionara correctamente (la FromEventPattern(MouseMove) a FromEventPattern(MouseMove) y luego me suscribí con una Suscripción deliberadamente lenta, y extrañamente dejaba pasar ráfagas de eventos, en lugar de uno a la vez)


Un ejemplo usando Observable.Switch. También maneja el caso cuando completa la tarea pero no hay nada en la cola.

using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive { public static class RXX { public static IDisposable SubscribeWithoutOverlap<T> ( this IObservable<T> source , Action<T> action , IScheduler scheduler = null) { var sampler = new Subject<Unit>(); scheduler = scheduler ?? Scheduler.Default; var p = source.Publish(); var connection = p.Connect(); var subscription = sampler.Select(x=>p.Take(1)) .Switch() .ObserveOn(scheduler) .Subscribe(l => { action(l); sampler.OnNext(Unit.Default); }); sampler.OnNext(Unit.Default); return new CompositeDisposable(connection, subscription); } } }