API を使用して、KQL データベースを使用して Eventhouses のデプロイを完全に自動化できます。 Fabric API を使用すると、ワークスペース内のアイテムを作成、更新、削除できます。 次のいずれかの方法を使用して、テーブルの作成やポリシーの変更などのアクションを実行することで、Eventhouse とデータベースを管理できます。
- Fabric API と定義: データベースを構成するための KQL データベース定義 の一部として、データベース スキーマ スクリプトを指定できます。
- Kusto API: Kusto API を使用して、データベースを構成 管理コマンドを実行できます。
この記事では、次の方法について説明します。
- 環境を設定する
- イベントハウスを作成する
- KQL データベースとスキーマを作成する
- 操作の完了を監視する
前提 条件
- Microsoft Fabric 対応の 容量を持つ ワークスペース。
- あなたのワークスペースID。 詳細については、「ワークスペース IDを識別する」を参照してください。
適切な方法を選択する
Eventhouse と KQL データベースを管理するための適切な方法を選択する場合は、次の点を考慮してください。
- Fabric API と定義: データベース定義の一部としてデータベースのスキーマを定義する場合は、このメソッドを使用します。 この方法は、デプロイ全体で 1 つの一貫性のある API を使用してデータベースのスキーマを定義する場合に便利です。
- Kusto API: 管理コマンドを実行してデータベースを構成する場合は、このメソッドを使用します。 この方法は、管理コマンドを実行してデータベースを構成する場合に便利です。
環境を設定する
この記事では、Fabric ノートブックを使用して python コード スニペット実行します。 Python パッケージ semantic-link 内の sempy.fabric パッケージを使用して、資格情報を使用した API 呼び出しを行います。 使用するツールに関係なく、API 呼び出しとペイロードは同じです。
環境のセットアップ:
既存のノートブックに移動するか、新しいノートブックを作成します。
コード セルに、パッケージをインポートするコードを入力します。
!pip install semantic-link --q import sempy.fabric as fabric import time import uuid import base64 import json
API 呼び出しを行い、ワークスペース ID の変数と UUID を設定して、名前が一意になるようにクライアントを設定します。
client = fabric.FabricRestClient() workspace_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee' uuid = uuid.uuid4()
イベントハウスを作成する
イベントハウス名の変数を追加します。
eventhouse_name = f"{'SampleEventhouse'}_{uuid}"
Fabric Create Eventhouse API を使用して、新しい Eventhouse を作成します。 変数に Eventhouse ID を設定します。
url = f"v1/workspaces/{workspace_id}/eventhouses" payload = { "displayName": f"{eventhouse_name}" } response = client.post(url, json=payload) eventhouse_id = response.json()['id']
KQL データベースとスキーマを作成する
Fabric Create KQL Database API では、base64 文字列を必要とするデータベース プロパティとスキーマ 項目定義が使用されます。 プロパティはデータベース レベルの保持ポリシーを設定し、データベース スキーマ スクリプトにはデータベース エンティティを作成するために実行するコマンドが含まれています。
データベース プロパティ定義を作成する
データベース プロパティの base64 文字列を作成します。 データベース のプロパティは、データベース レベルの保持ポリシーを設定します。 この定義は、データベース作成 API 呼び出しの一部として使用して、新しい KQL データベースを作成します。
KQL データベースを構成するための変数を追加します。
database_name = f"{'SampleDatabase'}_{uuid}" database_cache = "3d" database_storage = "30d"
データベース プロパティの base64 文字列を作成します。
database_properties = { "databaseType": "ReadWrite", "parentEventhouseItemId": f"{eventhouse_id}", "oneLakeCachingPeriod": f"{database_cache}", "oneLakeStandardStoragePeriod": f"{database_storage}" } database_properties = json.dumps(database_properties) database_properties_string = database_properties.encode('utf-8') database_properties_bytes = base64.b64encode(database_properties_string) database_properties_string = database_properties_bytes.decode('utf-8')
データベース スキーマ定義を作成する
データベース スキーマの base64 文字列を作成します。 データベース スキーマ スクリプトには、データベース エンティティを作成するために実行するコマンドが含まれています。 この定義は、データベース作成 API 呼び出しの一部として使用して、新しい KQL データベースを作成します。
データベース スキーマの base64 文字列を作成します。
database_schema=""".create-merge table T(a:string, b:string)
.alter table T policy retention @'{"SoftDeletePeriod":"10.00:00:00","Recoverability":"Enabled"}'
.alter table T policy caching hot = 3d
"""
database_schema_string = database_schema.encode('utf-8')
database_schema_bytes = base64.b64encode(database_schema_string)
database_schema_string = database_schema_bytes.decode('utf-8')
データベース作成 API を実行する
Fabric Create KQL Database API を使用して、定義した保持ポリシーとスキーマを基に新しい KQL データベースを作成します。
url = f"v1/workspaces/{workspace_id}/kqlDatabases"
payload = {
"displayName": f"{database_name}",
"definition": {
"parts": [
{
"path": "DatabaseProperties.json",
"payload": f"{database_properties_string}",
"payloadType": "InlineBase64"
},
{
"path": "DatabaseSchema.kql",
"payload": f"{database_schema_string}",
"payloadType": "InlineBase64"
}
]
}
}
response = client.post(url, json=payload)
操作の完了を監視する
定義を使用して項目を作成することは、非同期的に実行される実行時間の長い操作です。 次のように、Create KQL Database API 呼び出しからの応答オブジェクト内の status_code と ロケーション の情報を使用して、操作を監視できます。
print(f"Create request status code: {response.status_code}")
print(response.headers['Location'])
async_result_polling_url = response.headers['Location']
while True:
async_response = client.get(async_result_polling_url)
async_status = async_response.json().get('status').lower()
print(f"Long running operation status: {async_status}")
if async_status != 'running':
break
time.sleep(3)
print(f"Long running operation reached terminal state: '{async_status}'")
if async_status == 'succeeded':
print("The operation completed successfully.")
final_result_url= async_response.headers['Location']
final_result = client.get(final_result_url)
print(f"Final result: {final_result.json()}")
elif async_status == 'failed':
print("The operation failed.")
else:
print(f"The operation is in an unexpected state: {status}")