Relever des événements sur un fil séparé

Je développe un composant qui doit traiter le stream en direct et diffuser les données aux auditeurs de manière assez rapide (avec environ 100 nano précision de second niveau, voire moins que si je peux le faire). Actuellement, je soulève un événement de mon code auquel l’abonné peut s’abonner. Cependant, étant donné que les gestionnaires d’événements en C # s’exécutent sur le même thread qui déclenche l’événement, mon thread qui déclenche l’événement sera bloqué jusqu’à ce que tous les abonnés aient terminé le traitement de l’événement. Je n’ai pas le contrôle sur le code des abonnés, ils peuvent donc éventuellement effectuer des opérations fastidieuses dans le gestionnaire d’événements, ce qui peut bloquer le thread qui diffuse.

Que puis-je faire pour pouvoir diffuser les données à d’autres abonnés tout en pouvant les diffuser assez rapidement?

100 ns est une cible très difficile à atteindre. Je crois qu’il faudra une profonde compréhension de ce que vous faites et de la raison de ce type de performance

Toutefois, l’appel asynchrone des abonnés aux événements est assez facile à résoudre. Il a déjà répondu ici par qui d’autre, Jon Skeet.

foreach (MyDelegate action in multicast.GetInvocationList()) { action.BeginInvoke(...); } 

edit: Je devrais également mentionner que vous devez exécuter un système d’exploitation en temps réel pour donner des garanties de performances ssortingctes à vos utilisateurs.

Il semble que vous cherchiez des tâches. Voici une méthode d’extension que j’ai écrite pour mon travail et qui peut invoquer un événement de manière asynchrone afin que chaque gestionnaire d’événements se trouve sur son propre thread. Je ne peux pas commenter sur sa vitesse car cela n’a jamais été une exigence pour moi.


METTRE À JOUR

Sur la base des commentaires, je l’ai ajusté de sorte qu’une seule tâche soit créée pour appeler tous les abonnés.

 ///  /// Extension method to safely encapsulate asynchronous event calls with checks ///  /// The event to call /// The sender of the event /// The arguments for the event /// The state information that is passed to the callback method ///  /// This method safely calls the each event handler attached to the event. This method uses  to /// asynchronously call invoke without any exception handling. As such, if any of the event handlers throw exceptions the application will /// most likely crash when the task is collected. This is an explicit decision since it is really in the hands of the event handler /// creators to make sure they handle issues that occur do to their code. There isn't really a way for the event raiser to know /// what is going on. ///  [System.Diagnostics.DebuggerStepThrough] public static void AsyncSafeInvoke( this EventHandler evnt, object sender, EventArgs args ) { // Used to make a temporary copy of the event to avoid possibility of // a race condition if the last subscriber unsubscribes // immediately after the null check and before the event is raised. EventHandler handler = evnt; if (handler != null) { // Manually calling all event handlers so that we could capture and aggregate all the // exceptions that are thrown by any of the event handlers attached to this event. var invocationList = handler.GetInvocationList(); Task.Factory.StartNew(() => { foreach (EventHandler h in invocationList) { // Explicitly not catching any exceptions. While there are several possibilities for handling these // exceptions, such as a callback, the correct place to handle the exception is in the event handler. h.Invoke(sender, args); } }); } } 

Vous pouvez utiliser ces méthodes d’extension simples sur vos gestionnaires d’événements:

 public static void Raise(this EventHandler handler, object sender, T e) where T : EventArgs { if (handler != null) handler(sender, e); } public static void Raise(this EventHandler handler, object sender, EventArgs e) { if (handler != null) handler(sender, e); } public static void RaiseOnDifferentThread(this EventHandler handler, object sender, T e) where T : EventArgs { if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e)); } public static void RaiseOnDifferentThread(this EventHandler handler, object sender, EventArgs e) { if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e)); } public static Task StartNewOnDifferentThread(this TaskFactory taskFactory, Action action) { return taskFactory.StartNew(action: action, cancellationToken: new CancellationToken()); } 

Usage:

 public static Test() { myEventHandler.RaiseOnDifferentThread(null, EventArgs.Empty); } 

StartNew() est nécessaire pour garantir que StartNew() utilise réellement un thread différent, comme expliqué ici .

Je ne peux pas vous dire si cela répondra de manière fiable à l’exigence de 100ns, mais voici une autre solution permettant de fournir à l’utilisateur final un moyen de vous fournir une ConcurrentQueue que vous voudriez remplir et qu’il pourrait écouter sur un fil séparé.

 class Program { static void Main(ssortingng[] args) { var multicaster = new QueueMulticaster(); var listener1 = new Listener(); //Make a couple of listening Q objects. listener1.Listen(); multicaster.Subscribe(listener1); var listener2 = new Listener(); listener2.Listen(); multicaster.Subscribe(listener2); multicaster.Broadcast(6); //Send a 6 to both concurrent Queues. Console.ReadLine(); } } //The listeners would run on their own thread and poll the Q like crazy. class Listener : IListenToStuff { public ConcurrentQueue StuffQueue { get; set; } public void Listen() { StuffQueue = new ConcurrentQueue(); var t = new Thread(ListenAggressively); t.Start(); } void ListenAggressively() { while (true) { int val; if(StuffQueue.TryDequeue(out val)) Console.WriteLine(val); } } } //Simple class that allows you to subscribe a Queue to a broadcast event. public class QueueMulticaster { readonly List> _subscribers = new List>(); public void Subscribe(IListenToStuff subscriber) { _subscribers.Add(subscriber); } public void Broadcast(T value) { foreach (var listenToStuff in _subscribers) { listenToStuff.StuffQueue.Enqueue(value); } } } public interface IListenToStuff { ConcurrentQueue StuffQueue { get; set; } } 

Étant donné que vous ne pouvez pas retarder le traitement des autres écouteurs, cela signifie plusieurs threads. Avoir des fils d’écoute dédiés sur les écouteurs semble être une approche raisonnable à essayer, et la queue simultanée semble être un mécanisme de diffusion décent. Dans cette implémentation, la scrutation est constante, mais vous pouvez probablement utiliser la signalisation de thread pour réduire la charge du processeur en utilisant quelque chose comme AutoResetEvent .