Servidor TCP de alto rendimiento en C#
scalable (5)
Soy un desarrollador experimentado de C #, pero hasta ahora no he desarrollado una aplicación de servidor TCP. Ahora tengo que desarrollar un servidor altamente escalable y de alto rendimiento que pueda manejar al menos 5-10 mil conexiones simultáneas: obtener datos de bytes de byte a través de GPRS desde dispositivos GPS.
Un proceso de comunicación común debería verse así:
- El dispositivo GPS inicia una conexión a mi servidor
- mi servidor responde si quiero obtener datos
- dispositivo enviar datos de GPS
- mi servidor envía un informe al dispositivo sobre cómo obtenerlo (sg como suma de comprobación)
- obteniendo nuevos datos del GPS, reportm y esto sucede una y otra vez
- luego, el DISPOSITIVO GPS cierra la conexión
Entonces, en mi servidor necesito
- rastrear clientes conectados / activos
- para cerrar cualquier cliente del lado del servidor
- atrapar el evento, cuando un dispositivo cierra la conexión
- obtener datos de bytes
- enviar datos a los clientes
Comencé a leer sobre este tema en Internet, pero parece ser una pesadilla para mí. Hay muchas maneras, pero no pude averiguar cuál es la mejor.
Los métodos de socket asíncrono parecen ser los mejores para mí, pero escribir código en este estilo asíncrono es terrible y no es fácil de depurar.
Entonces mi pregunta es: ¿cuál crees que es la mejor manera de implementar un servidor TCP de alto rendimiento en C #? ¿Conoces algún buen componente de código abierto para hacer esto? (Probé varios, pero no pude encontrar uno bueno).
Como dice Remus arriba, debes utilizar la función asincrónica para mantener el rendimiento alto. Ese es el método Begin ... / End ... en .NET.
Debajo del capó para enchufes, estos métodos hacen uso de IO Completion Ports que parece ser la forma más eficaz de procesar muchos sockets en los sistemas operativos Windows.
Como dice Jim, la clase TcpClient puede ayudar aquí y es bastante fácil de usar. Aquí hay un ejemplo del uso de TcpListener para escuchar conexiones entrantes y TcpClient para manejarlas, con las llamadas BeginAccept y BeginRead iniciales como asincrónicas.
Este ejemplo asume que se usa un protocolo basado en mensajes en los sockets y que se omite, excepto que los primeros 4 bytes de cada transmisión son la longitud, pero eso le permite usar una lectura sincrónica en la transmisión para obtener el resto de los datos eso ya está amortiguado
Aquí está el código:
class ClientContext
{
public TcpClient Client;
public Stream Stream;
public byte[] Buffer = new byte[4];
public MemoryStream Message = new MemoryStream();
}
class Program
{
static void OnMessageReceived(ClientContext context)
{
// process the message here
}
static void OnClientRead(IAsyncResult ar)
{
ClientContext context = ar.AsyncState as ClientContext;
if (context == null)
return;
try
{
int read = context.Stream.EndRead(ar);
context.Message.Write(context.Buffer, 0, read);
int length = BitConverter.ToInt32(context.Buffer, 0);
byte[] buffer = new byte[1024];
while (length > 0)
{
read = context.Stream.Read(buffer, 0, Math.Min(buffer.Length, length));
context.Message.Write(buffer, 0, read);
length -= read;
}
OnMessageReceived(context);
}
catch (System.Exception)
{
context.Client.Close();
context.Stream.Dispose();
context.Message.Dispose();
context = null;
}
finally
{
if (context != null)
context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
}
}
static void OnClientAccepted(IAsyncResult ar)
{
TcpListener listener = ar.AsyncState as TcpListener;
if (listener == null)
return;
try
{
ClientContext context = new ClientContext();
context.Client = listener.EndAcceptTcpClient(ar);
context.Stream = context.Client.GetStream();
context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
}
finally
{
listener.BeginAcceptTcpClient(OnClientAccepted, listener);
}
}
static void Main(string[] args)
{
TcpListener listener = new TcpListener(new IPEndPoint(IPAddress.Any, 20000));
listener.Start();
listener.BeginAcceptTcpClient(OnClientAccepted, listener);
Console.Write("Press enter to exit...");
Console.ReadLine();
listener.Stop();
}
}
Demuestra cómo manejar las llamadas asincrónicas, pero necesitará un manejo de errores para asegurarse de que el TcpListener siempre acepta nuevas conexiones y más manejo de errores para cuando los clientes se desconectan inesperadamente. Además, parece haber algunos casos en los que no todos los datos llegan de una sola vez y que también deberían ser tratados.
Creo que también estás buscando técnicas UDP. Para 10k clientes, es rápido, pero el problema es que debe implementar el acuse de recibo para cada mensaje que recibió el mensaje. En UDP no necesita abrir un socket para cada cliente, pero necesita implementar un mecanismo de latido / ping después de x segundos para verificar qué cliente está conectado o no.
Debe ser asincrónico, no hay forma de evitar esto. El alto rendimiento y la escalabilidad no se mezclan con un hilo por zócalo. Puedes echar un vistazo a lo que están haciendo los propios StackExchange, ver asincronización. Redis aguarda BookSleeve que aprovecha las funciones CTP de la próxima versión de C # (por lo tanto, está al límite y sujeto a cambios, pero es genial). Para obtener aún más ventajas, las soluciones evolucionan aprovechando la clase SocketAsyncEventArgs, que lleva las cosas un paso más allá al eliminar las asignaciones frecuentes de controladores asíncronos asociados con el procesamiento asíncrono C # clásico:
La clase SocketAsyncEventArgs es parte de un conjunto de mejoras a la clase System.Net.Sockets.Socket que proporciona un patrón asincrónico alternativo que puede ser utilizado por aplicaciones especializadas de socket de alto rendimiento. Esta clase se diseñó específicamente para aplicaciones de servidor de red que requieren alto rendimiento. Una aplicación puede usar el patrón asíncrono mejorado exclusivamente o solo en áreas calientes específicas (por ejemplo, cuando se reciben grandes cantidades de datos).
Larga historia corta: aprende asincrónico o muere intentándolo ...
Por cierto, si preguntas por qué asíncrona, lee los tres artículos vinculados de esta publicación: Programas de Windows de alto rendimiento . La respuesta final es: el diseño del sistema operativo subyacente lo requiere.
Puede usar mi TCP CSharpServer que he creado, es muy simple de implementar, solo implemente IClientRequest en una de sus clases.
using System;
using System.Collections.Generic;
using System.Linq;
namespace cSharpServer
{
public interface IClientRequest
{
/// <summary>
/// this needs to be set, otherwise the server will not beable to handle the request.
/// </summary>
byte IdType { get; set; } // This is used for Execution.
/// <summary>
/// handle the process by the client.
/// </summary>
/// <param name="data"></param>
/// <param name="client"></param>
/// <returns></returns>
byte[] Process(BinaryBuffer data, Client client);
}
}
BinaryBuffer le permite leer los datos enviados al servidor realmente fácil.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
namespace cSharpServer
{
public class BinaryBuffer
{
private const string Str0001 = "You are at the End of File!";
private const string Str0002 = "You are Not Reading from the Buffer!";
private const string Str0003 = "You are Currenlty Writing to the Buffer!";
private const string Str0004 = "You are Currenlty Reading from the Buffer!";
private const string Str0005 = "You are Not Writing to the Buffer!";
private const string Str0006 = "You are trying to Reverse Seek, Unable to add a Negative value!";
private bool _inRead;
private bool _inWrite;
private List<byte> _newBytes;
private int _pointer;
public byte[] ByteBuffer;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override string ToString()
{
return Helper.DefaultEncoding.GetString(ByteBuffer, 0, ByteBuffer.Length);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public BinaryBuffer(string data)
: this(Helper.DefaultEncoding.GetBytes(data))
{
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public BinaryBuffer()
{
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public BinaryBuffer(byte[] data)
: this(ref data)
{
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public BinaryBuffer(ref byte[] data)
{
ByteBuffer = data;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void IncrementPointer(int add)
{
if (add < 0)
{
throw new Exception(Str0006);
}
_pointer += add;
if (EofBuffer())
{
throw new Exception(Str0001);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetPointer()
{
return _pointer;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string GetString(ref byte[] buffer)
{
return Helper.DefaultEncoding.GetString(buffer, 0, buffer.Length);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string GetString(byte[] buffer)
{
return GetString(ref buffer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
if (_inRead)
{
throw new Exception(Str0004);
}
_inWrite = true;
_newBytes = new List<byte>();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(float value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
_newBytes.AddRange(BitConverter.GetBytes(value));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(byte value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
_newBytes.Add(value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(int value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
_newBytes.AddRange(BitConverter.GetBytes(value));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(long value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
byte[] byteArray = new byte[8];
unsafe
{
fixed (byte* bytePointer = byteArray)
{
*((long*)bytePointer) = value;
}
}
_newBytes.AddRange(byteArray);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int UncommitedLength()
{
return _newBytes == null ? 0 : _newBytes.Count;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void WriteField(string value)
{
Write(value.Length);
Write(value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(string value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
byte[] byteArray = Helper.DefaultEncoding.GetBytes(value);
_newBytes.AddRange(byteArray);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(decimal value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
int[] intArray = decimal.GetBits(value);
Write(intArray[0]);
Write(intArray[1]);
Write(intArray[2]);
Write(intArray[3]);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SetInt(int value, int pos)
{
byte[] byteInt = BitConverter.GetBytes(value);
for (int i = 0; i < byteInt.Length; i++)
{
_newBytes[pos + i] = byteInt[i];
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SetLong(long value, int pos)
{
byte[] byteInt = BitConverter.GetBytes(value);
for (int i = 0; i < byteInt.Length; i++)
{
_newBytes[pos + i] = byteInt[i];
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(byte[] value)
{
Write(ref value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(ref byte[] value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
_newBytes.AddRange(value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndWrite()
{
if (ByteBuffer != null)
{
_newBytes.InsertRange(0, ByteBuffer);
}
ByteBuffer = _newBytes.ToArray();
_newBytes = null;
_inWrite = false;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndRead()
{
_inRead = false;
_pointer = 0;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginRead()
{
if (_inWrite)
{
throw new Exception(Str0003);
}
_inRead = true;
_pointer = 0;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public byte ReadByte()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer())
{
throw new Exception(Str0001);
}
return ByteBuffer[_pointer++];
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int ReadInt()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(4))
{
throw new Exception(Str0001);
}
int startPointer = _pointer;
_pointer += 4;
return BitConverter.ToInt32(ByteBuffer, startPointer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public float[] ReadFloatArray()
{
float[] dataFloats = new float[ReadInt()];
for (int i = 0; i < dataFloats.Length; i++)
{
dataFloats[i] = ReadFloat();
}
return dataFloats;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public float ReadFloat()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(sizeof(float)))
{
throw new Exception(Str0001);
}
int startPointer = _pointer;
_pointer += sizeof(float);
return BitConverter.ToSingle(ByteBuffer, startPointer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public decimal ReadDecimal()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(16))
{
throw new Exception(Str0001);
}
return new decimal(new[] { ReadInt(),
ReadInt(),
ReadInt(),
ReadInt()
});
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long ReadLong()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(8))
{
throw new Exception(Str0001);
}
int startPointer = _pointer;
_pointer += 8;
return BitConverter.ToInt64(ByteBuffer, startPointer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string ReadString(int size)
{
return Helper.DefaultEncoding.GetString(ReadByteArray(size), 0, size);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public byte[] ReadByteArray(int size)
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(size))
{
throw new Exception(Str0001);
}
byte[] newBuffer = new byte[size];
Array.Copy(ByteBuffer, _pointer, newBuffer, 0, size);
_pointer += size;
return newBuffer;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool EofBuffer(int over = 1)
{
return ByteBuffer == null || ((_pointer + over) > ByteBuffer.Length);
}
}
}
El proyecto completo está en GitHub CSharpServer
Puedes hacer esto con la clase TcpClient , aunque a decir verdad no sé si podrías tener 10 mil enchufes abiertos. Eso es bastante. Pero regularmente uso TcpClient
para manejar docenas de sockets concurrentes. Y el modelo asíncrono es realmente muy bueno de usar.
Tu mayor problema no será hacer funcionar a TcpClient
. Con 10 mil conexiones simultáneas, creo que el ancho de banda y la escalabilidad serán problemas. Ni siquiera sé si una máquina puede manejar todo ese tráfico. Supongo que depende de qué tan grandes sean los paquetes y cuán a menudo estén llegando. Pero será mejor que hagas una estimación de respaldo antes de comprometerte a implementar todo esto en una sola computadora.