你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
将数据从 FHIR 服务复制到 Azure Synapse Analytics
本文介绍了将数据从 Azure Health Data Services 中的 FHIR® 服务 复制到 Azure Synapse Analytics 的三种方法,这是一种无限的分析服务,它将数据集成、企业数据仓库和大数据分析相结合。
- 使用 FHIR 到 Synapse 同步代理 OSS 工具
- 使用 FHIR 到 CDM 管道生成器 OSS 工具
- 使用 $export并使用 T-SQL 将数据加载到 Synapse
使用 FHIR 到 Synapse 同步代理 OSS 工具
注意
FHIR to Synapse Sync Agent 是 MIT 许可证下发布的开放源代码工具,不受 Azure 服务的 Microsoft SLA 涵盖。
FHIR 到 Synapse 同步代理 是 MIT 许可证下发布的Microsoft OSS 项目。 它是一个 Azure 函数,它使用 FHIR 资源 API 从 FHIR 服务器提取数据,将其转换为分层 Parquet 文件,并近乎实时地将其写入 Azure Data Lake。 这还包含一个脚本,用于在指向 Parquet 文件的 Synapse 无服务器 SQL 池中创建外部表和视图。
借助此解决方案,可以使用 Synapse Studio、SSMS 和 Power BI 等工具来查询整个 FHIR 数据。 还可以直接从 Synapse Spark 池访问 Parquet 文件。 如果要近乎实时地访问所有 FHIR 数据,并且希望将自定义转换推迟到下游系统,则应考虑此解决方案。
按照 OSS 文档 进行安装和使用说明。
使用 FHIR 到 CDM 管道生成器 OSS 工具
注意
FHIR to CDM 管道生成器是 MIT 许可证下发布的开放源代码工具,不受 Azure 服务的 Microsoft SLA 涵盖。
FHIR 到 CDM 管道生成器是 MIT 许可证下发布的Microsoft OSS 项目。 它是使用 $export API 从 FHIR 服务器复制数据的快照、将其转换为 csv 格式以及写入 Azure Data Lake Storage Gen 2 中的 CDM 文件夹 的工具。 该工具需要用户创建的配置文件,其中包含将 FHIR 资源和字段投影和平展到表中的说明。 还可以按照说明在 Synapse 工作区中创建下游管道,将数据从 CDM 文件夹移动到 Synapse 专用 SQL 池。
使用此解决方案,可以将数据转换为表格格式,因为它写入 CDM 文件夹。 如果要在从 FHIR 服务器中提取 FHIR 数据后将其转换为自定义架构,则应考虑此解决方案。
按照 OSS 文档 进行安装和使用说明。
使用 T-SQL 将导出的数据加载到 Synapse
在此方法中,使用 FHIR 操作以格式将 FHIR $export
资源复制到 Azure Data Lake Gen 2 (ADL Gen 2) Blob 存储NDJSON
中。 然后,使用 T-SQL 将数据从存储加载到 Synapse 中的无服务器或专用 SQL 池 中。 可以使用 Synapse 管道将这些步骤转换为可靠的数据移动管道。
用于 $export
复制数据
在 FHIR 服务器中配置$export
Azure Health Data Services 中的 FHIR 服务器实现 $export
由 FHIR 规范定义的操作,以采用格式导出所有或筛选的 FHIR 数据 NDJSON
子集。 此外,它还支持取消标识导出,以在导出过程中匿名处理 FHIR 数据。
若要将 FHIR 数据导出到 Azure Blob 存储,首先需要将 FHIR 服务器配置为将数据导出到存储帐户。 需要(1)启用托管标识,(2)转到存储帐户中的访问控制,然后添加角色分配,(3)选择存储帐户$export
。 可在此处找到更多分步操作。
可以将服务器配置为将数据导出到任何类型的 Azure 存储帐户,但我们建议导出到 ADL Gen 2,以便与 Synapse 保持高度一致。
使用 $export
命令
配置 FHIR 服务器后,可以按照文档中的内容在系统、患者或组级别导出 FHIR 资源。 例如,可以使用以下 $export
命令导出与 Group
中患者相关的所有 FHIR 数据,其中可以在字段 {{BlobContainer}}
中指定 ADL Gen 2 Blob 存储名称:
https://{{FHIR service base URL}}/Group/{{GroupId}}/$export?_container={{BlobContainer}}
还可以在前面的$export
调用中使用_type
参数来限制要导出的资源。 例如,以下调用仅Patient
MedicationRequest
导出和资源Observation
:
https://{{FHIR service base URL}}/Group/{{GroupId}}/$export?_container={{BlobContainer}}&
_type=Patient,MedicationRequest,Condition
有关支持的不同参数的详细信息,请查看有关查询参数的 $export
页面部分。
使用 Synapse for Analytics
创建 Synapse 工作区
在使用 Synapse 之前,需要一个 Synapse 工作区。 在 Azure 门户 上创建 Azure Synapse Analytics 服务。 可在此处找到更多分步指南。 需要 ADLSGEN2
帐户才能创建工作区。 Azure Synapse 工作区将使用此存储帐户来存储 Synapse 工作区数据。
创建工作区后,可以通过登录工作区或在 Azure 门户 中启动 Synapse Studio,在 Synapse Studio 中查看工作区https://web.azuresynapse.net。
在 Azure 存储和 Synapse 之间创建链接服务
若要将数据复制到 Synapse,需要创建一个链接服务,用于连接 Azure 存储 帐户,其中已使用 Synapse 导出数据。 可在此处找到更多分步说明。
- 在 Synapse Studio 中,浏览到“管理”选项卡,然后在“外部连接”下选择“链接服务”。
- 选择“新建”以添加新的链接服务。
- 从列表中选择“Azure Data Lake Storage Gen2”磁贴,然后选择“继续”。
- 输入你的身份验证凭据。 完成后,选择“创建”。
有了 ADL Gen 2 存储和 Synapse 之间的链接服务后,即可使用 Synapse SQL 池来加载和分析 FHIR 数据。
在无服务器池与专用 SQL 池之间做出决定
Azure Synapse Analytics 提供两个不同的 SQL 池:无服务器 SQL 池和专用 SQL 池。 无服务器 SQL 池提供了使用无服务器 SQL 终结点直接在 Blob 存储中查询数据的灵活性,而无需进行任何资源预配。 专用 SQL 池具有高性能和并发处理能力,建议用于企业规模数据仓储功能。 有关这两种 SQL 池的详细信息,请查看有关 SQL 体系结构的 Synapse 文档页。
使用无服务器 SQL 池
由于无服务器,因此没有要设置的基础结构或要维护的群集。 创建工作区后,即可从 Synapse Studio 查询数据。
例如,以下查询可用于将所选字段从 Patient.ndjson
转换为表格结构:
SELECT * FROM
OPENROWSET(bulk 'https://{{youraccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson',
FORMAT = 'csv',
FIELDTERMINATOR ='0x0b',
FIELDQUOTE = '0x0b')
WITH (doc NVARCHAR(MAX)) AS rows
CROSS APPLY OPENJSON(doc)
WITH (
ResourceId VARCHAR(64) '$.id',
Active VARCHAR(10) '$.active',
FullName VARCHAR(100) '$.name[0].text',
Gender VARCHAR(20) '$.gender',
...
)
在前面的查询中,函数OPENROWSET
访问Azure 存储中的文件,并OPENJSON
分析 JSON 文本,并将 JSON 输入属性作为行和列返回。 每次执行此查询时,无服务器 SQL 池都会从 Blob 存储中读取文件、分析 JSON 并提取字段。
还可以在外部表中以 Parquet 格式具体化结果,以获得更好的查询性能,如下所示。
-- Create External data source where the parquet file will be written
CREATE EXTERNAL DATA SOURCE [MyDataSource] WITH (
LOCATION = 'https://{{youraccount}}.blob.core.windows.net/{{exttblcontainer}}'
);
GO
-- Create External File Format
CREATE EXTERNAL FILE FORMAT [ParquetFF] WITH (
FORMAT_TYPE = PARQUET,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
);
GO
CREATE EXTERNAL TABLE [dbo].[Patient] WITH (
LOCATION = 'PatientParquet/',
DATA_SOURCE = [MyDataSource],
FILE_FORMAT = [ParquetFF]
) AS
SELECT * FROM
OPENROWSET(bulk 'https://{{youraccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson'
-- Use rest of the SQL statement from the previous example --
使用专用 SQL 池
专用 SQL 池支持托管表和分层缓存,能够提高内存中性能。 可以使用简单的 T-SQL 查询导入大数据,然后利用分布式查询引擎的功能运行高性能分析。
将数据从存储加载到专用 SQL 池的最简单快捷的方法是使用 T-SQL 中的 命令,该命令可以读取 CSV、Parquet 和 ORC 文件COPY
。 如以下示例查询中所示,使用 COPY
命令将 NDJSON
行加载到表格结构中。
-- Create table with HEAP, which is not indexed and does not have a column width limitation of NVARCHAR(4000)
CREATE TABLE StagingPatient (
Resource NVARCHAR(MAX)
) WITH (HEAP)
COPY INTO StagingPatient
FROM 'https://{{yourblobaccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson'
WITH (
FILE_TYPE = 'CSV',
ROWTERMINATOR='0x0a',
FIELDQUOTE = '',
FIELDTERMINATOR = '0x00'
)
GO
在上表中具有 JSON 行 StagingPatient
后,可以使用函数创建数据的不同表格格式 OPENJSON
,并将结果存储到表中。 下面是一个示例 SQL 查询,用于通过从Patient
资源中提取几个字段来创建Patient
表:
SELECT RES.*
INTO Patient
FROM StagingPatient
CROSS APPLY OPENJSON(Resource)
WITH (
ResourceId VARCHAR(64) '$.id',
FullName VARCHAR(100) '$.name[0].text',
FamilyName VARCHAR(50) '$.name[0].family',
GivenName VARCHAR(50) '$.name[0].given[0]',
Gender VARCHAR(20) '$.gender',
DOB DATETIME2 '$.birthDate',
MaritalStatus VARCHAR(20) '$.maritalStatus.coding[0].display',
LanguageOfCommunication VARCHAR(20) '$.communication[0].language.text'
) AS RES
GO
后续步骤
本文介绍了将 FHIR 数据复制到 Synapse 的三种不同的方法。
接下来,可以了解如何在将数据导出到 Synapse 时取消标识 FHIR 数据,以保护 PHI。
注意
FHIR® 是 HL7 的注册商标,经 HL7 许可使用。