En utilisant Parallel Linq Extensions pour unir deux séquences, comment peut-on obtenir les résultats les plus rapides en premier?

Disons que j’ai deux séquences renvoyant des nombres entiers de 1 à 5.

Le premier retourne très rapidement 1, 2 et 3, mais 4 et 5 prennent 200 ms chacun.

public static IEnumerable FastFirst() { for (int i = 1; i  3) Thread.Sleep(200); yield return i; } } 

Le second retourne 1, 2 et 3 avec un délai de 200 ms, mais 4 et 5 sont retournés rapidement.

 public static IEnumerable SlowFirst() { for (int i = 1; i < 6; i++) { if (i < 4) Thread.Sleep(200); yield return i; } } 

L’union de ces deux séquences ne me donne que les chiffres 1 à 5.

 FastFirst().Union(SlowFirst()); 

Je ne peux pas garantir laquelle des deux méthodes a des retards à quel moment, donc l’ordre d’exécution ne peut pas me garantir une solution. Par conséquent, je voudrais paralléliser l’union, afin de minimiser le délai (artificiel) dans mon exemple.

Un scénario réel: j’ai un cache qui renvoie certaines entités et une source de données qui renvoie toutes les entités. J’aimerais pouvoir renvoyer un iterator à partir d’une méthode qui parallélise en interne la demande au cache et à la source de données afin que les résultats mis en cache soient générés aussi rapidement que possible.

Note 1: Je réalise que cela gaspille toujours des cycles de processeur; Je ne demande pas comment puis-je empêcher les séquences d’itérer sur leurs éléments lents, comment puis-je les unir le plus rapidement possible.

Mise à jour 1: j’ai adapté la réponse géniale d’achitaka-san pour accepter plusieurs producteurs et utiliser ContinueWhenAll pour définir CompleteAdding de BlockingCollection une seule fois. Je viens de le mettre ici car il serait perdu à cause du manque de formatage des commentaires. Tout autre commentaire serait génial!

 public static IEnumerable SelectAsync( params IEnumerable[] producer) { var resultsQueue = new BlockingCollection(); var taskList = new HashSet(); foreach (var result in producer) { taskList.Add( Task.Factory.StartNew( () => { foreach (var product in result) { resultsQueue.Add(product); } })); } Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding()); return resultsQueue.GetConsumingEnumerable(); } 

Regarde ça. La première méthode renvoie simplement tout dans l’ordre des résultats. La seconde vérifie l’unicité. Si vous les enchaînez, vous obtiendrez le résultat souhaité, je pense.

 public static class Class1 { public static IEnumerable SelectAsync( IEnumerable producer1, IEnumerable producer2, int capacity) { var resultsQueue = new BlockingCollection(capacity); var producer1Done = false; var producer2Done = false; Task.Factory.StartNew(() => { foreach (var product in producer1) { resultsQueue.Add(product); } producer1Done = true; if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); } }); Task.Factory.StartNew(() => { foreach (var product in producer2) { resultsQueue.Add(product); } producer2Done = true; if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); } }); return resultsQueue.GetConsumingEnumerable(); } public static IEnumerable SelectAsyncUnique(this IEnumerable source) { HashSet knownResults = new HashSet(); foreach (TResult result in source) { if (knownResults.Contains(result)) {continue;} knownResults.Add(result); yield return result; } } } 

Le cache serait presque instantané par rapport à l’extraction de la firebase database. Vous pouvez donc commencer par lire le cache et renvoyer ces éléments, puis la lire à partir de la firebase database et renvoyer les éléments, à l’exception de ceux trouvés dans le cache.

Si vous essayez de paralléliser cela, vous appendez beaucoup de complexité mais vous obtiendrez un gain assez modeste.

Modifier:

S’il n’y a pas de différence prévisible dans la vitesse des sources, vous pouvez les exécuter dans des threads et utiliser un ensemble de hachage synchronisé pour garder une trace des éléments que vous avez déjà obtenus, mettre les nouveaux éléments dans une queue et laisser le thread principal en lecture. de la queue:

 public static IEnumerable GetParallel(Func getKey, params IEnumerable[] sources) { HashSet found = new HashSet(); List queue = new List(); object sync = new object(); int alive = 0; object aliveSync = new object(); foreach (IEnumerable source in sources) { lock (aliveSync) { alive++; } new Thread(s => { foreach (TItem item in s as IEnumerable) { TKey key = getKey(item); lock (sync) { if (found.Add(key)) { queue.Add(item); } } } lock (aliveSync) { alive--; } }).Start(source); } while (true) { lock (sync) { if (queue.Count > 0) { foreach (TItem item in queue) { yield return item; } queue.Clear(); } } lock (aliveSync) { if (alive == 0) break; } Thread.Sleep(100); } } 

Flux de test:

 public static IEnumerable SlowRandomFeed(Random rnd) { int[] values = new int[100]; for (int i = 0; i < 100; i++) { int pos = rnd.Next(i + 1); values[i] = i; int temp = values[pos]; values[pos] = values[i]; values[i] = temp; } foreach (int value in values) { yield return value; Thread.Sleep(rnd.Next(200)); } } 

Tester:

 Random rnd = new Random(); foreach (int item in GetParallel(n => n, SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd))) { Console.Write("{0:0000 }", item); }