إنشاء اتصال بيانات مركز الأحداث لمستكشف بيانات Azure Synapse باستخدام Python (معاينة)
مستكشف البيانات Azure Synapse عبارة عن خدمة استكشاف بيانات سريعة وقابلة للتطوير بدرجة كبيرة لبيانات السجل وتتبع الاستخدام. يوفر Azure Synapse Data Explorer الاستيعاب (تحميل البيانات) من مراكز الأحداث ومراكز IoT والكائنات الثنائية كبيرة الحجم المكتوبة إلى حاويات الكائن الثنائي كبير الحجم.
في هذه المقالة، يمكنك إنشاء اتصال بيانات مركز الأحداث لمستكشف البيانات Azure Synapse باستخدام Python.
المتطلبات الأساسية
اشتراك Azure. لدي حساب Azure مجاني.
إنشاء تجمع مستكشف بيانات باستخدام أستوديو Synapse أو مدخل Azure
إنشاء قاعدة بيانات Data Explorer.
في Synapse Studio، في الجزء الأيسر، حدد البيانات.
حدد +(Add new resource)> Data Explorer pool، واستخدم المعلومات التالية:
الإعدادات القيمة المقترحة الوصف اسم التجمع contosodataexplorer اسم تجمع Data Explorer لاستخدامه الاسم TestDatabase يجب أن يكون اسم قاعدة البيانات فريدًا داخل الكتلة. فترة الاستبقاء الافتراضية 365 الفترة الزمنية (بالأيام) التي تضمن لها الاحتفاظ بالبيانات متاحة للاستعلام. يتم قياس الفترة الزمنية من وقت تناول البيانات. فترة التخزين المؤقت الافتراضية 31 الفترة الزمنية (بالأيام) التي يجب الاحتفاظ بالبيانات التي يتم الاستعلام عنها بشكل متكرر والمتوفرة في تخزين SSD أو ذاكرة الوصول العشوائي ، بدلًا من التخزين على المدى الطويل. حدد Create لإنشاء قاعدة البيانات. عادة ما يستغرق الإنشاء أقل من دقيقة.
إنشاء جدول على نظام مجموعة الاختبار الخاص بك
أنشئ جدولاً باسم StormEvents
يطابق مخطط البيانات في الملفStormEvents.csv
.
تلميح
تنشئ قصاصات التعليمات البرمجية التالية مثيل عميل لكل استدعاء تقريبًا. يتم ذلك لجعل كل قصاصة برمجية قابلة للتشغيل بصورة فردية. في الإنتاج، تتم إعادة إدخال مثيلات العميل، ويجب الاحتفاظ بها طالما لزم الأمر. يكفي وجود مثيل عميل واحد لكل URI، حتى عند العمل مع قواعد بيانات متعددة (يمكن تحديد قاعدة البيانات على مستوى الأمر).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
تعريف تعيين الاستيعاب
تعيين بيانات CSV الواردة إلى أسماء الأعمدة المستخدمة عند إنشاء الجدول. توفير عنصر تخطيط عمود CSV على هذا الجدول.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
تثبيت حزمة Python
لتثبيت حزمة Python لمستكشف بيانات Azure Synapse، افتح موجه أوامر يحتوي على Python في مساره. شغّل الأمر التالي:
pip install azure-common
pip install azure-mgmt-kusto
المصادقة
لتشغيل المثال التالي، تحتاج إلى تطبيق Microsoft Entra ومدير الخدمة الذي يمكنه الوصول إلى الموارد. لإنشاء تطبيق Microsoft Entra مجاني وإضافة تعيين دور على مستوى الاشتراك، راجع إنشاء تطبيق Microsoft Entra. تحتاج أيضا إلى معرف الدليل (المستأجر) ومعرف التطبيق وسر العميل.
أضف اتصال بيانات مركز الأحداث
يبين لك المثال التالي كيفية إضافة اتصال قاعدة بيانات Event Hub برمجيًا. راجع الاتصال بمركز الأحداث لإضافة اتصال بيانات مركز الأحداث باستخدام مدخل Microsoft Azure.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
الإعداد | القيمة المقترحة | وصف الحقل |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | معرّف المستأجر الخاص بك. يعرف أيضا باسم معرف الدليل. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | معرف الاشتراك الذي تستخدمه عند إنشاء الموارد. |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | معرف العميل للتطبيق الذي بإمكانه الوصول إلى الموارد في المستأجر الخاص بك. |
client_secret | xxxxxxxxxxxxxx | سر العميل للتطبيق الذي بإمكانه الوصول إلى الموارد في المستأجر الخاص بك. |
resource_group_name | testrg | اسم مجموعة الموارد التي تحتوي على نظام المجموعة الخاص بك. |
cluster_name | mykustocluster | اسم نظام المجموعة الخاص بك. |
database_name | mykustodatabase | اسم قاعدة البيانات الهدف في نظام المجموعة الخاص بك. |
data_connection_name | myeventhubconnect | الاسم المطلوب لاتصال قاعدة البيانات الخاص بك. |
table_name | StormEvents | اسم الجدول الهدف في قاعدة البيانات الهدف. |
mapping_rule_name | StormEvents_CSV_Mapping | اسم تخطيط العمود الخاص بك المرتبط بالجدول الهدف. |
data_format | csv | تنسيق البيانات الخاصة بالرسالة. |
event_hub_resource_id | معرف المورد | معرف المورد لمركز الأحداث الذي يحتوي على البيانات لاستيعابها. |
consumer_group | $Default | مجموعة المستهلكين في مركز الأحداث الخاص بك. |
موقع | وسط الولايات المتحدة | موقع مورد اتصال قاعدة البيانات. |
تنظيف الموارد
لحذف اتصال البيانات، استخدم الأمر التالي:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)