Écrire une méthode d’extension «RetryAfter» Rx

Dans le livre IntroToRx, l’auteur suggère d’écrire une nouvelle tentative “intelligente” pour les E / S, qui réessaie une demande E / S, comme une demande réseau, après un certain temps.

Voici le paragraphe exact:

Une méthode d’extension utile à append à votre propre bibliothèque pourrait être une méthode “Back Off and Retry”. Les équipes avec lesquelles j’ai travaillé ont trouvé une telle fonctionnalité utile lors de l’exécution d’E / S, en particulier de requêtes réseau. Le concept est d’essayer, et en cas d’échec, attendre pendant une période donnée et réessayer. Votre version de cette méthode peut prendre en compte le type d’exception que vous souhaitez réessayer, ainsi que le nombre maximal de réessais. Vous voudrez peut-être même prolonger la période d’attente pour être moins agressif à chaque nouvelle tentative.

Malheureusement, je n’arrive pas à comprendre comment écrire cette méthode. 🙁

La clé de cette implémentation d’une tentative de retour en arrière est l’ observable différé . Un observable différé n’exécutera son usine que si quelqu’un s’y abonne. Et il invoquera l’usine pour chaque abonnement, ce qui en fait l’idéal pour notre scénario de nouvelle tentative.

Supposons que nous ayons une méthode qui déclenche une requête réseau.

public IObservable SomeApiMethod() { ... } 

Pour les besoins de ce petit extrait, définissons le différé comme source

 var source = Observable.Defer(() => SomeApiMethod()); 

Chaque fois que quelqu’un s’abonne à la source, il appelle SomeApiMethod et lance une nouvelle requête Web. La façon naïve de le réessayer chaque fois que cela échouerait serait d’utiliser l’opérateur Réessayer intégré.

 source.Retry(4) 

Cela ne serait cependant pas très agréable pour l’API et ce n’est pas ce que vous demandez. Nous devons retarder le lancement des demandes entre chaque tentative. Une façon de faire est d’ utiliser un abonnement différé .

 Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4) 

Ce n’est pas idéal car cela va append du retard même à la première demande, corrigeons ça.

 int attempt = 0; Observable.Defer(() => { return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1))) }) .Retry(4) .Select(response => ...) 

S’arrêter juste une seconde n’est pas une très bonne méthode de relance, changeons donc cette constante pour en faire une fonction qui reçoit le décompte de relance et renvoie un délai approprié. Le recul exponentiel est assez facile à mettre en œuvre.

 Func strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) 

Nous avons presque terminé, il nous faut simplement append un moyen de spécifier les exceptions pour lesquelles nous devrions réessayer. Ajoutons une fonction renvoyée par une exception, qu’il soit logique d’essayer ou non, nous l’appellerons retryOnError.

Maintenant, nous devons écrire un code qui fait peur, mais supportez-moi.

 Observable.Defer(() => { return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1))) .Select(item => new Tuple(true, item, null)) .Catch, Exception>(e => retryOnError(e) ? Observable.Throw>(e) : Observable.Return(new Tuple(false, null, e))); }) .Retry(retryCount) .SelectMany(t => t.Item1 ? Observable.Return(t.Item2) : Observable.Throw(t.Item3)) 

Tous ces crochets sont là pour créer une exception pour laquelle nous ne devrions pas essayer de nouveau après la .Retry() . Nous avons fait de l’observable interne un IObservable> où le premier bool indique si nous avons une réponse ou une exception. Si retryOnError indique que nous devrions réessayer pour une exception particulière, l’observable interne sera lancée et sera reprise par la tentative. SelectMany ouvre simplement notre tuple et permet à l’observable résultant d’être de IObservable .

Voir mon résumé avec la source complète et les tests pour la version finale. Avoir cet opérateur nous permet d’écrire notre code de nouvelle tentative assez succinctement

 Observable.Defer(() => SomApiMethod()) .RetryWithBackoffStrategy( retryCount: 4, retryOnError: e => e is ApiRetryWebException ) 

Je simplifie peut-être la situation, mais si nous examinons la mise en œuvre de Retry, il s’agit simplement d’un Observable.Catch sur un nombre infini d’observables:

 private static IEnumerable RepeatInfinite(T value) { while (true) yield return value; } public virtual IObservable Retry(IObservable source) { return Observable.Catch(QueryLanguage.RepeatInfinite(source)); } 

Donc, si nous adoptons cette approche, nous pouvons simplement append un délai après le premier rendement.

 private static IEnumerable> RepeateInfinite (IObservable source, TimeSpan dueTime) { // Don't delay the first time yield return source; while (true) yield return source.DelaySubscription(dueTime); } public static IObservable RetryAfterDelay(this IObservable source, TimeSpan dueTime) { return RepeateInfinite(source, dueTime).Catch(); } 

Une surcharge qui intercepte une exception spécifique avec un nombre de tentatives peut être encore plus concise:

 public static IObservable RetryAfterDelay(this IObservable source, TimeSpan dueTime, int count) where TException : Exception { return source.Catch(exception => { if (count <= 0) { return Observable.Throw(exception); } return source.DelaySubscription(dueTime).RetryAfterDelay(dueTime, --count); }); } 

Notez que la surcharge utilise ici la récursivité. Lors des premières comparutions, il semblerait qu’une exception StackOverflowException soit possible si le nombre comptait quelque chose comme Int32.MaxValue. Toutefois, DelaySubscription utilise un planificateur pour exécuter l’action de souscription, de sorte que le dépassement de capacité de la stack ne serait pas possible (c’est-à-dire en utilisant “le trampoline”). Je suppose que ce n’est pas vraiment évident en regardant le code cependant. Nous pourrions forcer un débordement de stack en définissant explicitement le planificateur de la surcharge DelaySubscription sur Scheduler.Immediate, puis en passant TimeSpan.Zero et Int32.MaxValue. Nous pourrions passer à un ordonnanceur non immédiat pour exprimer notre intention un peu plus explicitement, par exemple:

 return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay(dueTime, --count); 

UPDATE: ajout d’une surcharge pour intégrer un planificateur spécifique.

 public static IObservable RetryAfterDelay( this IObservable source, TimeSpan retryDelay, int retryCount, IScheduler scheduler) where TException : Exception { return source.Catch( ex => { if (retryCount <= 0) { return Observable.Throw(ex); } return source.DelaySubscription(retryDelay, scheduler) .RetryAfterDelay(retryDelay, --retryCount, scheduler); }); } 

Voici celui que j’utilise:

 public static IObservable DelayedRetry(this IObservable src, TimeSpan delay) { Contract.Requires(src != null); Contract.Ensures(Contract.Result>() != null); if (delay == TimeSpan.Zero) return src.Retry(); return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry()); } 

Basé sur la réponse de Markus, j’ai écrit ce qui suit:

 public static class ObservableExtensions { private static IObservable BackOffAndRetry( this IObservable source, Func strategy, Func retryOnError, int attempt) { return Observable .Defer(() => { var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt); var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay); return s .Catch(e => { if (retryOnError(attempt, e)) { return source.BackOffAndRetry(strategy, retryOnError, attempt + 1); } return Observable.Throw(e); }); }); } public static IObservable BackOffAndRetry( this IObservable source, Func strategy, Func retryOnError) { return source.BackOffAndRetry(strategy, retryOnError, 0); } } 

Je l’aime plus parce que

  • il ne modifie pas les attempts mais utilise la récursivité.
  • Il n’utilise pas de resortinges mais transmet le nombre de tentatives de retryOnError

Voici une autre implémentation légèrement différente que j’ai conçue en étudiant comment Rxx le fait. C’est donc en grande partie une version simplifiée de l’approche de Rxx.

La signature est légèrement différente de la version de Markus. Vous spécifiez un type d’exception sur lequel réessayer, et la stratégie de délai prend l’exception et le nombre de tentatives, de sorte que vous pouvez avoir des délais plus longs pour chaque tentative successive, etc.

Je ne peux pas garantir que c’est la preuve d’un bogue, ou la meilleure approche, mais cela semble fonctionner.

 public static IObservable RetryWithDelay(this IObservable source, Func delayFactory, IScheduler scheduler = null) where TException : Exception { return Observable.Create(observer => { scheduler = scheduler ?? Scheduler.CurrentThread; var disposable = new SerialDisposable(); int retryCount = 0; var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero, self => { var subscription = source.Subscribe( observer.OnNext, ex => { var typedException = ex as TException; if (typedException != null) { var retryDelay = delayFactory(typedException, ++retryCount); self(retryDelay); } else { observer.OnError(ex); } }, observer.OnCompleted); disposable.Disposable = subscription; }); return new CompositeDisposable(scheduleDisposable, disposable); }); } 

Voici celui que je suis venu avec.

Vous ne souhaitez pas concaténer les éléments des tentatives individuelles dans une séquence, mais émettre la séquence source dans son ensemble à chaque tentative; l’opérateur renvoie donc un IObservable> . Si cela n’est pas souhaité, vous pouvez simplement replacer Switch() dans une séquence.

(Arrière-plan: dans mon cas d’utilisation, la source est une séquence très chaude, dans laquelle je GroupByUntil un élément qui ferme le groupe. Si cet élément est perdu entre deux tentatives, le groupe n’est jamais fermé, ce qui entraîne une fuite de mémoire. Séquence de séquences permet de grouper uniquement sur les séquences internes (ou la gestion des exceptions ou …).)

 ///  /// Repeats  in individual windows, with  time in between. ///  public static IObservable> RetryAfter(this IObservable source, TimeSpan interval, IScheduler scheduler = null) { if (scheduler == null) scheduler = Scheduler.Default; return Observable.Create>(observer => { return scheduler.Schedule(self => { observer.OnNext(Observable.Create(innerObserver => { return source.Subscribe( innerObserver.OnNext, ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); }, () => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); }); })); }); }); }