Aracılığıyla paylaş


Apache Spark bağlayıcısı: SQL Server ve Azure SQL

SQL Server ve Azure SQL için Apache Spark bağlayıcısı, işlem verilerini büyük veri analizine dahil etmek ve geçici sorgular veya raporlama için sonuçları kalıcı hale getirmek için kullanabileceğiniz yüksek performanslı bir bağlayıcıdır. Bağlayıcıyı kullanarak, Spark işleri için giriş veri kaynağı veya çıkış veri havuzu olarak herhangi bir SQL veritabanını, şirket içinde veya bulutta kullanabilirsiniz.

Uyarı

Bu bağlayıcı etkin bir şekilde korunmaz. Bu makale yalnızca arşivleme amacıyla saklanır.

Bu kitaplık, SQL Server ve Azure SQL platformları için Apache Spark Bağlayıcısı'nın kaynak kodunu içerir.

Apache Spark , büyük ölçekli veri işlemeye yönelik birleşik bir analiz altyapısıdır.

Bağlayıcının iki sürümü Maven aracılığıyla kullanılabilir: 2.4.x uyumlu bir sürüm ve 3.0.x uyumlu bir sürüm. Bağlayıcıları maven.org'dan indirin ve ardından koordinatları kullanarak içeri aktarın.

Bağlayıcı Maven Koordinatı
Spark 2.4.x uyumlu bağlayıcı com.microsoft.azure:spark-mssql-connector:1.0.2
Spark 3.0.x uyumlu bağlayıcı com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Spark 3.1.x uyumlu bağlayıcı com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Ayrıca bağlayıcıyı kaynaktan derleyebilir veya GitHub'daki Yayın bölümünden JAR'yi indirebilirsiniz. Bağlayıcı hakkında en son bilgiler için bkz. SQL Spark bağlayıcısı GitHub deposu.

Desteklenen özellikler

  • Tüm Spark bağlamaları için destek (Scala, Python, R)
  • Temel kimlik doğrulaması ve Active Directory (AD) Anahtar Sekmesi desteği
  • Yeniden düzenlenmiş dataframe yazma desteği
  • SQL Server Büyük Veri Kümelerinde SQL Server Tekli örneğe ve Veri Havuzuna yazma desteği
  • Sql Server Tek Örneği için güvenilir bağlayıcı desteği
Bileşen Desteklenen sürümler
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala programlama dili 2.11, 2.12
SQL Server için Microsoft JDBC sürücüsü 8.4
Microsoft SQL Server SQL Server 2008 veya üzeri
Azure SQL Veritabanları Destekleniyor

Desteklenen seçenekler

SQL Server ve Azure SQL için Apache Spark Bağlayıcısı , SQL DataSource JDBC makalesinde tanımlanan seçenekleri destekler.

Buna ek olarak, bağlayıcı aşağıdaki seçenekleri destekler:

Seçenek Varsayılan Açıklama
reliabilityLevel BEST_EFFORT BEST_EFFORT veya NO_DUPLICATES. NO_DUPLICATES yürütücü yeniden başlatma senaryolarında güvenilir bir ekleme uygular
dataPoolDataSource none none değerin ayarlanmadığını ve bağlayıcının SQL Server tek örneğe yazması gerektiğini belirtir. Büyük Veri Kümelerinde bir veri havuzu tablosu yazmak için bu değeri veri kaynağı adı olarak ayarlayın.
isolationLevel READ_COMMITTED Yalıtım düzeyini belirtin
tableLock false TABLOCK seçeneğiyle yazma performansını iyileştiren bir ekleme uygular
schemaCheckEnabled true False olarak ayarlandığında katı veri çerçevesi ve SQL tablo şeması denetimini devre dışı bırakır

Diğer toplu kopyalama seçeneklerini üzerinde dataframeseçenekler olarak ayarlayın. Bağlayıcı, bu seçenekleri yazma işlemi sırasında bulkcopy API'lerine geçirir.

Performans karşılaştırması

SQL Server ve Azure SQL için Apache Spark Bağlayıcısı, SQL Server'a yazmak için genel JDBC bağlayıcısından 15 kat daha hızlıdır. Performans özellikleri türüne, veri hacmine, kullanılan seçeneklere göre değişir ve her çalıştırma arasındaki varyasyonları gösterebilir. Aşağıdaki performans sonuçları, spark dataframe içinde 143,9M satır içeren bir SQL tablosunun üzerine yazılma süresidir. Spark, spark dataframestore_sales kullanılarak oluşturulan HDFS tablosu okunarak oluşturulur. Okuma süresi, store_sales ile dataframe arasındaki zaman hariç tutulur. Sonuçlar, üç çalıştırmadan elde edilen ortalamadır.

Bağlayıcı Türü Seçenekler Açıklama Yazma zamanı
JDBCConnector Varsayılan Varsayılan seçeneklerle genel JDBC bağlayıcısı 1.385 saniye
sql-spark-connector BEST_EFFORT Varsayılan seçeneklerle en iyi çaba gösterme sql-spark-connector 580 saniye
sql-spark-connector NO_DUPLICATES Güvenilir sql-spark-connector 709 saniye
sql-spark-connector BEST_EFFORT + tabLock=true Tablo kilidi etkinken en iyi çaba sql-spark-connector 72 saniye
sql-spark-connector NO_DUPLICATES + tabLock=true Tablo kilidi etkinleştirildiğinde güvenilir sql-spark-connector 198 saniye

Yapılandırma

  • Spark yapılandırması: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
  • Data Gen yapılandırması: "scale_factor"=50, "partitioned_tables"=true
  • Satır sayısı 143.997.590 olan veri dosyası store_sales

Çevre

Sık karşılaşılan sorunlar

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Bu hata, Hadoop ortamınızda sürücünün eski bir sürümünü mssql kullandığınızda oluşur. Bağlayıcı artık bu sürücüyü içerir. Daha önce Azure SQL Bağlayıcısı'nı kullandıysanız ve Microsoft Entra kimlik doğrulaması uyumluluğu için kümenize sürücüleri el ile yüklediyseniz, bu sürücüleri kaldırın.

Hatayı düzeltmek için:

  1. Genel bir Hadoop ortamı kullanıyorsanız, aşağıdaki komutla JAR'ı denetleyin ve kaldırın mssql : rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Databricks kullanıyorsanız, sürücünün eski sürümlerini klasörden mssql/databricks/jars kaldırmak için genel veya küme başlatma betiği ekleyin veya bu satırı mevcut bir betike ekleyin: rm /databricks/jars/*mssql*

  2. adal4j ve mssql paketlerini ekleyin. Örneğin, Maven'ı kullanabilirsiniz, ancak başka yöntemler de işe yarayacaktır.

    Dikkat

    SQL Spark bağlayıcısını bu şekilde yüklemeyin.

  3. Sürücü sınıfını bağlantı yapılandırmanıza ekleyin. Örneğin:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Daha fazla bilgi için çözümüne https://github.com/microsoft/sql-spark-connector/issues/26 bakın.

Başlayın

SQL Server ve Azure SQL için Apache Spark Bağlayıcısı, Spark DataSourceV1 API'sini ve SQL Server Toplu API'sini temel alır. Yerleşik JDBC Spark-SQL bağlayıcısı ile aynı arabirimi kullanır. Bu tümleştirmeyi kullanarak, com.microsoft.sqlserver.jdbc.spark format parametresini güncelleyip bağlayıcıyı entegre edebilir ve mevcut Spark işlerinizi kolayca geçirebilirsiniz.

Bağlayıcıyı projelerinize eklemek için bu depoyu indirin ve SBT kullanarak JAR'yi oluşturun.

Yeni bir SQL tablosuna yazma

Uyarı

Öncelikle, overwrite mod veritabanında zaten varsa tabloyu siler. Beklenmeyen veri kaybını önlemek için bu seçeneği dikkatli kullanın.

Tabloyu yeniden oluştururken modu overwrite seçeneği truncate olmadan kullanırsanız, işlem dizinleri kaldırır. Ayrıca, sütun deposu tablosu yığın tablosuna dönüşür. Mevcut dizinleri korumak için truncate seçeneğini true olarak ayarlayın. Örneğin, .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

SQL tablosuna ekle

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Yalıtım düzeyini belirtin

Bu bağlayıcı, verileri veritabanına toplu olarak eklediğinde varsayılan olarak yalıtım düzeyini kullanır READ_COMMITTED . Yalıtım düzeyini geçersiz kılmak için şu mssqlIsolationLevel seçeneği kullanın:

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

SQL tablosundan okuma

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra kimlik doğrulama

Hizmet sorumlusu ile Python örneği

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Active Directory parolası ile Python örneği

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Active Directory kullanarak kimlik doğrulaması yapmak için gerekli bağımlılığı yükleyin.

Kullandığınız zaman ActiveDirectoryPassword, user değeri, username@domainname.com gibi UPN biçiminde olmalıdır.

Scala için yapıtı com.microsoft.aad.adal4j yükleyin.

Python için kitaplığı yükleyinadal. Bu kitaplık pip paket yöneticisi aracılığıyla kullanılabilir.

Örnekler için örnek not defterlerine bakın.