Apache Spark için Azure Veri Gezgini Bağlayıcısı

Apache Spark , büyük ölçekli veri işlemeye yönelik birleşik bir analiz altyapısıdır. Azure Veri Gezgini büyük miktarda veri üzerinde gerçek zamanlı analiz yapmaya yönelik hızlı ve tam olarak yönetilen bir veri analizi hizmetidir.

Spark için Azure Veri Gezgini bağlayıcısı, herhangi bir Spark kümesinde çalışabilen açık kaynak bir projedir. Verileri Azure Veri Gezgini ve Spark kümeleri arasında taşımak için veri kaynağı ve veri havuzu uygular. Azure Veri Gezgini ve Apache Spark kullanarak veri temelli senaryoları hedefleyen hızlı ve ölçeklenebilir uygulamalar oluşturabilirsiniz. Örneğin, makine öğrenmesi (ML), Ayıklama-Dönüştürme-Yükleme (ETL) ve Log Analytics. Bağlayıcı ile Azure Veri Gezgini yazma, okuma ve writeStream gibi standart Spark kaynağı ve havuz işlemleri için geçerli bir veri deposu haline gelir.

Azure Veri Gezgini'a toplu iş veya akış modunda yazabilirsiniz. Azure Veri Gezgini'dan okuma, azure Veri Gezgini verileri filtreleyerek aktarılan veri hacmini azaltan sütun ayıklama ve koşul gönderme işlemlerini destekler.

Not

Azure Veri Gezgini için Synapse Spark bağlayıcısı ile çalışma hakkında bilgi için bkz. Azure Synapse Analytics için Apache Spark kullanarak Azure Veri Gezgini bağlanma.

Bu konuda, Azure Veri Gezgini Spark bağlayıcısının nasıl yükleneceği ve yapılandırıldığı ve Azure Veri Gezgini ile Apache Spark kümeleri arasında verilerin nasıl taşındığı açıklanır.

Not

Aşağıdaki örneklerden bazıları Azure Databricks Spark kümesine başvuruda bulunmakla birlikte, Azure Veri Gezgini Spark bağlayıcısı Databricks'e veya başka bir Spark dağıtımına doğrudan bağımlılıklar almaz.

Önkoşullar

İpucu

Spark 2.3.x sürümleri de desteklenir, ancak pom.xml bağımlılıklarında bazı değişiklikler gerekebilir.

Spark bağlayıcısı oluşturma

Sürüm 2.3.0'dan başlayarak spark-kusto-connector: kusto-spark_3.0_2.12, Spark 3.x ve Scala 2.12 ile Spark 2.4.x ve scala 2.11'i hedefleyen kusto-spark_2.4_2.11'in yerini alan yeni yapıt kimlikleri kullanıma sunuldu.

Not

2.5.1 öncesi sürümler artık mevcut bir tabloya almak için çalışmıyor, lütfen daha sonraki bir sürüme güncelleştirin. Bu adım isteğe bağlıdır. Maven gibi önceden oluşturulmuş kitaplıklar kullanıyorsanız bkz. Spark kümesi kurulumu.

Derleme önkoşulları

  1. Önceden oluşturulmuş kitaplıklar kullanmıyorsanız, aşağıdaki Kusto Java SDK kitaplıkları dahil olmak üzere bağımlılıklarda listelenen kitaplıkları yüklemeniz gerekir. Yüklenecek doğru sürümü bulmak için ilgili sürümün pom'una bakın:

  2. Spark Bağlayıcısı oluşturmak için bu kaynağa bakın.

  3. Maven proje tanımlarını kullanan Scala/Java uygulamaları için uygulamanızı aşağıdaki yapıtla bağlayın (en son sürüm farklı olabilir):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>kusto-spark_3.0_2.12</artifactId>
         <version>2.5.1</version>
       </dependency>
    

Derleme komutları

Jar oluşturmak ve tüm testleri çalıştırmak için:

mvn clean package

Jar oluşturmak için tüm testleri çalıştırın ve jar dosyasını yerel Maven deponuza yükleyin:

mvn clean install

Daha fazla bilgi için bkz. bağlayıcı kullanımı.

Spark kümesi kurulumu

Not

Aşağıdaki adımları gerçekleştirirken en son Azure Veri Gezgini Spark bağlayıcı sürümünü kullanmanız önerilir.

  1. Spark 2.4.4 ve Scala 2.11 veya Spark 3.0.1 ve Scala 2.12 kullanan Azure Databricks kümesini temel alan aşağıdaki Spark kümesi ayarlarını yapılandırın:

    Databricks kümesi ayarları.

  2. Maven'dan en son spark-kusto-connector kitaplığını yükleyin:

    Kitaplıkları içeri aktar.Spark-Kusto-Connector'ı seçin.

  3. Tüm gerekli kitaplıkların yüklü olduğunu doğrulayın:

    Yüklü kitaplıkları doğrulayın.

  4. JAR dosyası kullanarak yükleme için ek bağımlılıkların yüklendiğini doğrulayın:

    Bağımlılıkları ekleyin.

Kimlik Doğrulaması

Azure Veri Gezgini Spark bağlayıcısı, aşağıdaki yöntemlerden birini kullanarak Azure Active Directory (Azure AD) ile kimlik doğrulaması yapmanıza olanak tanır:

Uygulama kimlik doğrulama Azure AD

Azure AD uygulama kimlik doğrulaması en basit ve en yaygın kimlik doğrulama yöntemidir ve Azure Veri Gezgini Spark bağlayıcısı için önerilir.

Özellikler Seçenek Dizesi Açıklama
KUSTO_AAD_APP_ID kustoAadAppId Azure AD uygulama (istemci) tanımlayıcısı.
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID kimlik doğrulama yetkilisini Azure AD. dizin (kiracı) kimliğini Azure AD. İsteğe bağlı - varsayılan olarak microsoft.com. Daha fazla bilgi için bkz. AAD yetkilisi.
KUSTO_AAD_APP_SECRET kustoAadAppSecret İstemci için uygulama anahtarını Azure AD.

Not

Eski API sürümleri (2.0.0'dan küçük) şu adlandırmaya sahiptir: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Azure Veri Gezgini ayrıcalıkları

Azure Veri Gezgini kümesinde aşağıdaki ayrıcalıkları verin:

  • Okuma (veri kaynağı) için Azure AD kimliğinin hedef veritabanında görüntüleyici ayrıcalıklarına veya hedef tabloda yönetici ayrıcalıklarına sahip olması gerekir.
  • Yazma (veri havuzu) için Azure AD kimliğinin hedef veritabanında alma ayrıcalıklarına sahip olması gerekir. Ayrıca yeni tablolar oluşturmak için hedef veritabanında kullanıcı ayrıcalıklarına sahip olması gerekir. Hedef tablo zaten varsa, hedef tabloda yönetici ayrıcalıklarını yapılandırmanız gerekir.

Azure Veri Gezgini asıl rolleri hakkında daha fazla bilgi için bkz. rol tabanlı yetkilendirme. Güvenlik rollerini yönetmek için bkz. güvenlik rolleri yönetimi.

Spark havuzu: Azure Veri Gezgini'ye yazma

  1. Havuz parametrelerini ayarlama:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Spark DataFrame'i Azure Veri Gezgini kümesine toplu olarak yazın:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Veya basitleştirilmiş söz dizimini kullanın:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Akış verilerini yazma:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark kaynağı: Azure Veri Gezgini'dan okuma

  1. Az miktarda veri okurken veri sorgusunu tanımlayın:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. İsteğe bağlı: Geçici blob depolamayı (Azure Veri Gezgini değil) sağlarsanız bloblar çağıranın sorumluluğu altında oluşturulur. Bu, depolamayı sağlamayı, erişim anahtarlarını döndürmeyi ve geçici yapıtları silmeyi içerir. KustoBlobStorageUtils modülü, hesap ve kapsayıcı koordinatları ile hesap kimlik bilgilerini temel alarak blobları silmeye yönelik yardımcı işlevler ya da yazma, okuma ve liste izinlerine sahip tam BIR SAS URL'si içerir. karşılık gelen RDD artık gerekli olmadığında, her işlem geçici blob yapıtlarını ayrı bir dizinde depolar. Bu dizin, Spark Sürücüsü düğümünde bildirilen okuma işlemi bilgi günlüklerinin bir parçası olarak yakalanır.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    Yukarıdaki örnekte bağlayıcı arabirimi kullanılarak Key Vault erişilemiyor; Databricks gizli dizilerini kullanmanın daha basit bir yöntemi kullanılıyor.

  3. Azure Veri Gezgini'dan okuyun.

    • Geçici blob depolamayı sağlarsanız Azure Veri Gezgini'dan aşağıdaki gibi okuyun:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Azure Veri Gezgini geçici blob depolama alanı sağlıyorsa Azure Veri Gezgini'dan aşağıdaki gibi okuyun:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      

Sonraki adımlar