教程第 1 部分:使用 Apache Spark 将数据引入 Microsoft Fabric Lakehouse

在本教程中,你将以 delta lake 格式将数据引入 Fabric 湖屋。 要理解的一些重要术语:

  • 湖屋 - 湖屋是文件/文件夹/表的集合,表示数据湖上由 Spark 引擎和 SQL 引擎用于大数据处理的数据库,其中包括在使用开放源代码 Delta 格式表时用于 ACID 事务的增强功能

  • Delta Lake - Delta Lake 是一个开源存储层,用于将 ACID 事务、可缩放的元数据管理和批处理和流式数据处理引入 Apache Spark。 Delta Lake 表是一种数据表格式,它将 Parquet 数据文件与基于文件的事务日志相结合,以支持 ACID 事务和可扩展的元数据管理。

  • Azure 开放数据集 是特选的公共数据集,可用于向机器学习解决方案添加特定于方案的功能,以便更准确的模型。 开放数据集位于 Microsoft Azure 存储上的云中,可通过各种方法(包括 Apache Spark、REST API、数据工厂和其他工具)访问。

在本教程中,你将使用 Apache Spark 来:

  • 从 Azure 开放数据集容器读取数据。
  • 将数据写入 Fabric 湖屋 Delta 表。

先决条件

  • 将湖屋添加到此笔记本。 你将从公共 Blob 下载数据,然后将数据存储在湖屋中。

在笔记本中继续操作

1-ingest-data.ipynb 是本教程随附的笔记本。

银行流失数据

数据集包含 10,000 个客户的流失状态。 它还包括可能影响客户流失的属性,例如:

  • 信用评分
  • 地理位置(德国、法国、西班牙)
  • 性别(男性、女性)
  • 年龄
  • 客户年限(成为银行客户的年数)
  • 帐户余额
  • 估计工资
  • 客户通过银行购买的产品数
  • 信用卡状态(客户是否有信用卡)
  • 成员活跃状态(不论是否为活跃银行的客户)

数据集还包括行号、客户 ID 和客户姓氏等列,这些列不应影响客户决定离开银行。

定义客户流失的事件是关闭客户的银行账户。 数据集中的列 exited 是指客户的放弃。 这些属性没有太多上下文可用,因此无需提供有关数据集的背景信息即可继续操作。 目的是了解这些属性对 exited 状态的贡献。

数据集中的示例行:

“CustomerID” "Surname" “CreditScore” 地理 性别 年龄 “任期” “平衡” "NumOfProducts" “HasCrCard” "IsActiveMember" "EstimatedSalary" "Exited"
15634602 哈格雷夫 619 法国 女性 42 2 0.00 1 1 1 101348.88 1
15647311 Hill 608 西班牙 女性 41 1 83807.86 1 0 1 112542.58 0

下载数据集并上传到 Lakehouse

提示

通过定义以下参数,可以轻松地将此笔记本用于不同的数据集。

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

DATA_ROOT = "/lakehouse/default"
DATA_FOLDER = "Files/churn"  # folder with data files
DATA_FILE = "churn.csv"  # data file name

此代码下载数据集的公开可用版本,然后将其存储在 Fabric Lakehouse 中。

重要

确保在运行笔记本之前向笔记本中添加湖屋。 否则将导致错误。

import os, requests
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    file_list = [DATA_FILE]
    download_path = f"{DATA_ROOT}/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

你将使用刚刚引入的数据: