你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
迁移指南:Elasticsearch 到 Azure 数据资源管理器
在本指南中,你会了解如何使用Logstash将 Elasticsearch 数据迁移到 Azure 数据资源管理器。
在本指南中,要迁移的数据位于名为vehicle的 Elasticsearch 索引中,该索引具有以下数据架构:
{
"Manufacturer": "string",
"Model": "string",
"ReleaseYear": "int",
"ReleaseDate": "datetime"
}
先决条件
要将 Elasticsearch 数据迁移到 Azure 数据资源管理器,需要:
- Microsoft 帐户或 Microsoft Entra 用户标识。 无需 Azure 订阅。
- Azure 数据资源管理器群集和数据库。 可以 创建免费群集 或 创建完整的群集。 若要确定最适合你,请检查 功能比较。
- 用于访问 Azure 数据资源管理器群集的应用 ID 和委派权限。 有关详细信息,请参阅创建 Microsoft Entra 应用。 需要应用 ID、机密和租户 ID 来配置 Logstash 管道。
- Logstash 版本 6+ 安装说明。
预迁移
满足先决条件后,就已准备就绪,可以探索环境的拓扑并评估 Azure 云迁移的可行性了。
在 Azure 数据资源管理器群集中创建目标架构
要正确引入和构造用于查询和分析的数据,需要在 Azure 数据资源管理器群集中创建表架构和映射。
表的架构和要迁移的数据应匹配。 引入映射对于建立 ELK 中源列到表中目标列的映射非常重要。
要在群集中创建表架构和引入映射:
登录到 Azure 数据资源管理器 Web UI。
选择要在其中为迁移数据创建表架构的数据库。
在数据库查询窗口中运行以下命令,以创建表架构。
.create tables Vehicle ( Manufacturer: string, Model: string, ReleaseYear: int, ReleaseDate: datetime )
运行以下命令以创建引入映射。
.create table Vehicle ingestion json mapping 'VechicleMapping' '[' ' {"column":"Manufacturer", "path":"$.manufacturer"},' ' {"column":"Model", "path":"$.model"},' ' {"column":"ReleaseYear", "path":"$.releaseYear"},' ' {"column":"ReleaseDate", "path":"$.releaseDate"}' ']'
准备用于迁移的 Logstash
将数据迁移到 Azure 数据资源管理器群集时,必须正确设置Logstash 管道。 管道可确保数据格式正确并传输到目标表。
如果需要从多个 Elasticsearch 群集或索引移动数据,可以在管道配置文件中创建多个输入节。 为此,可以为每个 Elasticsearch 群集或索引定义输入部分,并根据需要使用标记对其进行分类。 然后,可以在“输出”部分的条件语句中使用这些标记,从而将这些数据集定向到特定的 Azure 数据资源管理器群集表
要设置 Logstash 管道:
在命令 shell 中,导航到 Logstash 根目录,然后运行以下命令以安装Logstash 输出插件。 有关插件的详细信息,请参阅从 Logstash 引入数据。
bin/logstash-plugin install logstash-output-kusto
使用以下设置创建 Logstash 管道配置文件:
input { elasticsearch { hosts => "http://localhost:9200" index => "vehicle" query => '{ "query": { "range" : { "releaseDate": { "gte": "2019-01-01", "lte": "2023-12-31" }}}}' user => "<elasticsearch_username>" password => "<elasticsearch_password>" ssl => true ca_file => "<certification_file>" } } filter { ruby { code => "event.set('[@metadata][timebucket]', Time.now().to_i/10)" } } output { kusto { path => "/tmp/region1/%{+YYYY-MM-dd}-%{[@metadata][timebucket]}.txt" ingest_url => "https://ingest-<azure_data_explorer_cluster_name>.<region>.kusto.windows.net" app_id => "<app_id>" app_key => "<app_secret>" app_tenant => "<app_tenant_id>" database => "<your_database>" table => "Vehicle" // The table schema you created earlier json_mapping => "vehicleMapping" // The ingestion mapping you created earlier } }
输入参数
参数名称 说明 hosts Elasticsearch 群集的 URL。 index 要迁移的索引的名称。 query 用于从索引获取特定数据的可选查询。 user 用于连接到 Elasticsearch 群集的用户名。 password 用于连接到 Elasticsearch 群集的密码。 tags 用于识别数据源的可选标记。 例如,在 tags => ["vehicle"]
elasticsearch部分指定 ,然后使用if "vehicle" in [tags] { ... }
包装kusto部分进行筛选。ssl 指定是否需要 SSL 证书。 ca_file 要传递用于身份验证的证书文件。 筛选器参数
Ruby 筛选器每 10 秒设置 Elasticsearch 数据文件的唯一时间戳,从而防止将重复数据引入群集。 这是最佳做法,即将数据分块到具有唯一时间戳的文件中,确保正确处理数据以用于迁移。
输出参数
参数名称 说明 路径 Logstash 插件会将事件写入临时文件,然后将其发送到群集。 此参数描述了保存临时文件的路径,以及用于触发上传到群集的文件轮换的时间表达式。 ingest_url 用于进行引入相关通信的群集终结点。 app_id、app_key 和 app_tenant 连接到群集所需的凭据。 请确保使用具有引入特权的应用程序。 有关详细信息,请参阅先决条件。 database 要将事件放入到的数据库的名称。 table 要将事件放入到的目标表的名称。 json_mapping 映射用于将传入事件的 json 字符串映射为正确的行格式(定义哪个 ELK 属进入哪个表架构列)。
迁移
完成迁移前准备步骤后,下一步是执行迁移过程。 必须在数据迁移过程中监视管道,从而确保管道平稳运行,以便解决可能出现的任何问题。
要迁移数据,请在命令 shell 中导航到 Logstash 根目录,然后运行以下命令:
bin/logstash -f <your_pipeline>.conf
应看到打印到屏幕上的信息。
迁移后
成功完成迁移后还需要执行一系列的迁移后任务,从而验证数据并确保所有操作尽可能顺利高效。
特定索引的数据验证过程通常包括以下活动:
数据比较:将 Azure 数据资源管理器群集中的迁移数据与 Elasticsearch 中的原始数据进行比较。 可以使用 ELK 堆栈中的 Kibana 等工具执行此操作,该工具允许在两个环境中查询和可视化数据。
查询执行:针对 Azure 数据资源管理器群集中的迁移数据运行一系列查询,从而确保数据准确、完整。 这包括运行测试不同字段之间关系的查询,以及测试数据完整性的查询。
检查缺失数据:将群集中的迁移数据与 Elasticsearch 中的数据进行比较,从而检查缺失数据、重复数据或任何其他数据不一致。
验证性能:测试群集中迁移数据的性能,并将其与 Elasticsearch 中的数据性能进行比较。 这可以包括运行查询并可视化数据以测试响应时间,并确保群集中的数据已针对性能进行优化。
重要
如果对迁移的数据或群集进行了任何更改,请重复数据验证过程,从而确保数据仍然准确、完整。
下面是一些可运行以验证群集中的数据的查询示例:
在 Elasticsearch 中,运行以下查询以获取:
// Gets the total record count of the index GET vehicle/_count // Gets the total record count of the index based on a datetime query GET vehicle/_count { "query": { "range" : { "releaseDate": { "gte": "2021-01-01", "lte": "2021-12-31" } } } } // Gets the count of all vehicles that has manufacturer as "Honda". GET vehicle/_count { "query": { "bool" : { "must" : { "term" : { "manufacturer" : "Honda" } } } } } // Get the record count where a specific property doesn't exist. // This is helpful especially when some records don't have NULL properties. GET vehicle/_count { "query": { "bool": { "must_not": { "exists": { "field": "description" } } } } }
在数据库查询窗口中,运行以下相应的查询:
// Gets the total record count in the table Vehicle | count // Gets the total record count where a given property is NOT empty/null Vehicle | where isnotempty(Manufacturer) // Gets the total record count where a given property is empty/null Vehicle | where isempty(Manufacturer) // Gets the total record count by a property value Vehicle | where Manufacturer == "Honda" | count
比较两组查询的结果,从而确保群集中的数据准确、完整。