Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
You can fully automate the deployment of your Eventhouses with KQL Databases using APIs. Fabric APIs allow you to create, update, and delete items within your workspace. You can manage your Eventhouses and Databases by performing actions such as creating tables and changing policies using one of the following methods:
- Fabric API with definition: You can specify a database schema script as part of the KQL Database definition to configure your database.
- Kusto API: You can use the Kusto API to execute management commands to configure your database.
In this article, you learn to:
- Set up your environment
- Create an eventhouse
- Create a KQL database and schema
- Monitor the operation for completion
Prerequisites
- A workspace with a Microsoft Fabric-enabled capacity.
- Your workspace ID. For more information, see Identify your workspace ID.
Choose the right method
When choosing the right method to manage your Eventhouse and KQL Database, consider these points:
- Fabric API with definition: Use this method if you want to define the schema of your database as part of the database definition. This method is useful when you want to define the schema of your database using a single consistent API for the entire deployment.
- Kusto API: Use this method if you want to execute management commands to configure your database. This method is useful when you want to execute management commands to configure your database.
Set up your environment
For this article, you use Fabric notebooks to run python code snippets. Use the sempy.fabric package in semantic-link Python package to make API calls using your credentials. The API calls and payload are identical, regardless of the tool you use.
Setting up your environment:
Navigate to an existing notebook or create a new one.
In a code cell, enter code to import the packages:
!pip install semantic-link --q import sempy.fabric as fabric import time import uuid import base64 import json
Set up your client to make the API calls and set a variable for your workspace ID and a UUID to ensure the names are unique:
client = fabric.FabricRestClient() workspace_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee' uuid = uuid.uuid4()
Create an eventhouse
Add a variable for the eventhouse name.
eventhouse_name = f"{'SampleEventhouse'}_{uuid}"
Use the Fabric Create Eventhouse API to create a new Eventhouse. Set the Eventhouse ID in a variable:
url = f"v1/workspaces/{workspace_id}/eventhouses" payload = { "displayName": f"{eventhouse_name}" } response = client.post(url, json=payload) eventhouse_id = response.json()['id']
Create a KQL database and schema
The Fabric Create KQL Database API uses item definitions for database properties and schemas that require base64 strings. The properties set the database level retention policies and database schema script contains the commands to run to create database entities.
Create the database properties definition
Create the base64 string for the database properties. The database properties set the database level retention policies. You use the definition as part of the database creation API call to create a new KQL database.
Add variables for configuring the KQL database.
database_name = f"{'SampleDatabase'}_{uuid}" database_cache = "3d" database_storage = "30d"
Create a base64 string for the database properties:
database_properties = { "databaseType": "ReadWrite", "parentEventhouseItemId": f"{eventhouse_id}", "oneLakeCachingPeriod": f"{database_cache}", "oneLakeStandardStoragePeriod": f"{database_storage}" } database_properties = json.dumps(database_properties) database_properties_string = database_properties.encode('utf-8') database_properties_bytes = base64.b64encode(database_properties_string) database_properties_string = database_properties_bytes.decode('utf-8')
Create the database schema definition
Create the base64 string for the database schema. The database schema script contains the commands to run to create database entities. You use the definition as part of the database creation API call to create a new KQL database.
Create a base64 string for the database schema:
database_schema=""".create-merge table T(a:string, b:string)
.alter table T policy retention @'{"SoftDeletePeriod":"10.00:00:00","Recoverability":"Enabled"}'
.alter table T policy caching hot = 3d
"""
database_schema_string = database_schema.encode('utf-8')
database_schema_bytes = base64.b64encode(database_schema_string)
database_schema_string = database_schema_bytes.decode('utf-8')
Run the database creation API
Use the Fabric Create KQL Database API to create a new KQL database with the retention policies and schema you defined.
url = f"v1/workspaces/{workspace_id}/kqlDatabases"
payload = {
"displayName": f"{database_name}",
"definition": {
"parts": [
{
"path": "DatabaseProperties.json",
"payload": f"{database_properties_string}",
"payloadType": "InlineBase64"
},
{
"path": "DatabaseSchema.kql",
"payload": f"{database_schema_string}",
"payloadType": "InlineBase64"
}
]
}
}
response = client.post(url, json=payload)
Monitor the operation for completion
Creating an item with a definition is a long-running operation that runs asynchronously. You can monitor the operation using status_code and location information in the response object from the Create KQL Database API call, as follows:
print(f"Create request status code: {response.status_code}")
print(response.headers['Location'])
async_result_polling_url = response.headers['Location']
while True:
async_response = client.get(async_result_polling_url)
async_status = async_response.json().get('status').lower()
print(f"Long running operation status: {async_status}")
if async_status != 'running':
break
time.sleep(3)
print(f"Long running operation reached terminal state: '{async_status}'")
if async_status == 'succeeded':
print("The operation completed successfully.")
final_result_url= async_response.headers['Location']
final_result = client.get(final_result_url)
print(f"Final result: {final_result.json()}")
elif async_status == 'failed':
print("The operation failed.")
else:
print(f"The operation is in an unexpected state: {status}")