第 1 部分:使用 Apache Spark 将数据引入 Microsoft Fabric 湖屋

在本教程中,你将以 delta lake 格式将数据引入 Fabric 湖屋。 需要理解的一些重要术语:

  • 湖屋 - 湖屋是文件/文件夹/表的集合,表示数据湖上由 Spark 引擎和 SQL 引擎用于大数据处理的数据库,其中包括在使用开放源代码 Delta 格式表时用于 ACID 事务的增强功能。

  • Delta Lake - Delta Lake 是一个开放源代码存储层,可将 ACID 事务、可缩放的元数据管理以及批处理和流式数据处理引入 Apache Spark。 Delta Lake 表是一种数据表格式,它通过基于文件的事务日志扩展了 Parquet 数据文件,可以处理 ACID 事务和可缩放的元数据。

  • Azure 开放数据集是精选的公共数据集,可用于将特定于方案的特征添加到机器学习解决方案,以提高模型的准确度。 开放数据集位于 Microsoft Azure 存储上的云中,可通过各种方法(包括 Apache Spark、REST API、数据工厂和其他工具)进行访问。

在本教程中,你将使用 Apache Spark 执行以下操作:

  • 读取 Azure 开放数据集容器中的数据。
  • 将数据写入 Fabric 湖屋增量表。

先决条件

  • 将湖屋添加到此笔记本。 你将从公共 Blob 下载数据,然后将数据存储在湖屋中。

在笔记本中继续操作

1-ingest-data.ipynb 是本教程随附的笔记本。

若要打开本教程随附的笔记本,请按照让系统为数据科学做好准备教程中的说明操作,将该笔记本导入到工作区。

如果要从此页面复制并粘贴代码,则可以创建新的笔记本

在开始运行代码之前,请务必将湖屋连接到笔记本

银行流失数据

该数据集包含 10,000 个客户的流失状态。 它还包括可能影响流失的属性,例如:

  • 信用评分
  • 地理位置(德国、法国、西班牙)
  • 性别(男性、女性)
  • 帐龄
  • 保有时间(成为银行客户的年数)
  • 帐户余额
  • 估计工资
  • 客户通过银行购买的产品数
  • 信用卡状态(客户是否持有信用卡)
  • 活动成员状态(是否为活动银行的客户)

数据集还包括行号、客户 ID 和客户姓氏等列,这些列不应影响客户停止与银行业务往来的决定。

定义客户流失的事件是客户银行账户销户。 数据集中的列 exited 是指客户的放弃。 关于这些属性并未提供太多可用的上下文,因此必须在不掌握数据集背景信息的情况下继续操作。 目标是了解这些属性对 exited 状态的影响。

数据集中的示例行:

"CustomerID" "Surname" "CreditScore" "Geography" "Gender" "Age" "Tenure" "Balance" "NumOfProducts" "HasCrCard" "IsActiveMember" "EstimatedSalary" "Exited"
15634602 Hargrave 619 France Female 42 2 0.00 1 1 1 101348.88 1
15647311 Hill 608 西班牙 Female 41 1 83807.86 1 0 1 112542.58 0

下载数据集并上传到湖屋

提示

通过定义以下参数,你可以轻松地将此笔记本应用于不同的数据集。

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

DATA_ROOT = "/lakehouse/default"
DATA_FOLDER = "Files/churn"  # folder with data files
DATA_FILE = "churn.csv"  # data file name

以下代码将下载数据集的公开可用版本,然后将其存储在 Fabric 湖屋中。

重要

确保在运行笔记本之前向笔记本中添加湖屋。 否则可能会导致出错。

import os, requests
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    file_list = [DATA_FILE]
    download_path = f"{DATA_ROOT}/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

你将使用刚刚引入的数据: