c# .net tsql sql-server-2012 sqlclr

c# - Compresión de conjunto de filas utilizando CLR y GZIP



.net tsql (2)

En su lugar, ¿ha considerado crear una nueva base de datos de ''Archivado'' (tal vez configurada como modelo de recuperación simple), donde puede volcar todos sus datos antiguos? A eso se puede acceder fácilmente en las consultas para que no haya dolor, por ejemplo

SELECT * FROM archive..olddata

Cuando crea el archivo db, lo coloca en otro disco y lo maneja de forma diferente en el procedimiento de copia de seguridad; tal vez haga el procedimiento de archivo una vez a la semana, luego solo necesita ser copiado luego de eso, y después de haberlo aplastado hasta casi cero con 7zip / rar.

No intente comprimir el db utilizando compresión NTFS, el servidor SQL no lo admite; me di cuenta de eso, una tarde muy avanzada :)

Quiero comprimir una tabla grande que contenga datos históricos que rara vez o no se leen. Primero intento usar las compresiones row ( row , page , column stored , column-stored archive ) pero ninguna de ellas puede comprimir valores fuera de la fila ( varchar(max) , nvarchar(max) ) y finalmente terminar tratando de usar la solución CLR

La solución de ejemplo de Rowset comprimido de SQL Server está comprimiendo todo el conjunto de filas devuelto por una consulta determinada utilizando el tipo de CLR definido por el usuario.

Por ejemplo:

CREATE TABLE Archive ( [Date] DATETIME2 DEFAULT(GETUTCDATE()) ,[Data] [dbo].[CompressedRowset] ) INSERT INTO Archive([Data]) SELECT [dbo].[CompressQueryResults](''SELECT * FROM [dbo].[A]'')

Está funcionando, pero me encontré con los siguientes problemas:

  • cuando intento comprimir un gran conjunto de filas de resultados, aparece el siguiente error :

    Msg 0, nivel 11, estado 0, línea 0 Se produjo un error grave en el comando actual. Los resultados, si los hay, deben descartarse.

    Además, la siguiente declaración está funcionando:

    SELECT [dbo].[CompressQueryResults] (''SELECT * FROM [dbo].[LargeA]'')

    pero estos no son:

    INSERT INTO Archive SELECT [dbo].[CompressQueryResults] (''SELECT * FROM [dbo].[LargeA]'' DECLARE @A [dbo].[CompressedRowset] SELECT @A = [dbo].[CompressQueryResults] (''SELECT * FROM [dbo].[LargeA]'')

  • para comprimir un conjunto de filas, el tipo t-sql type debe asignarse al .net type ; desafortunadamente, esto no es cierto para todos los tipos de sql - Asignación de datos de parámetros CLR ; Ya he expandido la siguiente función para manejar más tipos, pero cómo manejar tipos como la geography por ejemplo :

    static SqlDbType ToSqlType(Type t){ if (t == typeof(int)){ return SqlDbType.Int; } ... if (t == typeof(Byte[])){ return SqlDbType.VarBinary; } else { throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion"); } }

Aquí está el código completo de .net :

using System; using System.Data; using System.Data.SqlClient; using System.Data.SqlTypes; using Microsoft.SqlServer.Server; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.IO.Compression; using System.Xml.Serialization; using System.Xml; [Serializable] [Microsoft.SqlServer.Server.SqlUserDefinedType ( Format.UserDefined ,IsByteOrdered = false ,IsFixedLength = false ,MaxByteSize = -1 ) ] public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable { DataTable rowset; public DataTable Data { get { return this.rowset; } set { this.rowset = value; } } public override string ToString() { using (var sw = new StringWriter()) using (var xw = new XmlTextWriter(sw)) { WriteXml(xw); xw.Flush(); sw.Flush(); return sw.ToString(); } } public bool IsNull { get { return (this.rowset == null);} } public static CompressedRowset Null { get { CompressedRowset h = new CompressedRowset(); return h; } } public static CompressedRowset Parse(SqlString s) { using (var sr = new StringReader(s.Value)) using (var xr = new XmlTextReader(sr)) { var c = new CompressedRowset(); c.ReadXml(xr); return c; } } #region "Stream Wrappers" abstract class WrapperStream : Stream { public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return false; } } public override void Flush() { } public override long Length { get { throw new NotImplementedException(); } } public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } } class BinaryWriterStream : WrapperStream { BinaryWriter br; public BinaryWriterStream(BinaryWriter br) { this.br = br; } public override bool CanRead { get { return false; } } public override bool CanWrite { get { return true; } } public override int Read(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } public override void Write(byte[] buffer, int offset, int count) { br.Write(buffer, offset, count); } } class BinaryReaderStream : WrapperStream { BinaryReader br; public BinaryReaderStream(BinaryReader br) { this.br = br; } public override bool CanRead { get { return true; } } public override bool CanWrite { get { return false; } } public override int Read(byte[] buffer, int offset, int count) { return br.Read(buffer, offset, count); } public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } } #endregion #region "IBinarySerialize" public void Read(System.IO.BinaryReader r) { using (var rs = new BinaryReaderStream(r)) using (var cs = new GZipStream(rs, CompressionMode.Decompress)) { var ser = new BinaryFormatter(); this.rowset = (DataTable)ser.Deserialize(cs); } } public void Write(System.IO.BinaryWriter w) { if (this.IsNull) return; rowset.RemotingFormat = SerializationFormat.Binary; var ser = new BinaryFormatter(); using (var binaryWriterStream = new BinaryWriterStream(w)) using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress)) { ser.Serialize(compressionStream, rowset); } } #endregion /// <summary> /// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob. /// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in /// a DataTable, and more to copy it into a compressed buffer to return. /// </summary> /// <param name="query"></param> /// <param name="results"></param> //[Microsoft.SqlServer.Server.SqlProcedure] [SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)] public static CompressedRowset CompressQueryResults(string query) { //open a context connection using (var con = new SqlConnection("Context Connection=true")) { con.Open(); var cmd = new SqlCommand(query, con); var dt = new DataTable(); using (var rdr = cmd.ExecuteReader()) { dt.Load(rdr); } //configure the DataTable for binary serialization dt.RemotingFormat = SerializationFormat.Binary; var bf = new BinaryFormatter(); var cdt = new CompressedRowset(); cdt.rowset = dt; return cdt; } } /// <summary> /// partial Type mapping between SQL and .NET /// </summary> /// <param name="t"></param> /// <returns></returns> static SqlDbType ToSqlType(Type t) { if (t == typeof(int)) { return SqlDbType.Int; } if (t == typeof(string)) { return SqlDbType.NVarChar; } if (t == typeof(Boolean)) { return SqlDbType.Bit; } if (t == typeof(decimal)) { return SqlDbType.Decimal; } if (t == typeof(float)) { return SqlDbType.Real; } if (t == typeof(double)) { return SqlDbType.Float; } if (t == typeof(DateTime)) { return SqlDbType.DateTime; } if (t == typeof(Int64)) { return SqlDbType.BigInt; } if (t == typeof(Int16)) { return SqlDbType.SmallInt; } if (t == typeof(byte)) { return SqlDbType.TinyInt; } if ( t == typeof(Guid)) { return SqlDbType.UniqueIdentifier; } //!!!!!!!!!!!!!!!!!!! if (t == typeof(Byte[])) { return SqlDbType.VarBinary; } else { throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion"); } } /// <summary> /// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet /// or into a table using exec .... into ... /// </summary> /// <param name="results"></param> [Microsoft.SqlServer.Server.SqlProcedure] public static void UnCompressRowset(CompressedRowset results) { if (results.IsNull) return; DataTable dt = results.rowset; var fields = new SqlMetaData[dt.Columns.Count]; for (int i = 0; i < dt.Columns.Count; i++) { var col = dt.Columns[i]; var sqlType = ToSqlType(col.DataType); var colName = col.ColumnName; if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary) { fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength); } else { fields[i] = new SqlMetaData(colName, sqlType); } } var record = new SqlDataRecord(fields); SqlContext.Pipe.SendResultsStart(record); foreach (DataRow row in dt.Rows) { record.SetValues(row.ItemArray); SqlContext.Pipe.SendResultsRow(record); } SqlContext.Pipe.SendResultsEnd(); } public System.Xml.Schema.XmlSchema GetSchema() { return null; } public void ReadXml(System.Xml.XmlReader reader) { if (rowset != null) { throw new InvalidOperationException("rowset already read"); } var ser = new XmlSerializer(typeof(DataTable)); rowset = (DataTable)ser.Deserialize(reader); } public void WriteXml(System.Xml.XmlWriter writer) { if (String.IsNullOrEmpty(rowset.TableName)) rowset.TableName = "Rows"; var ser = new XmlSerializer(typeof(DataTable)); ser.Serialize(writer, rowset); } }

y aquí está la creación de objetos SQL:

CREATE TYPE [dbo].[CompressedRowset] EXTERNAL NAME [CompressedRowset].[CompressedRowset]; GO CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000)) RETURNS [dbo].[CompressedRowset] AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults]; GO CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset] AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset]; GO


Probablemente sea demasiado tarde para la pregunta original, pero esto podría valer la pena considerar para otros tropezar: en SQL Server 2016 hay funciones de compresión y descompresión (ver aquí y aquí ) que pueden ser útiles aquí si los datos que intentas archivar contienen grandes valores en columnas [N]VARCHAR y VARBINARY .

Necesitaría hornear esto en su capa de lógica de negocios o producir algún arreglo en SQL Server mediante el cual replique su tabla descomprimida como una vista en una tabla de respaldo (donde están los valores comprimidos) y derivando los datos descomprimidos a través de DECOMPRESS y teniendo INSTEAD OF activadores que actualizan la tabla de respaldo (por lo que la vista se comporta como la tabla original para seleccionar / insertar / actualizar / eliminar, además de las diferencias de rendimiento). Un poco hacky, pero funcionaría ...

Para versiones anteriores de SQL, probablemente también podría escribir una función CLR para hacer el trabajo.

Obviamente, este método no funcionará para conjuntos de datos que están compuestos de pequeños campos, por supuesto, este estilo de compresión simplemente no logrará nada con valores pequeños (de hecho, los hará más grandes).