你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将数据从 FHIR 服务复制到 Azure Synapse Analytics

本文介绍了将数据从 Azure Health Data Services 中的 FHIR® 服务 复制到 Azure Synapse Analytics 的三种方法,这是一种无限的分析服务,它将数据集成、企业数据仓库和大数据分析相结合。

使用 FHIR 到 Synapse 同步代理 OSS 工具

注意

FHIR to Synapse Sync Agent 是 MIT 许可证下发布的开放源代码工具,不受 Azure 服务的 Microsoft SLA 涵盖。

FHIR 到 Synapse 同步代理 是 MIT 许可证下发布的Microsoft OSS 项目。 它是一个 Azure 函数,它使用 FHIR 资源 API 从 FHIR 服务器提取数据,将其转换为分层 Parquet 文件,并近乎实时地将其写入 Azure Data Lake。 这还包含一个脚本,用于在指向 Parquet 文件的 Synapse 无服务器 SQL 池中创建外部表和视图。

借助此解决方案,可以使用 Synapse Studio、SSMS 和 Power BI 等工具来查询整个 FHIR 数据。 还可以直接从 Synapse Spark 池访问 Parquet 文件。 如果要近乎实时地访问所有 FHIR 数据,并且希望将自定义转换推迟到下游系统,则应考虑此解决方案。

按照 OSS 文档 进行安装和使用说明。

使用 FHIR 到 CDM 管道生成器 OSS 工具

注意

FHIR to CDM 管道生成器是 MIT 许可证下发布的开放源代码工具,不受 Azure 服务的 Microsoft SLA 涵盖。

FHIR 到 CDM 管道生成器是 MIT 许可证下发布的Microsoft OSS 项目。 它是使用 $export API 从 FHIR 服务器复制数据的快照、将其转换为 csv 格式以及写入 Azure Data Lake Storage Gen 2 中的 CDM 文件夹 的工具。 该工具需要用户创建的配置文件,其中包含将 FHIR 资源和字段投影和平展到表中的说明。 还可以按照说明在 Synapse 工作区中创建下游管道,将数据从 CDM 文件夹移动到 Synapse 专用 SQL 池。

使用此解决方案,可以将数据转换为表格格式,因为它写入 CDM 文件夹。 如果要在从 FHIR 服务器中提取 FHIR 数据后将其转换为自定义架构,则应考虑此解决方案。

按照 OSS 文档 进行安装和使用说明。

使用 T-SQL 将导出的数据加载到 Synapse

在此方法中,使用 FHIR 操作以格式将 FHIR $export 资源复制到 Azure Data Lake Gen 2 (ADL Gen 2) Blob 存储NDJSON中。 然后,使用 T-SQL 将数据从存储加载到 Synapse 中的无服务器或专用 SQL 池 中。 可以使用 Synapse 管道将这些步骤转换为可靠的数据移动管道

使用 $export 将 Azure 存储存储到 Synapse。

用于 $export 复制数据

在 FHIR 服务器中配置$export

Azure Health Data Services 中的 FHIR 服务器实现 $export 由 FHIR 规范定义的操作,以采用格式导出所有或筛选的 FHIR 数据 NDJSON 子集。 此外,它还支持取消标识导出,以在导出过程中匿名处理 FHIR 数据。

若要将 FHIR 数据导出到 Azure Blob 存储,首先需要将 FHIR 服务器配置为将数据导出到存储帐户。 需要(1)启用托管标识,(2)转到存储帐户中的访问控制,然后添加角色分配,(3)选择存储帐户$export。 可在此处找到更多分步操作

可以将服务器配置为将数据导出到任何类型的 Azure 存储帐户,但我们建议导出到 ADL Gen 2,以便与 Synapse 保持高度一致。

使用 $export 命令

配置 FHIR 服务器后,可以按照文档中的内容在系统、患者或组级别导出 FHIR 资源。 例如,可以使用以下 $export 命令导出与 Group 中患者相关的所有 FHIR 数据,其中可以在字段 {{BlobContainer}} 中指定 ADL Gen 2 Blob 存储名称:

https://{{FHIR service base URL}}/Group/{{GroupId}}/$export?_container={{BlobContainer}}  

还可以在前面的$export调用中使用_type参数来限制要导出的资源。 例如,以下调用仅PatientMedicationRequest导出和资源Observation

https://{{FHIR service base URL}}/Group/{{GroupId}}/$export?_container={{BlobContainer}}&
_type=Patient,MedicationRequest,Condition

有关支持的不同参数的详细信息,请查看有关查询参数$export 页面部分。

使用 Synapse for Analytics

创建 Synapse 工作区

在使用 Synapse 之前,需要一个 Synapse 工作区。 在 Azure 门户 上创建 Azure Synapse Analytics 服务。 可在此处找到更多分步指南。 需要 ADLSGEN2 帐户才能创建工作区。 Azure Synapse 工作区将使用此存储帐户来存储 Synapse 工作区数据。

创建工作区后,可以通过登录工作区或在 Azure 门户 中启动 Synapse Studio,在 Synapse Studio 中查看工作区https://web.azuresynapse.net

在 Azure 存储和 Synapse 之间创建链接服务

若要将数据复制到 Synapse,需要创建一个链接服务,用于连接 Azure 存储 帐户,其中已使用 Synapse 导出数据。 可在此处找到更多分步说明。

  1. 在 Synapse Studio 中,浏览到“管理”选项卡,然后在“外部连接”下选择“链接服务”。
  2. 选择“新建”以添加新的链接服务
  3. 从列表中选择“Azure Data Lake Storage Gen2”磁贴,然后选择“继续”
  4. 输入你的身份验证凭据。 完成后,选择“创建”

有了 ADL Gen 2 存储和 Synapse 之间的链接服务后,即可使用 Synapse SQL 池来加载和分析 FHIR 数据。

在无服务器池与专用 SQL 池之间做出决定

Azure Synapse Analytics 提供两个不同的 SQL 池:无服务器 SQL 池和专用 SQL 池。 无服务器 SQL 池提供了使用无服务器 SQL 终结点直接在 Blob 存储中查询数据的灵活性,而无需进行任何资源预配。 专用 SQL 池具有高性能和并发处理能力,建议用于企业规模数据仓储功能。 有关这两种 SQL 池的详细信息,请查看有关 SQL 体系结构的 Synapse 文档页

使用无服务器 SQL 池

由于无服务器,因此没有要设置的基础结构或要维护的群集。 创建工作区后,即可从 Synapse Studio 查询数据。

例如,以下查询可用于将所选字段从 Patient.ndjson 转换为表格结构:

SELECT * FROM  
OPENROWSET(bulk 'https://{{youraccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson', 
FORMAT = 'csv', 
FIELDTERMINATOR ='0x0b', 
FIELDQUOTE = '0x0b')  
WITH (doc NVARCHAR(MAX)) AS rows     
CROSS APPLY OPENJSON(doc)     
WITH ( 
    ResourceId VARCHAR(64) '$.id', 
    Active VARCHAR(10) '$.active', 
    FullName VARCHAR(100) '$.name[0].text', 
    Gender VARCHAR(20) '$.gender', 
       ...
) 

在前面的查询中,函数OPENROWSET访问Azure 存储中的文件,并OPENJSON分析 JSON 文本,并将 JSON 输入属性作为行和列返回。 每次执行此查询时,无服务器 SQL 池都会从 Blob 存储中读取文件、分析 JSON 并提取字段。

还可以在外部表中以 Parquet 格式具体化结果,以获得更好的查询性能,如下所示。

-- Create External data source where the parquet file will be written 
CREATE EXTERNAL DATA SOURCE [MyDataSource] WITH ( 
    LOCATION = 'https://{{youraccount}}.blob.core.windows.net/{{exttblcontainer}}' 
); 
GO 

-- Create External File Format 
CREATE EXTERNAL FILE FORMAT [ParquetFF] WITH ( 
    FORMAT_TYPE = PARQUET, 
    DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec' 
); 
GO 

CREATE EXTERNAL TABLE [dbo].[Patient] WITH ( 
        LOCATION = 'PatientParquet/', 
        DATA_SOURCE = [MyDataSource], 
        FILE_FORMAT = [ParquetFF] 
) AS 
SELECT * FROM  
OPENROWSET(bulk 'https://{{youraccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson' 
-- Use rest of the SQL statement from the previous example --

使用专用 SQL 池

专用 SQL 池支持托管表和分层缓存,能够提高内存中性能。 可以使用简单的 T-SQL 查询导入大数据,然后利用分布式查询引擎的功能运行高性能分析。

将数据从存储加载到专用 SQL 池的最简单快捷的方法是使用 T-SQL 中的 命令,该命令可以读取 CSV、Parquet 和 ORC 文件COPY。 如以下示例查询中所示,使用 COPY 命令将 NDJSON 行加载到表格结构中。

-- Create table with HEAP, which is not indexed and does not have a column width limitation of NVARCHAR(4000) 
CREATE TABLE StagingPatient ( 
Resource NVARCHAR(MAX) 
) WITH (HEAP) 
COPY INTO StagingPatient 
FROM 'https://{{yourblobaccount}}.blob.core.windows.net/{{yourcontainer}}/Patient.ndjson' 
WITH ( 
FILE_TYPE = 'CSV', 
ROWTERMINATOR='0x0a', 
FIELDQUOTE = '', 
FIELDTERMINATOR = '0x00' 
) 
GO

在上表中具有 JSON 行 StagingPatient 后,可以使用函数创建数据的不同表格格式 OPENJSON ,并将结果存储到表中。 下面是一个示例 SQL 查询,用于通过从Patient资源中提取几个字段来创建Patient表:

SELECT RES.* 
INTO Patient 
FROM StagingPatient
CROSS APPLY OPENJSON(Resource)   
WITH (
  ResourceId VARCHAR(64) '$.id',
  FullName VARCHAR(100) '$.name[0].text',
  FamilyName VARCHAR(50) '$.name[0].family',
  GivenName VARCHAR(50) '$.name[0].given[0]',
  Gender VARCHAR(20) '$.gender',
  DOB DATETIME2 '$.birthDate',
  MaritalStatus VARCHAR(20) '$.maritalStatus.coding[0].display',
  LanguageOfCommunication VARCHAR(20) '$.communication[0].language.text'
) AS RES 
GO

后续步骤

本文介绍了将 FHIR 数据复制到 Synapse 的三种不同的方法。

接下来,可以了解如何在将数据导出到 Synapse 时取消标识 FHIR 数据,以保护 PHI。

注意

FHIR® 是 HL7 的注册商标,经 HL7 许可使用。