Utilisation de IObservable au lieu d’événements

J’ai récemment lu sur IObservable. Jusqu’à présent, j’ai examiné diverses questions relatives à SO et regardé une vidéo sur ce qu’elles peuvent faire. Je pense que tout le mécanisme “push” est shiny, mais j’essaie toujours de comprendre ce que tout fait exactement. De mes lectures, je suppose qu’en un sens un IObservable est quelque chose qui peut être «surveillé», et IObservers sont les «observateurs».

Alors maintenant, je vais essayer d’implémenter cela dans mon application. Il y a quelques points que j’aimerais bien comprendre avant de commencer. J’ai déjà constaté que IObservable était l’opposé de IEnumerable. Cependant, je ne peux pas vraiment voir d’endroits que je puisse intégrer à mon application dans mon cas particulier.

Actuellement, je me sers beaucoup des événements, à tel point que je peux voir que la «plomberie» commence à devenir ingérable. Je pense que IObservable peut m’aider ici.

Considérez le design suivant, qui constitue mon enveloppe autour de mes E / S dans mon application (pour info, je dois généralement traiter des chaînes):

J’ai une interface de base appelée IDataIO :

 public interface IDataIO { event OnDataReceived; event OnTimeout: event OnTransmit; } 

Actuellement, trois classes implémentent cette interface. Chacune de ces classes utilise des appels de méthode Async, introduisant un type de traitement multithread:

 public class SerialIO : IDataIO; public class UdpIO : IDataIO; public class TcpIO : IDataIO; 

Il existe une seule instance de chacune de ces classes intégrée à ma dernière classe, appelée IO (qui implémente également IDataIO – conformément à mon modèle de stratégie):

 public class IO : IDataIO { public SerialIO Serial; public UdpIO Udp; public TcpIO Tcp; } 

J’ai utilisé le modèle de stratégie pour encapsuler ces trois classes, de sorte que lors du passage entre les différentes instances IDataIO lors de l’exécution, il soit “invisible” pour l’utilisateur final. Comme vous pouvez l’imaginer, cela a conduit à un peu de «plomberie événementielle» en arrière-plan.

Alors, comment puis-je utiliser la notification “push” ici dans mon cas? Au lieu de m’abonner à des événements (DataReceived, etc.), j’aimerais simplement transmettre les données à toute personne intéressée. Je ne sais pas trop par où commencer. J’essaie encore de jouer avec les idées / classes génériques de Subject et ses différentes incarnations (ReplaySubject / AsynSubject / BehaviourSubject). Est-ce que quelqu’un pourrait m’éclairer sur celui-ci (peut-être en référence à ma conception)? Ou est-ce tout simplement pas un IObservable idéal pour IObservable ?

PS N’hésitez pas à corriger mes “malentendus” 🙂

Les observables sont parfaits pour représenter des stream de données. Ainsi, votre événement DataReceived serait modélisé selon le modèle observable, quelque chose comme IObservable ou IObservable . Vous bénéficiez également des avantages supplémentaires de OnError et OnComplete qui sont pratiques.

En termes de mise en œuvre, il est difficile de dire pour votre scénario exact mais nous utilisons souvent Subject comme source sous-jacente et appelons OnNext pour OnNext des données. Peut-être quelque chose comme

 // Using a subject is probably the easiest way to push data to an Observable // It wraps up both IObservable and IObserver so you almost never use IObserver directly private readonly Subject subject = new Subject(); private void OnPort_DataReceived(object sender, EventArgs e) { // This pushes the data to the IObserver, which is probably just a wrapper // around your subscribe delegate is you're using the Rx extensions this.subject.OnNext(port.Data); // pseudo code } 

Vous pouvez ensuite exposer le sujet via une propriété:

 public IObservable DataObservable { get { return this.subject; } // Or this.subject.AsObservable(); } 

Vous pouvez remplacer votre événement DataReceived sur IDataIO par un IObservable et demander à chaque classe de stratégie de gérer ses données de la manière dont elle a besoin et de les transmettre au Subject .

De l’autre côté, quiconque s’abonne à l’Observable est alors capable de le gérer comme un événement (en utilisant simplement une Action ) ou d’effectuer un travail vraiment utile sur le stream avec Select , Where , Buffer , etc.

 private IDataIO dataIo = new ... private void SubscribeToData() { dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes); } private void On16Bytes(IList bytes) { // do stuff } 

ReplaySubject / ConnectableObservable s sont parfaits lorsque vous savez que votre abonné arrivera en retard à la fête mais doit tout de même se tenir au courant de tous les événements. La source met en cache tout ce qui est poussé et rejoue tout pour chaque abonné. Vous seul pouvez dire si c’est le comportement dont vous avez réellement besoin (mais soyez prudent, car tout ce qui se cache mettra en cache l’utilisation de votre mémoire, bien évidemment).

Quand j’en ai appris sur Rx, j’ai trouvé que la http://leecampbell.blogspot.co.uk/ série de blogs sur Rx était très informative pour comprendre la théorie (les publications sont un peu dépassées et les API ont changé, alors méfiez-vous de cela. )

C’est certainement un cas idéal pour les observables. La classe d’ IO verra probablement le plus d’améliorations. Pour commencer, changeons l’interface pour utiliser des observables et voyons à quel point la classe de combinaison devient simple.

 public interface IDataIO { //you will have to fill in the types here. Either the event args //the events provide now or byte[] or something relevant would be good. IObservable DataReceived; IObservable Timeout; IObservable Transmit; } public class IO : IDataIO { public SerialIO Serial; public UdpIO Udp; public TcpIO Tcp; public IObservable DataReceived { get { return Observable.Merge(Serial.DataReceived, Udp.DataReceived, Tcp.DataReceived); } } //similarly for other two observables } 

NOTE LATÉRALE: Vous avez peut-être remarqué que j’ai modifié les noms des membres de l’interface. Dans .NET, les événements sont généralement nommés et les fonctions qui les déclenchent sont appelés On .

Pour les classes producsortingces, vous avez quelques options qui dépendent des sources réelles. Supposons que vous utilisiez la classe .NET SerialPort dans SerialIO et que DataReceived renvoie un IObservable . Étant donné que SerialPort a déjà un événement pour les données reçues, vous pouvez l’utiliser directement pour rendre l’observable dont vous avez besoin.

 public class SerialIO : IDataIO { private SerialPort _port; public IObservable DataRecived { get { return Observable.FromEventPattern( h => _port.DataReceived += h, h => _port.DataReceived -= h) .Where(ep => ep.EventArgs.EventType == SerialData.Chars) .Select(ep => { byte[] buffer = new byte[_port.BytesToRead]; _port.Read(buffer, 0, buffer.Length); return buffer; }); } } } 

Dans les cas où vous n’avez pas de source d’événement existante, vous devrez peut-être utiliser un sujet comme suggéré par RichK. Sa réponse couvre assez bien ce mode d’utilisation, je ne vais donc pas le reproduire ici.

Vous n’avez pas montré comment vous utilisiez cette interface, mais selon le cas d’utilisation, il serait peut-être plus judicieux que d’autres fonctions sur ces classes renvoient IObservable mêmes IObservable et suppriment complètement ces “événements”. Avec un modèle asynchrone basé sur des événements, vous devez avoir des événements distincts de la fonction que vous appelez pour déclencher le travail, mais avec des observables, vous pouvez les renvoyer depuis la fonction pour indiquer plus clairement ce à quoi vous vous abonnez. Cette approche permet également aux observables renvoyés de chaque appel d’envoyer des messages OnError et OnCompleted pour signaler la fin d’une opération. D’après votre utilisation d’une classe combinée, je ne m’attends pas à ce que cela soit utile dans ce cas particulier, mais c’est quelque chose à garder à l’esprit.

Utilisation de IObservable au lieu d’événements

si seulement intéressé par la propriété, le paquet nuget rxx a:

 IObservable obs=Observable2.FromPropertyChangedPattern(() => obj.Name) 

(avec beaucoup d’autres méthodes)


ou si l’événement empêche les modifications de propriété / cherche à éviter la mise en oeuvre de INotifyPropertyChanged

 class ObserveEvent_Simple { public static event EventHandler SimpleEvent; static void Main() { IObservable eventAsObservable = Observable.FromEventPattern( ev => SimpleEvent += ev, ev => SimpleEvent -= ev); } } 

semblable à u / Gideon Engelberth à partir de http://rxwiki.wikidot.com/101samples#toc6

couvert par https://rehansaeed.com/reactive-extensions-part2-wrapping-events/


Cet article codeproject est également consacré à la conversion d’événements en événements réactifs.

https://www.codeproject.com/Tips/1078183/Weak-events-in-NET-using-Reactive-Extensions-Rx

et traite également des souscriptions faibles