J’essaie de reconstituer un pipeline Rx qui fonctionne comme suit:
Je lance le processus avec quelques bons profils connus.
Finalement, il n’y a plus de profils correspondants qui n’ont pas encore été vus, et le processus se termine.
J’ai du mal à programmer cela. Si j’utilise un object pour permettre à la fin du pipeline d’envoyer ses profils au début du stream de travail, personne ne va appeler OnCompleted et je ne découvre jamais que le processus est terminé. Si je développe plutôt cela avec la récursivité, il semble que je finisse toujours par avoir un débordement de stack puisque j’essaie d’appeler une fonction avec sa propre valeur de retour.
Quelqu’un peut-il m’aider avec la manière dont je peux accomplir cette tâche de manière à pouvoir déterminer que le processus est terminé?
On dirait que vous voulez un stream de données comme ceci:
seed profiles --> source --> get related --> output ^ | | v -<--- transform <-----
Cela ressemble à un cas où la résolution du problème général est aussi facile ou plus simple que le problème spécifique. Je vous propose donc une fonction générique de "feedback" qui devrait vous donner les éléments de base dont vous avez besoin:
edit: fonction fixe à compléter
IObservable Feedback(this IObservable seed, Func> produce, Func> feed) { return Observable.Create( obs => { var ret = new CompositeDisposable(); Action partComplete = d => { ret.Remove(d); if (ret.Count == 0) obs.OnCompleted(); }; Action, Action > ssub = (o, n) => { var disp = new SingleAssignmentDisposable(); ret.Add(disp); disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp)); }; Action, Action > rsub = (o, n) => { var disp = new SingleAssignmentDisposable(); ret.Add(disp); disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp)); }; Action recurse = null; recurse = s => { rsub(produce(s), r => { obs.OnNext(r); ssub(feed(r), recurse); }); }; ssub(seed, recurse); return ret; }); }
Dans votre cas, T
et TResult
semblent être identiques, ainsi, feed
sera la fonction identité. produce
seront les fonctions utilisées pour mettre en œuvre les étapes 2 et 3.
Quelques exemples de code pour lesquels j'ai testé cette fonction:
void Main() { var seed = new int[] { 1, 2, 3, 4, 5, 6 }; var found = new HashSet(); var mults = seed.ToObservable() .Feedback(i => { return Observable.Range(0, 5) .Select(r => r * i) .TakeWhile(v => v < 100) .Where(v => found.Add(v)); }, i => Observable.Return(i)); using (var disp = mults.Dump()) { Console.WriteLine("Press any key to stop"); Console.ReadKey(); } Console.WriteLine("Press any key to exit"); Console.ReadKey(); } static IDisposable Dump(this IObservable source) { return source.Subscribe(item => Console.WriteLine(item), ex => Console.WriteLine("Error occurred in dump observable: " + ex.ToSsortingng()), () => Console.WriteLine("Dump completed")); }