BrokeredMessage automatiquement mis au rebut après avoir appelé OnMessage ()

J’essaie de mettre en queue les éléments d’un Azure Service Bus afin de pouvoir les traiter en bloc. Je suis conscient du fait que Azure Service Bus a un ReceiveBatch (), mais cela semble problématique pour les raisons suivantes:

  • Je ne peux obtenir qu’un maximum de 256 messages à la fois et même cela peut alors être aléatoire en fonction de la taille du message.
  • Même si je jette un œil pour voir combien de messages sont en attente, je ne sais pas combien d’appels RequestBatch à faire, car je ne sais pas combien de messages chaque appel me rendra. Étant donné que les messages continueront à arriver, je ne peux pas continuer à faire des demandes jusqu’à ce qu’elles soient vides, car elles ne le seront jamais.

J’ai décidé d’utiliser simplement le programme d’écoute de messages, qui est moins cher que de perdre des coups d’œil et qui me donnera plus de contrôle.

En gros, j’essaie de laisser un nombre défini de messages s’accumuler, puis de les traiter simultanément. J’utilise une timer pour forcer un délai, mais je dois pouvoir faire la queue pour mes articles lorsqu’ils arrivent.

Sur la base de mon besoin de timer, il semblait que la collection de blocage n’était pas une bonne option et j’essaie donc d’utiliser ConcurrentBag.

var batchingQueue = new ConcurrentBag(); myQueueClient.OnMessage((m) => { Console.WriteLine("Queueing message"); batchingQueue.Add(m); }); while (true) { var sw = WaitableStopwatch.StartNew(); BrokeredMessage msg; while (batchingQueue.TryTake(out msg)) // <== Object is already disposed { ...do this until I have a thousand ready to be written to DB in batch Console.WriteLine("Completing message"); msg.Complete(); // <== ERRORS HERE } sw.Wait(MINIMUM_DELAY); } 

Cependant, dès que j’accède au message en dehors du pipeline OnMessage, il montre BrokeredMessage comme étant déjà supprimé.

Je pense que cela doit être un comportement automatique d’OnMessage et je ne vois aucun moyen de faire autre chose avec le message que de le traiter tout de suite, ce que je ne veux pas faire.

C’est incroyablement facile à faire avec BlockingCollection .

 var batchingQueue = new BlockingCollection(); myQueueClient.OnMessage((m) => { Console.WriteLine("Queueing message"); batchingQueue.Add(m); }); 

Et votre fil de consommation:

 foreach (var msg in batchingQueue.GetConsumingEnumerable()) { Console.WriteLine("Completing message"); msg.Complete(); } 

GetConsumingEnumerable renvoie un iterator qui utilise les éléments de la queue jusqu’à ce que la propriété IsCompleted soit définie et que la queue soit vide. Si la queue est vide mais que la valeur IsCompleted est IsCompleted False , l’attente suivante est IsCompleted pour l’élément non occupé.

Pour annuler le thread consommateur (c’est-à-dire arrêter le programme), vous arrêtez d’append des éléments à la queue et l’appel du thread principal batchingQueue.CompleteAdding . Le consommateur videra la queue, verra que la propriété IsCompleted la IsCompleted True et quittera.

L’utilisation de BlockingCollection ici est meilleure que ConcurrentBag ou ConcurrentQueue , car l’interface BlockingCollection est plus facile à utiliser. En particulier, l’utilisation de GetConsumingEnumerable vous GetConsumingEnumerable d’avoir à vous soucier de la vérification du nombre ou de l’attente (boucles d’interrogation). Ça fonctionne.

Notez également que ConcurrentBag a un comportement de suppression plutôt étrange. En particulier, l’ordre dans lequel les éléments sont supprimés varie en fonction du thread qui le supprime. Le fil qui a créé le sac supprime les éléments dans un ordre différent de celui des autres fils. Voir Utilisation de la collection ConcurrentBag pour plus de détails.

Vous n’avez pas dit pourquoi vous voulez regrouper les éléments en entrée. À moins d’une raison primordiale en termes de performances, il ne semble pas particulièrement judicieux de compliquer votre code avec cette logique de traitement par lots.


Si vous souhaitez effectuer des écritures par lots dans la firebase database, je vous suggère d’utiliser une simple List pour mettre les éléments en mémoire tampon. Si vous devez traiter les éléments avant qu’ils ne soient écrits dans la firebase database, utilisez la technique présentée ci-dessus pour les traiter. Ensuite, en écrivant directement dans la firebase database, ajoutez l’élément à une liste. Lorsque la liste contient 1 000 éléments, ou qu’un laps de temps donné s’est écoulé, allouez une nouvelle liste et démarrez une tâche pour écrire l’ancienne liste dans la firebase database. Comme ça:

 // at class scope // Flush every 5 minutes. private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5); private const int MaxBufferItems = 1000; // Create a timer for the buffer flush. System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush, FlushDelay.TotalMilliseconds, Timeout.Infinite); // A lock for the list. Unless you're getting hundreds of thousands // of items per second, this will not be a performance problem. object _listLock = new Object(); List _recordBuffer = new List(); 

Ensuite, chez votre consommateur:

 foreach (var msg in batchingQueue.GetConsumingEnumerable()) { // process the message Console.WriteLine("Completing message"); msg.Complete(); lock (_listLock) { _recordBuffer.Add(msg); if (_recordBuffer.Count >= MaxBufferItems) { // Stop the timer _flushTimer.Change(Timeout.Infinite, Timeout.Infinite); // Save the old list and allocate a new one var myList = _recordBuffer; _recordBuffer = new List(); // Start a task to write to the database Task.Factory.StartNew(() => FlushBuffer(myList)); // Restart the timer _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite); } } } private void TimedFlush() { bool lockTaken = false; List myList = null; try { if (Monitor.TryEnter(_listLock, 0, out lockTaken)) { // Save the old list and allocate a new one myList = _recordBuffer; _recordBuffer = new List(); } } finally { if (lockTaken) { Monitor.Exit(_listLock); } } if (myList != null) { FlushBuffer(myList); } // Restart the timer _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite); } 

L’idée ici est que vous récupérez l’ancienne liste, allouez une nouvelle liste afin que le traitement puisse continuer, puis écrivez les éléments de l’ancienne liste dans la firebase database. Le verrou est là pour empêcher le chronomètre et le compteur d’enregistrements de se chevaucher. Sans le verrou, les choses sembleraient probablement bien fonctionner pendant un certain temps, et vous auriez alors des collisions étranges à des moments imprévisibles.

J’aime cette conception, car elle élimine les interrogations du consommateur. La seule chose que je n’aime pas, c’est que le consommateur doit connaître le minuteur (c.-à-d. Qu’il doit s’arrêter puis le redémarrer). Avec un peu plus de reflection, je pourrais éliminer cette exigence. Mais cela fonctionne bien comme il est écrit.

Passer à OnMessageAsync a résolu le problème pour moi

 _queueClient.OnMessageAsync(async receivedMessage => 

J’ai contacté Microsoft à propos du problème de la suppression de BrokeredMessage sur MSDN . Voici la réponse:

Très règle de base et je ne suis pas sûr si cela est documenté. Le message reçu doit être traité pendant la durée de vie de la fonction de rappel. Dans votre cas, les messages seront supprimés à la fin du rappel async. C’est pourquoi vos tentatives complètes échouent avec ObjectDisposedException dans un autre thread.

Je ne vois pas vraiment comment la mise en queue des messages pour un traitement ultérieur peut améliorer le débit. Cela appenda certainement plus de fardeau au client. Essayez de traiter le message dans le rappel asynchrone, cela devrait être assez performant.

Dans mon cas, cela signifie que je ne peux pas utiliser le ServiceBus comme je le voulais et je dois repenser à la façon dont je voulais que les choses fonctionnent. Bugger.

J’ai eu le même problème quand j’ai commencé à travailler avec le service Azure Service Bus.

J’ai trouvé cette méthode OnMessage toujours disposer object BrokedMessage. La démarche proposée par Jim Mischel ne m’a pas aidé (mais c’était très intéressant à lire – merci!).

Après une enquête, j’ai trouvé que toute l’approche est fausse. Laissez-moi vous expliquer la bonne façon de faire ce que vous voulez.

  1. Utilisez la méthode BrokedMessage.Complete () uniquement dans le gestionnaire de méthode OnMessage.
  2. Si vous devez traiter un message en dehors de cette méthode, vous devez utiliser la méthode QueueClient.Complete (Guid lockToken). “LockToken” est la propriété de l’object BrokeredMessage.

Exemple:

  var messageOptions = new OnMessageOptions { AutoComplete = false, AutoRenewTimeout = TimeSpan.FromMinutes( 5 ), MaxConcurrentCalls = 1 }; var buffer = new Dictionary(); // get message from queue myQueueClient.OnMessage( m => buffer.Add(key: m.GetBody(), value: m.LockToken), messageOptions // this option says to ServiceBus to "froze" message in he queue until we process it ); foreach(var item in buffer){ try { Console.WriteLine($"Process item: {item.Key}"); myQueueClient.Complete(item.Value);// you can also use method CompleteBatch(...) to improve performance } catch{ // "unfroze" message in ServiceBus. Message would be delivered to other listener myQueueClient.Defer(item.Value); } }