OneLake と Azure Databricks の統合

この記事では、Azure Databricks から OneLake データにアクセスする方法について説明します。 どちらの方法でも、サービス プリンシパル認証と OneLake ABFS エンドポイントが使用されます。 Databricks コンピューティングの種類に一致するセクションを選択します。

  • Standard またはジョブ クラスター: OAuth 構成の Spark ABFS ドライバーを使用して、Spark DataFrames を介して直接データの読み取りと書き込みを行います。
  • サーバーレス コンピューティング: サーバーレス ランタイムでは、カスタム Spark 構成プロパティを設定できません。 代わりに、Microsoft Authentication Library (MSAL) と Python deltalake ライブラリを使用して、Delta テーブルの認証と読み取りまたは書き込みを行います。

関連する Databricks 統合シナリオについては、次のリソースを参照してください。

Scenario ドキュメント
コピーせずに Unity カタログから OneLake データにクエリを実行する OneLake カタログフェデレーションを有効にする
Fabricから Databricks Unity カタログ データにアクセスする Azure Databricks Unity カタログのミラーリング

[前提条件]

接続する前に、次の内容を確認してください。

  • ファブリック ワークスペースとレイクハウス。
  • Premium Azure Databricks ワークスペース。
  • 少なくとも 共同作成者 ワークスペース ロールが割り当てられているサービス プリンシパル。
  • シークレットを格納および取得するための Databricks シークレットまたは Azure Key Vault (AKV)。 この記事の例では、Databricks シークレットを使用します。

標準クラスターを使用して OneLake に接続する

正しい OneLake ABFS パス形式を使用する

次のいずれかの URI 形式を使用します。

  • abfss://<workspace_id_or_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_id_or_name>.lakehouse/Files/<path>
  • abfss://<workspace_id_or_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_id_or_name>.lakehouse/Tables/<path>

ID または名前を使用できます。 名前を使用する場合は、ワークスペース名とレイクハウス名に特殊文字と空白文字を使用しないでください。

サービス プリンシパル認証を使用する

このオプションは、自動化されたジョブと一元化されたシークレットローテーションに使用します。

workspace_name = "<workspace_name>"
lakehouse_name = "<lakehouse_name>"
tenant_id = dbutils.secrets.get(scope="<scope-name>", key="<tenant-id-key>")
service_principal_id = dbutils.secrets.get(scope="<scope-name>", key="<client-id-key>")
service_principal_secret = dbutils.secrets.get(scope="<scope-name>", key="<client-secret-key>")

spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set(
   "fs.azure.account.oauth.provider.type",
   "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
)
spark.conf.set("fs.azure.account.oauth2.client.id", service_principal_id)
spark.conf.set("fs.azure.account.oauth2.client.secret", service_principal_secret)
spark.conf.set(
   "fs.azure.account.oauth2.client.endpoint",
   f"https://login.microsoftonline.com/{tenant_id}/oauth2/token",
)

# Read
df = spark.read.format("parquet").load(
   f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.lakehouse/Files/data"
)
df.show(10)

# Write
df.write.format("delta").mode("overwrite").save(
   f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.lakehouse/Tables/dbx_delta_spn"
)

サーバーレス コンピューティングを使用して OneLake に接続する

Databricks サーバーレス コンピューティング を使用すると、クラスターをプロビジョニングせずにワークロードを実行できますが、 サポートされている Spark プロパティのサブセットのみが許可されます。 標準クラスターで使用される fs.azure.* Spark 構成を設定することはできません。

この制限は、Azure Databricks に固有の制限ではありません。 アマゾン ウェブ サービス (AWS)Google Cloud での Databricks サーバーレス実装の動作は同じです。

サーバーレス ノートブックでサポートされていない Spark 構成を設定しようとすると、CONFIG_NOT_AVAILABLE エラーが返されます。

ユーザーがサーバーレス コンピューティングでサポートされていない Spark 構成を変更しようとした場合のエラー メッセージを示すスクリーンショット。

代わりに、MSAL を使用して OAuth トークンを取得し、Python deltalake ライブラリを使用して、そのトークンを使用して Delta テーブルを読み書きします。

サーバーレス ノートブックを設定する

  1. Databricks ワークスペースにノートブックを作成し、サーバーレス コンピューティングにアタッチします。

    Databricks ノートブックをサーバーレス コンピューティングに接続する方法を示すスクリーンショット。

  2. Pythonモジュールをインポートします。 このサンプルでは、次の 2 つのモジュールを使用します。

    • msal はMicrosoft ID プラットフォームで認証します。
    • deltalake Pythonを使用して Delta Lake テーブルを読み書きします。
    from msal import ConfidentialClientApplication
    from deltalake import DeltaTable, write_deltalake
    
  3. アプリケーション ID を含む Microsoft Entra テナントの変数を宣言します。 Microsoft Fabric がデプロイされているテナントのテナント ID を使用します。

    # Fetch from Databricks secrets.
    tenant_id = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for tenant_id>")
    client_id = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for client_id>")
    client_secret = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for secret>")
    
  4. Fabric ワークスペース変数を宣言します。

    workspace_id = "<replace with workspace name>"
    lakehouse_id = "<replace with lakehouse name>"
    table_to_read = "<name of lakehouse table to read>"
    onelake_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}.lakehouse/Tables/{table_to_read}"
    
  5. トークンを取得するためにクライアントを初期化します。

    authority = f"https://login.microsoftonline.com/{tenant_id}"
    
    app = ConfidentialClientApplication(
        client_id,
        authority=authority,
        client_credential=client_secret
    )
    
    result = app.acquire_token_for_client(scopes=["https://onelake.fabric.microsoft.com/.default"])
    
    if "access_token" in result:
        print("Access token acquired.")
        token_val = result['access_token']
    else:
        raise Exception(f"Failed to acquire token: {result.get('error_description', result)}")
    
  6. OneLake から Delta テーブルを読み取る。

    dt = DeltaTable(onelake_uri, storage_options={"bearer_token": f"{token_val}", "use_fabric_endpoint": "true"})
    df = dt.to_pandas()
    print(df.head())
    
  7. Delta テーブルを OneLake に書き込みます。

    target_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}.lakehouse/Tables/<target_table_name>"
    write_deltalake(
        target_uri,
        df,
        mode="overwrite",
        storage_options={"bearer_token": f"{token_val}", "use_fabric_endpoint": "true"}
    )
    

設計上の考慮事項

  • 可能な場合は、テーブル パスごとに 1 つのライター パターンを使用します。 複数のコンピューティング エンジンまたはランタイム バージョンから同じストレージ パスに書き込むと、競合が発生する可能性があります。
  • サービス プリンシパルの資格情報にはシークレット管理を使用します。
  • 別のレイクハウスの場所にデータを物理的に書き込むのではなく、仮想化されたアクセスが必要な場合は、 OneLake ショートカット を使用します。