Compartir a través de


Modelo de extracción de la fuente de cambios de Azure Cosmos DB

SE APLICA A: NoSQL

Con el modelo de extracción de fuente de cambios, puede consumir la fuente de cambios de Azure Cosmos DB a su propio ritmo. Al igual que con el procesador de fuente de cambios, puede usar el modelo de extracción de la fuente de cambios para ejecutar en paralelo el procesamiento de los cambios en varios consumidores de fuente de cambios.

Comparación con el procesador de fuente de cambios

Muchos escenarios pueden procesar la fuente de cambios con el procesador de fuente de cambios o el modelo de extracción de fuente de cambios. Tanto los tokens de continuación del modelo de extracción como los contenedores de concesión del procesador de la fuente de cambios son "marcadores" del último elemento (o lote de elementos) que se procesó en la fuente de cambios.

Pero no puede convertir los tokens de continuación en una concesión (o viceversa).

Nota

En la mayoría de los casos, cuando necesite leer de la fuente de cambios, la opción más sencilla será usar el procesador de la fuente de cambios.

Plantéese el uso del modelo de extracción en los siguientes escenarios:

  • Leer los cambios de una clave de partición específica.
  • Controlar el ritmo con el que el cliente recibe los cambios para procesarlos.
  • Realizar una lectura única de los datos existentes en la fuente de cambios (por ejemplo, para realizar una migración de datos).

Estas son algunas diferencias clave entre el procesador de fuente de cambios y el modelo de extracción de fuente de cambios:

Característica Procesador de fuente de cambios Modelo de extracción de fuente de cambios
Realizar un seguimiento del punto actual en el procesamiento de la fuente de cambios Concesión (almacenada en un contenedor de Azure Cosmos DB). Token de continuación (almacenado en la memoria o guardado manualmente).
Funcionalidad para reproducir los cambios anteriores. Sí, con el modelo de inserción. Sí, con el modelo de extracción.
Sondeo para cambios futuros. Comprueba automáticamente si hay cambios según el valor WithPollInterval especificado por el usuario Manual
Comportamiento en el que no hay cambios nuevos Espera automáticamente el valor de WithPollInterval y vuelve a comprobarlo Debe comprobar el estado y volver a comprobar manualmente
Procesar cambios de todo un contenedor Sí, y se ejecuta en paralelo automáticamente en varios subprocesos o máquinas que consumen el mismo contenedor Sí, y se ejecuta en paralelo de forma manual mediante FeedRange
Procesar los cambios de una sola clave de partición No compatible

Nota

A diferencia de cuando se lee mediante el procesador de fuente de cambios, con el modelo de extracción usted debe controlar explícitamente los casos en los que no haya cambios nuevos.

Trabajar con el modelo de extracción

Para procesar la fuente de cambios mediante el modelo de extracción, cree una instancia de FeedIterator. Al crear por primera vez una instancia de FeedIterator, debe especificar un valor ChangeFeedStartFrom obligatorio que consta de la posición de inicio para leer los cambios y el valor que quiere usar para FeedRange . FeedRange es un intervalo de valores de claves de partición y especifica los elementos que se pueden leer de la fuente de cambios con ese valor FeedIterator específico. También debe especificar un valor ChangeFeedMode obligatorio para el modo en el que quiere procesar los cambios: versión más reciente o todas las versiones y eliminaciones. Use ChangeFeedMode.LatestVersion o ChangeFeedMode.AllVersionsAndDeletes para indicar en qué modo desea usar para leer la fuente de cambios. Cuando usa el modo todas las versiones y eliminaciones, debe seleccionar un valor de inicio de fuente de cambios de Now() o de un token de continuación específico.

También puede especificar ChangeFeedRequestOptions para establecer un objeto PageSizeHint. Al establecerse, esta propiedad establece la cantidad máxima de elementos recibidos por página. Si las operaciones en la colección supervisada se realizan mediante procedimientos almacenados, el ámbito de transacción se conserva al leer los elementos de la fuente de cambios. Como resultado, es posible que el número de elementos recibidos sea mayor que el valor especificado, de modo que los elementos que cambien en la misma transacción se devuelvan como parte de un lote atómico.

Este es un ejemplo sobre cómo obtener un valor FeedIterator en el modo de versión más reciente que devuelve objetos de entidad; en este caso, un objeto User:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Sugerencia

Antes de la versión 3.34.0, se puede usar la versión más reciente estableciendo ChangeFeedMode.Incremental. Tanto Incremental como LatestVersion hacen referencia a la versión más reciente de la fuente de cambios y las aplicaciones que usan cualquiera de los modos verán el mismo comportamiento.

El modo de todas las versiones y eliminaciones está en versión preliminar y se puede usar con las versiones preliminares > = 3.32.0-preview del SDK de .NET. Este es un ejemplo para obtener FeedIterator en todas las versiones y elimina el modo que devuelve User objetos:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Nota:

En el modo de versión más reciente, recibirá objetos que representan el elemento que ha cambiado con algunos metadatos adicionales. El modo de todas las versiones y eliminaciones devuelve un modelo de datos. Para obtener más información, vea Analizar el objeto de respuesta.

Puede obtener el ejemplo completo de modo de versión más reciente o todas las versiones y elimina el modo.

Consumo de la fuente de cambios a través de flujos

FeedIterator para ambos modos de fuente de cambios tiene dos opciones. Además de los ejemplos que devuelven objetos de entidad, también puede obtener la respuesta mediante la compatibilidad con Stream. Las secuencias le permiten leer datos sin tener que deserializarlos primero, de modo que se ahorran recursos del cliente.

Este es un ejemplo sobre cómo obtener un valor FeedIterator en el modo de versión más reciente que devuelve Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Consumir los cambios para un contenedor completo

Si no suministra un FeedRange a un parámetro FeedIterator, puede procesar la fuente de cambios de un contenedor completo a su propio ritmo. A continuación se muestra un ejemplo en el que se empiezan a leer todos los cambios a partir de la hora actual en el modo de versión más reciente:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Dado que la fuente de cambios es realmente una lista infinita de elementos que abarcan todas las escrituras y actualizaciones futuras, el valor de HasMoreResults siempre es true. Al intentar leer la fuente de cambios y no haber nuevos cambios disponibles, recibirá una respuesta con el estado NotModified. Es por ello que en el código anterior, se espera cinco segundos antes de volver a comprobar los cambios.

Consumir los cambios de una clave de partición

En algunos casos, es posible que desee procesar solo los cambios de una clave de partición específica. Puede obtener FeedIterator para una clave de partición específica y procesar los cambios de la misma manera en que lo haría para todo un contenedor.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Uso de FeedRange para la paralelización

En el procesador de fuente de cambios, el trabajo se distribuye de manera automática entre varios consumidores. En el modelo de extracción de la fuente de cambios, puede usar FeedRange para ejecutar en paralelo el procesamiento de la fuente de cambios. FeedRange representa un intervalo de valores de clave de partición.

Este es un ejemplo que muestra cómo obtener una lista de intervalos para su contenedor:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Al obtener la lista de valores FeedRange para el contenedor, obtiene un FeedRange por cada partición física.

Con un FeedRange, puede crear un FeedIterator para ejecutar en paralelo el procesamiento de la fuente de cambios en varios equipos o subprocesos. A diferencia del ejemplo anterior, en el que se mostró cómo obtener un FeedIterator para todo el contenedor o una sola clave de partición, puede usar FeedRanges para obtener varios FeedIterators que puedan procesar la fuente de cambios en paralelo.

En caso de que quiera usar FeedRanges, debe tener un proceso de orquestador que obtenga FeedRanges y los distribuya en esas máquinas. Esta distribución puede ser:

  • Use FeedRange.ToJsonString y distribuya este valor de cadena. Los consumidores pueden usar este valor con FeedRange.FromJsonString.
  • Si la distribución está en proceso, pase la referencia de objeto FeedRange.

Este es un ejemplo que muestra cómo leer la fuente de cambios de un contenedor desde el principio mediante dos máquinas independientes hipotéticas que realizan la lectura en paralelo:

Máquina 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Máquina 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Guardar tokens de continuación

Puede obtener el token de continuación para guardar la posición de FeedIterator. Un token de continuación es un valor de cadena que realiza un seguimiento de los últimos cambios que procesó FeedIterator y permite que FeedIterator se reanude más tarde desde este punto. El token de continuación, si se especifica, tiene prioridad sobre los valores de hora de inicio y iniciar desde el principio. El código siguiente lee la fuente de cambios desde la creación del contenedor. Cuando ya no haya más cambios disponibles, se conservará un token de continuación para que el consumo de la fuente de cambios se pueda reanudar más tarde.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Cuando se usa el modo de versión más reciente, el token de continuación FeedIterator nunca expirará, siempre y cuando el contenedor de Azure Cosmos DB siga existiendo. Cuando se usan todas las versiones y el modo de eliminación, el token de continuación FeedIterator es válido siempre y cuando se produzcan cambios en la ventana de retención para las copias de seguridad continuas.

Pasos siguientes