DynamoDB: actividad de la tabla

Las transmisiones de DynamoDB le permiten realizar un seguimiento y responder a los cambios de elementos de la tabla. Emplee esta funcionalidad para crear una aplicación que responda a los cambios actualizando la información entre fuentes. Sincronice datos para miles de usuarios de un gran sistema multiusuario. Úselo para enviar notificaciones a los usuarios sobre actualizaciones. Sus aplicaciones resultan diversas y sustanciales. Los flujos de DynamoDB sirven como la herramienta principal utilizada para lograr esta funcionalidad.

Los flujos capturan secuencias ordenadas por tiempo que contienen modificaciones de elementos dentro de una tabla. Conservan estos datos durante un máximo de 24 horas. Las aplicaciones los utilizan para ver los elementos originales y modificados, casi en tiempo real.

Las secuencias habilitadas en una tabla capturan todas las modificaciones. En cualquier operación CRUD, DynamoDB crea un registro de flujo con los atributos de clave principal de los elementos modificados. Puede configurar transmisiones para obtener información adicional, como imágenes de antes y después.

Los Streams tienen dos garantías:

  • Cada registro aparece una vez en la secuencia y

  • Cada modificación de elemento da como resultado registros de flujo del mismo orden que el de las modificaciones.

Todos los flujos se procesan en tiempo real para permitirle emplearlos para funciones relacionadas en aplicaciones.

Administrar transmisiones

En la creación de tablas, puede habilitar una secuencia. Las tablas existentes permiten la desactivación de transmisiones o cambios de configuración. Los flujos ofrecen la función de operación asíncrona, lo que significa que no hay impacto en el rendimiento de la tabla.

Utilice la consola de administración de AWS para una administración de transmisiones simple. Primero, navegue a la consola y elijaTables. En la pestaña Resumen, elijaManage Stream. Dentro de la ventana, seleccione la información agregada a una secuencia sobre modificaciones de datos de la tabla. Después de ingresar todas las configuraciones, seleccioneEnable.

Si desea deshabilitar cualquier transmisión existente, seleccione Manage Stream, y entonces Disable.

También puede utilizar las API CreateTable y UpdateTable para habilitar o cambiar una transmisión. Utilice el parámetro StreamSpecification para configurar la secuencia. StreamEnabled especifica el estado, lo que significa verdadero para habilitado y falso para deshabilitado.

StreamViewType especifica la información agregada a la transmisión: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE y NEW_AND_OLD_IMAGES.

Lectura de corriente

Lea y procese flujos conectándose a un punto final y realizando solicitudes de API. Cada flujo consta de registros de flujo y cada registro existe como una única modificación que posee el flujo. Los registros de flujo incluyen un número de secuencia que revela el orden de publicación. Los registros pertenecen a grupos también conocidos como fragmentos. Los fragmentos funcionan como contenedores para varios registros y también contienen la información necesaria para acceder y atravesar registros. Después de 24 horas, los registros se eliminan automáticamente.

Estos fragmentos se generan y eliminan según sea necesario y no duran mucho. También se dividen en varios fragmentos nuevos automáticamente, generalmente en respuesta a picos de actividad de escritura. Al desactivar la transmisión, los fragmentos abiertos se cierran. La relación jerárquica entre los fragmentos significa que las aplicaciones deben priorizar los fragmentos principales para el orden de procesamiento correcto. Puede usar Kinesis Adapter para hacer esto automáticamente.

Note - Las operaciones que no producen cambios no escriben registros de flujo.

El acceso y procesamiento de registros requiere realizar las siguientes tareas:

  • Determine el ARN del flujo de destino.
  • Determine los fragmentos de la transmisión que contienen los registros de destino.
  • Acceda a los fragmentos para recuperar los registros deseados.

Note- Debe haber un máximo de 2 procesos leyendo un fragmento a la vez. Si supera los 2 procesos, entonces puede estrangular la fuente.

Las acciones de la API de transmisión disponibles incluyen

  • ListStreams
  • DescribeStream
  • GetShardIterator
  • GetRecords

Puede revisar el siguiente ejemplo de la lectura del flujo:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =  
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());  
   private static AmazonDynamoDBStreamsClient streamsClient =  
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());  

   public static void main(String args[]) {  
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");   
      streamsClient.setEndpoint("InsertStreamEndpointHere");    
      
      // table creation 
      String tableName = "MyTestingTable";  
      ArrayList<AttributeDefinition> attributeDefinitions =  
         new ArrayList<AttributeDefinition>();  
      
      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID") 
         .withAttributeType("N"));
         
      ArrayList<KeySchemaElement> keySchema = new 
         ArrayList<KeySchemaElement>(); 
      
      keySchema.add(new KeySchemaElement() 
         .withAttributeName("ID") 
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification(); 
      streamSpecification.setStreamEnabled(true); 
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);  
      CreateTableRequest createTableRequest = new CreateTableRequest() 
         .withTableName(tableName) 
         .withKeySchema(keySchema) 
         .withAttributeDefinitions(attributeDefinitions) 
         .withProvisionedThroughput(new ProvisionedThroughput() 
         .withReadCapacityUnits(1L) 
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);  
      
      System.out.println("Executing CreateTable for " + tableName); 
      dynamoDBClient.createTable(createTableRequest);  
      System.out.println("Creating " + tableName); 
      
      try { 
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); 
      } catch (InterruptedException e) { 
         e.printStackTrace(); 
      } 
         
      // Get the table's stream settings 
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);  
      
      String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); 
      StreamSpecification myStreamSpec =  
         describeTableResult.getTable().getStreamSpecification();  
      
      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); 
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());  
      
      // Add an item 
      int numChanges = 0; 
      System.out.println("Making some changes to table data"); 
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); 
      item.put("ID", new AttributeValue().withN("222")); 
      item.put("Alert", new AttributeValue().withS("item!")); 
      dynamoDBClient.putItem(tableName, item); 
      numChanges++;  
      
      // Update the item         
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); 
      key.put("ID", new AttributeValue().withN("222")); 
      Map<String, AttributeValueUpdate> attributeUpdates =  
      new HashMap<String, AttributeValueUpdate>(); 
      
      attributeUpdates.put("Alert", new AttributeValueUpdate() 
         .withAction(AttributeAction.PUT) 
         .withValue(new AttributeValue().withS("modified item"))); 
      
      dynamoDBClient.updateItem(tableName, key, attributeUpdates); 
      numChanges++;   
      
      // Delete the item         
      dynamoDBClient.deleteItem(tableName, key);  
      numChanges++;
      
      // Get stream shards         
      DescribeStreamResult describeStreamResult =  
      streamsClient.describeStream(new DescribeStreamRequest() 
         .withStreamArn(myStreamArn)); 
      String streamArn =  
         describeStreamResult.getStreamDescription().getStreamArn(); 
      List<Shard> shards =  
         describeStreamResult.getStreamDescription().getShards();  
      
      // Process shards 
      for (Shard shard : shards) { 
         String shardId = shard.getShardId(); 
         System.out.println("Processing " + shardId + " in "+ streamArn);  
         
         // Get shard iterator 
         GetShardIteratorRequest getShardIteratorRequest = new 
            GetShardIteratorRequest() 
            .withStreamArn(myStreamArn) 
            .withShardId(shardId) 
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); 
         
         GetShardIteratorResult getShardIteratorResult =  
            streamsClient.getShardIterator(getShardIteratorRequest); 
         String nextItr = getShardIteratorResult.getShardIterator();  
         
         while (nextItr != null && numChanges > 0) { 
            // Read data records with iterator                 
            GetRecordsResult getRecordsResult =  
               streamsClient.getRecords(new GetRecordsRequest(). 
               withShardIterator(nextItr));
               
            List<Record> records = getRecordsResult.getRecords(); 
            System.out.println("Pulling records...");  
               
            for (Record record : records) { 
               System.out.println(record); 
               numChanges--;
            } 
            nextItr = getRecordsResult.getNextShardIterator(); 
         } 
      } 
   } 
}