Récursif / fan-out dans les extensions réactives

J’essaie de reconstituer un pipeline Rx qui fonctionne comme suit:

  1. J’ai écrit une fonction qui prend un IObservable me fournissant des profils contenant des informations sur une entreprise
  2. J’interroge diverses sources de données pour trouver des profils d’entreprise potentiellement liés, tous en parallèle. Je le fusionne dans un seul IObservable de profils d’entreprise.
  3. Lorsque je récupère ces profils potentiellement liés, je les compare aux profils que j’ai déjà observés et, s’ils ont une pertinence> 80% et ne correspondent pas aux profils que j’ai déjà observés, je les considère comme des correspondances.
  4. Je souhaite réintroduire les sociétés correspondantes dans l’étape 1 afin de pouvoir rechercher des données associées à ces nouveaux profils correspondants.

Je lance le processus avec quelques bons profils connus.

Finalement, il n’y a plus de profils correspondants qui n’ont pas encore été vus, et le processus se termine.

J’ai du mal à programmer cela. Si j’utilise un object pour permettre à la fin du pipeline d’envoyer ses profils au début du stream de travail, personne ne va appeler OnCompleted et je ne découvre jamais que le processus est terminé. Si je développe plutôt cela avec la récursivité, il semble que je finisse toujours par avoir un débordement de stack puisque j’essaie d’appeler une fonction avec sa propre valeur de retour.

Quelqu’un peut-il m’aider avec la manière dont je peux accomplir cette tâche de manière à pouvoir déterminer que le processus est terminé?

On dirait que vous voulez un stream de données comme ceci:

seed profiles --> source --> get related --> output ^ | | v -<--- transform <----- 

Cela ressemble à un cas où la résolution du problème général est aussi facile ou plus simple que le problème spécifique. Je vous propose donc une fonction générique de "feedback" qui devrait vous donner les éléments de base dont vous avez besoin:

edit: fonction fixe à compléter

 IObservable Feedback(this IObservable seed, Func> produce, Func> feed) { return Observable.Create( obs => { var ret = new CompositeDisposable(); Action partComplete = d => { ret.Remove(d); if (ret.Count == 0) obs.OnCompleted(); }; Action, Action> ssub = (o, n) => { var disp = new SingleAssignmentDisposable(); ret.Add(disp); disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp)); }; Action, Action> rsub = (o, n) => { var disp = new SingleAssignmentDisposable(); ret.Add(disp); disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp)); }; Action recurse = null; recurse = s => { rsub(produce(s), r => { obs.OnNext(r); ssub(feed(r), recurse); }); }; ssub(seed, recurse); return ret; }); } 

Dans votre cas, T et TResult semblent être identiques, ainsi, feed sera la fonction identité. produce seront les fonctions utilisées pour mettre en œuvre les étapes 2 et 3.

Quelques exemples de code pour lesquels j'ai testé cette fonction:

 void Main() { var seed = new int[] { 1, 2, 3, 4, 5, 6 }; var found = new HashSet(); var mults = seed.ToObservable() .Feedback(i => { return Observable.Range(0, 5) .Select(r => r * i) .TakeWhile(v => v < 100) .Where(v => found.Add(v)); }, i => Observable.Return(i)); using (var disp = mults.Dump()) { Console.WriteLine("Press any key to stop"); Console.ReadKey(); } Console.WriteLine("Press any key to exit"); Console.ReadKey(); } static IDisposable Dump(this IObservable source) { return source.Subscribe(item => Console.WriteLine(item), ex => Console.WriteLine("Error occurred in dump observable: " + ex.ToSsortingng()), () => Console.WriteLine("Dump completed")); }