你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Synapse Analytics 中的文件装载/卸载 API 简介

Azure Synapse Studio 团队在 Microsoft Spark 实用工具 (mssparkutils) 包中构建了两个新的装载/卸载 API。 可以使用这些 API 将远程存储(Azure Blob 存储 或 Azure Data Lake Storage Gen2)附加到所有工作节点(驱动程序节点和工作器节点)。 存储就位后,可以使用本地文件 API 访问数据,如同数据存储在本地文件系统中一样。 有关详细信息,请参阅 Microsoft Spark 实用工具简介

本文演示如何在工作区中使用装载/卸载 API。 学习内容:

  • 如何装载 Data Lake Storage Gen2 或 Blob 存储。
  • 如何通过本地文件系统 API 访问装入点下的文件。
  • 如何使用 mssparktuils fs API 访问装入点下的文件。
  • 如何使用 Spark 读取 API 访问装入点下的文件。
  • 如何卸载装入点。

警告

Azure 文件共享装载会暂时禁用。 可以改用 Data Lake Storage Gen2 或 Azure Blob 存储装载,如下一部分所述。

不支持 Azure Data Lake Storage Gen1 存储。 在使用装载 API 之前,可以按照 Azure Data Lake Storage Gen1 到 Gen2 迁移指南迁移到 Data Lake Storage Gen2。

装载存储

此部分说明如何逐步装载 Data Lake Storage Gen2,以此作为示例。 装载 Blob 存储的方式是类似的。

该示例假设你具有一个名为 storegen2 的 Data Lake Storage Gen2 帐户。 该帐户有一个名为 mycontainer 的容器,你要将该容器装载到 Spark 池中的 /test

Screenshot of a Data Lake Storage Gen2 storage account.

若要装载名为 mycontainer 的容器,mssparkutils 首先需要检查你是否具有访问该容器的权限。 目前,Azure Synapse Analytics 对触发器装载操作支持三种身份验证方法:LinkedServiceaccountKeysastoken

建议通过链接服务进行触发器装载。 此方法可避免安全泄漏,因为 mssparkutils 不会存储任何机密或身份验证值本身。 相反,mssparkutils 始终从链接服务提取身份验证值,以便从远程存储请求 Blob 数据。

Screenshot of linked services.

可以为 Data Lake Storage Gen2 或 Blob 存储创建链接服务。 目前,Azure Synapse Analytics 在创建链接服务时支持两种身份验证方法:

  • 使用帐户密钥创建链接服务

    Screenshot of selections for creating a linked service by using an account key.

  • 使用托管标识创建链接服务

    Screenshot of selections for creating a linked service by using a managed identity.

重要

  • 如果上面创建的到 Azure Data Lake Storage Gen2 的链接服务使用托管专用终结点(具有 dfs URI),则需要使用 Azure Blob 存储选项(具有 blob URI)创建另一个辅助托管专用终结点,以确保内部 fsspec/adlfs 代码可以使用 BlobServiceClient 接口进行连接。
  • 如果未正确配置辅助托管专用终结点,我们将看到类似于 ServiceRequestError: 无法连接到主机 [storageaccountname].blob.core.windows.net:443 ssl:True [名称或服务未知] 的错误消息

Screenshot of creating a managed private end-point to an ADLS Gen2 storage using blob endpoint.

注意

如果使用托管标识作为身份验证方法来创建链接服务,请确保工作区 MSI 文件具有已装载容器的存储 Blob 数据参与者角色。

成功创建链接服务后,可以使用以下 Python 代码将容器轻松装载到 Spark 池:

mssparkutils.fs.mount( 
    "abfss://mycontainer@<accountname>.dfs.core.windows.net", 
    "/test", 
    {"LinkedService":"mygen2account"} 
) 

注意

如果 mssparkutils 不可用,可能需要将它导入:

from notebookutils import mssparkutils 

装载参数:

  • fileCacheTimeout:默认情况下,Blob 将在本地临时文件夹中缓存 120 秒。 在此期间,BlobFuse 不会检查文件是否为最新状态。 可以将参数设置为更改默认超时时间。 当多个客户端同时修改文件时,为了避免本地文件与远程文件之间的不一致,我们建议缩短缓存时间,甚至将其更改为 0,并始终从服务器获取最新文件。
  • timeout:默认情况下,装载操作超时为 120 秒。 可以将参数设置为更改默认超时时间。 当执行程序过多或装载超时时,建议增加值。
  • scope:范围参数用于指定装载的范围。 默认值为“job”。如果范围设置为“job”,则装载仅对当前群集可见。 如果范围设置为“workspace”,则装载对当前工作区中的所有笔记本都可见,如果不存在装入点,则会自动创建。 将相同的参数添加到卸载 API 以卸载装入点。 工作区级别装载仅支持链接服务身份验证。

可以使用如下所示的这些参数:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"linkedService":"mygen2account", "fileCacheTimeout": 120, "timeout": 120}
)

无论使用哪种身份验证方法,都不建议装载根文件夹。

通过共享访问签名令牌或帐户密钥进行装载

除了通过链接服务进行装载外,mssparkutils 还支持显式传递帐户密钥或共享访问签名 (SAS) 令牌作为参数来装载目标。

出于安全原因,建议将帐户密钥或 SAS 令牌存储在 Azure Key Vault 中(如以下示例屏幕截图所示)。 随后可以使用 mssparkutil.credentials.getSecret API 检索它们。 有关详细信息,请参阅使用 Key Vault 和 Azure CLI(旧版)管理存储帐户密钥

Screenshot that shows a secret stored in a key vault.

下面是示例代码:

from notebookutils import mssparkutils  

accountKey = mssparkutils.credentials.getSecret("MountKV","mySecret")  
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
) 

注意

出于安全原因,请勿在代码中存储凭据。

使用 mssparkutils fs API 访问装入点下的文件

装载操作的主要目的是让客户能够使用本地文件系统 API 访问远程存储帐户中存储的数据。 你也可以使用 mssparkutils fs API 以装载路径作为参数来访问数据。 此处使用的路径格式略有不同。

假设已使用装载 API 将 Data Lake Storage Gen2 容器 mycontainer 装载到 /test。 当你使用本地文件系统 API 访问数据时,路径格式如下所示:

/synfs/{jobId}/test/{filename}

建议使用 mssparkutils.fs.getMountPath() 来获取准确的路径:

path = mssparkutils.fs.getMountPath("/test") # equals to /synfs/{jobId}/test

当你要使用 mssparkutils fs API 访问数据时,路径格式如下所示:

synfs:/{jobId}/test/{filename}

在本例中,你可以看到 synfs 用作架构,而不是装载路径的一部分。

以下三个示例演示如何使用 mssparkutils fs 通过装入点路径访问文件。 在示例中,49 是我们通过调用 mssparkutils.env.getJobId() 而获取的 Spark 作业 ID。

  • 列出目录:

    mssparkutils.fs.ls("synfs:/49/test") 
    
  • 读取文件内容:

    mssparkutils.fs.head("synfs:/49/test/myFile.txt") 
    
  • 创建目录:

    mssparkutils.fs.mkdirs("synfs:/49/test/newdir") 
    

使用 Spark 读取 API 访问装入点下的文件

可以提供参数以通过 Spark 读取 API 访问数据。 此处的路径格式与使用 mssparkutils fs API 时相同:

synfs:/{jobId}/test/{filename}

从装载的 Data Lake Storage Gen2 存储帐户读取文件

以下示例假设已装载 Data Lake Storage Gen2 存储帐户,然后使用装载路径读取文件:

%%pyspark 

df = spark.read.load("synfs:/49/test/myFile.csv", format='csv') 
df.show() 

注意

使用链接服务装载存储时,在使用 synfs 架构访问数据之前,应始终显式设置 Spark 链接服务配置。 有关详细信息,请参阅 ADLS Gen2 存储与链接服务

从装载的 Blob 存储帐户中读取文件

如果装载了 Blob 存储帐户并且要使用 mssparkutils 或 Spark API 访问该帐户,则需要在使用装载 API 尝试装载容器之前通过 Spark 配置显式配置 SAS 令牌:

  1. 若要在进行触发器装载后使用 mssparkutils 或 Spark API 访问 Blob 存储帐户,请更新 Spark 配置,如以下代码示例所示。 如果在装载后只想使用本地文件 API 访问 Spark 配置,则可以绕过此步骤。

    blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds("myblobstorageaccount") 
    
    spark.conf.set('fs.azure.sas.mycontainer.<blobStorageAccountName>.blob.core.windows.net', blob_sas_token) 
    
  2. 创建链接服务 myblobstorageaccount,并使用该链接服务装载 Blob 存储帐户:

    %%spark 
    mssparkutils.fs.mount( 
        "wasbs://mycontainer@<blobStorageAccountName>.blob.core.windows.net", 
        "/test", 
        Map("LinkedService" -> "myblobstorageaccount") 
    ) 
    
  3. 装载 Blob 存储容器,然后通过本地文件 API 使用装载路径读取文件:

        # mount the Blob Storage container, and then read the file by using a mount path
        with open("/synfs/64/test/myFile.txt") as f:
        print(f.read())
    
  4. 通过 Spark 读取 API 从装载的 Blob 存储容器读取数据:

    %%spark
    // mount blob storage container and then read file using mount path
    val df = spark.read.text("synfs:/49/test/myFile.txt")
    df.show()
    

卸载装入点

使用以下代码可卸载装入点(在此示例中为 /test):

mssparkutils.fs.unmount("/test") 

已知的限制

  • mssparkutils fs help 函数尚未添加有关装载/卸载部分的说明。

  • 卸载机制不是自动进行的。 应用程序运行完成后,若要卸载装入点以释放磁盘空间,需要在代码中显式调用卸载 API。 否则,应用程序运行结束后,装入点仍存在于节点中。

  • 目前不支持装载 Data Lake Storage Gen1 存储帐户。

后续步骤