この記事では、Azure Databricks から
- Standard またはジョブ クラスター: OAuth 構成の Spark ABFS ドライバーを使用して、Spark DataFrames を介して直接データの読み取りと書き込みを行います。
-
サーバーレス コンピューティング: サーバーレス ランタイムでは、カスタム Spark 構成プロパティを設定できません。 代わりに、Microsoft Authentication Library (MSAL) と Python
deltalakeライブラリを使用して、Delta テーブルの認証と読み取りまたは書き込みを行います。
関連する Databricks 統合シナリオについては、次のリソースを参照してください。
| Scenario | ドキュメント |
|---|---|
| コピーせずに Unity カタログから OneLake データにクエリを実行する | OneLake カタログフェデレーションを有効にする |
| Fabricから Databricks Unity カタログ データにアクセスする | Azure Databricks Unity カタログのミラーリング |
[前提条件]
接続する前に、次の内容を確認してください。
- ファブリック ワークスペースとレイクハウス。
- Premium Azure Databricks ワークスペース。
- 少なくとも 共同作成者 ワークスペース ロールが割り当てられているサービス プリンシパル。
- シークレットを格納および取得するための Databricks シークレットまたは Azure Key Vault (AKV)。 この記事の例では、Databricks シークレットを使用します。
標準クラスターを使用して OneLake に接続する
正しい OneLake ABFS パス形式を使用する
次のいずれかの URI 形式を使用します。
abfss://<workspace_id_or_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_id_or_name>.lakehouse/Files/<path>abfss://<workspace_id_or_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_id_or_name>.lakehouse/Tables/<path>
ID または名前を使用できます。 名前を使用する場合は、ワークスペース名とレイクハウス名に特殊文字と空白文字を使用しないでください。
サービス プリンシパル認証を使用する
このオプションは、自動化されたジョブと一元化されたシークレットローテーションに使用します。
workspace_name = "<workspace_name>"
lakehouse_name = "<lakehouse_name>"
tenant_id = dbutils.secrets.get(scope="<scope-name>", key="<tenant-id-key>")
service_principal_id = dbutils.secrets.get(scope="<scope-name>", key="<client-id-key>")
service_principal_secret = dbutils.secrets.get(scope="<scope-name>", key="<client-secret-key>")
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set(
"fs.azure.account.oauth.provider.type",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
)
spark.conf.set("fs.azure.account.oauth2.client.id", service_principal_id)
spark.conf.set("fs.azure.account.oauth2.client.secret", service_principal_secret)
spark.conf.set(
"fs.azure.account.oauth2.client.endpoint",
f"https://login.microsoftonline.com/{tenant_id}/oauth2/token",
)
# Read
df = spark.read.format("parquet").load(
f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.lakehouse/Files/data"
)
df.show(10)
# Write
df.write.format("delta").mode("overwrite").save(
f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.lakehouse/Tables/dbx_delta_spn"
)
サーバーレス コンピューティングを使用して OneLake に接続する
Databricks サーバーレス コンピューティング を使用すると、クラスターをプロビジョニングせずにワークロードを実行できますが、 サポートされている Spark プロパティのサブセットのみが許可されます。 標準クラスターで使用される fs.azure.* Spark 構成を設定することはできません。
注
この制限は、Azure Databricks に固有の制限ではありません。 アマゾン ウェブ サービス (AWS) と Google Cloud での Databricks サーバーレス実装の動作は同じです。
サーバーレス ノートブックでサポートされていない Spark 構成を設定しようとすると、CONFIG_NOT_AVAILABLE エラーが返されます。
代わりに、MSAL を使用して OAuth トークンを取得し、Python deltalake ライブラリを使用して、そのトークンを使用して Delta テーブルを読み書きします。
サーバーレス ノートブックを設定する
Databricks ワークスペースにノートブックを作成し、サーバーレス コンピューティングにアタッチします。
Pythonモジュールをインポートします。 このサンプルでは、次の 2 つのモジュールを使用します。
- msal はMicrosoft ID プラットフォームで認証します。
- deltalake Pythonを使用して Delta Lake テーブルを読み書きします。
from msal import ConfidentialClientApplication from deltalake import DeltaTable, write_deltalakeアプリケーション ID を含む Microsoft Entra テナントの変数を宣言します。 Microsoft Fabric がデプロイされているテナントのテナント ID を使用します。
# Fetch from Databricks secrets. tenant_id = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for tenant_id>") client_id = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for client_id>") client_secret = dbutils.secrets.get(scope="<replace-scope-name>",key="<replace value with key value for secret>")Fabric ワークスペース変数を宣言します。
workspace_id = "<replace with workspace name>" lakehouse_id = "<replace with lakehouse name>" table_to_read = "<name of lakehouse table to read>" onelake_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}.lakehouse/Tables/{table_to_read}"トークンを取得するためにクライアントを初期化します。
authority = f"https://login.microsoftonline.com/{tenant_id}" app = ConfidentialClientApplication( client_id, authority=authority, client_credential=client_secret ) result = app.acquire_token_for_client(scopes=["https://onelake.fabric.microsoft.com/.default"]) if "access_token" in result: print("Access token acquired.") token_val = result['access_token'] else: raise Exception(f"Failed to acquire token: {result.get('error_description', result)}")OneLake から Delta テーブルを読み取る。
dt = DeltaTable(onelake_uri, storage_options={"bearer_token": f"{token_val}", "use_fabric_endpoint": "true"}) df = dt.to_pandas() print(df.head())Delta テーブルを OneLake に書き込みます。
target_uri = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}.lakehouse/Tables/<target_table_name>" write_deltalake( target_uri, df, mode="overwrite", storage_options={"bearer_token": f"{token_val}", "use_fabric_endpoint": "true"} )
設計上の考慮事項
- 可能な場合は、テーブル パスごとに 1 つのライター パターンを使用します。 複数のコンピューティング エンジンまたはランタイム バージョンから同じストレージ パスに書き込むと、競合が発生する可能性があります。
- サービス プリンシパルの資格情報にはシークレット管理を使用します。
- 別のレイクハウスの場所にデータを物理的に書き込むのではなく、仮想化されたアクセスが必要な場合は、 OneLake ショートカット を使用します。