Processändringsflöde i Azure Blob Storage
Ändringsflöde innehåller transaktionsloggar för alla ändringar som sker i blobarna och blobmetadata i ditt lagringskonto. Den här artikeln visar hur du läser ändringsflödesposter med hjälp av biblioteket för blobändringsflödesprocessorn.
Mer information om ändringsflödet finns i Ändringsflöde i Azure Blob Storage.
Konfigurera projektet
Det här avsnittet beskriver hur du förbereder ett projekt för att arbeta med klientbiblioteket för Blobs Change Feed för .NET.
Installera paket
Från projektkatalogen installerar du paketet för Azure Storage Blobs Change Feed-klientbiblioteket för .NET med hjälp av dotnet add package
kommandot . I det här exemplet lägger vi till --prerelease
flaggan i kommandot för att installera den senaste förhandsversionen.
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
Kodexemplen i den här artikeln använder även Azure Blob Storage - och Azure Identity-paketen .
dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs
Lägga till using
direktiv
Lägg till följande using
direktiv i kodfilen:
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
Skapa ett klientobjekt
Om du vill ansluta programmet till Blob Storage skapar du en instans av BlobServiceClient
klassen. I följande exempel visas hur du skapar ett klientobjekt med hjälp av DefaultAzureCredential
för auktorisering. Mer information finns i Auktorisera åtkomst och ansluta till Blob Storage. Om du vill arbeta med ändringsflödet behöver du den inbyggda Azure RBAC-rollen Storage Blob Data Reader eller senare.
// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";
BlobServiceClient client = new(
new Uri($"https://{accountName}.blob.core.windows.net"),
new DefaultAzureCredential());
Klientobjektet skickas som en parameter till några av de metoder som visas i den här artikeln.
Läsa poster i ändringsflödet
Kommentar
Ändringsflödet är en oföränderlig och skrivskyddad entitet i ditt lagringskonto. Valfritt antal program kan läsa och bearbeta ändringsflödet samtidigt och oberoende av varandra på egen hand. Poster tas inte bort från ändringsflödet när ett program läser dem. Läs- eller iterationstillståndet för varje förbrukande läsare är oberoende och underhålls endast av ditt program.
Följande kodexempel itererar genom alla poster i ändringsflödet, lägger till dem i en lista och returnerar sedan listan över ändringsflödeshändelser:
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
// Create a new BlobChangeFeedClient
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = [];
// Get all the events in the change feed
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
I följande kodexempel skrivs vissa värden ut från listan över ändringsflödeshändelser:
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
}
Återuppta läsning av poster från en sparad position
Du kan välja att spara läspositionen i ändringsflödet och sedan återuppta iterationen genom posterna vid en framtida tidpunkt. Du kan spara läspositionen genom att hämta ändringsflödesmarkören. Markören är en sträng och ditt program kan spara strängen på alla sätt som passar programmets design, till exempel till en fil eller databas.
Det här exemplet itererar genom alla poster i ändringsflödet, lägger till dem i en lista och sparar markören. Listan och markören returneras till anroparen.
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
BlobServiceClient client,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it's intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
Dataströmbearbetning av poster
Du kan välja att bearbeta ändringsflödesposter när de checkas in i ändringsflödet. Se Specifikationer. Ändringshändelserna publiceras i ändringsflödet i genomsnitt under 60 sekunder. Vi rekommenderar att du söker efter nya ändringar med den här perioden i åtanke när du anger ditt avsökningsintervall.
Det här exemplet söker regelbundet efter ändringar. Om det finns ändringsposter bearbetar den här koden dessa poster och sparar markör för ändringsflöde. Om processen stoppas och sedan startas igen kan programmet använda markören för att återuppta bearbetningsposter där den senast slutade. I det här exemplet sparas markören i en lokal fil i demonstrationssyfte, men programmet kan spara den i valfri form som passar bäst för ditt scenario.
public async Task ChangeFeedStreamAsync(
BlobServiceClient client,
int waitTimeMs,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
// Helper method to save cursor
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
void SaveCursor(string cursor)
{
// Specify the path to the file where you want to save the cursor
string filePath = "path/to/cursor.txt";
// Write the cursor value to the file
File.WriteAllText(filePath, cursor);
}
Läsa poster inom ett visst tidsintervall
Du kan läsa poster som ligger inom ett visst tidsintervall. Det här exemplet itererar genom alla poster i ändringsflödet som ligger inom ett visst datum- och tidsintervall, lägger till dem i en lista och returnerar listan:
async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Starttiden som du anger avrundas ned till närmaste timme och sluttiden avrundas upp till närmaste timme. Det är möjligt att användarna kan se händelser som inträffat före starttiden och efter sluttiden. Det är också möjligt att vissa händelser som inträffar mellan start- och sluttiden inte visas. Det beror på att händelser kan registreras under timmen före starttiden eller under timmen efter sluttiden.