Zestawienie zmian procesów w usłudze Azure Blob Storage
Zestawienie zmian zawiera dzienniki transakcji wszystkich zmian występujących w obiektach blob i metadanych obiektu blob na koncie magazynu. W tym artykule pokazano, jak odczytywać rekordy zestawienia zmian przy użyciu biblioteki procesora zestawienia zmian obiektów blob.
Aby dowiedzieć się więcej na temat zestawienia zmian, zobacz Zestawienie zmian w usłudze Azure Blob Storage.
konfigurowanie projektu
Ta sekcja przeprowadzi Cię przez proces przygotowywania projektu do pracy z biblioteką klienta zestawienia zmian obiektów blob dla platformy .NET.
Instalowanie pakietów
Z katalogu projektu zainstaluj pakiet dla biblioteki klienta zestawienia zmian obiektów blob usługi Azure Storage dla platformy .NET przy użyciu dotnet add package
polecenia . W tym przykładzie dodamy flagę --prerelease
do polecenia w celu zainstalowania najnowszej wersji zapoznawczej.
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
Przykłady kodu w tym artykule używają również pakietów usługi Azure Blob Storage i Azure Identity .
dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs
Dodawanie using
dyrektyw
Dodaj następujące using
dyrektywy do pliku kodu:
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
Tworzenie obiektu klienta
Aby połączyć aplikację z usługą Blob Storage, utwórz wystąpienie BlobServiceClient
klasy . W poniższym przykładzie pokazano, jak utworzyć obiekt klienta przy użyciu DefaultAzureCredential
autoryzacji. Aby dowiedzieć się więcej, zobacz Autoryzowanie dostępu i nawiązywanie połączenia z usługą Blob Storage. Aby pracować z kanałem informacyjnym zmian, potrzebujesz wbudowanej roli Czytelnik danych obiektu blob usługi Azure Storage lub nowszego.
// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";
BlobServiceClient client = new(
new Uri($"https://{accountName}.blob.core.windows.net"),
new DefaultAzureCredential());
Obiekt klienta jest przekazywany jako parametr do niektórych metod przedstawionych w tym artykule.
Odczytywanie rekordów w kanale zmian
Uwaga
Źródło zmian to niezmienna i tylko do odczytu jednostka na koncie magazynu. Dowolna liczba aplikacji może odczytywać i przetwarzać zestawienie zmian jednocześnie i niezależnie w ich własnej wygodzie. Rekordy nie są usuwane z zestawienia zmian, gdy aplikacja je odczytuje. Stan odczytu lub iteracji każdego czytnika zużywającego jest niezależny i obsługiwany tylko przez aplikację.
Poniższy przykładowy kod iteruje wszystkie rekordy w kanale zmian, dodaje je do listy, a następnie zwraca listę zdarzeń zestawienia zmian:
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
// Create a new BlobChangeFeedClient
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = [];
// Get all the events in the change feed
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Poniższy przykład kodu wyświetla niektóre wartości z listy zdarzeń zestawienia zmian:
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
}
Wznawianie odczytywania rekordów z zapisanej pozycji
Możesz zapisać pozycję odczytu w kanale informacyjnym zmian, a następnie wznowić iterację rekordów w przyszłości. Możesz zapisać pozycję odczytu, uzyskując kursor zestawienia zmian. Kursor jest ciągiem, a aplikacja może zapisać ten ciąg w dowolny sposób, który ma sens w projekcie aplikacji, na przykład w pliku lub bazie danych.
W tym przykładzie iteruje wszystkie rekordy w kanale zmian, dodaje je do listy i zapisuje kursor. Lista i kursor są zwracane do obiektu wywołującego.
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
BlobServiceClient client,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it's intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
Przetwarzanie strumieniowe rekordów
Możesz przetworzyć rekordy zestawienia zmian w miarę ich zatwierdzenia do zestawienia zmian. Zobacz Specyfikacje. Zdarzenia zmiany są publikowane w kanale zmian średnio przez 60 sekund. Zalecamy sondowanie nowych zmian w tym okresie podczas określania interwału sondowania.
Ten przykład okresowo sonduje pod kątem zmian. Jeśli rekordy zmiany istnieją, ten kod przetwarza te rekordy i zapisuje kursor zestawienia zmian. W ten sposób, jeśli proces zostanie zatrzymany, a następnie uruchomiony ponownie, aplikacja może użyć kursora do wznowienia przetwarzania rekordów, w których ostatnio została przerwana. W tym przykładzie kursor jest zapisywany w pliku lokalnym w celach demonstracyjnych, ale aplikacja może zapisać go w dowolnej formie, która jest najbardziej zrozumiała dla danego scenariusza.
public async Task ChangeFeedStreamAsync(
BlobServiceClient client,
int waitTimeMs,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
// Helper method to save cursor
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
void SaveCursor(string cursor)
{
// Specify the path to the file where you want to save the cursor
string filePath = "path/to/cursor.txt";
// Write the cursor value to the file
File.WriteAllText(filePath, cursor);
}
Odczytywanie rekordów w określonym zakresie czasu
Możesz odczytywać rekordy należące do określonego zakresu czasu. W tym przykładzie iteruje wszystkie rekordy w kanale informacyjnym zmian należącym do określonego zakresu dat i godzin, dodaje je do listy i zwraca listę:
async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
Czas rozpoczęcia, który podajesz, jest zaokrąglany w dół do najbliższej godziny, a czas zakończenia jest zaokrąglany do najbliższej godziny. Istnieje możliwość, że użytkownicy mogą zobaczyć zdarzenia, które wystąpiły przed godziną rozpoczęcia i po godzinie zakończenia. Istnieje również możliwość, że niektóre zdarzenia występujące między czasem rozpoczęcia i zakończenia nie będą wyświetlane. Dzieje się tak, ponieważ zdarzenia mogą być rejestrowane w ciągu godziny poprzedniej do godziny rozpoczęcia lub godziny po godzinie zakończenia.