c# - ventajas - la escalabilidad horizontal
Rendimiento WCF, latencia y escalabilidad. (3)
Estoy tratando de portar un simple servidor TCP asíncrono en F # a C # 4. El servidor recibe una conexión, lee una sola solicitud y transmite una secuencia de respuestas antes de cerrar la conexión.
Async en C # 4 se ve tedioso y propenso a errores, así que pensé que intentaría usar WCF en su lugar. No es improbable que este servidor vea 1.000 solicitudes simultáneas en la naturaleza, por lo que creo que tanto el rendimiento como la latencia son interesantes.
He escrito un servicio web WCF dúplex mínimo y un cliente de consola en C #. Aunque estoy usando WCF en lugar de sockets sin procesar, esto ya es de 175 líneas de código en comparación con 80 líneas para el original. Pero me preocupa más el rendimiento y la escalabilidad:
- La latencia es 154 × peor con WCF.
- El rendimiento es 54 × peor con WCF.
- TCP maneja 1,000 conexiones simultáneas con facilidad, pero WCF se ahoga con solo 20.
En primer lugar, estoy usando la configuración predeterminada para todo, así que me pregunto si hay algo que pueda modificar para mejorar estas cifras de rendimiento.
En segundo lugar, me pregunto si alguien está usando WCF para este tipo de cosas o si es la herramienta incorrecta para el trabajo.
Aquí está mi servidor WCF en C #:
IService1.cs
[DataContract]
public class Stock
{
[DataMember]
public DateTime FirstDealDate { get; set; }
[DataMember]
public DateTime LastDealDate { get; set; }
[DataMember]
public DateTime StartDate { get; set; }
[DataMember]
public DateTime EndDate { get; set; }
[DataMember]
public decimal Open { get; set; }
[DataMember]
public decimal High { get; set; }
[DataMember]
public decimal Low { get; set; }
[DataMember]
public decimal Close { get; set; }
[DataMember]
public decimal VolumeWeightedPrice { get; set; }
[DataMember]
public decimal TotalQuantity { get; set; }
}
[ServiceContract(CallbackContract = typeof(IPutStock))]
public interface IStock
{
[OperationContract]
void GetStocks();
}
public interface IPutStock
{
[OperationContract]
void PutStock(Stock stock);
}
Service1.svc
<%@ ServiceHost Language="C#" Debug="true" Service="DuplexWcfService2.Stocks" CodeBehind="Service1.svc.cs" %>
Service1.svc.cs
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]
public class Stocks : IStock
{
IPutStock callback;
#region IStock Members
public void GetStocks()
{
callback = OperationContext.Current.GetCallbackChannel<IPutStock>();
Stock st = null;
st = new Stock
{
FirstDealDate = System.DateTime.Now,
LastDealDate = System.DateTime.Now,
StartDate = System.DateTime.Now,
EndDate = System.DateTime.Now,
Open = 495,
High = 495,
Low = 495,
Close = 495,
VolumeWeightedPrice = 495,
TotalQuantity = 495
};
for (int i=0; i<1000; ++i)
callback.PutStock(st);
}
#endregion
}
Web.config
<?xml version="1.0"?>
<configuration>
<system.web>
<compilation debug="true" targetFramework="4.0" />
</system.web>
<system.serviceModel>
<services>
<service name="DuplexWcfService2.Stocks">
<endpoint address="" binding="wsDualHttpBinding" contract="DuplexWcfService2.IStock">
<identity>
<dns value="localhost"/>
</identity>
</endpoint>
<endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange"/>
</service>
</services>
<behaviors>
<serviceBehaviors>
<behavior>
<serviceMetadata httpGetEnabled="true"/>
<serviceDebug includeExceptionDetailInFaults="true"/>
</behavior>
</serviceBehaviors>
</behaviors>
<serviceHostingEnvironment multipleSiteBindingsEnabled="true" />
</system.serviceModel>
<system.webServer>
<modules runAllManagedModulesForAllRequests="true"/>
</system.webServer>
</configuration>
Aquí está el cliente C # WCF:
Program.cs
[CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, UseSynchronizationContext = false)]
class Callback : DuplexWcfService2.IStockCallback
{
System.Diagnostics.Stopwatch timer;
int n;
public Callback(System.Diagnostics.Stopwatch t)
{
timer = t;
n = 0;
}
public void PutStock(DuplexWcfService2.Stock st)
{
++n;
if (n == 1)
Console.WriteLine("First result in " + this.timer.Elapsed.TotalSeconds + "s");
if (n == 1000)
Console.WriteLine("1,000 results in " + this.timer.Elapsed.TotalSeconds + "s");
}
}
class Program
{
static void Test(int i)
{
var timer = System.Diagnostics.Stopwatch.StartNew();
var ctx = new InstanceContext(new Callback(timer));
var proxy = new DuplexWcfService2.StockClient(ctx);
proxy.GetStocks();
Console.WriteLine(i + " connected");
}
static void Main(string[] args)
{
for (int i=0; i<10; ++i)
{
int j = i;
new System.Threading.Thread(() => Test(j)).Start();
}
}
}
Aquí está mi código de servidor y cliente TCP asíncrono en F #:
type AggregatedDeals =
{
FirstDealTime: System.DateTime
LastDealTime: System.DateTime
StartTime: System.DateTime
EndTime: System.DateTime
Open: decimal
High: decimal
Low: decimal
Close: decimal
VolumeWeightedPrice: decimal
TotalQuantity: decimal
}
let read (stream: System.IO.Stream) = async {
let! header = stream.AsyncRead 4
let length = System.BitConverter.ToInt32(header, 0)
let! body = stream.AsyncRead length
let fmt = System.Runtime.Serialization.Formatters.Binary.BinaryFormatter()
use stream = new System.IO.MemoryStream(body)
return fmt.Deserialize(stream)
}
let write (stream: System.IO.Stream) value = async {
let body =
let fmt = System.Runtime.Serialization.Formatters.Binary.BinaryFormatter()
use stream = new System.IO.MemoryStream()
fmt.Serialize(stream, value)
stream.ToArray()
let header = System.BitConverter.GetBytes body.Length
do! stream.AsyncWrite header
do! stream.AsyncWrite body
}
let endPoint = System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 4502)
let server() = async {
let listener = System.Net.Sockets.TcpListener(endPoint)
listener.Start()
while true do
let client = listener.AcceptTcpClient()
async {
use stream = client.GetStream()
let! _ = stream.AsyncRead 1
for i in 1..1000 do
let aggregatedDeals =
{
FirstDealTime = System.DateTime.Now
LastDealTime = System.DateTime.Now
StartTime = System.DateTime.Now
EndTime = System.DateTime.Now
Open = 1m
High = 1m
Low = 1m
Close = 1m
VolumeWeightedPrice = 1m
TotalQuantity = 1m
}
do! write stream aggregatedDeals
} |> Async.Start
}
let client() = async {
let timer = System.Diagnostics.Stopwatch.StartNew()
use client = new System.Net.Sockets.TcpClient()
client.Connect endPoint
use stream = client.GetStream()
do! stream.AsyncWrite [|0uy|]
for i in 1..1000 do
let! _ = read stream
if i=1 then lock stdout (fun () ->
printfn "First result in %fs" timer.Elapsed.TotalSeconds)
lock stdout (fun () ->
printfn "1,000 results in %fs" timer.Elapsed.TotalSeconds)
}
do
server() |> Async.Start
seq { for i in 1..100 -> client() }
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
Para responder primero a su segunda pregunta, WCF siempre tendrá una sobrecarga en comparación con los sockets en bruto. Pero tiene una tonelada de funcionalidad (como seguridad, confiabilidad, interoperabilidad, múltiples protocolos de transporte, rastreo, etc.) en comparación con los sockets sin procesar, ya que la compensación es aceptable para usted y se basa en su escenario. Parece que está haciendo alguna aplicación de negociación financiera y WCF posiblemente no sea adecuado para su caso (aunque no estoy en la industria financiera para calificar esto con experiencia).
Para su primera pregunta, en lugar de un enlace http doble, intente hospedar un servicio WCF separado en el cliente para que el cliente pueda ser un servicio por sí mismo y, si es posible, utilice el enlace netTCP. Ajustar los atributos en el elemento serviceThrottling en el comportamiento del servicio. Los valores por defecto eran más bajos antes de .Net 4.
WCF selecciona valores muy seguros para casi todos sus valores predeterminados. Esto sigue la filosofía de no dejar que el desarrollador novato se dispare a sí mismo. Sin embargo, si conoce las limitaciones que deben cambiarse y los enlaces que debe usar, puede obtener un rendimiento y una escala razonables.
En mi núcleo i5-2400 (cuádruple núcleo, sin subprocesos, 3,10 GHz), la solución a continuación ejecutará 1000 clientes con 1000 devoluciones de llamada cada uno para un tiempo de ejecución total promedio de 20 segundos. Eso es 1,000,000 llamadas de WCF en 20 segundos.
Desafortunadamente, no pude hacer que tu programa F # se ejecutara para una comparación directa. Si ejecuta mi solución en su caja, ¿podría publicar algunos números de comparación de desempeño F # vs C # WCF?
Descargo de responsabilidad : el siguiente está destinado a ser una prueba de concepto. Algunos de estos ajustes no tienen sentido para la producción.
Lo que hice:
- Se eliminó el enlace dúplex y los clientes crearon sus propios hosts de servicio para recibir las devoluciones de llamada. Esto es esencialmente lo que hace un enlace dúplex bajo el capó. (También es la sugerencia de Pratik)
- Se cambió el enlace a netTcpBinding.
- Valores de regulación modificados:
- WCF: maxConcurrentCalls, maxConcurrentSessions, maxConcurrentInstances todos a 1000
- Enlace TCP : maxConnections = 1000
- Threadpool: Min threads de trabajo = 1000, Min IO threads = 2000
- Se agregó IsOneWay a las operaciones de servicio
Tenga en cuenta que en este prototipo todos los servicios y clientes están en el mismo dominio de aplicación y comparten el mismo grupo de subprocesos.
Que aprendí:
- Cuando un cliente obtuvo un "No se pudo establecer conexión porque la máquina de destino lo rechazó activamente" excepción
- Posibles causas:
- Se ha alcanzado el límite de WCF
- Se ha alcanzado el límite de TCP
- No había un hilo de E / S disponible para manejar la llamada.
- La solución para el # 3 fue:
- Aumente el número mínimo de hilos IO -O-
- Haga que StockService realice sus devoluciones de llamada en un subproceso de trabajo (esto aumenta el tiempo de ejecución total)
- Posibles causas:
- Al agregar IsOneWay, se reduce el tiempo de ejecución a la mitad (de 40 segundos a 20 segundos).
La salida del programa se ejecuta en un Core i5-2400. Tenga en cuenta que los temporizadores se utilizan de forma diferente a la pregunta original (consulte el código).
All client hosts open.
Service Host opened. Starting timer...
Press ENTER to close the host one you see ''ALL DONE''.
Client #100 completed 1,000 results in 0.0542168 s
Client #200 completed 1,000 results in 0.0794684 s
Client #300 completed 1,000 results in 0.0673078 s
Client #400 completed 1,000 results in 0.0527753 s
Client #500 completed 1,000 results in 0.0581796 s
Client #600 completed 1,000 results in 0.0770291 s
Client #700 completed 1,000 results in 0.0681298 s
Client #800 completed 1,000 results in 0.0649353 s
Client #900 completed 1,000 results in 0.0714947 s
Client #1000 completed 1,000 results in 0.0450857 s
ALL DONE. Total number of clients: 1000 Total runtime: 19323 msec
Código todo en un archivo de aplicación de consola:
using System;
using System.Collections.Generic;
using System.ServiceModel;
using System.Diagnostics;
using System.Threading;
using System.Runtime.Serialization;
namespace StockApp
{
[DataContract]
public class Stock
{
[DataMember]
public DateTime FirstDealDate { get; set; }
[DataMember]
public DateTime LastDealDate { get; set; }
[DataMember]
public DateTime StartDate { get; set; }
[DataMember]
public DateTime EndDate { get; set; }
[DataMember]
public decimal Open { get; set; }
[DataMember]
public decimal High { get; set; }
[DataMember]
public decimal Low { get; set; }
[DataMember]
public decimal Close { get; set; }
[DataMember]
public decimal VolumeWeightedPrice { get; set; }
[DataMember]
public decimal TotalQuantity { get; set; }
}
[ServiceContract]
public interface IStock
{
[OperationContract(IsOneWay = true)]
void GetStocks(string address);
}
[ServiceContract]
public interface IPutStock
{
[OperationContract(IsOneWay = true)]
void PutStock(Stock stock);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
public class StocksService : IStock
{
public void SendStocks(object obj)
{
string address = (string)obj;
ChannelFactory<IPutStock> factory = new ChannelFactory<IPutStock>("CallbackClientEndpoint");
IPutStock callback = factory.CreateChannel(new EndpointAddress(address));
Stock st = null; st = new Stock
{
FirstDealDate = System.DateTime.Now,
LastDealDate = System.DateTime.Now,
StartDate = System.DateTime.Now,
EndDate = System.DateTime.Now,
Open = 495,
High = 495,
Low = 495,
Close = 495,
VolumeWeightedPrice = 495,
TotalQuantity = 495
};
for (int i = 0; i < 1000; ++i)
callback.PutStock(st);
//Console.WriteLine("Done calling {0}", address);
((ICommunicationObject)callback).Shutdown();
factory.Shutdown();
}
public void GetStocks(string address)
{
/// WCF service methods execute on IO threads.
/// Passing work off to worker thread improves service responsiveness... with a measurable cost in total runtime.
System.Threading.ThreadPool.QueueUserWorkItem(new System.Threading.WaitCallback(SendStocks), address);
// SendStocks(address);
}
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerSession)]
public class Callback : IPutStock
{
public static int CallbacksCompleted = 0;
System.Diagnostics.Stopwatch timer = Stopwatch.StartNew();
int n = 0;
public void PutStock(Stock st)
{
++n;
if (n == 1000)
{
//Console.WriteLine("1,000 results in " + this.timer.Elapsed.TotalSeconds + "s");
int compelted = Interlocked.Increment(ref CallbacksCompleted);
if (compelted % 100 == 0)
{
Console.WriteLine("Client #{0} completed 1,000 results in {1} s", compelted, this.timer.Elapsed.TotalSeconds);
if (compelted == Program.CLIENT_COUNT)
{
Console.WriteLine("ALL DONE. Total number of clients: {0} Total runtime: {1} msec", Program.CLIENT_COUNT, Program.ProgramTimer.ElapsedMilliseconds);
}
}
}
}
}
class Program
{
public const int CLIENT_COUNT = 1000; // TEST WITH DIFFERENT VALUES
public static System.Diagnostics.Stopwatch ProgramTimer;
static void StartCallPool(object uriObj)
{
string callbackUri = (string)uriObj;
ChannelFactory<IStock> factory = new ChannelFactory<IStock>("StockClientEndpoint");
IStock proxy = factory.CreateChannel();
proxy.GetStocks(callbackUri);
((ICommunicationObject)proxy).Shutdown();
factory.Shutdown();
}
static void Test()
{
ThreadPool.SetMinThreads(CLIENT_COUNT, CLIENT_COUNT * 2);
// Create all the hosts that will recieve call backs.
List<ServiceHost> callBackHosts = new List<ServiceHost>();
for (int i = 0; i < CLIENT_COUNT; ++i)
{
string port = string.Format("{0}", i).PadLeft(3, ''0'');
string baseAddress = "net.tcp://localhost:7" + port + "/";
ServiceHost callbackHost = new ServiceHost(typeof(Callback), new Uri[] { new Uri( baseAddress)});
callbackHost.Open();
callBackHosts.Add(callbackHost);
}
Console.WriteLine("All client hosts open.");
ServiceHost stockHost = new ServiceHost(typeof(StocksService));
stockHost.Open();
Console.WriteLine("Service Host opened. Starting timer...");
ProgramTimer = Stopwatch.StartNew();
foreach (var callbackHost in callBackHosts)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(StartCallPool), callbackHost.BaseAddresses[0].AbsoluteUri);
}
Console.WriteLine("Press ENTER to close the host once you see ''ALL DONE''.");
Console.ReadLine();
foreach (var h in callBackHosts)
h.Shutdown();
stockHost.Shutdown();
}
static void Main(string[] args)
{
Test();
}
}
public static class Extensions
{
static public void Shutdown(this ICommunicationObject obj)
{
try
{
obj.Close();
}
catch (Exception ex)
{
Console.WriteLine("Shutdown exception: {0}", ex.Message);
obj.Abort();
}
}
}
}
app.config:
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<services>
<service name="StockApp.StocksService">
<host>
<baseAddresses>
<add baseAddress="net.tcp://localhost:8123/StockApp/"/>
</baseAddresses>
</host>
<endpoint address="" binding="netTcpBinding" bindingConfiguration="tcpConfig" contract="StockApp.IStock">
<identity>
<dns value="localhost"/>
</identity>
</endpoint>
</service>
<service name="StockApp.Callback">
<host>
<baseAddresses>
<!-- Base address defined at runtime. -->
</baseAddresses>
</host>
<endpoint address="" binding="netTcpBinding" bindingConfiguration="tcpConfig" contract="StockApp.IPutStock">
<identity>
<dns value="localhost"/>
</identity>
</endpoint>
</service>
</services>
<client>
<endpoint name="StockClientEndpoint"
address="net.tcp://localhost:8123/StockApp/"
binding="netTcpBinding"
bindingConfiguration="tcpConfig"
contract="StockApp.IStock" >
</endpoint>
<!-- CallbackClientEndpoint address defined at runtime. -->
<endpoint name="CallbackClientEndpoint"
binding="netTcpBinding"
bindingConfiguration="tcpConfig"
contract="StockApp.IPutStock" >
</endpoint>
</client>
<behaviors>
<serviceBehaviors>
<behavior>
<!--<serviceMetadata httpGetEnabled="true"/>-->
<serviceDebug includeExceptionDetailInFaults="true"/>
<serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="1000" maxConcurrentInstances="1000" />
</behavior>
</serviceBehaviors>
</behaviors>
<bindings>
<netTcpBinding>
<binding name="tcpConfig" listenBacklog="100" maxConnections="1000">
<security mode="None"/>
<reliableSession enabled="false" />
</binding>
</netTcpBinding>
</bindings>
</system.serviceModel>
</configuration>
Actualización : Acabo de probar la solución anterior con un netNamedPipeBinding:
<netNamedPipeBinding >
<binding name="pipeConfig" maxConnections="1000" >
<security mode="None"/>
</binding>
</netNamedPipeBinding>
En realidad fue 3 segundos más lento (de 20 a 23 segundos). Dado que este ejemplo en particular es todo entre procesos, no estoy seguro de por qué. Si alguien tiene alguna idea, por favor comente.
Yo diría que depende de tus metas. Si desea impulsar su hardware en la medida de lo posible, es posible obtener más de 10,000 clientes conectados fácilmente, el secreto es minimizar el tiempo empleado en el recolector de basura y usar sockets de manera eficiente.
Tengo algunas publicaciones sobre Sockets en F # aquí: http://moiraesoftware.com
Estoy haciendo un trabajo continuo con una biblioteca llamada Fracture-IO aquí: https://github.com/fractureio/fracture
Es posible que desee revisar esos para ideas ...