إنشاء اتصال بيانات مركز الأحداث لمستكشف بيانات Azure Synapse باستخدام Python (معاينة)

مستكشف البيانات Azure Synapse عبارة عن خدمة استكشاف بيانات سريعة وقابلة للتطوير بدرجة كبيرة لبيانات السجل وتتبع الاستخدام. يوفر Azure Synapse Data Explorer الاستيعاب (تحميل البيانات) من مراكز الأحداث ومراكز IoT والكائنات الثنائية كبيرة الحجم المكتوبة إلى حاويات الكائن الثنائي كبير الحجم.

في هذه المقالة، يمكنك إنشاء اتصال بيانات مركز الأحداث لمستكشف البيانات Azure Synapse باستخدام Python.

المتطلبات الأساسية

  • اشتراك Azure. لدي حساب Azure مجاني.

  • إنشاء تجمع مستكشف بيانات باستخدام أستوديو Synapse أو مدخل Azure

  • إنشاء قاعدة بيانات Data Explorer.

    1. في Synapse Studio، في الجزء الأيسر، حدد البيانات.

    2. حدد +(Add new resource)> Data Explorer pool، واستخدم المعلومات التالية:

      الإعدادات القيمة المقترحة ‏‏الوصف
      اسم التجمع contosodataexplorer اسم تجمع Data Explorer لاستخدامه
      الاسم TestDatabase يجب أن يكون اسم قاعدة البيانات فريدًا داخل الكتلة.
      فترة الاستبقاء الافتراضية 365 الفترة الزمنية (بالأيام) التي تضمن لها الاحتفاظ بالبيانات متاحة للاستعلام. يتم قياس الفترة الزمنية من وقت تناول البيانات.
      فترة التخزين المؤقت الافتراضية 31 الفترة الزمنية (بالأيام) التي يجب الاحتفاظ بالبيانات التي يتم الاستعلام عنها بشكل متكرر والمتوفرة في تخزين SSD أو ذاكرة الوصول العشوائي ، بدلًا من التخزين على المدى الطويل.
    3. حدد Create لإنشاء قاعدة البيانات. عادة ما يستغرق الإنشاء أقل من دقيقة.

إنشاء جدول على نظام مجموعة الاختبار الخاص بك

أنشئ جدولاً باسم StormEvents يطابق مخطط البيانات في الملفStormEvents.csv.

تلميح

تنشئ قصاصات التعليمات البرمجية التالية مثيل عميل لكل استدعاء تقريبًا. يتم ذلك لجعل كل قصاصة برمجية قابلة للتشغيل بصورة فردية. في الإنتاج، تتم إعادة إدخال مثيلات العميل، ويجب الاحتفاظ بها طالما لزم الأمر. يكفي وجود مثيل عميل واحد لكل URI، حتى عند العمل مع قواعد بيانات متعددة (يمكن تحديد قاعدة البيانات على مستوى الأمر).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

تعريف تعيين الاستيعاب

تعيين بيانات CSV الواردة إلى أسماء الأعمدة المستخدمة عند إنشاء الجدول. توفير عنصر تخطيط عمود CSV على هذا الجدول.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

تثبيت حزمة Python

لتثبيت حزمة Python لمستكشف بيانات Azure Synapse، افتح موجه أوامر يحتوي على Python في مساره. شغّل الأمر التالي:

pip install azure-common
pip install azure-mgmt-kusto

المصادقة

لتشغيل المثال التالي، تحتاج إلى تطبيق Microsoft Entra ومدير الخدمة الذي يمكنه الوصول إلى الموارد. لإنشاء تطبيق Microsoft Entra مجاني وإضافة تعيين دور على مستوى الاشتراك، راجع إنشاء تطبيق Microsoft Entra. تحتاج أيضا إلى معرف الدليل (المستأجر) ومعرف التطبيق وسر العميل.

أضف اتصال بيانات مركز الأحداث

يبين لك المثال التالي كيفية إضافة اتصال قاعدة بيانات Event Hub برمجيًا. راجع الاتصال بمركز الأحداث لإضافة اتصال بيانات مركز الأحداث باستخدام مدخل Microsoft Azure.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
الإعداد القيمة المقترحة وصف الحقل
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx معرّف المستأجر الخاص بك. يعرف أيضا باسم معرف الدليل.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx معرف الاشتراك الذي تستخدمه عند إنشاء الموارد.
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx معرف العميل للتطبيق الذي بإمكانه الوصول إلى الموارد في المستأجر الخاص بك.
client_secret xxxxxxxxxxxxxx سر العميل للتطبيق الذي بإمكانه الوصول إلى الموارد في المستأجر الخاص بك.
resource_group_name testrg اسم مجموعة الموارد التي تحتوي على نظام المجموعة الخاص بك.
cluster_name mykustocluster اسم نظام المجموعة الخاص بك.
database_name mykustodatabase اسم قاعدة البيانات الهدف في نظام المجموعة الخاص بك.
data_connection_name myeventhubconnect الاسم المطلوب لاتصال قاعدة البيانات الخاص بك.
table_name StormEvents اسم الجدول الهدف في قاعدة البيانات الهدف.
mapping_rule_name StormEvents_CSV_Mapping اسم تخطيط العمود الخاص بك المرتبط بالجدول الهدف.
data_format csv تنسيق البيانات الخاصة بالرسالة.
event_hub_resource_id معرف المورد معرف المورد لمركز الأحداث الذي يحتوي على البيانات لاستيعابها.
consumer_group $Default مجموعة المستهلكين في مركز الأحداث الخاص بك.
موقع وسط الولايات المتحدة موقع مورد اتصال قاعدة البيانات.

تنظيف الموارد

لحذف اتصال البيانات، استخدم الأمر التالي:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

الخطوات التالية