c# - ravendb vs mongodb
RavenDB Stream para resultados ilimitados-Resiliencia de conexión (1)
Según la sugerencia de @StriplingWarrior, he recreado la solución utilizando suscripciones de datos .
Usando este enfoque, pude iterar en todas las 2 millones de filas (aunque es cierto que con mucho menos procesamiento por elemento); 2 puntos aquí que habrían ayudado cuando intentamos implementar la misma lógica utilizando Streams:
- Los lotes solo se eliminan de la "cola" de suscripción una vez reconocidos (como la mayoría de las colas estándar)
- El
IObserver<T>
suscritoIObserver<T>
completarse correctamente para que se establezca este reconocimiento. - Esta información es manejada por el servidor en lugar del cliente, por lo que permite que el cliente se reinicie sin afectar la última posición exitosa procesada en la suscripción
- Vea aquí para más detalles
- El
- Como lo indicó @StriplingWarrior porque puede crear suscripciones con filtros hasta el nivel de propiedad, sería posible reproducir con un conjunto de resultados más pequeño en el caso de una excepción dentro de la propia suscripción.
- El primer punto realmente supera esto; pero nos permite una flexibilidad adicional que no se ve en la API de Stream
El entorno de prueba es una base de datos RavenDB 3.0 (máquina local, que se ejecuta como un servicio de Windows) con configuración predeterminada contra una colección de 2 millones de registros.
Código para generar los registros ficticios:
using (IDocumentStore store = GetDocumentStore())
{
store.Initialize();
using (var bulkInsert = store.BulkInsert())
{
for (var i = 0; i != recordsToCreate; i++)
{
var person = new Person
{
Id = Guid.NewGuid(),
Firstname = NameGenerator.GenerateFirstName(),
Lastname = NameGenerator.GenerateLastName()
};
bulkInsert.Store(person);
}
}
}
Suscribirse a esta colección es entonces un caso de:
using (IDocumentStore store = GetDocumentStore())
{
store.Initialize();
var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>());
var personSubscription = store.Subscriptions.Open<Person>(
subscriptionId, new SubscriptionConnectionOptions()
{
BatchOptions = new SubscriptionBatchOptions()
{
// Max number of docs that can be sent in a single batch
MaxDocCount = 16 * 1024,
// Max total batch size in bytes
MaxSize = 4 * 1024 * 1024,
// Max time the subscription needs to confirm that the batch
// has been successfully processed
AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
},
IgnoreSubscribersErrors = false,
ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
});
personSubscription.Subscribe(new PersonObserver());
while (true)
{
Thread.Sleep(TimeSpan.FromMilliseconds(500));
}
}
Tenga en cuenta el PersonObserver
; Esta es solo una implementación básica de IObserver así:
public class PersonObserver : IObserver<Person>
{
public void OnCompleted()
{
Console.WriteLine("Completed");
}
public void OnError(Exception error)
{
Console.WriteLine("Error occurred: " + error.ToString());
}
public void OnNext(Person person)
{
Console.WriteLine($"Received ''{person.Firstname} {person.Lastname}''");
}
}
Estamos utilizando la funcionalidad Stream en RavenDB para cargar, transformar y migrar datos entre 2 bases de datos, como:
var query = originSession.Query<T>(IndexForQuery);
using (var stream = originSession.Advanced.Stream(query))
{
while (stream.MoveNext())
{
var streamedDocument = stream.Current.Document;
OpenSessionAndMigrateSingleDocument(streamedDocument);
}
}
El problema es que una de las colecciones tiene millones de filas, y seguimos recibiendo una IOException
en el siguiente formato:
Application: MigrateToNewSchema.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.IO.IOException
Stack:
at System.Net.ConnectStream.Read(Byte[], Int32, Int32)
at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32)
at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32)
at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef)
at System.IO.StreamReader.Read(Char[], Int32, Int32)
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32)
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char)
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char)
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue()
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal()
at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read()
at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader)
at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext()
at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext()
at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection()
at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore)
at MigrateToNewSchema.Program.Main(System.String[])
Esto ocurre bastante lejos en la transmisión y, por supuesto, durante este período de tiempo ocurrirán problemas de conexión transitorios (se demoran horas en completarse).
Sin embargo, cuando reintentamos, ya que estamos utilizando una Query
debemos comenzar desde cero. Entonces, en última instancia, si hay una falla en la conexión durante todo el Stream
entonces debemos intentarlo nuevamente, y hasta que funcione de extremo a extremo.
Sé que puede usar ETag
con flujo para reiniciar de manera efectiva en un punto determinado, sin embargo, no hay una sobrecarga para hacer esto con una Query
que necesitamos para filtrar los resultados que se migran y especificar la recopilación correcta.
Entonces, en RavenDB, ¿hay alguna manera de mejorar la capacidad de recuperación interna de la conexión (propiedad de la cadena de conexión, configuración interna, etc.) o "recuperar" efectivamente una secuencia en un error?