إنشاء اتصال قاعدة بيانات Event Hub لمستكشف بيانات Azure Synapse باستخدام C\# (إصدار أولي)

مستكشف البيانات Azure Synapse عبارة عن خدمة استكشاف بيانات سريعة وقابلة للتطوير بدرجة كبيرة لبيانات السجل وتتبع الاستخدام. يوفر Azure Synapse Data Explorer الاستيعاب (تحميل البيانات) من مراكز الأحداث ومراكز IoT والكائنات الثنائية كبيرة الحجم المكتوبة إلى حاويات الكائن الثنائي كبير الحجم.

في هذه المقالة، بإمكانك إنشاء اتصال قاعدة بيانات مركز الأحداث لمستكشف البيانات Azure Synapse باستخدام C\#.

المتطلبات الأساسية

  • اشتراك Azure. لدي حساب Azure مجاني.

  • إنشاء تجمع مستكشف بيانات باستخدام أستوديو Synapse أو مدخل Azure

  • إنشاء قاعدة بيانات Data Explorer.

    1. في Synapse Studio، في الجزء الأيسر، حدد البيانات.

    2. حدد +(Add new resource)> Data Explorer pool، واستخدم المعلومات التالية:

      الإعدادات القيمة المقترحة ‏‏الوصف
      اسم التجمع contosodataexplorer اسم تجمع Data Explorer لاستخدامه
      الاسم TestDatabase يجب أن يكون اسم قاعدة البيانات فريدًا داخل الكتلة.
      فترة الاستبقاء الافتراضية 365 الفترة الزمنية (بالأيام) التي تضمن لها الاحتفاظ بالبيانات متاحة للاستعلام. يتم قياس الفترة الزمنية من وقت تناول البيانات.
      فترة التخزين المؤقت الافتراضية 31 الفترة الزمنية (بالأيام) التي يجب الاحتفاظ بالبيانات التي يتم الاستعلام عنها بشكل متكرر والمتوفرة في تخزين SSD أو ذاكرة الوصول العشوائي ، بدلًا من التخزين على المدى الطويل.
    3. حدد Create لإنشاء قاعدة البيانات. عادة ما يستغرق الإنشاء أقل من دقيقة.

إشعار

لن يعمل استيعاب البيانات من "مركز الأحداث" في تجمعات "مستكشف البيانات" إذا كانت مساحة عمل Synapse تستخدم شبكة ظاهرية مُدارة مع تمكين حماية نقل غير مصرّح للبيانات.

  • Visual Studio 2019، قم بتنزيل واستخدام Visual Studio 2019 Community Edition المجاني. قم بتمكين تطوير Azure أثناء إعداد Visual Studio.

إنشاء جدول على نظام مجموعة الاختبار الخاص بك

أنشئ جدولاً باسم StormEvents يطابق مخطط البيانات في الملفStormEvents.csv.

تلميح

تنشئ قصاصات التعليمات البرمجية التالية مثيل عميل لكل استدعاء تقريبًا. يتم ذلك لجعل كل قصاصة برمجية قابلة للتشغيل بصورة فردية. في الإنتاج، تتم إعادة إدخال مثيلات العميل، ويجب الاحتفاظ بها طالما لزم الأمر. يكفي وجود مثيل عميل واحد لكل URI، حتى عند العمل مع قواعد بيانات متعددة (يمكن تحديد قاعدة البيانات على مستوى الأمر).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

تعريف تعيين الاستيعاب

تعيين بيانات CSV الواردة إلى أسماء الأعمدة المستخدمة عند إنشاء الجدول. توفير عنصر تخطيط عمود CSV على هذا الجدول.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

تثبيت C# NuGet

المصادقة

لتشغيل المثال التالي، تحتاج إلى تطبيق Microsoft Entra ومدير الخدمة الذي يمكنه الوصول إلى الموارد. لإنشاء تطبيق Microsoft Entra مجاني وإضافة تعيين دور على مستوى الاشتراك، راجع إنشاء تطبيق Microsoft Entra. تحتاج أيضا إلى معرف الدليل (المستأجر) ومعرف التطبيق وسر العميل.

أضف اتصال بيانات مركز الأحداث

يبين لك المثال التالي كيفية إضافة اتصال قاعدة بيانات Event Hub برمجيًا. راجع الاتصال بـ Event Hub لمعرفة معلومات عن إضافة اتصال قاعدة البيانات Event Hub باستخدام مدخل Microsoft Azure.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
الإعداد القيمة المقترحة وصف الحقل
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx معرّف المستأجر الخاص بك. يعرف أيضا باسم معرف الدليل.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx معرف الاشتراك الذي تستخدمه عند إنشاء الموارد.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx معرف العميل للتطبيق الذي بإمكانه الوصول إلى الموارد في المستأجر الخاص بك.
clientSecret xxxxxxxxxxxxxx سر العميل للتطبيق الذي بإمكانه الوصول إلى الموارد في المستأجر الخاص بك.
resourceGroupName testrg اسم مجموعة الموارد التي تحتوي على نظام المجموعة الخاص بك.
clusterName mykustocluster اسم نظام المجموعة الخاص بك.
databaseName mykustodatabase اسم قاعدة البيانات الهدف في نظام المجموعة الخاص بك.
dataConnectionName myeventhubconnect الاسم المطلوب لاتصال قاعدة البيانات الخاص بك.
اسم الجدول StormEvents اسم الجدول الهدف في قاعدة البيانات الهدف.
mappingRuleName StormEvents_CSV_Mapping اسم تخطيط العمود الخاص بك المرتبط بالجدول الهدف.
dataFormat csv تنسيق البيانات الخاصة بالرسالة.
eventHubResourceId معرف المورد معرف المورد لمركز الأحداث الذي يحتوي على البيانات لاستيعابها.
ConsumerGroup $Default مجموعة المستهلكين في مركز الأحداث الخاص بك.
مكان وسط الولايات المتحدة موقع مورد اتصال قاعدة البيانات.
ضغط Gzip أو None نوع ضغط البيانات.

إنشاء البيانات

راجع التطبيق العينة الذي يقوم بإنشاء البيانات وإرسالها إلى Event Hub.

يمكن أن يحتوي الحدث على سجل أو أكثر، حتى حد حجمه. نرسل حديثين في النموذج، واحد يحتوي كل واحد منهما على خمسة سجلات ملحقة:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

تنظيف الموارد

لحذف اتصال البيانات، استخدم الأمر التالي:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

الخطوات التالية