Поделиться через


Прием данных с помощью Go SDK для Azure Data Explorer

Azure Data Explorer — это быстрая и высокомасштабируемая служба для изучения данных журналов и телеметрии. Она предоставляет клиентскую библиотеку Go SDK для взаимодействия со службой Azure Data Explorer. Go SDK можно использовать для приема, управления и запроса данных в кластерах Azure Data Explorer.

В этой статье вы сначала создадите таблицу и сопоставление данных в тестовом кластере. Затем вы поставите в очередь прием данных в кластер с помощью Go SDK и проверите результаты.

Необходимые компоненты

Установка пакета Go SDK

Пакет Go SDK для Azure Data Explorer будет автоматически установлен при запуске [примера приложения, использующего модули Go. Если вы установили пакет Go SDK для другого приложения, создайте модуль Go и получите пакет Azure Data Explorer (с помощью go get), например:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

Зависимость пакета будет добавлена в файл go.mod. Используйте ее в приложении Go.

Просмотр кода

Этот раздел Проверка кода является необязательным. Если вам интересно узнать, как работает код, вы можете просмотреть следующие фрагменты кода. В противном случае вы можете сразу перейти к разделу Выполнение приложения.

Аутентификация

Перед выполнением каких-либо операций программе необходимо пройти проверку подлинности в службе Azure Data Explorer.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Экземпляр kusto.Authorization создан с использованием учетных данных субъекта-службы. Затем он использовался для создания kusto.Client с новой функцией, которая также принимает конечную точку кластера.

Создать таблицу

Команда «Создать таблицу» представлена инструкцией Kusto. Для выполнения команд управления используется функция Mgmt. Он используется для выполнения команды создания таблицы.

func createTable(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
  if err != nil {
    log.Fatal("failed to create table", err)
  }
  log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Совет

Инструкция Kusto по умолчанию является константой, это необходимо для повышения уровня безопасности. NewStmt принимает строковые константы. API UnsafeStmt позволяет использовать сегменты инструкций, не являющихся константами, но так делать не рекомендуется.

Команда Kusto «Создать таблицу» выглядит следующим образом:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Создать сопоставление

Сопоставление данных используется во время приема данных для сопоставления входящих данных со столбцами в таблицах Azure Data Explorer. Дополнительные сведения см. в статье Сопоставление данных. Сопоставление создается таким же образом, как и таблица, с использованием функции Mgmt с указанием имени базы данных и соответствующей команды. Полная версия команды доступна в репозитории GitHub для примера.

func createMapping(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
  if err != nil {
    log.Fatal("failed to create mapping - ", err)
  }
  log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Прием данных

Прием данных помещается в очередь с помощью файла из существующего контейнера хранилища Blob-объектов Azure.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
  kIngest, err := ingest.New(kc, kustoDB, kustoTable)
  if err != nil {
    log.Fatal("failed to create ingestion client", err)
  }
  blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
  err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

  if err != nil {
    log.Fatal("failed to ingest file", err)
  }
  log.Println("Ingested file from -", blobStorePath)
}

Клиент приема создается с помощью ingest.New. Функция FromFile используется для ссылки на универсальный код ресурса (URI) хранилища Blob-объектов Azure. Ссылочное имя сопоставления и тип данных передаются в виде FileOption.

Выполнение приложения

  1. Клонируйте образец кода с GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Запустите пример кода, как показано в следующем фрагменте из main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Совет

    Чтобы попробовать различные комбинации операций, вы можете раскомментировать или закомментировать соответствующие функции в main.go.

    Когда вы запускаете пример кода, выполняются следующие действия:

    1. Удаление таблицы: StormEvents удаление таблицы (если она существует).
    2. Создание таблицы: StormEvents создание таблицы.
    3. Создание сопоставления: StormEvents_CSV_Mapping создание сопоставления.
    4. Прием файла: CSV-файл (в хранилище Blob-объектов Azure) помещается в очередь для приема данных.
  3. Чтобы создать субъект-службу для проверки подлинности с помощью Azure CLI, используйте команду az ad sp create-for-rbac. Укажите в качестве сведений о субъекте-службе конечную точку кластера и имя базы данных в виде переменных среды, которые будут использоваться программой:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Запустите программу.

    go run main.go
    

    Вы получите подобные выходные данные:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Проверка и устранение неполадок

Подождите 5–10 минут, пока не будет запланирован прием данных в очереди, и загрузите данные в Azure Data Explorer.

  1. Войдите в https://dataexplorer.azure.com и подключитесь к кластеру. Затем выполните следующую команду, чтобы получить количество записей в таблице StormEvents.

    StormEvents | count
    
  2. Выполните в своей базе данных следующую команду, чтобы проверить, не было ли в ней сбоев приема за последние четыре часа. Замените имя базы данных перед запуском.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Выполните следующую команду, чтобы узнать состояние всех операций приема за последние четыре часа. Замените имя базы данных перед запуском.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Очистка ресурсов

Если вы планируете следить за другими нашими статьями, сохраните созданные вами ресурсы. В противном случае выполните в своей базе данных следующую команду, чтобы удалить таблицу StormEvents.

.drop table StormEvents

Следующий шаг