Traitement de la queue TPL

Je travaille actuellement sur un projet et j’ai besoin de mettre en queue des travaux pour le traitement, voici l’exigence:

  1. Les travaux doivent être traités un à la fois
  2. Un article en queue doit pouvoir être attendu

Donc, je veux quelque chose qui ressemble à:

Task QueueJob(params here) { /// Queue the job and somehow return a waitable task that will wait until the queued job has been executed and return the result. } 

J’ai essayé de créer une tâche en arrière-plan qui extrait simplement des éléments d’une queue et traite le travail, mais la difficulté est de passer d’une tâche en arrière-plan à la méthode.

Si nécessaire, je pourrais simplement demander un rappel d’achèvement dans la méthode QueueJob, mais ce serait formidable si je pouvais obtenir une tâche transparente qui vous permettrait d’attendre le traitement pour être traité (même s’il y a des travaux). avant dans la queue).

Func ne prend aucun paramètre et renvoie une valeur de type T. Les travaux sont exécutés un par un et vous pouvez attendre la tâche renvoyée pour obtenir le résultat.

 public class TaskQueue { private Queue InnerTaskQueue; private bool IsJobRunning; public void Start() { Task.Factory.StartNew(() => { while (true) { if (InnerTaskQueue.Count > 0 && !IsJobRunning) { var task = InnerTaskQueue.Dequeue() task.Start(); IsJobRunning = true; task.ContinueWith(t => IsJobRunning = false); } else { Thread.Sleep(1000); } } } } public Task QueueJob(Func job) { var task = new Task(() => job()); InnerTaskQueue.Enqueue(task); return task; } } 

Vous pouvez trouver TaskCompletionSource , il peut être utilisé pour créer une Task qui se termine exactement quand vous le souhaitez. Si vous le combinez avec BlockingCollection , vous obtiendrez votre queue:

 class JobProcessor : IDisposable { private readonly Func m_transform; // or a custom type instead of Tuple private readonly BlockingCollection>> m_queue = new BlockingCollection>>(); public JobProcessor(Func transform) { m_transform = transform; Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning); } private void ProcessQueue() { Tuple> tuple; while (m_queue.TryTake(out tuple, Timeout.Infinite)) { var input = tuple.Item1; var tcs = tuple.Item2; try { tcs.SetResult(m_transform(input)); } catch (Exception ex) { tcs.SetException(ex); } } } public Task QueueJob(TInput input) { var tcs = new TaskCompletionSource(); m_queue.Add(Tuple.Create(input, tcs)); return tcs.Task; } public void Dispose() { m_queue.CompleteAdding(); } } 

J’irais pour quelque chose comme ça:

 class TaskProcessor { // TODO: Error handling! readonly BlockingCollection> blockingCollection = new BlockingCollection>(new ConcurrentQueue>()); public Task AddTask(Func work) { var task = new Task(work); blockingCollection.Add(task); return task; // give the task back to the caller so they can wait on it } public void CompleteAddingTasks() { blockingCollection.CompleteAdding(); } public TaskProcessor() { ProcessQueue(); } void ProcessQueue() { Task task; while (blockingCollection.TryTake(out task)) { task.Start(); task.Wait(); // ensure this task finishes before we start a new one... } } } 

Selon le type d’application qui l’utilise, vous pouvez désactiver BlockingCollection / ConcurrentQueue pour quelque chose de plus simple (par exemple, une simple queue). Vous pouvez également ajuster la signature de la méthode “AddTask” en fonction du type de méthodes / parameters que vous allez mettre en queue …