你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

迁移指南:Elasticsearch 到 Azure 数据资源管理器

在本指南中,你会了解如何使用Logstash将 Elasticsearch 数据迁移到 Azure 数据资源管理器。

在本指南中,要迁移的数据位于名为vehicle的 Elasticsearch 索引中,该索引具有以下数据架构:

{
  "Manufacturer": "string",
  "Model": "string",
  "ReleaseYear": "int",
  "ReleaseDate": "datetime"
}

先决条件

要将 Elasticsearch 数据迁移到 Azure 数据资源管理器,需要:

  • Microsoft 帐户或 Microsoft Entra 用户标识。 无需 Azure 订阅。
  • Azure 数据资源管理器群集和数据库。 可以 创建免费群集创建完整的群集。 若要确定最适合你,请检查 功能比较
  • 用于访问 Azure 数据资源管理器群集的应用 ID 和委派权限。 有关详细信息,请参阅创建 Microsoft Entra 应用。 需要应用 ID、机密和租户 ID 来配置 Logstash 管道。
  • Logstash 版本 6+ 安装说明

预迁移

满足先决条件后,就已准备就绪,可以探索环境的拓扑并评估 Azure 云迁移的可行性了。

在 Azure 数据资源管理器群集中创建目标架构

要正确引入和构造用于查询和分析的数据,需要在 Azure 数据资源管理器群集中创建表架构和映射。

表的架构和要迁移的数据应匹配。 引入映射对于建立 ELK 中源列到表中目标列的映射非常重要。

要在群集中创建表架构和引入映射:

  1. 登录到 Azure 数据资源管理器 Web UI

  2. 向群集添加连接

  3. 选择要在其中为迁移数据创建表架构的数据库。

  4. 在数据库查询窗口中运行以下命令,以创建表架构。

    .create tables Vehicle (
      Manufacturer: string,
      Model: string,
      ReleaseYear: int,
      ReleaseDate: datetime
      )
    
  5. 运行以下命令以创建引入映射。

    .create table Vehicle ingestion json mapping 'VechicleMapping'
      '['
      '  {"column":"Manufacturer", "path":"$.manufacturer"},'
      '  {"column":"Model", "path":"$.model"},'
      '  {"column":"ReleaseYear", "path":"$.releaseYear"},'
      '  {"column":"ReleaseDate", "path":"$.releaseDate"}'
      ']'
    

准备用于迁移的 Logstash

将数据迁移到 Azure 数据资源管理器群集时,必须正确设置Logstash 管道。 管道可确保数据格式正确并传输到目标表。

如果需要从多个 Elasticsearch 群集或索引移动数据,可以在管道配置文件中创建多个输入节。 为此,可以为每个 Elasticsearch 群集或索引定义输入部分,并根据需要使用标记对其进行分类。 然后,可以在“输出”部分的条件语句中使用这些标记,从而将这些数据集定向到特定的 Azure 数据资源管理器群集表

要设置 Logstash 管道:

  1. 在命令 shell 中,导航到 Logstash 根目录,然后运行以下命令以安装Logstash 输出插件。 有关插件的详细信息,请参阅从 Logstash 引入数据

    bin/logstash-plugin install logstash-output-kusto
    
  2. 使用以下设置创建 Logstash 管道配置文件:

    input {
      elasticsearch {
        hosts => "http://localhost:9200"
        index => "vehicle"
        query => '{ "query": { "range" : { "releaseDate": { "gte": "2019-01-01", "lte": "2023-12-31" }}}}'
        user => "<elasticsearch_username>"
        password => "<elasticsearch_password>"
        ssl => true
        ca_file => "<certification_file>"
      }
    }
    
    filter
    {
      ruby
      {
        code => "event.set('[@metadata][timebucket]', Time.now().to_i/10)"
      }
    }
    
    output {
      kusto {
        path => "/tmp/region1/%{+YYYY-MM-dd}-%{[@metadata][timebucket]}.txt"
        ingest_url => "https://ingest-<azure_data_explorer_cluster_name>.<region>.kusto.windows.net"
        app_id => "<app_id>"
        app_key => "<app_secret>"
        app_tenant => "<app_tenant_id>"
        database => "<your_database>"
        table => "Vehicle" // The table schema you created earlier
        json_mapping => "vehicleMapping" // The ingestion mapping you created earlier
      }
    }
    

    输入参数

    参数名称 说明
    hosts Elasticsearch 群集的 URL。
    index 要迁移的索引的名称。
    query 用于从索引获取特定数据的可选查询。
    user 用于连接到 Elasticsearch 群集的用户名。
    password 用于连接到 Elasticsearch 群集的密码。
    tags 用于识别数据源的可选标记。 例如,在tags => ["vehicle"]elasticsearch部分指定 ,然后使用if "vehicle" in [tags] { ... }包装kusto部分进行筛选。
    ssl 指定是否需要 SSL 证书。
    ca_file 要传递用于身份验证的证书文件。

    筛选器参数

    Ruby 筛选器每 10 秒设置 Elasticsearch 数据文件的唯一时间戳,从而防止将重复数据引入群集。 这是最佳做法,即将数据分块到具有唯一时间戳的文件中,确保正确处理数据以用于迁移。

    输出参数

    参数名称 说明
    路径 Logstash 插件会将事件写入临时文件,然后将其发送到群集。 此参数描述了保存临时文件的路径,以及用于触发上传到群集的文件轮换的时间表达式。
    ingest_url 用于进行引入相关通信的群集终结点。
    app_idapp_keyapp_tenant 连接到群集所需的凭据。 请确保使用具有引入特权的应用程序。 有关详细信息,请参阅先决条件
    database 要将事件放入到的数据库的名称。
    table 要将事件放入到的目标表的名称。
    json_mapping 映射用于将传入事件的 json 字符串映射为正确的行格式(定义哪个 ELK 属进入哪个表架构列)。

迁移

完成迁移前准备步骤后,下一步是执行迁移过程。 必须在数据迁移过程中监视管道,从而确保管道平稳运行,以便解决可能出现的任何问题。

要迁移数据,请在命令 shell 中导航到 Logstash 根目录,然后运行以下命令:

bin/logstash -f <your_pipeline>.conf

应看到打印到屏幕上的信息。

迁移后

成功完成迁移后还需要执行一系列的迁移后任务,从而验证数据并确保所有操作尽可能顺利高效。

特定索引的数据验证过程通常包括以下活动:

数据比较:将 Azure 数据资源管理器群集中的迁移数据与 Elasticsearch 中的原始数据进行比较。 可以使用 ELK 堆栈中的 Kibana 等工具执行此操作,该工具允许在两个环境中查询和可视化数据。

查询执行:针对 Azure 数据资源管理器群集中的迁移数据运行一系列查询,从而确保数据准确、完整。 这包括运行测试不同字段之间关系的查询,以及测试数据完整性的查询。

检查缺失数据:将群集中的迁移数据与 Elasticsearch 中的数据进行比较,从而检查缺失数据、重复数据或任何其他数据不一致。

验证性能:测试群集中迁移数据的性能,并将其与 Elasticsearch 中的数据性能进行比较。 这可以包括运行查询并可视化数据以测试响应时间,并确保群集中的数据已针对性能进行优化。

重要

如果对迁移的数据或群集进行了任何更改,请重复数据验证过程,从而确保数据仍然准确、完整。

下面是一些可运行以验证群集中的数据的查询示例:

  1. 在 Elasticsearch 中,运行以下查询以获取:

    // Gets the total record count of the index
    GET vehicle/_count
    
    // Gets the total record count of the index based on a datetime query
    GET vehicle/_count
    {
      "query": {
        "range" : {
          "releaseDate": { "gte": "2021-01-01", "lte": "2021-12-31" }
                  }
              }
    }
    
    // Gets the count of all vehicles that has manufacturer as "Honda".
    GET vehicle/_count
    {
      "query": {
        "bool" : {
          "must" : {
            "term" : { "manufacturer" : "Honda" }
          }
        }
      }
    }
    
    // Get the record count where a specific property doesn't exist.
    // This is helpful especially when some records don't have NULL properties.
    GET vehicle/_count
    {
      "query": {
        "bool": {
          "must_not": {
            "exists": {
              "field": "description"
            }
          }
        }
      }
    }
    
  2. 在数据库查询窗口中,运行以下相应的查询:

    // Gets the total record count in the table
    Vehicle
    | count
    
    // Gets the total record count where a given property is NOT empty/null
    Vehicle
    | where isnotempty(Manufacturer)
    
    // Gets the total record count where a given property is empty/null
    Vehicle
    | where isempty(Manufacturer)
    
    // Gets the total record count by a property value
    Vehicle
    | where Manufacturer == "Honda"
    | count
    
  3. 比较两组查询的结果,从而确保群集中的数据准确、完整。

要了解有关 Azure 数据资源管理器数据库的详细信息,请参阅:

有关云迁移的框架和采用周期的详细信息,请参阅: