sql - tiempo - cómo insertar datos parellel en tres tablas diferentes
insertar datos en tablas relacionadas mysql java (5)
¿Son las tres tablas idénticas en estructura y contenido? De ser así, utilice la replicación transaccional / de fusión
Alternativamente, cree un disparador en la primera tabla para insertar en la segunda y tercera tabla
Tengo un procedimiento almacenado que insertará la mayor parte de los registros, ahora existe la posibilidad de insertar datos en 3 tablas en paralelo;
- Primera tabla que inserta 1 millón de registros.
- Segunda tabla que inserta 1,5 millones de registros.
- Tercera tabla que inserta 500k registros
Según mi conocimiento, la inserción del procedimiento está sucediendo una después de la otra.
Entonces, ¿cómo puedo implementar la carga en paralelo?
Suponiendo que desea tener el mismo valor de fecha de inserción para todas las inserciones, defina un parámetro de fecha establecido en la fecha actual como se muestra.
DECLARE @InsertDate as date
SET @InsertDate = GetDate()
Luego, pase el parámetro de fecha de inserción a su procedimiento insert insertado y actualice este procedimiento almacenado para usar esa entrada. Esto asegurará que se use el mismo valor de fecha de inserción para todas las inserciones.
EXEC dbo.InsertTables123 @p1 = @InsertDate
El parámetro de entrada @InsertDate también se puede asignar manualmente si se necesita algo distinto de la fecha actual.
Puede intentar crear tres trabajos y ejecutar scripts de inserción en paralelo de la siguiente manera:
DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N''Job1'',
@enabled=1,
@description=N''No description available.'',
@job_id = @jobId OUTPUT
EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N''Insert into First Table'',
@step_id=1,
@cmdexec_success_code=0,
@on_success_action=1,
@on_success_step_id=0,
@on_fail_action=2,
@on_fail_step_id=0,
@retry_attempts=0,
@retry_interval=0,
@os_run_priority=0, @subsystem=N''TSQL'',
@command=N''--Insert script for first table'',
@database_name=N''Test'',
@flags=0
EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N''(local)''
GO
DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N''Job2'',
@enabled=1,
@description=N''No description available.'',
@job_id = @jobId OUTPUT
EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N''Insert into second Table'',
@step_id=1,
@cmdexec_success_code=0,
@on_success_action=1,
@on_success_step_id=0,
@on_fail_action=2,
@on_fail_step_id=0,
@retry_attempts=0,
@retry_interval=0,
@os_run_priority=0, @subsystem=N''TSQL'',
@command=N''--Insert script for second table'',
@database_name=N''Test'',
@flags=0
EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N''(local)''
GO
DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N''Job3'',
@enabled=1,
@description=N''No description available.'',
@job_id = @jobId OUTPUT
EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N''Insert into Third Table'',
@step_id=1,
@cmdexec_success_code=0,
@on_success_action=1,
@on_success_step_id=0,
@on_fail_action=2,
@on_fail_step_id=0,
@retry_attempts=0,
@retry_interval=0,
@os_run_priority=0, @subsystem=N''TSQL'',
@command=N''--Insert script for third table'',
@database_name=N''Test'',
@flags=0
EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N''(local)''
GO
EXEC msdb.dbo.sp_start_job N''Job1'' ; --All will execute in parallel
EXEC msdb.dbo.sp_start_job N''Job2'' ;
EXEC msdb.dbo.sp_start_job N''Job3'' ;
Las declaraciones se ejecutan sincrónicamente dentro de un lote de T-SQL. Para ejecutar varias instrucciones de forma asincrónica y en paralelo desde un procedimiento almacenado, deberá usar múltiples conexiones de bases de datos concurrentes. Tenga en cuenta que la parte complicada con la ejecución asincrónica no solo determina cuándo se han completado todas las tareas, sino también si han tenido éxito o han fallado.
Método 1: paquete de SSIS
Cree un paquete SSIS para ejecutar las 3 sentencias SQL en paralelo. En SQL 2012 y versiones posteriores, ejecute el paquete utilizando los procedimientos almacenados del catálogo de SSIS. Pre-SQL 2012, deberá crear un trabajo de SQL Agent para el paquete y ejecutarlo con sp_start_job.
Deberá verificar el estado de ejecución de SSIS o el estado del trabajo del Agente SQL para determinar la finalización y el resultado de éxito / falla.
Método 2: Powershell y SQL Agent
Ejecute un trabajo del Agente SQL que ejecute un script de Powershell que ejecute las consultas en paralelo utilizando los trabajos en segundo plano de Powershell (comando Start-Job). La secuencia de comandos puede devolver un código de salida, cero para el éxito y distinto de cero para la falla, de modo que SQL Agent pueda determinar si tuvo éxito. Verifique el estado del trabajo del Agente SQL para determinar la finalización y el resultado correcto / incorrecto.
Método 3: múltiples trabajos del Agente SQL
Ejecute múltiples trabajos de SQL Agent al mismo tiempo, cada uno con un paso de trabajo T-SQL que contiene el script de importación. Compruebe el estado del trabajo del Agente SQL de cada trabajo para determinar la finalización y el resultado correcto / incorrecto.
Método 4: Service Broker Use un proc activado de cola para ejecutar los scripts de importación en paralelo. Esto puede ser obtuso si no ha utilizado el intermediario de servicios antes y es importante seguir los patrones examinados. He incluido un ejemplo para comenzar (reemplace THROW con RAISERROR para pre-SQL 2012). La base de datos debe tener habilitado Service Broker, que está habilitado por defecto pero apagado después de una restauración o adjuntar.
USE YourDatabase;
Go
--create proc that will be automatically executed (activated) when requests are waiting
CREATE PROC dbo.ExecuteTSqlTask
AS
SET NOCOUNT ON;
DECLARE
@TSqlJobConversationHandle uniqueidentifier = NEWID()
, @TSqlExecutionRequestMessage xml
, @TSqlExecutionResultMessage xml
, @TSqlExecutionResult varchar(10)
, @TSqlExecutionResultDetails nvarchar(MAX)
, @TSqlScript nvarchar(MAX)
, @TSqlTaskName sysname
, @RowsAffected int
, @message_type_name sysname;
WHILE 1 = 1
BEGIN
--get the next task to execute
WAITFOR (
RECEIVE TOP (1)
@TSqlJobConversationHandle = conversation_handle
, @TSqlExecutionRequestMessage = CAST(message_body AS xml)
, @message_type_name = message_type_name
FROM dbo.TSqlExecutionQueue
), TIMEOUT 1000;
IF @@ROWCOUNT = 0
BEGIN
--no work to do - exit
BREAK;
END;
IF @message_type_name = N''TSqlExecutionRequest''
BEGIN
--get task name and script
SELECT
@TSqlTaskName = @TSqlExecutionRequestMessage.value(''(/TSqlTaskName)[1]'', ''sysname'')
, @TSqlScript = @TSqlExecutionRequestMessage.value(''(/TSqlScript)[1]'', ''nvarchar(MAX)'');
--execute script
BEGIN TRY
EXEC sp_executesql @TSqlScript;
SET @RowsAffected = @@ROWCOUNT;
SET @TSqlExecutionResult = ''Completed'';
SET @TSqlExecutionResultDetails = CAST(@RowsAffected as varchar(10)) + '' rows affected'';
END TRY
BEGIN CATCH
SET @TSqlExecutionResult = ''Erred'';
SET @TSqlExecutionResultDetails =
''Msg '' + CAST(ERROR_NUMBER() AS varchar(10))
+ '', Level '' + CAST(ERROR_SEVERITY() AS varchar(2))
+ '', State '' + CAST(ERROR_STATE() AS varchar(10))
+ '', Line '' + CAST(ERROR_LINE() AS varchar(10))
+ '': '' + ERROR_MESSAGE();
END CATCH;
--send execution result back to initiator
SET @TSqlExecutionResultMessage = ''<TSqlTaskName /><TSqlExecutionResult /><TSqlExecutionResultDetails />'';
SET @TSqlExecutionResultMessage.modify(''insert text {sql:variable("@TSqlTaskName")} into (/TSqlTaskName)[1] '');
SET @TSqlExecutionResultMessage.modify(''insert text {sql:variable("@TSqlExecutionResult")} into (/TSqlExecutionResult)[1] '');
SET @TSqlExecutionResultMessage.modify(''insert text {sql:variable("@TSqlExecutionResultDetails")} into (/TSqlExecutionResultDetails)[1] '');
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlExecutionResult
(@TSqlExecutionResultMessage);
END
ELSE
BEGIN
IF @message_type_name = N''TSqlJobComplete''
BEGIN
--service has ended conversation so we''re not going to get any more execution requests
END CONVERSATION @TSqlJobConversationHandle;
END
ELSE
BEGIN
END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = ''Unexpected message type received by ExecuteTSqlTask'';
RAISERROR(''Unexpected message type received (%s) by ExecuteTSqlTask'', 16, 1, @message_type_name);
END;
END;
END;
GO
CREATE QUEUE dbo.TSqlResultQueue;
CREATE QUEUE dbo.TSqlExecutionQueue
WITH STATUS=ON,
ACTIVATION (
STATUS = ON
, PROCEDURE_NAME = dbo.ExecuteTSqlTask
, MAX_QUEUE_READERS = 3 --max number of concurrent activated proc instances
, EXECUTE AS OWNER
);
CREATE MESSAGE TYPE TSqlExecutionRequest VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlExecutionResult VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlJobComplete VALIDATION = WELL_FORMED_XML;
CREATE CONTRACT TSqlExecutionContract (
TSqlExecutionRequest SENT BY INITIATOR
, TSqlJobComplete SENT BY INITIATOR
, TSqlExecutionResult SENT BY TARGET
);
CREATE SERVICE TSqlJobService ON QUEUE dbo.TSqlResultQueue ([TSqlExecutionContract]);
CREATE SERVICE TSqlExecutorService ON QUEUE dbo.TSqlExecutionQueue ([TSqlExecutionContract]);
GO
CREATE PROC dbo.ExecuteParallelImportScripts
AS
SET NOCOUNT ON;
DECLARE
@TSqlJobConversationHandle uniqueidentifier
, @TSqlExecutionRequestMessage xml
, @TSqlExecutionResultMessage xml
, @TSqlExecutionResult varchar(10)
, @TSqlExecutionResultDetails nvarchar(MAX)
, @TSqlTaskName sysname
, @CompletedCount int = 0
, @ErredCount int = 0
, @message_type_name sysname;
DECLARE @TsqlTask TABLE(
TSqlTaskName sysname NOT NULL PRIMARY KEY
, TSqlScript nvarchar(MAX) NOT NULL
);
BEGIN TRY
--insert a row for each import task
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N''ImportScript1'', N''INSERT INTO dbo.Table1 SELECT * FROM dbo.Table1Staging;'');
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N''ImportScript2'', N''INSERT INTO dbo.Table2 SELECT * FROM dbo.Table2Staging;'');
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N''ImportScript3'', N''INSERT INTO dbo.Table3 SELECT * FROM dbo.Table3Staging;'');
--start a conversation for this import process
BEGIN DIALOG CONVERSATION @TsqlJobConversationHandle
FROM SERVICE TSqlJobService
TO SERVICE ''TSqlExecutorService'', ''CURRENT DATABASE''
ON CONTRACT TSqlExecutionContract
WITH ENCRYPTION = OFF;
--send import tasks to executor service for parallel execution
DECLARE JobTasks CURSOR LOCAL FAST_FORWARD FOR
SELECT (SELECT TSqlTaskName, TSqlScript
FROM @TsqlTask AS task
WHERE task.TSqlTaskName = job.TSqlTaskName
FOR XML PATH(''''), TYPE) AS TSqlExecutionRequest
FROM @TsqlTask AS job;
OPEN JobTasks;
WHILE 1 = 1
BEGIN
FETCH NEXT FROM JobTasks INTO @TSqlExecutionRequestMessage;
IF @@FETCH_STATUS = -1 BREAK;
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlExecutionRequest
(@TSqlExecutionRequestMessage);
END;
CLOSE JobTasks;
DEALLOCATE JobTasks;
--get each parallel task execution result until all are complete
WHILE 1 = 1
BEGIN
--get next task result
WAITFOR (
RECEIVE TOP (1)
@TSqlExecutionResultMessage = CAST(message_body AS xml)
, @message_type_name = message_type_name
FROM dbo.TSqlResultQueue
WHERE conversation_handle = @TSqlJobConversationHandle
), TIMEOUT 1000;
IF @@ROWCOUNT <> 0
BEGIN
IF @message_type_name = N''TSqlExecutionResult''
BEGIN
--get result of import script execution
SELECT
@TSqlTaskName = @TSqlExecutionResultMessage.value(''(/TSqlTaskName)[1]'', ''sysname'')
, @TSqlExecutionResult = @TSqlExecutionResultMessage.value(''(/TSqlExecutionResult)[1]'', ''varchar(10)'')
, @TSqlExecutionResultDetails = COALESCE(@TSqlExecutionResultMessage.value(''(/TSqlExecutionResultDetails)[1]'', ''nvarchar(MAX)''), N'''');
RAISERROR(''Import task %s %s: %s'', 0, 0, @TSqlTaskName, @TSqlExecutionResult, @TSqlExecutionResultDetails) WITH NOWAIT;
IF @TSqlExecutionResult = ''Completed''
BEGIN
SET @CompletedCount += 1;
END
ELSE
BEGIN
SET @ErredCount += 1;
END;
--remove task from tracking table after completion
DELETE FROM @TSqlTask
WHERE TSqlTaskName = @TSqlTaskName;
IF NOT EXISTS(SELECT 1 FROM @TsqlTask)
BEGIN
--all tasks are done - send TSqlJobComplete message to instruct executor service to end conversation
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlJobComplete;
END
END
ELSE
BEGIN
IF @message_type_name = N''http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog''
BEGIN
--executor service has ended conversation so we''re done
END CONVERSATION @TSqlJobConversationHandle;
BREAK;
END
ELSE
BEGIN
END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = ''Unexpected message type received by ExecuteParallelInserts'';
RAISERROR(''Unexpected message type received (%s) by ExecuteParallelInserts'', 16, 1, @message_type_name);
END;
END
END;
END;
RAISERROR(''Import processing completed. CompletedCount=%d, ErredCount=%d.'', 0, 0, @CompletedCount, @ErredCount);
END TRY
BEGIN CATCH
THROW;
END CATCH;
GO
--execute import scripts in parallel
EXEC dbo.ExecuteParallelImportScripts;
GO
Para su procedimiento, supongo que tiene TableName y la ubicación del archivo como parámetros.
Si tiene un archivo grande que tiene 3 millones de registros, debe dividir el archivo en 3 pequeños (si conoce otro idioma que no sea sql), después de eso, puede abrir 3 consolas de servidores Sql y llamar el procedimiento en cada consola. Hará que la inserción sea paralela. O conoce otros lenguajes de programación, puede usar varios hilos para llamar al procedimiento.