Udostępnij za pośrednictwem


Łącznik usługi Azure Data Explorer dla platformy Apache Spark

Apache Spark to ujednolicony aparat analityczny do przetwarzania danych na dużą skalę. Usługa Azure Data Explorer to szybka, w pełni zarządzana usługa do analizy danych, która pozwala w czasie rzeczywistym analizować duże woluminy danych.

Łącznik Kusto dla platformy Spark to projekt typu open source, który można uruchomić w dowolnym klastrze Spark. Implementuje źródło danych i ujście danych do przenoszenia danych między klastrami usługi Azure Data Explorer i Spark. Korzystając z usług Azure Data Explorer i Apache Spark, można tworzyć szybkie i skalowalne aplikacje przeznaczone dla scenariuszy opartych na danych. Na przykład uczenie maszynowe (ML), wyodrębnianie i przekształcanie obciążenia (ETL) i usługa Log Analytics. Dzięki łącznikowi usługa Azure Data Explorer staje się prawidłowym magazynem danych dla standardowych operacji źródła i ujścia platformy Spark, takich jak zapis, odczyt i zapisStream.

Możesz zapisywać dane w usłudze Azure Data Explorer za pośrednictwem pozyskiwania w kolejce lub pozyskiwania strumieniowego. Odczyt z usługi Azure Data Explorer obsługuje oczyszczanie kolumn i wypychanie predykatu, które filtruje dane w usłudze Azure Data Explorer, zmniejszając ilość przesyłanych danych.

Uwaga

Aby uzyskać informacje na temat pracy z łącznikiem usługi Synapse Spark dla usługi Azure Data Explorer, zobacz Nawiązywanie połączenia z usługą Azure Data Explorer przy użyciu platformy Apache Spark dla usługi Azure Synapse Analytics.

W tym temacie opisano sposób instalowania i konfigurowania łącznika Spark usługi Azure Data Explorer oraz przenoszenia danych między usługami Azure Data Explorer i Apache Spark.

Uwaga

Chociaż niektóre z poniższych przykładów odnoszą się do klastra Platformy Spark usługi Azure Databricks , łącznik Spark usługi Azure Data Explorer nie bierze bezpośrednich zależności od usługi Databricks ani jakiejkolwiek innej dystrybucji platformy Spark.

Wymagania wstępne

Napiwek

Obsługiwane są również wersje platformy Spark 2.3.x, ale mogą wymagać pewnych zmian w zależnościach pom.xml.

Jak utworzyć łącznik spark

Począwszy od wersji 2.3.0 wprowadzamy nowe identyfikatory artefaktów zastępujące łącznik spark-kusto-connector: kusto-spark_3.0_2.12 przeznaczone dla platform Spark 3.x i Scala 2.12.

Uwaga

Wersje wcześniejsze niż 2.5.1 nie działają już w celu pozyskiwania do istniejącej tabeli. Zaktualizuj do nowszej wersji. To krok jest opcjonalny. Jeśli używasz wstępnie utworzonych bibliotek, na przykład narzędzia Maven, zobacz Konfigurowanie klastra Spark.

Wymagania wstępne dotyczące kompilacji

  1. Zapoznaj się z tym źródłem w celu utworzenia łącznika Spark.

  2. W przypadku aplikacji Scala/Java korzystających z definicji projektu Maven połącz aplikację z najnowszym artefaktem. Znajdź najnowszy artefakt w usłudze Maven Central.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Jeśli nie używasz wstępnie utworzonych bibliotek, musisz zainstalować biblioteki wymienione w zależnościach, w tym następujące biblioteki zestawu Sdk języka Java kusto. Aby znaleźć odpowiednią wersję do zainstalowania, zapoznaj się z odpowiednią wersją:

    1. Aby skompilować plik jar i uruchomić wszystkie testy:

      mvn clean package -DskipTests
      
    2. Aby skompilować plik jar, uruchom wszystkie testy i zainstaluj plik jar w lokalnym repozytorium Maven:

      mvn clean install -DskipTests
      

Aby uzyskać więcej informacji, zobacz użycie łącznika.

Konfiguracja klastra Spark

Uwaga

Zaleca się użycie najnowszej wersji łącznika Kusto Spark podczas wykonywania poniższych kroków.

  1. Skonfiguruj następujące ustawienia klastra Spark na podstawie klastra usługi Azure Databricks Spark 3.0.1 i Scala 2.12:

    Ustawienia klastra usługi Databricks.

  2. Zainstaluj najnowszą bibliotekę spark-kusto-connector z narzędzia Maven:

    Importowanie bibliotek.Wybierz pozycję Spark-Kusto-Connector.

  3. Sprawdź, czy wszystkie wymagane biblioteki są zainstalowane:

    Sprawdź zainstalowane biblioteki.

  4. W przypadku instalacji przy użyciu pliku JAR sprawdź, czy zainstalowano inne zależności:

    Dodaj zależności.

Uwierzytelnianie

Łącznik Kusto Spark umożliwia uwierzytelnianie za pomocą identyfikatora Entra firmy Microsoft przy użyciu jednej z następujących metod:

Uwierzytelnianie aplikacji Firmy Microsoft Entra

Uwierzytelnianie aplikacji Microsoft Entra jest najprostszą i najbardziej typową metodą uwierzytelniania i jest zalecane dla łącznika Kusto Spark.

  1. Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.

    az login
    
  2. Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z zwróconych danych JSON skopiuj wartości appId, passwordi tenant do użycia w przyszłości.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Utworzono aplikację Microsoft Entra i jednostkę usługi.

Łącznik platformy Spark używa następujących właściwości aplikacji Entra do uwierzytelniania:

Właściwości Ciąg opcji opis
KUSTO_AAD_APP_ID kustoAadAppId Identyfikator aplikacji Microsoft Entra (klienta).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Urząd uwierzytelniania Firmy Microsoft Entra. Identyfikator katalogu entra firmy Microsoft (dzierżawy). Opcjonalne — wartości domyślne do microsoft.com. Aby uzyskać więcej informacji, zobacz Urząd firmy Microsoft Entra.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Klucz aplikacji Entra firmy Microsoft dla klienta.
KUSTO_ACCESS_TOKEN kustoAccessToken Jeśli masz już token accessToken, który został utworzony z dostępem do usługi Kusto, można go również przekazać do łącznika, a także do uwierzytelniania.

Uwaga

Starsze wersje interfejsu API (mniejsze niż 2.0.0) mają następujące nazwy: "kustoAADClientID", "kustoClientADClientPassword", "kustoAADAuthorityID"

Uprawnienia usługi Kusto

Przyznaj następujące uprawnienia po stronie usługi Kusto na podstawie operacji platformy Spark, którą chcesz wykonać.

Operacja platformy Spark Uprawnienia
Odczyt — tryb pojedynczy Czytelnik
Odczyt — wymuszanie trybu rozproszonego Czytelnik
Zapis — tryb w kolejce z opcją tworzenia tabeli CreateTableIfNotExist Administrator
Zapis — tryb kolejkowany z opcją tworzenia tabeli FailIfNotExist Ingestor
Zapis — TransactionalMode Administrator

Aby uzyskać więcej informacji na temat ról głównych, zobacz Kontrola dostępu oparta na rolach. Aby zarządzać rolami zabezpieczeń, zobacz Zarządzanie rolami zabezpieczeń.

Ujście platformy Spark: zapisywanie w usłudze Kusto

  1. Konfigurowanie parametrów ujścia:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Zapisz ramkę danych platformy Spark w klastrze Kusto jako partię:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Możesz też użyć uproszczonej składni:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Zapisywanie danych przesyłanych strumieniowo:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Źródło platformy Spark: odczytywanie z usługi Kusto

  1. Podczas odczytywania małych ilości danych zdefiniuj zapytanie dotyczące danych:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Opcjonalnie: Jeśli podasz przejściowy magazyn obiektów blob (a nie Kusto), obiekty blob są tworzone na podstawie odpowiedzialności obiektu wywołującego. Obejmuje to aprowizowanie magazynu, rotację kluczy dostępu i usuwanie przejściowych artefaktów. Moduł KustoBlobStorageUtils zawiera funkcje pomocnicze do usuwania obiektów blob na podstawie współrzędnych konta i kontenera oraz poświadczeń konta lub pełnego adresu URL sygnatury dostępu współdzielonego z uprawnieniami do zapisu, odczytu i listy. Gdy odpowiedni rdD nie jest już potrzebny, każda transakcja przechowuje przejściowe artefakty obiektów blob w osobnym katalogu. Ten katalog jest przechwytywany w ramach dzienników informacji o transakcji odczytu zgłoszonych w węźle Sterownik platformy Spark.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    W powyższym przykładzie usługa Key Vault nie jest dostępna przy użyciu interfejsu łącznika; Jest używana prostsza metoda używania wpisów tajnych usługi Databricks.

  3. Przeczytaj z kusto.

    • Jeśli podasz przejściowy magazyn obiektów blob, odczyt z usługi Kusto w następujący sposób:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Jeśli usługa Kusto udostępnia przejściowy magazyn obiektów blob, odczyt z usługi Kusto w następujący sposób:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)