Apache Spark için Azure Veri Gezgini Bağlayıcısı
Apache Spark , büyük ölçekli veri işlemeye yönelik birleşik bir analiz altyapısıdır. Azure Veri Gezgini büyük miktarda veri üzerinde gerçek zamanlı analiz yapmaya yönelik hızlı ve tam olarak yönetilen bir veri analizi hizmetidir.
Spark için Azure Veri Gezgini bağlayıcısı, herhangi bir Spark kümesinde çalışabilen açık kaynak bir projedir. Verileri Azure Veri Gezgini ve Spark kümeleri arasında taşımak için veri kaynağı ve veri havuzu uygular. Azure Veri Gezgini ve Apache Spark kullanarak veri temelli senaryoları hedefleyen hızlı ve ölçeklenebilir uygulamalar oluşturabilirsiniz. Örneğin, makine öğrenmesi (ML), Ayıklama-Dönüştürme-Yükleme (ETL) ve Log Analytics. Bağlayıcı ile Azure Veri Gezgini yazma, okuma ve writeStream gibi standart Spark kaynağı ve havuz işlemleri için geçerli bir veri deposu haline gelir.
Azure Veri Gezgini'a toplu iş veya akış modunda yazabilirsiniz. Azure Veri Gezgini'dan okuma, azure Veri Gezgini verileri filtreleyerek aktarılan veri hacmini azaltan sütun ayıklama ve koşul gönderme işlemlerini destekler.
Not
Azure Veri Gezgini için Synapse Spark bağlayıcısı ile çalışma hakkında bilgi için bkz. Azure Synapse Analytics için Apache Spark kullanarak Azure Veri Gezgini bağlanma.
Bu konuda, Azure Veri Gezgini Spark bağlayıcısının nasıl yükleneceği ve yapılandırıldığı ve Azure Veri Gezgini ile Apache Spark kümeleri arasında verilerin nasıl taşındığı açıklanır.
Not
Aşağıdaki örneklerden bazıları Azure Databricks Spark kümesine başvuruda bulunmakla birlikte, Azure Veri Gezgini Spark bağlayıcısı Databricks'e veya başka bir Spark dağıtımına doğrudan bağımlılıklar almaz.
Önkoşullar
- Azure aboneliği. Ücretsiz bir Azure hesabı oluşturun.
- Azure Veri Gezgini kümesi ve veritabanı. Ücretsiz bir küme oluşturabilir veya tam küme oluşturabilirsiniz. Hangisinin sizin için en uygun olduğuna karar vermek için özellik karşılaştırmasını denetleyin.
- Spark kümesi
- Azure Veri Gezgini bağlayıcı kitaplığını yükleyin:
- Spark 2.4+Scala 2.11 veya Spark 3+scala 2.12 için önceden oluşturulmuş kitaplıklar
- Maven deposu
- Maven 3.x yüklü
İpucu
Spark 2.3.x sürümleri de desteklenir, ancak pom.xml bağımlılıklarında bazı değişiklikler gerekebilir.
Spark bağlayıcısı oluşturma
Sürüm 2.3.0'dan başlayarak spark-kusto-connector: kusto-spark_3.0_2.12, Spark 3.x ve Scala 2.12 ile Spark 2.4.x ve scala 2.11'i hedefleyen kusto-spark_2.4_2.11'in yerini alan yeni yapıt kimlikleri kullanıma sunuldu.
Not
2.5.1 öncesi sürümler artık mevcut bir tabloya almak için çalışmıyor, lütfen daha sonraki bir sürüme güncelleştirin. Bu adım isteğe bağlıdır. Maven gibi önceden oluşturulmuş kitaplıklar kullanıyorsanız bkz. Spark kümesi kurulumu.
Derleme önkoşulları
Önceden oluşturulmuş kitaplıklar kullanmıyorsanız, aşağıdaki Kusto Java SDK kitaplıkları dahil olmak üzere bağımlılıklarda listelenen kitaplıkları yüklemeniz gerekir. Yüklenecek doğru sürümü bulmak için ilgili sürümün pom'una bakın:
Spark Bağlayıcısı oluşturmak için bu kaynağa bakın.
Maven proje tanımlarını kullanan Scala/Java uygulamaları için uygulamanızı aşağıdaki yapıtla bağlayın (en son sürüm farklı olabilir):
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
Derleme komutları
Jar oluşturmak ve tüm testleri çalıştırmak için:
mvn clean package
Jar oluşturmak için tüm testleri çalıştırın ve jar dosyasını yerel Maven deponuza yükleyin:
mvn clean install
Daha fazla bilgi için bkz. bağlayıcı kullanımı.
Spark kümesi kurulumu
Not
Aşağıdaki adımları gerçekleştirirken en son Azure Veri Gezgini Spark bağlayıcı sürümünü kullanmanız önerilir.
Spark 2.4.4 ve Scala 2.11 veya Spark 3.0.1 ve Scala 2.12 kullanan Azure Databricks kümesini temel alan aşağıdaki Spark kümesi ayarlarını yapılandırın:
Maven'dan en son spark-kusto-connector kitaplığını yükleyin:
Tüm gerekli kitaplıkların yüklü olduğunu doğrulayın:
JAR dosyası kullanarak yükleme için ek bağımlılıkların yüklendiğini doğrulayın:
Kimlik Doğrulaması
Azure Veri Gezgini Spark bağlayıcısı, aşağıdaki yöntemlerden birini kullanarak Azure Active Directory (Azure AD) ile kimlik doğrulaması yapmanıza olanak tanır:
- Azure AD uygulaması
- Azure AD erişim belirteci
- Cihaz kimlik doğrulaması (üretim dışı senaryolar için)
- Azure Key Vault Key Vault kaynağına erişmek için azure-keyvault paketini yükleyin ve uygulama kimlik bilgilerini sağlayın.
Uygulama kimlik doğrulama Azure AD
Azure AD uygulama kimlik doğrulaması en basit ve en yaygın kimlik doğrulama yöntemidir ve Azure Veri Gezgini Spark bağlayıcısı için önerilir.
Özellikler | Seçenek Dizesi | Açıklama |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Azure AD uygulama (istemci) tanımlayıcısı. |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | kimlik doğrulama yetkilisini Azure AD. dizin (kiracı) kimliğini Azure AD. İsteğe bağlı - varsayılan olarak microsoft.com. Daha fazla bilgi için bkz. AAD yetkilisi. |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | İstemci için uygulama anahtarını Azure AD. |
Not
Eski API sürümleri (2.0.0'dan küçük) şu adlandırmaya sahiptir: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"
Azure Veri Gezgini ayrıcalıkları
Azure Veri Gezgini kümesinde aşağıdaki ayrıcalıkları verin:
- Okuma (veri kaynağı) için Azure AD kimliğinin hedef veritabanında görüntüleyici ayrıcalıklarına veya hedef tabloda yönetici ayrıcalıklarına sahip olması gerekir.
- Yazma (veri havuzu) için Azure AD kimliğinin hedef veritabanında alma ayrıcalıklarına sahip olması gerekir. Ayrıca yeni tablolar oluşturmak için hedef veritabanında kullanıcı ayrıcalıklarına sahip olması gerekir. Hedef tablo zaten varsa, hedef tabloda yönetici ayrıcalıklarını yapılandırmanız gerekir.
Azure Veri Gezgini asıl rolleri hakkında daha fazla bilgi için bkz. rol tabanlı yetkilendirme. Güvenlik rollerini yönetmek için bkz. güvenlik rolleri yönetimi.
Spark havuzu: Azure Veri Gezgini'ye yazma
Havuz parametrelerini ayarlama:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Spark DataFrame'i Azure Veri Gezgini kümesine toplu olarak yazın:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Veya basitleştirilmiş söz dizimini kullanın:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Akış verilerini yazma:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark kaynağı: Azure Veri Gezgini'dan okuma
Az miktarda veri okurken veri sorgusunu tanımlayın:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
İsteğe bağlı: Geçici blob depolamayı (Azure Veri Gezgini değil) sağlarsanız bloblar çağıranın sorumluluğu altında oluşturulur. Bu, depolamayı sağlamayı, erişim anahtarlarını döndürmeyi ve geçici yapıtları silmeyi içerir. KustoBlobStorageUtils modülü, hesap ve kapsayıcı koordinatları ile hesap kimlik bilgilerini temel alarak blobları silmeye yönelik yardımcı işlevler ya da yazma, okuma ve liste izinlerine sahip tam BIR SAS URL'si içerir. karşılık gelen RDD artık gerekli olmadığında, her işlem geçici blob yapıtlarını ayrı bir dizinde depolar. Bu dizin, Spark Sürücüsü düğümünde bildirilen okuma işlemi bilgi günlüklerinin bir parçası olarak yakalanır.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
Yukarıdaki örnekte bağlayıcı arabirimi kullanılarak Key Vault erişilemiyor; Databricks gizli dizilerini kullanmanın daha basit bir yöntemi kullanılıyor.
Azure Veri Gezgini'dan okuyun.
Geçici blob depolamayı sağlarsanız Azure Veri Gezgini'dan aşağıdaki gibi okuyun:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Azure Veri Gezgini geçici blob depolama alanı sağlıyorsa Azure Veri Gezgini'dan aşağıdaki gibi okuyun:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Sonraki adımlar
- Azure Veri Gezgini Spark Bağlayıcısı'nın diğer seçeneklerini kullanmayı öğrenin
- Scala ve Python için örnek kod