Pobieranie danych z platformy Apache Spark
Apache Spark to ujednolicony aparat analityczny do przetwarzania danych na dużą skalę.
Łącznik Kusto dla platformy Spark to projekt typu open source, który można uruchomić w dowolnym klastrze Spark. Implementuje źródło danych i ujście danych do przenoszenia danych między klastrami usługi Azure Data Explorer i Spark. Korzystając z usługi Eventhouse i Apache Spark, można tworzyć szybkie i skalowalne aplikacje przeznaczone dla scenariuszy opartych na danych. Na przykład uczenie maszynowe (ML), wyodrębnianie i przekształcanie obciążenia (ETL) i usługa Log Analytics. Dzięki łącznikowi usługa Eventhouses staje się prawidłowym magazynem danych dla standardowych operacji źródła i ujścia platformy Spark, takich jak zapis, odczyt i zapisStream.
Możesz zapisywać dane w usłudze Eventhouse za pośrednictwem pozyskiwania w kolejce lub pozyskiwania przesyłania strumieniowego. Odczyt z usługi Eventhouses obsługuje oczyszczanie kolumn i wypychanie predykatu, które filtruje dane w usłudze Eventhouse, zmniejszając ilość przesyłanych danych.
W tym artykule opisano sposób instalowania i konfigurowania łącznika Spark oraz przenoszenia danych między klastrami Eventhouse i Apache Spark.
Uwaga
Chociaż niektóre z poniższych przykładów odnoszą się do klastra Platformy Spark usługi Azure Databricks , łącznik spark nie bierze bezpośrednich zależności od usługi Databricks ani żadnej innej dystrybucji platformy Spark.
Wymagania wstępne
- Subskrypcja platformy Azure. Utwórz bezpłatne konto platformy Azure. Jest to używane do uwierzytelniania przy użyciu identyfikatora Entra firmy Microsoft.
- Baza danych KQL w usłudze Microsoft Fabric. Skopiuj identyfikator URI tej bazy danych, korzystając z instrukcji w temacie Uzyskiwanie dostępu do istniejącej bazy danych KQL.
- Klaster Spark
- Zainstaluj bibliotekę łącznika:
- Wstępnie utworzone biblioteki dla platformy Spark 2.4+Scala 2.11 lub Spark 3+scala 2.12
- Repozytorium Maven
- Zainstalowano program Maven 3.x
Napiwek
Obsługiwane są również wersje platformy Spark 2.3.x, ale mogą wymagać pewnych zmian w zależnościach pom.xml.
Jak utworzyć łącznik spark
Począwszy od wersji 2.3.0 wprowadzamy nowe identyfikatory artefaktów zastępujące łącznik spark-kusto-connector: kusto-spark_3.0_2.12 przeznaczone dla platform Spark 3.x i Scala 2.12.
Uwaga
Wersje wcześniejsze niż 2.5.1 nie działają już w celu pozyskiwania do istniejącej tabeli. Zaktualizuj do nowszej wersji. To krok jest opcjonalny. Jeśli używasz wstępnie utworzonych bibliotek, na przykład narzędzia Maven, zobacz Konfigurowanie klastra Spark.
Wymagania wstępne dotyczące kompilacji
Zapoznaj się z tym źródłem w celu utworzenia łącznika Spark.
W przypadku aplikacji Scala/Java korzystających z definicji projektu Maven połącz aplikację z najnowszym artefaktem. Znajdź najnowszy artefakt w usłudze Maven Central.
For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
Jeśli nie używasz wstępnie utworzonych bibliotek, musisz zainstalować biblioteki wymienione w zależnościach, w tym następujące biblioteki zestawu Sdk języka Java kusto. Aby znaleźć odpowiednią wersję do zainstalowania, zapoznaj się z odpowiednią wersją:
Aby skompilować plik jar i uruchomić wszystkie testy:
mvn clean package -DskipTests
Aby skompilować plik jar, uruchom wszystkie testy i zainstaluj plik jar w lokalnym repozytorium Maven:
mvn clean install -DskipTests
Aby uzyskać więcej informacji, zobacz użycie łącznika.
Konfiguracja klastra Spark
Uwaga
Zaleca się użycie najnowszej wersji łącznika Kusto Spark podczas wykonywania poniższych kroków.
Skonfiguruj następujące ustawienia klastra Spark na podstawie klastra usługi Azure Databricks Spark 3.0.1 i Scala 2.12:
Zainstaluj najnowszą bibliotekę spark-kusto-connector z narzędzia Maven:
Sprawdź, czy wszystkie wymagane biblioteki są zainstalowane:
W przypadku instalacji przy użyciu pliku JAR sprawdź, czy zainstalowano inne zależności:
Uwierzytelnianie
Łącznik Kusto Spark umożliwia uwierzytelnianie za pomocą identyfikatora Entra firmy Microsoft przy użyciu jednej z następujących metod:
- Aplikacja Firmy Microsoft Entra
- Token dostępu firmy Microsoft Entra
- Uwierzytelnianie urządzenia (w scenariuszach nieprodukcyjnych)
- Usługa Azure Key Vault Aby uzyskać dostęp do zasobu usługi Key Vault , zainstaluj pakiet azure-keyvault i podaj poświadczenia aplikacji.
Uwierzytelnianie aplikacji Firmy Microsoft Entra
Uwierzytelnianie aplikacji Microsoft Entra jest najprostszą i najbardziej typową metodą uwierzytelniania i jest zalecane dla łącznika Kusto Spark.
Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.
az login
Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Z zwróconych danych JSON skopiuj wartości
appId
,password
itenant
do użycia w przyszłości.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Utworzono aplikację Microsoft Entra i jednostkę usługi.
Łącznik platformy Spark używa następujących właściwości aplikacji Entra do uwierzytelniania:
Właściwości | Ciąg opcji | opis |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Identyfikator aplikacji Microsoft Entra (klienta). |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Urząd uwierzytelniania Firmy Microsoft Entra. Identyfikator katalogu entra firmy Microsoft (dzierżawy). Opcjonalne — wartości domyślne do microsoft.com. Aby uzyskać więcej informacji, zobacz Urząd firmy Microsoft Entra. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Klucz aplikacji Entra firmy Microsoft dla klienta. |
KUSTO_ACCESS_TOKEN | kustoAccessToken | Jeśli masz już token accessToken, który został utworzony z dostępem do usługi Kusto, można go również przekazać do łącznika, a także do uwierzytelniania. |
Uwaga
Starsze wersje interfejsu API (mniejsze niż 2.0.0) mają następujące nazwy: "kustoAADClientID", "kustoClientADClientPassword", "kustoAADAuthorityID"
Uprawnienia usługi Kusto
Przyznaj następujące uprawnienia po stronie usługi Kusto na podstawie operacji platformy Spark, którą chcesz wykonać.
Operacja platformy Spark | Uprawnienia |
---|---|
Odczyt — tryb pojedynczy | Czytelnik |
Odczyt — wymuszanie trybu rozproszonego | Czytelnik |
Zapis — tryb w kolejce z opcją tworzenia tabeli CreateTableIfNotExist | Administracja |
Zapis — tryb kolejkowany z opcją tworzenia tabeli FailIfNotExist | Ingestor |
Zapis — TransactionalMode | Administracja |
Aby uzyskać więcej informacji na temat ról głównych, zobacz Kontrola dostępu oparta na rolach. Aby zarządzać rolami zabezpieczeń, zobacz Zarządzanie rolami zabezpieczeń.
Ujście platformy Spark: zapisywanie w usłudze Kusto
Konfigurowanie parametrów ujścia:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Zapisz ramkę danych platformy Spark w klastrze Kusto jako partię:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Możesz też użyć uproszczonej składni:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Zapisywanie danych przesyłanych strumieniowo:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Źródło platformy Spark: odczytywanie z usługi Kusto
Podczas odczytywania małych ilości danych zdefiniuj zapytanie dotyczące danych:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Opcjonalnie: Jeśli podasz przejściowy magazyn obiektów blob (a nie Kusto), obiekty blob są tworzone na podstawie odpowiedzialności obiektu wywołującego. Obejmuje to aprowizowanie magazynu, rotację kluczy dostępu i usuwanie przejściowych artefaktów. Moduł KustoBlobStorageUtils zawiera funkcje pomocnicze do usuwania obiektów blob na podstawie współrzędnych konta i kontenera oraz poświadczeń konta lub pełnego adresu URL sygnatury dostępu współdzielonego z uprawnieniami do zapisu, odczytu i listy. Gdy odpowiedni rdD nie jest już potrzebny, każda transakcja przechowuje przejściowe artefakty obiektów blob w osobnym katalogu. Ten katalog jest przechwytywany w ramach dzienników informacji o transakcji odczytu zgłoszonych w węźle Sterownik platformy Spark.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
W powyższym przykładzie usługa Key Vault nie jest dostępna przy użyciu interfejsu łącznika; Jest używana prostsza metoda używania wpisów tajnych usługi Databricks.
Przeczytaj z kusto.
Jeśli podasz przejściowy magazyn obiektów blob, odczyt z usługi Kusto w następujący sposób:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Jeśli usługa Kusto udostępnia przejściowy magazyn obiektów blob, odczyt z usługi Kusto w następujący sposób:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)