File d’attente de tâches asynchrones avec limitation qui prend en charge le multi-threading

Je dois implémenter une bibliothèque pour demander l’API vk.com. Le problème est que l’API ne prend en charge que 3 requêtes par seconde. Je voudrais avoir l’API asynchrone.

Important: L’ API doit prendre en charge l’access sécurisé à partir de plusieurs threads.

Mon idée est d’implémenter une classe appelée throttler qui n’autorise pas plus de 3 requêtes / seconde et retarde les autres requêtes.

L’interface est la suivante:

public interface IThrottler : IDisposable { Task Throttle(Func<Task> task); } 

L’utilisation est comme

 var audio = await throttler.Throttle(() => api.MyAudio()); var messages = await throttler.Throttle(() => api.ReadMessages()); var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId)); /// Here should be delay because 3 requests executed var photo = await throttler.Throttle(() => api.MyPhoto()); 

Comment implémenter throttler?

Actuellement, je l’ai implémenté en tant que queue qui est traitée par un thread d’arrière-plan.

 public Task Throttle(Func<Task> task) { /// TaskRequest has method Run() to run task /// TaskRequest uses TaskCompletionSource to provide new task /// which is resolved when queue processed til this element. var request = new TaskRequest(task); requestQueue.Enqueue(request); return request.ResultTask; } 

Il s’agit du code de la boucle du fil d’arrière-plan qui traite la file:

 private void ProcessQueue(object state) { while (true) { IRequest request; while (requestQueue.TryDequeue(out request)) { /// Delay method calculates actual delay value and calls Thread.Sleep() Delay(); request.Run(); } } } 

Est-il possible d’implémenter ceci sans thread d’arrière-plan?

Nous allons donc commencer par une solution à un problème plus simple, celle de créer une queue qui traite jusqu’à N tâches simultanément, plutôt que de limiter à N tâches démarrées par seconde, et de poursuivre sur cette base:

 public class TaskQueue { private SemaphoreSlim semaphore; public TaskQueue() { semaphore = new SemaphoreSlim(1); } public TaskQueue(int concurrentRequests) { semaphore = new SemaphoreSlim(concurrentRequests); } public async Task Enqueue(Func> taskGenerator) { await semaphore.WaitAsync(); try { return await taskGenerator(); } finally { semaphore.Release(); } } public async Task Enqueue(Func taskGenerator) { await semaphore.WaitAsync(); try { await taskGenerator(); } finally { semaphore.Release(); } } } 

Nous utiliserons également les méthodes d’assistance suivantes pour faire correspondre le résultat d’un TaskCompletionSource à une tâche `:

 public static void Match(this TaskCompletionSource tcs, Task task) { task.ContinueWith(t => { switch (t.Status) { case TaskStatus.Canceled: tcs.SetCanceled(); break; case TaskStatus.Faulted: tcs.SetException(t.Exception.InnerExceptions); break; case TaskStatus.RanToCompletion: tcs.SetResult(t.Result); break; } }); } public static void Match(this TaskCompletionSource tcs, Task task) { Match(tcs, task.ContinueWith(t => default(T))); } 

Maintenant, pour notre solution réelle, ce que nous pouvons faire, c’est que chaque fois que nous devons effectuer une opération TaskCompletionSource , nous créons un TaskCompletionSource , puis nous allons dans notre TaskQueue et ajoutons un élément qui démarre la tâche, fait correspondre le TCS à son résultat, n’attend pas et retarde ensuite la file d’attente pendant 1 seconde. La queue des tâches n’autorisera pas une tâche à démarrer tant qu’il n’y aura plus N tâches lancées au cours de la dernière seconde, le résultat de l’opération étant identique à celui de la Task création:

 public class Throttler { private TaskQueue queue; public Throttler(int requestsPerSecond) { queue = new TaskQueue(requestsPerSecond); } public Task Enqueue(Func> taskGenerator) { TaskCompletionSource tcs = new TaskCompletionSource(); var unused = queue.Enqueue(() => { tcs.Match(taskGenerator()); return Task.Delay(TimeSpan.FromSeconds(1)); }); return tcs.Task; } public Task Enqueue(Func taskGenerator) { TaskCompletionSource tcs = new TaskCompletionSource(); var unused = queue.Enqueue(() => { tcs.Match(taskGenerator()); return Task.Delay(TimeSpan.FromSeconds(1)); }); return tcs.Task; } } 

Voici une solution qui utilise un chronomètre :

 public class Throttler : IThrottler { private readonly Stopwatch m_Stopwatch; private int m_NumberOfRequestsInLastSecond; private readonly int m_MaxNumberOfRequestsPerSecond; public Throttler(int max_number_of_requests_per_second) { m_MaxNumberOfRequestsPerSecond = max_number_of_requests_per_second; m_Stopwatch = Stopwatch.StartNew(); } public async Task Throttle(Func> task) { var elapsed = m_Stopwatch.Elapsed; if (elapsed > TimeSpan.FromSeconds(1)) { m_NumberOfRequestsInLastSecond = 1; m_Stopwatch.Restart(); return await task(); } if (m_NumberOfRequestsInLastSecond >= m_MaxNumberOfRequestsPerSecond) { TimeSpan time_to_wait = TimeSpan.FromSeconds(1) - elapsed; await Task.Delay(time_to_wait); m_NumberOfRequestsInLastSecond = 1; m_Stopwatch.Restart(); return await task(); } m_NumberOfRequestsInLastSecond++; return await task(); } } 

Voici comment ce code peut être testé:

 class Program { static void Main(ssortingng[] args) { DoIt(); Console.ReadLine(); } static async Task DoIt() { Func> func = async () => { await Task.Delay(100); return 1; }; Throttler throttler = new Throttler(3); for (int i = 0; i < 10; i++) { var result = await throttler.Throttle(func); Console.WriteLine(DateTime.Now); } } } 

Vous pouvez l’utiliser comme générique

 public TaskThrottle(int maxTasksToRunInParallel) { _semaphore = new SemaphoreSlim(maxTasksToRunInParallel); } public void TaskThrottler(IEnumerable> tasks, int timeoutInMilliseconds, CancellationToken cancellationToken = default(CancellationToken)) where T : class { // Get Tasks as List var taskList = tasks as IList> ?? tasks.ToList(); var postTasks = new List>(); // When the first task completed, it will flag taskList.ForEach(x => { postTasks.Add(x.ContinueWith(y => _semaphore.Release(), cancellationToken)); }); taskList.ForEach(x => { // Wait for open slot _semaphore.Wait(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); x.Start(); }); Task.WaitAll(taskList.ToArray(), cancellationToken); } 

Edit: cette solution fonctionne, mais ne l’utilisez que s’il est acceptable de traiter toutes les requêtes en série (dans un thread). Sinon, utilisez la solution acceptée comme réponse.

Eh bien, merci à Best, dans .NET, de gérer la file d’attente de tâches sur un thread séparé (unique)

Ma question est presque redondante, à l’exception de l’ajout d’un délai avant l’exécution, ce qui est en réalité simple.

Le principal assistant ici est la classe SemaphoreSlim qui permet de limiter le degré de parallélisme.

Donc, commencez par créer un sémaphore:

 // Semaphore allows run 1 thread concurrently. private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1); 

Et, la version finale de l’accélérateur ressemble à

 public async Task Throttle(Func> task) { await semaphore.WaitAsync(); try { await delaySource.Delay(); return await task(); } finally { semaphore.Release(); } } 

La source de retard est également assez simple:

 private class TaskDelaySource { private readonly int maxTasks; private readonly TimeSpan inInterval; private readonly Queue ticks = new Queue(); public TaskDelaySource(int maxTasks, TimeSpan inInterval) { this.maxTasks = maxTasks; this.inInterval = inInterval; } public async Task Delay() { // We will measure time of last maxTasks tasks. while (ticks.Count > maxTasks) ticks.Dequeue(); if (ticks.Any()) { var now = DateTime.UtcNow.Ticks; var lastTick = ticks.First(); // Calculate interval between last maxTasks task and current time var intervalSinceLastTask = TimeSpan.FromTicks(now - lastTick); if (intervalSinceLastTask < inInterval) await Task.Delay((int)(inInterval - intervalSinceLastTask).TotalMilliseconds); } ticks.Enqueue(DateTime.UtcNow.Ticks); } }