Rychlý start: Správa dat pomocí konektoru Azure Cosmos DB Spark 3 OLTP pro rozhraní API pro NoSQL

PLATÍ PRO: NoSQL

Tento kurz je úvodní příručka, která vám ukáže, jak používat konektor Spark služby Azure Cosmos DB ke čtení nebo zápisu do služby Azure Cosmos DB. Konektor Spark pro Azure Cosmos DB podporuje Spark 3.1.x a 3.2.x a 3.3.x.

V tomto rychlém kurzu spoléháme na Azure Databricks Runtime 12.2 se Sparkem 3.3.2 a Jupyter Notebook, který vám ukáže, jak používat konektor Spark pro Azure Cosmos DB.

Měli byste být schopni používat libovolný jazyk podporovaný Sparkem (PySpark, Scala, Java atd.) nebo jakékoli rozhraní Sparku, které znáte (Jupyter Notebook, Livy atd.).

Požadavky

SLF4J je potřeba jenom v případě, že plánujete používat protokolování, ale také stáhnout vazbu SLF4J, která propojí rozhraní API SLF4J s implementací protokolování podle vašeho výběru. Další informace najdete v uživatelské příručce K SLF4J .

Nainstalujte konektor Spark služby Azure Cosmos DB do clusteru Spark s použitím nejnovější verze pro Spark 3.3.x.

Příručka Začínáme je založená na PySpark/Scala a následující fragment kódu můžete spustit v poznámkovém bloku PySpark/Scala v Azure Databricks.

Vytváření databází a kontejnerů

Nejprve nastavte přihlašovací údaje účtu služby Azure Cosmos DB a název databáze Azure Cosmos DB a název kontejneru.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

Dále můžete pomocí nového rozhraní API pro katalog vytvořit databázi a kontejner Azure Cosmos DB prostřednictvím Sparku.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create an Azure Cosmos DB database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create an Azure Cosmos DB container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

Při vytváření kontejnerů pomocí rozhraní API pro katalog můžete nastavit propustnost a cestu ke klíči oddílu pro kontejner, který se má vytvořit.

Další informace najdete v úplné dokumentaci k rozhraní API pro katalog .

Ingestace dat

Název zdroje dat je cosmos.oltpa následující příklad ukazuje, jak do služby Azure Cosmos DB zapsat datový rámec paměti, který se skládá ze dvou položek:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Všimněte si, že id pole je pro službu Azure Cosmos DB povinné.

Další informace týkající se ingestování dat najdete v úplné dokumentaci ke konfiguraci zápisu .

Dotazování dat

Pomocí stejného cosmos.oltp zdroje dat se můžeme dotazovat na data a použít filter k nasdílení filtrů:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

Další informace týkající se dotazování na data najdete v úplné dokumentaci ke konfiguraci dotazů .

Částečná aktualizace dokumentu pomocí opravy

Pomocí stejného cosmos.oltp zdroje dat můžeme provést částečnou aktualizaci ve službě Azure Cosmos DB pomocí rozhraní API pro opravy:

cfgPatch = {"spark.cosmos.accountEndpoint": cosmosEndpoint,
          "spark.cosmos.accountKey": cosmosMasterKey,
          "spark.cosmos.database": cosmosDatabaseName,
          "spark.cosmos.container": cosmosContainerName,
          "spark.cosmos.write.strategy": "ItemPatch",
          "spark.cosmos.write.bulk.enabled": "false",
          "spark.cosmos.write.patch.defaultOperationType": "Set",
          "spark.cosmos.write.patch.columnConfigs": "[col(name).op(set)]"
          }

id = "<document-id>"
query = "select * from cosmosCatalog.{}.{} where id = '{}';".format(
    cosmosDatabaseName, cosmosContainerName, id)

dfBeforePatch = spark.sql(query)
print("document before patch operation")
dfBeforePatch.show()

data = [{"id": id, "name": "Joel Brakus"}]
patchDf = spark.createDataFrame(data)

patchDf.write.format("cosmos.oltp").mode("Append").options(**cfgPatch).save()

dfAfterPatch = spark.sql(query)
print("document after patch operation")
dfAfterPatch.show()

Další ukázky související s částečnou aktualizací dokumentů najdete v ukázce kódu GitHubu v ukázce opravy.

Odvozování schémat

Při dotazování na data může konektor Sparku odvodit schéma na základě vzorkování existujících položek nastavením spark.cosmos.read.inferSchema.enabled na truehodnotu .

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()


# Alternatively, you can pass the custom schema you want to be used to read the data:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(customSchema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

# If no custom schema is specified and schema inference is disabled, then the resulting data will be returning the raw Json content of the items:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

Další informace související s odvozováním schématu najdete v úplné dokumentaci ke konfiguraci odvozování schématu .

Referenční informace ke konfiguraci

Konektor Azure Cosmos DB Spark 3 OLTP pro rozhraní API pro NoSQL obsahuje kompletní referenční informace o konfiguraci, které poskytují pokročilejší nastavení pro zápis a dotazování dat, serializaci, streamování pomocí kanálu změn, správu oddílů a propustnosti a další. Kompletní výpis s podrobnostmi najdete v referenčních informacích ke konfiguraci konektorů Spark na GitHubu.

Ověřování služby Azure Active Directory

  1. Postupujte podle pokynů k registraci aplikace pomocí Azure AD a vytvoření instančního objektu.

  2. Stále byste měli být v Azure Portal > Registrace aplikací Azure Active Directory>. V části Certificates & secrets vytvořte nový tajný kód. Uložte si hodnotu na později.

  3. Klikněte na kartu Přehled a vyhledejte hodnoty pro clientId a tenantId, společně s clientSecret hodnotami, které jste vytvořili dříve a cosmosEndpoint, subscriptionIda resourceGroupNameze svého účtu. Následujícím způsobem vytvořte poznámkový blok a nahraďte konfigurace příslušnými hodnotami:

    cosmosDatabaseName = "AADsampleDB"
    cosmosContainerName = "sampleContainer"
    authType = "ServicePrinciple"
    cosmosEndpoint = "<replace with URI of your Cosmos DB account>"
    subscriptionId = "<replace with subscriptionId>"
    tenantId = "<replace with Directory (tenant) ID from the portal>"
    resourceGroupName = "<replace with the resourceGroup name>"
    clientId = "<replace with Application (client) ID from the portal>"
    clientSecret = "<replace with application secret value you created earlier>"
    
    cfg = {
        "spark.cosmos.accountEndpoint" : cosmosEndpoint,
        "spark.cosmos.auth.type" : authType,
        "spark.cosmos.account.subscriptionId" : subscriptionId,
        "spark.cosmos.account.tenantId" : tenantId,
        "spark.cosmos.account.resourceGroupName" : resourceGroupName,
        "spark.cosmos.auth.aad.clientId" : clientId,
        "spark.cosmos.auth.aad.clientSecret" : clientSecret,
        "spark.cosmos.database" : cosmosDatabaseName,
        "spark.cosmos.container" : cosmosContainerName
    }
    
    # Configure Catalog Api to be used
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", authType)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", subscriptionId)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", tenantId)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", resourceGroupName)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", clientId)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", clientSecret)
    
    # create an Azure Cosmos DB database using catalog api
    spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
    
    # create an Azure Cosmos DB container using catalog api
    spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))
    
    spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
      .toDF("id","name","age","isAlive") \
       .write\
       .format("cosmos.oltp")\
       .options(**cfg)\
       .mode("APPEND")\
       .save()
    
    

    Tip

    V tomto příkladu rychlého startu se přihlašovací údaje přiřazují k proměnným ve formátu nemazaného textu, ale z důvodu zabezpečení doporučujeme používat tajné kódy. Projděte si pokyny k zabezpečení přihlašovacích údajů v Azure Synapse Apache Sparku s propojenými službami pomocí TokenLibrary. Nebo pokud používáte Databricks, přečtěte si, jak vytvořit obor tajných kódů Azure Key Vault nebo obor tajných kódů založený na Databricks. Informace o konfiguraci tajných kódů najdete v tématu Přidání tajných kódů do konfigurace Sparku.

  4. Vytvořte roli pomocí az role definition create příkazu . Předejte název účtu cosmos DB a skupinu prostředků a za nimi text JSON, který definuje vlastní roli. Následující příklad vytvoří roli s názvem SparkConnectorAAD s oprávněním ke čtení a zápisu položek v kontejnerech Cosmos DB. Role je také vymezená na úroveň účtu pomocí /příkazu .

    resourceGroupName='<myResourceGroup>'
    accountName='<myCosmosAccount>'
    az cosmosdb sql role definition create \
       --account-name $accountName \
       --resource-group $resourceGroupName \
       --body '{
           "RoleName": "SparkConnectorAAD",
           "Type": "CustomRole",
           "AssignableScopes": ["/"],
           "Permissions": [{
               "DataActions": [
                   "Microsoft.DocumentDB/databaseAccounts/readMetadata",
                   "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*",
                   "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*"
               ]
           }]
       }'
    
  5. Teď vypište definici role, kterou jste vytvořili pro načtení jejího ID:

        az cosmosdb sql role definition list --account-name $accountName --resource-group $resourceGroupName 
    
  6. Měla by se vrátit odpověď podobná níže. Poznamenejte si id hodnotu.

        [
          {
            "assignableScopes": [
              "/subscriptions/<mySubscriptionId>/resourceGroups/<myResourceGroup>/providers/Microsoft.DocumentDB/databaseAccounts/<myCosmosAccount>"
            ],
            "id": "/subscriptions/<mySubscriptionId>/resourceGroups/<myResourceGroup>/providers/Microsoft.DocumentDB/databaseAccounts/<myCosmosAccount>/sqlRoleDefinitions/<roleDefinitionId>",
            "name": "<roleDefinitionId>",
            "permissions": [
              {
                "dataActions": [
                  "Microsoft.DocumentDB/databaseAccounts/readMetadata",
                  "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*",
                  "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*"
                ],
                "notDataActions": []
              }
            ],
            "resourceGroup": "<myResourceGroup>",
            "roleName": "MyReadWriteRole",
            "sqlRoleDefinitionGetResultsType": "CustomRole",
            "type": "Microsoft.DocumentDB/databaseAccounts/sqlRoleDefinitions"
          }
        ]
    
  7. Teď přejděte na Azure Portal >podnikové aplikace Azure Active Directory > a vyhledejte aplikaci, kterou jste vytvořili dříve. Tady si poznamenejte ID objektu.

    Poznámka

    Nezapomeňte použít jeho ID objektu, které najdete v části Podnikové aplikace v okně portálu Azure Active Directory (a ne v oddílu Registrace aplikací, který jste použili dříve).

  8. Teď vytvořte přiřazení role. Nahraďte id objektu <aadPrincipalId> za ID objektu, které jste si poznamenali výše (všimněte si, že se NEJEDNÁ o ID objektu viditelné v zobrazení registrace aplikací, které jste viděli dříve). Také nahraďte <myResourceGroup> a <myCosmosAccount> odpovídajícím způsobem v níže uvedeném příkladu. id Nahraďte <roleDefinitionId> hodnotou načtenou ze spuštění příkazu, který jste spustili az cosmosdb sql role definition list výše. Pak v Azure CLI spusťte příkaz:

    resourceGroupName='<myResourceGroup>'
    accountName='<myCosmosAccount>'
    readOnlyRoleDefinitionId='<roleDefinitionId>' # as fetched above
    # For Service Principals make sure to use the Object ID as found in the Enterprise applications section of the Azure Active Directory portal blade.
    principalId='<aadPrincipalId>'
    az cosmosdb sql role assignment create --account-name $accountName --resource-group $resourceGroupName --scope "/" --principal-id $principalId --role-definition-id $readOnlyRoleDefinitionId
    
  9. Teď, když jste vytvořili instanční objekt a aplikaci Azure Active Directory, vytvořili vlastní roli a přiřadili oprávnění této role k vašemu účtu služby Cosmos DB, byste měli být schopni spustit poznámkový blok.

Migrace na konektor Spark 3

Pokud používáte náš starší konektor Spark 2.4, tady najdete informace o migraci na konektor Spark 3.

Další kroky