Multi-threading avec .Net HttpListener

J’ai un auditeur:

listener = new HttpListener(); listener.Prefixes.Add(@"http://+:8077/"); listener.Start(); listenerThread = new Thread(HandleRequests); listenerThread.Start(); 

Et je traite les demandes:

 private void HandleRequests() { while (listener.IsListening) { var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener); context.AsyncWaitHandle.WaitOne(); } } private void ListenerCallback(IAsyncResult ar) { var listener = ar.AsyncState as HttpListener; var context = listener.EndGetContext(ar); //do some stuff } 

J’aimerais écrire void Stop() de telle manière que:

  1. Il bloquera jusqu’à la fin de toutes les requêtes en cours (c’est-à-dire qu’il attendra que tous les threads “fassent des choses”).
  2. Bien qu’il attende les demandes déjà commencées, il ne permettra plus aucune demande (c’est-à-dire, retourne au début de ListenerCallback ).
  3. Après cela, il appellera listener.Stop() ( listener.IsListening est devenu faux).

Comment pourrait-il être écrit?

EDIT : Que pensez-vous de cette solution? Est-ce sûr?

 public void Stop() { lock (this) { isStopping = true; } resetEvent.WaitOne(); //initially set to true listener.Stop(); } private void ListenerCallback(IAsyncResult ar) { lock (this) { if (isStopping) return; resetEvent.Reset(); numberOfRequests++; } var listener = ar.AsyncState as HttpListener; var context = listener.EndGetContext(ar); //do some stuff lock (this) { if (--numberOfRequests == 0) resetEvent.Set(); } } 

Pour être complet, voici à quoi cela ressemblerait si vous gérez vos propres threads de travail:

 class HttpServer : IDisposable { private readonly HttpListener _listener; private readonly Thread _listenerThread; private readonly Thread[] _workers; private readonly ManualResetEvent _stop, _ready; private Queue _queue; public HttpServer(int maxThreads) { _workers = new Thread[maxThreads]; _queue = new Queue(); _stop = new ManualResetEvent(false); _ready = new ManualResetEvent(false); _listener = new HttpListener(); _listenerThread = new Thread(HandleRequests); } public void Start(int port) { _listener.Prefixes.Add(Ssortingng.Format(@"http://+:{0}/", port)); _listener.Start(); _listenerThread.Start(); for (int i = 0; i < _workers.Length; i++) { _workers[i] = new Thread(Worker); _workers[i].Start(); } } public void Dispose() { Stop(); } public void Stop() { _stop.Set(); _listenerThread.Join(); foreach (Thread worker in _workers) worker.Join(); _listener.Stop(); } private void HandleRequests() { while (_listener.IsListening) { var context = _listener.BeginGetContext(ContextReady, null); if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) return; } } private void ContextReady(IAsyncResult ar) { try { lock (_queue) { _queue.Enqueue(_listener.EndGetContext(ar)); _ready.Set(); } } catch { return; } } private void Worker() { WaitHandle[] wait = new[] { _ready, _stop }; while (0 == WaitHandle.WaitAny(wait)) { HttpListenerContext context; lock (_queue) { if (_queue.Count > 0) context = _queue.Dequeue(); else { _ready.Reset(); continue; } } try { ProcessRequest(context); } catch (Exception e) { Console.Error.WriteLine(e); } } } public event Action ProcessRequest; } 

Eh bien, il existe plusieurs façons de résoudre ce problème … Voici un exemple simple qui utilise un sémaphore pour suivre le travail en cours et un signal qui est émis lorsque tous les travailleurs sont terminés. Cela devrait vous donner une idée de base à partir de laquelle travailler.

La solution ci-dessous n’est pas idéale, idéalement, nous devrions acquérir le sémaphore avant d’appeler BeginGetContext. Cela rend l’arrêt plus difficile, j’ai donc choisi d’utiliser cette approche plus simplifiée. Si je le faisais pour de «vrais», j’écrirais probablement ma propre gestion de threads plutôt que de compter sur le ThreadPool. Cela permettrait un arrêt plus fiable.

Quoi qu’il en soit, voici l’exemple complet:

 class TestHttp { static void Main() { using (HttpServer srvr = new HttpServer(5)) { srvr.Start(8085); Console.WriteLine("Press [Enter] to quit."); Console.ReadLine(); } } } class HttpServer : IDisposable { private readonly int _maxThreads; private readonly HttpListener _listener; private readonly Thread _listenerThread; private readonly ManualResetEvent _stop, _idle; private readonly Semaphore _busy; public HttpServer(int maxThreads) { _maxThreads = maxThreads; _stop = new ManualResetEvent(false); _idle = new ManualResetEvent(false); _busy = new Semaphore(maxThreads, maxThreads); _listener = new HttpListener(); _listenerThread = new Thread(HandleRequests); } public void Start(int port) { _listener.Prefixes.Add(Ssortingng.Format(@"http://+:{0}/", port)); _listener.Start(); _listenerThread.Start(); } public void Dispose() { Stop(); } public void Stop() { _stop.Set(); _listenerThread.Join(); _idle.Reset(); //aquire and release the semaphore to see if anyone is running, wait for idle if they are. _busy.WaitOne(); if(_maxThreads != 1 + _busy.Release()) _idle.WaitOne(); _listener.Stop(); } private void HandleRequests() { while (_listener.IsListening) { var context = _listener.BeginGetContext(ListenerCallback, null); if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) return; } } private void ListenerCallback(IAsyncResult ar) { _busy.WaitOne(); try { HttpListenerContext context; try { context = _listener.EndGetContext(ar); } catch (HttpListenerException) { return; } if (_stop.WaitOne(0, false)) return; Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl); context.Response.SendChunked = true; using (TextWriter tw = new StreamWriter(context.Response.OutputStream)) { tw.WriteLine("

Hello World

"); for (int i = 0; i < 5; i++) { tw.WriteLine("

{0} @ {1}

", i, DateTime.Now); tw.Flush(); Thread.Sleep(1000); } tw.WriteLine(""); } } finally { if (_maxThreads == 1 + _busy.Release()) _idle.Set(); } } }

J’ai consulté mon code dans la partie EDIT de ma question et j’ai décidé de l’accepter avec quelques modifications:

 public void Stop() { lock (locker) { isStopping = true; } resetEvent.WaitOne(); //initially set to true listener.Stop(); } private void ListenerCallback(IAsyncResult ar) { lock (locker) //locking on this is a bad idea, but I forget about it before { if (isStopping) return; resetEvent.Reset(); numberOfRequests++; } try { var listener = ar.AsyncState as HttpListener; var context = listener.EndGetContext(ar); //do some stuff } finally //to make sure that bellow code will be executed { lock (locker) { if (--numberOfRequests == 0) resetEvent.Set(); } } } 

Appeler simplement listener.Stop () devrait faire l’affaire. Cela ne mettra pas fin aux connexions déjà établies mais empêchera toute nouvelle connexion.

Ceci utilise la queue typée BlockingCollection pour traiter les demandes. Il est utilisable tel quel. Vous devriez dériver une classe de celle-ci et remplacer Response.

 using System; using System.Collections.Concurrent; using System.Net; using System.Text; using System.Threading; namespace Service { class HttpServer : IDisposable { private HttpListener httpListener; private Thread listenerLoop; private Thread[] requestProcessors; private BlockingCollection messages; public HttpServer(int threadCount) { requestProcessors = new Thread[threadCount]; messages = new BlockingCollection(); httpListener = new HttpListener(); } public virtual int Port { get; set; } = 80; public virtual ssortingng[] Prefixes { get { return new ssortingng[] {ssortingng.Format(@"http://+:{0}/", Port )}; } } public void Start(int port) { listenerLoop = new Thread(HandleRequests); foreach( ssortingng prefix in Prefixes ) httpListener.Prefixes.Add( prefix ); listenerLoop.Start(); for (int i = 0; i < requestProcessors.Length; i++) { requestProcessors[i] = StartProcessor(i, messages); } } public void Dispose() { Stop(); } public void Stop() { messages.CompleteAdding(); foreach (Thread worker in requestProcessors) worker.Join(); httpListener.Stop(); listenerLoop.Join(); } private void HandleRequests() { httpListener.Start(); try { while (httpListener.IsListening) { Console.WriteLine("The Linstener Is Listening!"); HttpListenerContext context = httpListener.GetContext(); messages.Add(context); Console.WriteLine("The Linstener has added a message!"); } } catch(Exception e) { Console.WriteLine (e.Message); } } private Thread StartProcessor(int number, BlockingCollection messages) { Thread thread = new Thread(() => Processor(number, messages)); thread.Start(); return thread; } private void Processor(int number, BlockingCollection messages) { Console.WriteLine ("Processor {0} started.", number); try { for (;;) { Console.WriteLine ("Processor {0} awoken.", number); HttpListenerContext context = messages.Take(); Console.WriteLine ("Processor {0} dequeued message.", number); Response (context); } } catch { } Console.WriteLine ("Processor {0} terminated.", number); } public virtual void Response(HttpListenerContext context) { SendReply(context, new SsortingngBuilder("NULLThis site not yet implementd.") ); } public static void SendReply(HttpListenerContext context, SsortingngBuilder responseSsortingng ) { byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseSsortingng.ToSsortingng()); context.Response.ContentLength64 = buffer.Length; System.IO.Stream output = context.Response.OutputStream; output.Write(buffer, 0, buffer.Length); output.Close(); } } } 

Ceci est un exemple d’utilisation. Pas besoin d’utiliser des événements ou des blocs de locking. BlockingCollection résout tous ces problèmes.

 using System; using System.Collections.Concurrent; using System.IO; using System.Net; using System.Text; using System.Threading; namespace Service { class Server { public static void Main (ssortingng[] args) { HttpServer Service = new QuizzServer (8); Service.Start (80); for (bool coninute = true; coninute ;) { ssortingng input = Console.ReadLine ().ToLower(); switch (input) { case "stop": Console.WriteLine ("Stop command accepted."); Service.Stop (); coninute = false; break; default: Console.WriteLine ("Unknown Command: '{0}'.",input); break; } } } } }