Interakcja z usługą Azure Cosmos DB przy użyciu platformy Apache Spark 2 w usłudze Azure Synapse Link

Uwaga

Aby uzyskać link Azure Synapse dla usługi Azure Cosmos DB przy użyciu platformy Spark 3, zapoznaj się z tym artykułem Azure Synapse Link dla usługi Azure Cosmos DB na platformie Spark 3

Z tego artykułu dowiesz się, jak korzystać z usługi Azure Cosmos DB przy użyciu usługi Synapse Apache Spark 2. Dzięki pełnej obsłudze języków Scala, Python, SparkSQL i C# usługa Synapse Apache Spark jest centralną funkcją analizy, inżynierii danych, nauki o danych i eksploracji danych w usłudze Azure Synapse Link dla usługi Azure Cosmos DB.

Podczas interakcji z usługą Azure Cosmos DB obsługiwane są następujące możliwości:

  • Usługa Synapse Apache Spark umożliwia analizowanie danych w kontenerach usługi Azure Cosmos DB, które są włączone za pomocą usługi Azure Synapse Link w czasie niemal rzeczywistym bez wpływu na wydajność obciążeń transakcyjnych. Dostępne są następujące dwie opcje umożliwiające wykonywanie zapytań do magazynu analitycznego usługi Azure Cosmos DB z platformy Spark:
    • Ładowanie do ramki danych platformy Spark
    • Tworzenie tabeli platformy Spark
  • Usługa Synapse Apache Spark umożliwia również pozyskiwanie danych do usługi Azure Cosmos DB. Należy pamiętać, że dane są zawsze pozyskiwane do kontenerów usługi Azure Cosmos DB za pośrednictwem magazynu transakcyjnego. Po włączeniu Synapse Link wszystkie nowe wstawki, aktualizacje i usunięcia są automatycznie synchronizowane z magazynem analitycznym.
  • Usługa Synapse Apache Spark obsługuje również przesyłanie strumieniowe ze strukturą platformy Spark z usługą Azure Cosmos DB jako źródłem, a także ujściem.

W poniższych sekcjach przedstawiono składnię powyższych możliwości. Możesz również wyewidencjonować moduł Learn, aby dowiedzieć się, jak wykonywać zapytania dotyczące usługi Azure Cosmos DB przy użyciu platformy Apache Spark na potrzeby Azure Synapse Analytics. Gesty w obszarze roboczym usługi Azure Synapse Analytics zostały zaprojektowane w celu zapewnienia łatwego gotowego środowiska do rozpoczęcia pracy. Gesty są widoczne po kliknięciu prawym przyciskiem myszy kontenera usługi Azure Cosmos DB na karcie Dane obszaru roboczego usługi Synapse. Za pomocą gestów można szybko wygenerować kod i dostosować go do własnych potrzeb. Gesty są również idealne do odnajdywania danych jednym kliknięciem.

Ważne

Należy pamiętać o pewnych ograniczeniach w schemacie analitycznym, które mogą prowadzić do nieoczekiwanego zachowania operacji ładowania danych. Na przykład tylko pierwsze 1000 właściwości ze schematu transakcyjnego są dostępne w schemacie analitycznym, właściwości z spacjami są niedostępne itp. Jeśli występują nieoczekiwane wyniki, sprawdź ograniczenia schematu magazynu analitycznego , aby uzyskać więcej szczegółów.

Wykonywanie zapytań względem magazynu analitycznego usługi Azure Cosmos DB

Przed zapoznaniem się z dwiema możliwymi opcjami wykonywania zapytań dotyczących magazynu analitycznego usługi Azure Cosmos DB, załadowanie do ramki danych platformy Spark i utworzenie tabeli Platformy Spark warto zapoznać się z różnicami w środowisku, aby można było wybrać opcję, która będzie odpowiednia dla Twoich potrzeb.

Różnica w środowisku polega na tym, czy zmiany danych bazowych w kontenerze usługi Azure Cosmos DB powinny być automatycznie odzwierciedlane w analizie wykonywanej na platformie Spark. Po zarejestrowaniu ramki danych platformy Spark lub utworzeniu tabeli Platformy Spark w magazynie analitycznym kontenera metadane wokół bieżącej migawki danych w magazynie analitycznym są pobierane do platformy Spark w celu wydajnego wypychania kolejnych analiz. Należy pamiętać, że ponieważ platforma Spark jest zgodna z leniwymi zasadami oceny, chyba że akcja jest wywoływana na ramce danych Spark lub w tabeli SparkSQL wykonywane jest zapytanie Spark, rzeczywiste dane nie są pobierane z magazynu analitycznego bazowego kontenera.

W przypadku ładowania do ramki danych Spark pobrane metadane są buforowane przez okres istnienia sesji Spark. Tym samym kolejne akcje wywoływane dla ramki danych są oceniane względem migawki magazynu analitycznego z czasu utworzenia ramki danych.

Z drugiej strony w przypadku utworzenia tabeli Spark metadane stanu magazynu analitycznego nie są buforowane na platformie Spark i są ponownie ładowane przy każdym wykonaniu zapytania SparkSQL względem tabeli Spark.

W związku z tym możesz wybrać między ładowaniem do ramki danych Spark i utworzeniem tabeli Spark w zależności od tego, czy analiza platformy Spark ma być oceniana względem, odpowiednio, stałej migawki magazynu analitycznego, czy najnowszej migawki magazynu analitycznego.

Jeśli zapytania analityczne mają często używane filtry, możesz partycjonować na podstawie tych pól w celu uzyskania lepszej wydajności zapytań. Można okresowo wykonywać zadanie partycjonowania z notesu platformy Spark Azure Synapse w celu wyzwolenia partycjonowania w magazynie analitycznym. Ten podzielony na partycje magazyn wskazuje podstawowe konto magazynu usługi ADLS Gen2 połączone z obszarem roboczym Azure Synapse. Aby dowiedzieć się więcej, zobacz wprowadzenie do partycjonowania niestandardowego i sposób konfigurowania niestandardowych artykułów dotyczących partycjonowania .

Uwaga

Aby wykonywać zapytania dotyczące kont usługi Azure Cosmos DB dla bazy danych MongoDB, dowiedz się więcej o reprezentacji schematu pełnej wierności w magazynie analitycznym i rozszerzonych nazwach właściwości do użycia.

Uwaga

Należy pamiętać, że w options poniższych poleceniach jest uwzględniana wielkość liter. Na przykład należy użyć funkcji Gateway while, aby gateway zwrócić błąd.

Ładowanie do ramki danych platformy Spark

W tym przykładzie utworzysz ramkę danych platformy Spark wskazującą magazyn analityczny usługi Azure Cosmos DB. Następnie możesz wykonać dodatkową analizę, wywołując akcje platformy Spark względem ramki danych. Ta operacja nie ma wpływu na magazyn transakcyjny.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Równoważna składnia w języku Scala będzie następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Tworzenie tabeli platformy Spark

W tym przykładzie utworzysz tabelę platformy Spark, która wskazuje magazyn analityczny usługi Azure Cosmos DB. Następnie możesz przeprowadzić dodatkową analizę, wywołując zapytania SparkSQL względem tabeli. Ta operacja nie wpływa ani na magazyn transakcyjny, ani nie powoduje żadnego przenoszenia danych. Jeśli zdecydujesz się usunąć tę tabelę Spark, podstawowy kontener usługi Azure Cosmos DB i odpowiedni magazyn analityczny nie będą miały wpływu.

Ten scenariusz jest wygodny do ponownego użycia tabel Platformy Spark za pomocą narzędzi innych firm i zapewnia ułatwienia dostępu do danych bazowych w czasie wykonywania.

Składnia do utworzenia tabeli Spark jest następująca:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Uwaga

Jeśli masz scenariusze, w których schemat bazowego kontenera usługi Azure Cosmos DB zmienia się w czasie; I jeśli chcesz, aby zaktualizowany schemat automatycznie odzwierciedlał zapytania względem tabeli Spark, możesz to osiągnąć, ustawiając spark.cosmos.autoSchemaMerge opcję na true w opcjach tabeli spark.

Zapisywanie ramki danych platformy Spark w kontenerze usługi Azure Cosmos DB

W tym przykładzie napiszesz ramkę danych Platformy Spark do kontenera usługi Azure Cosmos DB. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowania w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

Równoważna składnia w języku Scala będzie następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Ładowanie przesyłania strumieniowego ramki danych z kontenera

W tym gestie użyjesz funkcji przesyłania strumieniowego platformy Spark, aby załadować dane z kontenera do ramki danych. Dane będą przechowywane na podstawowym koncie usługi Data Lake (i systemie plików) połączonym z obszarem roboczym.

Uwaga

Jeśli chcesz odwołać się do bibliotek zewnętrznych w usłudze Synapse Apache Spark, dowiedz się więcej tutaj. Jeśli na przykład chcesz pozyskać ramkę danych platformy Spark do kontenera usługi Azure Cosmos DB dla bazy danych MongoDB, możesz użyć łącznika bazy danych MongoDB dla platformy Spark.

Ładowanie przesyłania strumieniowego ramki danych z kontenera usługi Azure Cosmos DB

W tym przykładzie użyjesz funkcji przesyłania strumieniowego ze strukturą platformy Spark, aby załadować dane z kontenera usługi Azure Cosmos DB do ramki danych przesyłania strumieniowego Spark przy użyciu funkcji zestawienia zmian w usłudze Azure Cosmos DB. Dane punktu kontrolnego używane przez platformę Spark będą przechowywane na podstawowym koncie usługi Data Lake (i systemie plików), które zostało połączone z obszarem roboczym.

Jeśli folder /localReadCheckpointFolder nie zostanie utworzony (w poniższym przykładzie), zostanie utworzony automatycznie. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowania w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

Równoważna składnia w języku Scala będzie następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Zapisywanie przesyłania strumieniowego ramki danych do kontenera usługi Azure Cosmos DB

W tym przykładzie napiszesz obiekt DataFrame przesyłania strumieniowego do kontenera usługi Azure Cosmos DB. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowania w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych. Jeśli folder /localWriteCheckpointFolder nie zostanie utworzony (w poniższym przykładzie), zostanie utworzony automatycznie.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

Równoważna składnia w języku Scala będzie następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Następne kroki