Microsoft Spark Yardımcı Programlarına Giriş

Microsoft Spark Yardımcı Programları (MSSparkUtils), ortak görevleri kolayca gerçekleştirmenize yardımcı olan yerleşik bir pakettir. MSSparkUtils'i dosya sistemleriyle çalışmak, ortam değişkenlerini almak, not defterlerini birbirine zincirleme ve gizli dizilerle çalışmak için kullanabilirsiniz. MSSparkUtils, , Scala.NET Spark (C#), ve not defterleri ile R (Preview) Synapse işlem hatlarında kullanılabilirPySpark (Python).

Önkoşullar

Azure Data Lake Storage 2. Nesil erişimini yapılandırma

Synapse not defterleri ADLS 2. Nesil hesaplarına erişmek için Microsoft Entra geçişini kullanır. ADLS 2. Nesil hesabına (veya klasörüne) erişmek için Depolama Blob Veri Katkıda Bulunanı olmanız gerekir.

Synapse işlem hatları, depolama hesaplarına erişmek için çalışma alanının Yönetilen Hizmet Kimliği'ni (MSI) kullanır. İşlem hattı etkinliklerinizde MSSparkUtils'i kullanmak için ADLS 2. Nesil hesabına (veya klasörüne) erişmek için çalışma alanı kimliğinizin blob veri katkıda bulunanı Depolama olması gerekir.

Microsoft Entra Id ve çalışma alanı MSI'nizin ADLS 2. Nesil hesabına erişimi olduğundan emin olmak için şu adımları izleyin:

  1. Azure portalını ve erişmek istediğiniz depolama hesabını açın. Erişmek istediğiniz kapsayıcıya gidebilirsiniz.

  2. Sol panelden Erişim denetimini (IAM) seçin.

  3. Rol ataması ekle sayfasını açmak için Rol ataması ekle'yi>seçin.

  4. Aşağıdaki rolü atayın. Ayrıntılı adımlar için bkz. Azure portalı kullanarak Azure rolleri atama.

    Ayar Value
    Role Depolama Blobu Veri Katılımcısı
    Şu kişiye erişim ata: USER ve MANAGEDIDENTITY
    Üyeler Microsoft Entra hesabınız ve çalışma alanı kimliğiniz

    Not

    Yönetilen kimlik adı aynı zamanda çalışma alanı adıdır.

    Add role assignment page in Azure portal.

  5. Kaydet'i seçin.

Aşağıdaki URL aracılığıyla Synapse Spark ile ADLS 2. Nesil'de verilere erişebilirsiniz:

abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>

Azure Blob Depolama erişimini yapılandırma

Synapse, Azure Blob Depolama erişmek için Paylaşılan erişim imzası (SAS) kullanır. Kodda SAS anahtarlarının açığa çıkartılmasını önlemek için Synapse çalışma alanında erişmek istediğiniz Azure Blob Depolama hesabına yeni bir bağlı hizmet oluşturmanızı öneririz.

Azure Blob Depolama hesabına yeni bir bağlı hizmet eklemek için şu adımları izleyin:

  1. Azure Synapse Studio'yu açın.
  2. Sol panelden Yönet'i seçin ve Dış bağlantılar'ın altında Bağlı hizmetler'iseçin.
  3. Sağdaki Yeni bağlı Hizmet panelinde Azure Blob Depolama arayın.
  4. Devam'ı seçin.
  5. Bağlı hizmet adına erişmek ve yapılandırmak için Azure Blob Depolama Hesabını seçin. Kimlik Doğrulama yöntemi için Hesap anahtarını kullanmayı önerin.
  6. Ayarların doğru olduğunu doğrulamak için Bağlantıyı test et'i seçin.
  7. Değişikliklerinizi kaydetmek için önce Oluştur'u seçin ve Tümünü yayımla'ya tıklayın.

Synapse Spark ile Azure Blob Depolama verilerine aşağıdaki URL aracılığıyla erişebilirsiniz:

wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>

Aşağıda bir kod örneği verilmiştir:

from pyspark.sql import SparkSession

# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name

blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely

wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)

spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name


val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)

var blob_account_name = "";  // replace with your blob name
var blob_container_name = "";     // replace with your container name
var blob_relative_path = "";  // replace with your relative folder path
var linked_service_name = "";    // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);

spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);

var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";

Console.WriteLine(wasbs_path);

# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name

blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)

print( paste('Remote blob path: ',wasb_path))

Azure Key Vault'a erişimi yapılandırma

Synapse'te kimlik bilgilerinizi yönetmek için bağlı hizmet olarak bir Azure Key Vault ekleyebilirsiniz. Azure Key Vault'u Synapse bağlı hizmeti olarak eklemek için şu adımları izleyin:

  1. Azure Synapse Studio'yu açın.

  2. Sol panelden Yönet'i seçin ve Dış bağlantılar'ın altında Bağlı hizmetler'iseçin.

  3. Sağ taraftaki Yeni bağlı Hizmet panelinde Azure Key Vault'ta arama yapma.

  4. Bağlı hizmet adına erişmek ve bu adı yapılandırmak için Azure Key Vault Hesabını seçin.

  5. Ayarların doğru olduğunu doğrulamak için Bağlantıyı test et'i seçin.

  6. Önce Oluştur'u seçin ve değişikliğinizi kaydetmek için Tümünü yayımla'ya tıklayın.

Synapse not defterleri, Azure Key Vault'a erişmek için Microsoft Entra geçişini kullanır. Synapse işlem hatları, Azure Key Vault'a erişmek için çalışma alanı kimliğini (MSI) kullanır. Kodunuzun hem not defterinde hem de Synapse işlem hattında çalıştığından emin olmak için hem Microsoft Entra hesabınız hem de çalışma alanı kimliğiniz için gizli dizi erişimi izni vermenizi öneririz.

Çalışma alanı kimliğinize gizli dizi erişimi vermek için şu adımları izleyin:

  1. Erişmek istediğiniz Azure portalını ve Azure Key Vault'ı açın.
  2. Sol panelden Erişim ilkeleri'ni seçin.
  3. Erişim İlkesi Ekle'yi seçin:
    • Yapılandırma şablonu olarak Anahtar, Gizli Dizi ve Sertifika Yönetimi'ni seçin.
    • Seçme sorumlusunda Microsoft Entra hesabınızı ve çalışma alanı kimliğinizi (çalışma alanı adınızla aynı) seçin veya zaten atanmış olduğundan emin olun.
  4. Seç ve Ekle'yi seçin.
  5. Değişiklikleri işlemek için Kaydet düğmesini seçin.

Dosya sistemi yardımcı programları

mssparkutils.fsAzure Data Lake Storage 2. Nesil (ADLS 2. Nesil) ve Azure Blob Depolama gibi çeşitli dosya sistemleriyle çalışmak için yardımcı programlar sağlar. Azure Data Lake Storage 2. Nesil ve Azure Blob Depolama erişimini uygun şekilde yapılandırdığınızdan emin olun.

Kullanılabilir yöntemlere genel bakış için aşağıdaki komutları çalıştırın:

from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()

Sonuçlar:


mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory

Use mssparkutils.fs.help("methodName") for more info about a method.

Dosyaları listeleme

Dizinin içeriğini listeleme.

mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")

Dosya özelliklerini görüntüleme

Dosya adı, dosya yolu, dosya boyutu, dosya değiştirme süresi ve bunun bir dizin ve dosya olup olmadığı gibi dosya özelliklerini döndürür.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
    file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
    Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
    writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}

Yeni dizin oluşturma

Mevcut değilse ve gerekli üst dizinler varsa verilen dizini oluşturur.

mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")

Dosyayı kopyalama

Bir dosya veya dizin kopyalar. Dosya sistemleri arasında kopyalamayı destekler.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)

Performanslı kopyalama dosyası

Bu yöntem, özellikle büyük hacimli verileri kopyalamak veya taşımak için daha hızlı bir yol sağlar.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively

Not

Yöntemi yalnızca Spark 3.3 ve Spark 3.4'te destekler.

Dosya içeriğini önizleme

Verilen dosyanın ilk 'maxBytes' baytlarına kadar UTF-8 ile kodlanmış bir Dize olarak döndürür.

mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)

Dosyayı taşıma

Bir dosyayı veya dizini taşır. Dosya sistemleri arasında taşımayı destekler.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Dosya yazma

Verilen dizeyi UTF-8 ile kodlanmış bir dosyaya yazar.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Dosyaya içerik ekleme

Verilen dizeyi UTF-8 ile kodlanmış bir dosyaya ekler.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Dosya veya dizin silme

Bir dosyayı veya dizini kaldırır.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Not defteri yardımcı programları

Desteklenmiyor.

MSSparkUtils Not Defteri Yardımcı Programları'nı kullanarak bir not defteri çalıştırabilir veya not defterinden bir değerle çıkabilirsiniz. Kullanılabilir yöntemlere genel bir bakış elde etmek için aşağıdaki komutu çalıştırın:

mssparkutils.notebook.help()

Sonuçları alın:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Not

Not defteri yardımcı programları Apache Spark iş tanımları (SJD) için geçerli değildir.

Not defterine başvurma

Not defterine başvurun ve çıkış değerini döndürür. İç içe işlev çağrılarını etkileşimli olarak veya işlem hattında bir not defterinde çalıştırabilirsiniz. Başvurulan not defteri, not defterinin bu işlevi çağırdığı Spark havuzunda çalışır.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Örneğin:

mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })

Çalıştırma tamamlandıktan sonra, hücre çıkışında 'Not defteri çalıştırmasını görüntüle: Not Defteri Adı' adlı bir anlık görüntü bağlantısı görürsünüz. Bu özel çalıştırmanın anlık görüntüsünü görmek için bağlantıya tıklayabilirsiniz.

Screenshot of a snap link python

Başvuru birden çok not defterini paralel olarak çalıştırma

yöntemi mssparkutils.notebook.runMultiple() , birden çok not defterini paralel veya önceden tanımlanmış bir topolojik yapıyla çalıştırmanıza olanak tanır. API, spark oturumunda çok iş parçacıklı bir uygulama mekanizması kullanıyor ve bu da işlem kaynaklarının başvuru not defteri çalıştırmaları tarafından paylaşıldığı anlamına gelir.

ile mssparkutils.notebook.runMultiple()şunları yapabilirsiniz:

  • Her birinin bitip bitmeden birden çok not defterini aynı anda yürütür.

  • Basit bir JSON biçimi kullanarak not defterleriniz için bağımlılıkları ve yürütme sırasını belirtin.

  • Spark işlem kaynaklarının kullanımını iyileştirin ve Synapse projelerinizin maliyetini azaltın.

  • Çıktıdaki her not defteri çalıştırma kaydının anlık görüntülerini görüntüleyin ve not defteri görevlerinizin hatalarını rahatça ayıklayın/izleyin.

  • Her yönetici etkinliğinin çıkış değerini alın ve bunları aşağı akış görevlerinde kullanın.

Örneği ve ayrıntılı kullanımı bulmak için mssparkutils.notebook.help("runMultiple") komutunu çalıştırmayı da deneyebilirsiniz.

Aşağıda, bu yöntemi kullanarak not defterlerinin listesini paralel olarak çalıştırmanın basit bir örneği verilmiştir:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Kök not defterinin yürütme sonucu aşağıdaki gibidir:

Screenshot of reference a list of notebooks.

Aşağıda kullanarak mssparkutils.notebook.runMultiple()topolojik yapıya sahip not defterlerini çalıştırma örneği verilmiştir. Bir kod deneyimi aracılığıyla not defterlerini kolayca düzenlemek için bu yöntemi kullanın.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG)

Not

  • Yöntemi yalnızca Spark 3.3 ve Spark 3.4'te destekler.
  • Birden çok not defteri çalıştırmasının paralellik derecesi, Spark oturumunun toplam kullanılabilir işlem kaynağıyla sınırlıdır.

Not defterinden çıkma

Not defterinden bir değerle çıkar. İç içe işlev çağrılarını etkileşimli olarak veya işlem hattında bir not defterinde çalıştırabilirsiniz.

  • Etkileşimli olarak bir not defterinden exit() işlevini çağırdığınızda, Azure Synapse bir özel durum oluşturur, alt sorgu hücrelerini çalıştırmayı atlar ve Spark oturumunu canlı tutar.

  • Synapse işlem hattında işlev çağıran bir exit() not defterini düzenlerseniz Azure Synapse bir çıkış değeri döndürür, işlem hattı çalıştırmasını tamamlar ve Spark oturumunu durdurur.

  • Başvurulmakta olan bir not defterindeki bir exit() işlevi çağırdığınızda, Azure Synapse başvurulmakta olan not defterinde daha fazla yürütmeyi durdurur ve not defterinde işlevi çağıran sonraki hücreleri çalıştırmaya run() devam eder. Örneğin: Notebook1 üç hücreye sahiptir ve ikinci hücredeki bir exit() işlevi çağırır. Not Defteri2'de üçüncü hücrede beş hücre ve çağrı run(notebook1) vardır. Notebook2'yi çalıştırdığınızda, not defteri1 işleve basıldığında ikinci hücrede exit() durdurulur. Not Defteri2, dördüncü hücresini ve beşinci hücresini çalıştırmaya devam eder.

mssparkutils.notebook.exit("value string")

Örneğin:

Örnek1 not defteri, aşağıdaki iki hücreye sahip klasörün/ altında bulunur:

  • hücre 1, varsayılan değeri 10 olarak ayarlanmış bir giriş parametresi tanımlar.
  • 2. hücre, çıkış değeri olarak giriş içeren not defterinden çıkar.

Screenshot of a sample notebook

Sample1'i varsayılan değerlerle başka bir not defterinde çalıştırabilirsiniz:


exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

Sonuçlar:

Sample1 run success with input is 10

Sample1'i başka bir not defterinde çalıştırabilir ve giriş değerini 20 olarak ayarlayabilirsiniz:

exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)

Sonuçlar:

Sample1 run success with input is 20

MSSparkUtils Not Defteri Yardımcı Programları'nı kullanarak bir not defteri çalıştırabilir veya not defterinden bir değerle çıkabilirsiniz. Kullanılabilir yöntemlere genel bir bakış elde etmek için aşağıdaki komutu çalıştırın:

mssparkutils.notebook.help()

Sonuçları alın:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Not defterine başvurma

Not defterine başvurun ve çıkış değerini döndürür. İç içe işlev çağrılarını etkileşimli olarak veya işlem hattında bir not defterinde çalıştırabilirsiniz. Başvurulan not defteri, not defterinin bu işlevi çağırdığı Spark havuzunda çalışır.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Örneğin:

mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))

Çalıştırma tamamlandıktan sonra, hücre çıkışında 'Not defteri çalıştırmasını görüntüle: Not Defteri Adı' adlı bir anlık görüntü bağlantısı görürsünüz. Bu özel çalıştırmanın anlık görüntüsünü görmek için bağlantıya tıklayabilirsiniz.

Screenshot of a snap link scala

Not defterinden çıkma

Not defterinden bir değerle çıkar. İç içe işlev çağrılarını etkileşimli olarak veya işlem hattında bir not defterinde çalıştırabilirsiniz.

  • Bir exit() işlevi etkileşimli olarak bir not defteri olarak çağırdığınızda Azure Synapse bir özel durum oluşturur, alt sorgu hücrelerini çalıştırmayı atlar ve Spark oturumunu canlı tutar.

  • Synapse işlem hattında işlev çağıran bir exit() not defterini düzenlerseniz Azure Synapse bir çıkış değeri döndürür, işlem hattı çalıştırmasını tamamlar ve Spark oturumunu durdurur.

  • Başvurulmakta olan bir not defterindeki bir exit() işlevi çağırdığınızda, Azure Synapse başvurulmakta olan not defterinde daha fazla yürütmeyi durdurur ve not defterinde işlevi çağıran sonraki hücreleri çalıştırmaya run() devam eder. Örneğin: Notebook1 üç hücreye sahiptir ve ikinci hücredeki bir exit() işlevi çağırır. Not Defteri2'de üçüncü hücrede beş hücre ve çağrı run(notebook1) vardır. Notebook2'yi çalıştırdığınızda, not defteri1 işleve basıldığında ikinci hücrede exit() durdurulur. Not Defteri2, dördüncü hücresini ve beşinci hücresini çalıştırmaya devam eder.

mssparkutils.notebook.exit("value string")

Örneğin:

Sample1 not defteri mssparkutils/folder/ altında aşağıdaki iki hücreyi bulur:

  • hücre 1, varsayılan değeri 10 olarak ayarlanmış bir giriş parametresi tanımlar.
  • 2. hücre, çıkış değeri olarak giriş içeren not defterinden çıkar.

Screenshot of a sample notebook

Sample1'i varsayılan değerlerle başka bir not defterinde çalıştırabilirsiniz:


val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)

Sonuçlar:

exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10

Sample1'i başka bir not defterinde çalıştırabilir ve giriş değerini 20 olarak ayarlayabilirsiniz:

val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)

Sonuçlar:

exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20

MSSparkUtils Not Defteri Yardımcı Programları'nı kullanarak bir not defteri çalıştırabilir veya not defterinden bir değerle çıkabilirsiniz. Kullanılabilir yöntemlere genel bir bakış elde etmek için aşağıdaki komutu çalıştırın:

mssparkutils.notebook.help()

Sonuçları alın:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Not defterine başvurma

Not defterine başvurun ve çıkış değerini döndürür. İç içe işlev çağrılarını etkileşimli olarak veya işlem hattında bir not defterinde çalıştırabilirsiniz. Başvurulan not defteri, not defterinin bu işlevi çağırdığı Spark havuzunda çalışır.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Örneğin:

mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))

Çalıştırma tamamlandıktan sonra, hücre çıkışında 'Not defteri çalıştırmasını görüntüle: Not Defteri Adı' adlı bir anlık görüntü bağlantısı görürsünüz. Bu özel çalıştırmanın anlık görüntüsünü görmek için bağlantıya tıklayabilirsiniz.

Not defterinden çıkma

Not defterinden bir değerle çıkar. İç içe işlev çağrılarını etkileşimli olarak veya işlem hattında bir not defterinde çalıştırabilirsiniz.

  • Bir exit() işlevi etkileşimli olarak bir not defteri olarak çağırdığınızda Azure Synapse bir özel durum oluşturur, alt sorgu hücrelerini çalıştırmayı atlar ve Spark oturumunu canlı tutar.

  • Synapse işlem hattında işlev çağıran bir exit() not defterini düzenlerseniz Azure Synapse bir çıkış değeri döndürür, işlem hattı çalıştırmasını tamamlar ve Spark oturumunu durdurur.

  • Başvurulmakta olan bir not defterindeki bir exit() işlevi çağırdığınızda, Azure Synapse başvurulmakta olan not defterinde daha fazla yürütmeyi durdurur ve not defterinde işlevi çağıran sonraki hücreleri çalıştırmaya run() devam eder. Örneğin: Notebook1 üç hücreye sahiptir ve ikinci hücredeki bir exit() işlevi çağırır. Not Defteri2'de üçüncü hücrede beş hücre ve çağrı run(notebook1) vardır. Notebook2'yi çalıştırdığınızda, not defteri1 işleve basıldığında ikinci hücrede exit() durdurulur. Not Defteri2, dördüncü hücresini ve beşinci hücresini çalıştırmaya devam eder.

mssparkutils.notebook.exit("value string")

Örneğin:

Örnek1 not defteri, aşağıdaki iki hücreye sahip klasörün/ altında bulunur:

  • hücre 1, varsayılan değeri 10 olarak ayarlanmış bir giriş parametresi tanımlar.
  • 2. hücre, çıkış değeri olarak giriş içeren not defterinden çıkar.

Screenshot of a sample notebook

Sample1'i varsayılan değerlerle başka bir not defterinde çalıştırabilirsiniz:


exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

Sonuçlar:

Sample1 run success with input is 10

Sample1'i başka bir not defterinde çalıştırabilir ve giriş değerini 20 olarak ayarlayabilirsiniz:

exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)

Sonuçlar:

Sample1 run success with input is 20

Kimlik bilgileri yardımcı programları

Bağlı hizmetlerin erişim belirteçlerini almak ve Azure Key Vault'ta gizli dizileri yönetmek için MSSparkUtils Kimlik Bilgileri Yardımcı Programları'nı kullanabilirsiniz.

Kullanılabilir yöntemlere genel bir bakış elde etmek için aşağıdaki komutu çalıştırın:

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()

Sonucu al:

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

Not

Şu anda getSecretWithLS(linkedService, secret) C# dilinde desteklenmiyor.

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

Belirteç alma

Belirli bir hedef kitle için Microsoft Entra belirtecini döndürür, ad (isteğe bağlı). Aşağıdaki tabloda kullanılabilir tüm hedef kitle türleri listelenmiştir:

hedef kitle türü API çağrısında kullanılacak dize değişmez değeri
Azure Depolama Storage
Azure Key Vault Vault
Azure Yönetimi AzureManagement
Azure SQL Veri Ambarı (Ayrılmış ve Sunucusuz) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Veri Gezgini AzureDataExplorer
MySQL için Azure Veritabanı AzureOSSDB
MariaDB için Azure Veritabanı AzureOSSDB
PostgreSQL için Azure Veritabanı AzureOSSDB
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')

Belirteci doğrulama

Belirtecin süresi dolmadıysa true döndürür.

mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')

Bağlı hizmet için bağlantı dizesi veya kimlik bilgilerini alma

Bağlı hizmet için bağlantı dizesi veya kimlik bilgilerini döndürür.

mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')

Çalışma alanı kimliğini kullanarak gizli dizi alma

Çalışma alanı kimliğini kullanarak belirli bir Azure Key Vault adı, gizli dizi adı ve bağlı hizmet adı için Azure Key Vault gizli dizisini döndürür. Azure Key Vault erişimini uygun şekilde yapılandırdığınızdan emin olun.

mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')

Kullanıcı kimlik bilgilerini kullanarak gizli dizi alma

Kullanıcı kimlik bilgilerini kullanarak belirli bir Azure Key Vault adı, gizli dizi adı ve bağlı hizmet adı için Azure Key Vault gizli dizisini döndürür.

mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')

Çalışma alanı kimliğini kullanarak gizli dizi koyma

Çalışma alanı kimliğini kullanarak belirli bir Azure Key Vault adı, gizli dizi adı ve bağlı hizmet adı için Azure Key Vault gizli dizisini yerleştirir. Azure Key Vault erişimini uygun şekilde yapılandırdığınızdan emin olun.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

Çalışma alanı kimliğini kullanarak gizli dizi koyma

Çalışma alanı kimliğini kullanarak belirli bir Azure Key Vault adı, gizli dizi adı ve bağlı hizmet adı için Azure Key Vault gizli dizisini yerleştirir. Azure Key Vault erişimini uygun şekilde yapılandırdığınızdan emin olun.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")

Çalışma alanı kimliğini kullanarak gizli dizi koyma

Çalışma alanı kimliğini kullanarak belirli bir Azure Key Vault adı, gizli dizi adı ve bağlı hizmet adı için Azure Key Vault gizli dizisini yerleştirir. Azure Key Vault erişimini uygun şekilde yapılandırdığınızdan emin olun.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

Kullanıcı kimlik bilgilerini kullanarak gizli dizi koyma

Kullanıcı kimlik bilgilerini kullanarak belirli bir Azure Key Vault adı, gizli dizi adı ve bağlı hizmet adı için Azure Key Vault gizli dizisini yerleştirir.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

Kullanıcı kimlik bilgilerini kullanarak gizli dizi koyma

Kullanıcı kimlik bilgilerini kullanarak belirli bir Azure Key Vault adı, gizli dizi adı ve bağlı hizmet adı için Azure Key Vault gizli dizisini yerleştirir.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

Kullanıcı kimlik bilgilerini kullanarak gizli dizi koyma

Kullanıcı kimlik bilgilerini kullanarak belirli bir Azure Key Vault adı, gizli dizi adı ve bağlı hizmet adı için Azure Key Vault gizli dizisini yerleştirir.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")

Ortam yardımcı programları

Kullanılabilir yöntemlere genel bir bakış elde etmek için aşağıdaki komutları çalıştırın:

mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()

Sonucu al:

getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id

Kullanıcı adını alma

Geçerli kullanıcı adını döndürür.

mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()

Kullanıcı kimliğini alma

Geçerli kullanıcı kimliğini döndürür.

mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()

İş kimliğini alma

İş kimliğini döndürür.

mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()

Çalışma alanı adını alma

Çalışma alanı adını döndürür.

mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()

Havuz adını alma

Spark havuzu adını döndürür.

mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()

Küme kimliğini alma

Geçerli küme kimliğini döndürür.

mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()

Çalışma Zamanı Bağlamı

3 çalışma zamanı özelliği kullanıma sunulan Mssparkutils çalışma zamanı yardımcı programları, aşağıdaki gibi listelenen özellikleri almak için mssparkutils çalışma zamanı bağlamını kullanabilirsiniz:

  • Notebookname - Geçerli not defterinin adı her zaman hem etkileşimli mod hem de işlem hattı modu için değer döndürür.
  • Pipelinejobid - İşlem hattı çalıştırma kimliği, işlem hattı modunda değer döndürür ve boş dizeyi etkileşimli modda döndürür.
  • Activityrunid - Not defteri etkinlik çalıştırma kimliği, işlem hattı modunda değer döndürür ve boş dizeyi etkileşimli modda döndürür.

Şu anda çalışma zamanı bağlamı hem Python hem de Scala'ya destek sağlar.

mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
    writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context

Oturum yönetimi

Etkileşimli oturumu durdurma

Durdur düğmesine el ile tıklamak yerine, bazen kodda bir API çağırarak etkileşimli oturumu durdurmak daha uygundur. Bu gibi durumlarda kod aracılığıyla etkileşimli oturumun durdurulmasını destekleyen bir API mssparkutils.session.stop() sağlıyoruz. Scala ve Python için kullanılabilir.

mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()

mssparkutils.session.stop() API, geçerli etkileşimli oturumu arka planda zaman uyumsuz olarak durdurur, Spark oturumunu durdurur ve oturum tarafından kullanılan kaynakları aynı havuzdaki diğer oturumların kullanımına sunulması için serbest bırakır.

Not

Scala'da veya sys.exit() kodunuzda Python'da olduğu gibi sys.exit dil yerleşik API'lerini çağırmanızı önermeyiz çünkü bu API'ler yalnızca yorumlayıcı işlemini sonlandırarak Spark oturumunu canlı bırakır ve kaynaklar yayımlanmaz.

Paket Bağımlılıkları

Yerel olarak not defterleri veya işler geliştirmek istiyorsanız ve derleme/IDE ipuçları için ilgili paketlere başvurmanız gerekiyorsa, aşağıdaki paketleri kullanabilirsiniz.

Sonraki adımlar