c# - Multi-threading con.Net HttpListener
multithreading (5)
Tengo un oyente:
listener = new HttpListener();
listener.Prefixes.Add(@"http://+:8077/");
listener.Start();
listenerThread = new Thread(HandleRequests);
listenerThread.Start();
Y estoy manejando peticiones:
private void HandleRequests()
{
while (listener.IsListening)
{
var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener);
context.AsyncWaitHandle.WaitOne();
}
}
private void ListenerCallback(IAsyncResult ar)
{
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
}
Me gustaría escribir void Stop()
de tal manera, que:
- Se bloqueará hasta que finalicen todas las solicitudes actualmente manejadas (es decir, esperará a que todos los subprocesos "hagan algunas cosas").
- Si bien esperará solicitudes ya iniciadas, no permitirá más solicitudes (es decir, volver al principio de
ListenerCallback
). - Después de eso, llamará a
listener.Stop()
(listener.IsListening
volvió falso).
¿Cómo podría ser escribir?
EDIT : ¿Qué piensas acerca de esta solución? ¿Es seguro?
public void Stop()
{
lock (this)
{
isStopping = true;
}
resetEvent.WaitOne(); //initially set to true
listener.Stop();
}
private void ListenerCallback(IAsyncResult ar)
{
lock (this)
{
if (isStopping)
return;
resetEvent.Reset();
numberOfRequests++;
}
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
lock (this)
{
if (--numberOfRequests == 0)
resetEvent.Set();
}
}
Bueno, hay varias maneras de resolver esto ... Este es un ejemplo simple que usa un semáforo para rastrear el trabajo en curso, y una señal que surge cuando todos los trabajadores han terminado. Esto debería darle una idea básica para trabajar.
La solución a continuación no es la ideal, idealmente deberíamos adquirir el semáforo antes de llamar a BeginGetContext. Eso hace que el cierre sea más difícil, por lo que he optado por utilizar este enfoque más simplificado. Si estuviera haciendo esto de manera "real", probablemente escribiría mi propia administración de subprocesos en lugar de confiar en ThreadPool. Esto permitiría un cierre más confiable.
De todos modos aquí está el ejemplo completo:
class TestHttp
{
static void Main()
{
using (HttpServer srvr = new HttpServer(5))
{
srvr.Start(8085);
Console.WriteLine("Press [Enter] to quit.");
Console.ReadLine();
}
}
}
class HttpServer : IDisposable
{
private readonly int _maxThreads;
private readonly HttpListener _listener;
private readonly Thread _listenerThread;
private readonly ManualResetEvent _stop, _idle;
private readonly Semaphore _busy;
public HttpServer(int maxThreads)
{
_maxThreads = maxThreads;
_stop = new ManualResetEvent(false);
_idle = new ManualResetEvent(false);
_busy = new Semaphore(maxThreads, maxThreads);
_listener = new HttpListener();
_listenerThread = new Thread(HandleRequests);
}
public void Start(int port)
{
_listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
_listener.Start();
_listenerThread.Start();
}
public void Dispose()
{ Stop(); }
public void Stop()
{
_stop.Set();
_listenerThread.Join();
_idle.Reset();
//aquire and release the semaphore to see if anyone is running, wait for idle if they are.
_busy.WaitOne();
if(_maxThreads != 1 + _busy.Release())
_idle.WaitOne();
_listener.Stop();
}
private void HandleRequests()
{
while (_listener.IsListening)
{
var context = _listener.BeginGetContext(ListenerCallback, null);
if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
return;
}
}
private void ListenerCallback(IAsyncResult ar)
{
_busy.WaitOne();
try
{
HttpListenerContext context;
try
{ context = _listener.EndGetContext(ar); }
catch (HttpListenerException)
{ return; }
if (_stop.WaitOne(0, false))
return;
Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl);
context.Response.SendChunked = true;
using (TextWriter tw = new StreamWriter(context.Response.OutputStream))
{
tw.WriteLine("<html><body><h1>Hello World</h1>");
for (int i = 0; i < 5; i++)
{
tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now);
tw.Flush();
Thread.Sleep(1000);
}
tw.WriteLine("</body></html>");
}
}
finally
{
if (_maxThreads == 1 + _busy.Release())
_idle.Set();
}
}
}
Esto utiliza la cola escrita BlockingCollection para atender las solicitudes. Es utilizable como es. Debería derivar una clase de esta y anular la Respuesta.
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Threading;
namespace Service
{
class HttpServer : IDisposable
{
private HttpListener httpListener;
private Thread listenerLoop;
private Thread[] requestProcessors;
private BlockingCollection<HttpListenerContext> messages;
public HttpServer(int threadCount)
{
requestProcessors = new Thread[threadCount];
messages = new BlockingCollection<HttpListenerContext>();
httpListener = new HttpListener();
}
public virtual int Port { get; set; } = 80;
public virtual string[] Prefixes
{
get { return new string[] {string.Format(@"http://+:{0}/", Port )}; }
}
public void Start(int port)
{
listenerLoop = new Thread(HandleRequests);
foreach( string prefix in Prefixes ) httpListener.Prefixes.Add( prefix );
listenerLoop.Start();
for (int i = 0; i < requestProcessors.Length; i++)
{
requestProcessors[i] = StartProcessor(i, messages);
}
}
public void Dispose() { Stop(); }
public void Stop()
{
messages.CompleteAdding();
foreach (Thread worker in requestProcessors) worker.Join();
httpListener.Stop();
listenerLoop.Join();
}
private void HandleRequests()
{
httpListener.Start();
try
{
while (httpListener.IsListening)
{
Console.WriteLine("The Linstener Is Listening!");
HttpListenerContext context = httpListener.GetContext();
messages.Add(context);
Console.WriteLine("The Linstener has added a message!");
}
}
catch(Exception e)
{
Console.WriteLine (e.Message);
}
}
private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages)
{
Thread thread = new Thread(() => Processor(number, messages));
thread.Start();
return thread;
}
private void Processor(int number, BlockingCollection<HttpListenerContext> messages)
{
Console.WriteLine ("Processor {0} started.", number);
try
{
for (;;)
{
Console.WriteLine ("Processor {0} awoken.", number);
HttpListenerContext context = messages.Take();
Console.WriteLine ("Processor {0} dequeued message.", number);
Response (context);
}
} catch { }
Console.WriteLine ("Processor {0} terminated.", number);
}
public virtual void Response(HttpListenerContext context)
{
SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>") );
}
public static void SendReply(HttpListenerContext context, StringBuilder responseString )
{
byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString());
context.Response.ContentLength64 = buffer.Length;
System.IO.Stream output = context.Response.OutputStream;
output.Write(buffer, 0, buffer.Length);
output.Close();
}
}
}
Esta es una muestra de cómo usarlo. No es necesario utilizar eventos o bloqueos de bloqueo. El BlockingCollection resuelve todos estos problemas.
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
namespace Service
{
class Server
{
public static void Main (string[] args)
{
HttpServer Service = new QuizzServer (8);
Service.Start (80);
for (bool coninute = true; coninute ;)
{
string input = Console.ReadLine ().ToLower();
switch (input)
{
case "stop":
Console.WriteLine ("Stop command accepted.");
Service.Stop ();
coninute = false;
break;
default:
Console.WriteLine ("Unknown Command: ''{0}''.",input);
break;
}
}
}
}
}
He consultado mi código en EDITAR parte de mi pregunta y he decidido aceptarlo con algunas modificaciones:
public void Stop()
{
lock (locker)
{
isStopping = true;
}
resetEvent.WaitOne(); //initially set to true
listener.Stop();
}
private void ListenerCallback(IAsyncResult ar)
{
lock (locker) //locking on this is a bad idea, but I forget about it before
{
if (isStopping)
return;
resetEvent.Reset();
numberOfRequests++;
}
try
{
var listener = ar.AsyncState as HttpListener;
var context = listener.EndGetContext(ar);
//do some stuff
}
finally //to make sure that bellow code will be executed
{
lock (locker)
{
if (--numberOfRequests == 0)
resetEvent.Set();
}
}
}
Para completar, aquí es cómo se vería si administras tus propios hilos de trabajo:
class HttpServer : IDisposable
{
private readonly HttpListener _listener;
private readonly Thread _listenerThread;
private readonly Thread[] _workers;
private readonly ManualResetEvent _stop, _ready;
private Queue<HttpListenerContext> _queue;
public HttpServer(int maxThreads)
{
_workers = new Thread[maxThreads];
_queue = new Queue<HttpListenerContext>();
_stop = new ManualResetEvent(false);
_ready = new ManualResetEvent(false);
_listener = new HttpListener();
_listenerThread = new Thread(HandleRequests);
}
public void Start(int port)
{
_listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
_listener.Start();
_listenerThread.Start();
for (int i = 0; i < _workers.Length; i++)
{
_workers[i] = new Thread(Worker);
_workers[i].Start();
}
}
public void Dispose()
{ Stop(); }
public void Stop()
{
_stop.Set();
_listenerThread.Join();
foreach (Thread worker in _workers)
worker.Join();
_listener.Stop();
}
private void HandleRequests()
{
while (_listener.IsListening)
{
var context = _listener.BeginGetContext(ContextReady, null);
if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
return;
}
}
private void ContextReady(IAsyncResult ar)
{
try
{
lock (_queue)
{
_queue.Enqueue(_listener.EndGetContext(ar));
_ready.Set();
}
}
catch { return; }
}
private void Worker()
{
WaitHandle[] wait = new[] { _ready, _stop };
while (0 == WaitHandle.WaitAny(wait))
{
HttpListenerContext context;
lock (_queue)
{
if (_queue.Count > 0)
context = _queue.Dequeue();
else
{
_ready.Reset();
continue;
}
}
try { ProcessRequest(context); }
catch (Exception e) { Console.Error.WriteLine(e); }
}
}
public event Action<HttpListenerContext> ProcessRequest;
}
Simplemente llamando a listener.Stop () debería hacer el truco. Esto no terminará ninguna conexión que ya haya sido establecida, pero evitará cualquier conexión nueva.