อ่านในภาษาอังกฤษ

แชร์ผ่าน


ปรับใช้อีเวนต์เฮ้าส์โดยใช้ Fabric API

คุณสามารถปรับใช้อีเวนต์เฮ้าส์ของคุณโดยอัตโนมัติด้วยฐานข้อมูล KQL โดยใช้ API Fabric API ช่วยให้คุณสามารถสร้าง อัปเดต และลบรายการภายในพื้นที่ทํางานของคุณได้ คุณสามารถจัดการ Eventhouses และฐานข้อมูลของคุณได้โดยการดําเนินการต่าง ๆ เช่น การสร้างตารางและการเปลี่ยนนโยบายโดยใช้วิธีใดวิธีหนึ่งต่อไปนี้:

  • Fabric API ที่มีข้อกําหนด : คุณสามารถระบุสคริปต์ Schema ฐานข้อมูลให้เป็นส่วนหนึ่งของข้อกําหนด KQL Database เพื่อกําหนดค่าฐานข้อมูลของคุณได้
  • Kusto API: คุณสามารถใช้ Kusto API เพื่อดําเนินการคําสั่งการจัดการ เพื่อกําหนดค่าฐานข้อมูลของคุณได้

ในบทความนี้ คุณจะได้เรียนรู้วิธีการ:

  • ตั้งค่าสภาพแวดล้อมของคุณ
  • สร้างอีเวนต์เฮ้าส์
  • สร้างฐานข้อมูล KQL และ schema
  • ตรวจสอบการดําเนินการสําหรับการดําเนินการให้เสร็จสมบูรณ์

ข้อกําหนดเบื้องต้น

เลือกวิธีการที่เหมาะสม

เมื่อเลือกวิธีที่เหมาะสมในการจัดการฐานข้อมูล Eventhouse และ KQL ให้พิจารณาประเด็นเหล่านี้:

  • Fabric API ที่มีข้อกําหนด: ใช้วิธีนี้ถ้าคุณต้องการกําหนด schema ของฐานข้อมูลของคุณให้เป็นส่วนหนึ่งของข้อกําหนดของฐานข้อมูล วิธีนี้มีประโยชน์เมื่อคุณต้องการกําหนด Schema ของฐานข้อมูลของคุณโดยใช้ API ที่สอดคล้องกันเพียงครั้งเดียวสําหรับการปรับใช้ทั้งหมด
  • Kusto API: ใช้วิธีนี้ถ้าคุณต้องการดําเนินการคําสั่งการจัดการเพื่อกําหนดค่าฐานข้อมูลของคุณ วิธีนี้มีประโยชน์เมื่อคุณต้องการดําเนินการคําสั่งการจัดการเพื่อกําหนดค่าฐานข้อมูลของคุณ

ตั้งค่าสภาพแวดล้อมของคุณ

สําหรับบทความนี้ คุณใช้สมุดบันทึกสําหรับ Fabric เพื่อเรียกใช้ python ส่วนย่อยของโค้ด ใช้แพคเกจ sempy.fabric ใน ลิงก์ความหมาย แพคเกจ Python เพื่อเรียกใช้ API โดยใช้ข้อมูลประจําตัวของคุณ การเรียกใช้ API และเพย์โหลดจะเหมือนกัน โดยไม่คํานึงถึงเครื่องมือที่คุณใช้

การตั้งค่าสภาพแวดล้อมของคุณ:

  1. นําทางไปยังสมุดบันทึกที่มีอยู่ หรือสร้างขึ้นใหม่

  2. ในเซลล์โค้ด ให้ป้อนรหัสเพื่อนําเข้าแพคเกจ:

    !pip install semantic-link --q
    
    import sempy.fabric as fabric
    import time
    import uuid
    import base64
    import json
    
  3. ตั้งค่าไคลเอ็นต์ของคุณเพื่อเรียกใช้ API และตั้งค่าตัวแปรสําหรับ ID พื้นที่ทํางานของคุณและ UUID เพื่อให้แน่ใจว่าชื่อไม่ซ้ํากัน:

    client = fabric.FabricRestClient()
    workspace_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
    uuid = uuid.uuid4()
    

สร้างอีเวนต์เฮ้าส์

  1. เพิ่มตัวแปรสําหรับชื่ออีเวนต์เฮ้าส์

    eventhouse_name = f"{'SampleEventhouse'}_{uuid}"
    
  2. ใช้ Fabric Create Eventhouse API เพื่อสร้าง Eventhouse ใหม่ ตั้งค่า ID Eventhouse ในตัวแปร:

    url = f"v1/workspaces/{workspace_id}/eventhouses"
    payload = {
      "displayName": f"{eventhouse_name}"
    }
    
    response = client.post(url, json=payload)
    eventhouse_id = response.json()['id']
    

สร้างฐานข้อมูล KQL และ schema

Fabric Create KQL Database API ใช้ข้อกําหนดรายการ สําหรับคุณสมบัติฐานข้อมูลและ schema ที่จําเป็นต้องมีสตริง base64 คุณสมบัติตั้งค่านโยบายการเก็บข้อมูลระดับฐานข้อมูลและสคริปต์ Schema ของฐานข้อมูลประกอบด้วยคําสั่งที่จะเรียกใช้เพื่อสร้างเอนทิตีฐานข้อมูล

สร้างข้อกําหนดคุณสมบัติของฐานข้อมูล

สร้างสตริง base64 สําหรับคุณสมบัติฐานข้อมูล คุณสมบัติฐานข้อมูลตั้งค่านโยบายการเก็บข้อมูลระดับฐานข้อมูล คุณใช้ข้อกําหนดเป็นส่วนหนึ่งของการเรียกใช้ API การสร้างฐานข้อมูลเพื่อสร้างฐานข้อมูล KQL ใหม่

  1. เพิ่มตัวแปรสําหรับการกําหนดค่าฐานข้อมูล KQL

    database_name = f"{'SampleDatabase'}_{uuid}"
    database_cache = "3d"
    database_storage = "30d"
    
  2. สร้างสตริง base64 สําหรับคุณสมบัติฐานข้อมูล:

    database_properties = {
      "databaseType": "ReadWrite",
      "parentEventhouseItemId": f"{eventhouse_id}",
      "oneLakeCachingPeriod": f"{database_cache}",
      "oneLakeStandardStoragePeriod": f"{database_storage}"
    }
    database_properties = json.dumps(database_properties)
    
    database_properties_string = database_properties.encode('utf-8')
    database_properties_bytes = base64.b64encode(database_properties_string)
    database_properties_string = database_properties_bytes.decode('utf-8')
    

สร้างข้อกําหนด Schema ของฐานข้อมูล

สร้างสตริง base64 สําหรับ schema ของฐานข้อมูล สคริปต์ Schema ฐานข้อมูลประกอบด้วยคําสั่งที่จะเรียกใช้เพื่อสร้างเอนทิตีฐานข้อมูล คุณใช้ข้อกําหนดเป็นส่วนหนึ่งของการเรียกใช้ API การสร้างฐานข้อมูลเพื่อสร้างฐานข้อมูล KQL ใหม่

สร้างสตริง base64 สําหรับ Schema ของฐานข้อมูล:

database_schema=""".create-merge table T(a:string, b:string)
.alter table T policy retention @'{"SoftDeletePeriod":"10.00:00:00","Recoverability":"Enabled"}'
.alter table T policy caching hot = 3d
"""

database_schema_string = database_schema.encode('utf-8')
database_schema_bytes = base64.b64encode(database_schema_string)
database_schema_string = database_schema_bytes.decode('utf-8')

เรียกใช้ API การสร้างฐานข้อมูล

ใช้ Fabric Create KQL Database API เพื่อสร้างฐานข้อมูล KQL ใหม่ด้วยนโยบายการเก็บข้อมูลและ schema ที่คุณกําหนด

url = f"v1/workspaces/{workspace_id}/kqlDatabases"

payload = {
  "displayName": f"{database_name}",
  "definition": {
    "parts": [
      {
        "path": "DatabaseProperties.json",
        "payload": f"{database_properties_string}",
        "payloadType": "InlineBase64"
      },
      {
        "path": "DatabaseSchema.kql",
        "payload": f"{database_schema_string}",
        "payloadType": "InlineBase64"
      }
    ]
  }
}

response = client.post(url, json=payload)

ตรวจสอบการดําเนินการสําหรับการดําเนินการให้เสร็จสมบูรณ์

การสร้างรายการที่มีข้อกําหนดเป็นการดําเนินการที่ใช้เวลานานซึ่งทํางานแบบอะซิงโครนัส คุณสามารถตรวจสอบการดําเนินการโดยใช้ status_code และตําแหน่งที่ตั้ง ข้อมูลในวัตถุการตอบสนองจากการเรียกใช้ API สร้างฐานข้อมูล KQL ดังนี้:

print(f"Create request status code: {response.status_code}")
print(response.headers['Location'])
async_result_polling_url = response.headers['Location']

while True:
  async_response = client.get(async_result_polling_url)
  async_status = async_response.json().get('status').lower()
  print(f"Long running operation status: {async_status}")
  if async_status != 'running':
    break

  time.sleep(3)

print(f"Long running operation reached terminal state: '{async_status}'")

if async_status == 'succeeded':
  print("The operation completed successfully.")
  final_result_url= async_response.headers['Location']
  final_result = client.get(final_result_url)
  print(f"Final result: {final_result.json()}")
elif async_status == 'failed':
  print("The operation failed.")
else:
  print(f"The operation is in an unexpected state: {status}")