Interakcja z usługą Azure Cosmos DB przy użyciu platformy Apache Spark 3 w usłudze Azure Synapse Link

W tym artykule dowiesz się, jak korzystać z usługi Azure Cosmos DB przy użyciu usługi Synapse Apache Spark 3. Dzięki pełnej obsłudze języków Scala, Python, SparkSQL i C# usługa Synapse Apache Spark 3 jest kluczowa dla analizy, inżynierii danych, nauki o danych i scenariuszy eksploracji danych w usłudze Azure Synapse Link dla usługi Azure Cosmos DB.

Następujące możliwości są obsługiwane podczas interakcji z usługą Azure Cosmos DB:

  • Usługa Synapse Apache Spark 3 umożliwia analizowanie danych w kontenerach usługi Azure Cosmos DB, które są włączone za pomocą usługi Azure Synapse Link w czasie niemal rzeczywistym bez wpływu na wydajność obciążeń transakcyjnych. Następujące dwie opcje są dostępne do wykonywania zapytań względem magazynu analitycznego usługi Azure Cosmos DB z platformy Spark:
    • Ładowanie do ramki danych platformy Spark
    • Tworzenie tabeli platformy Spark
  • Usługa Synapse Apache Spark umożliwia również pozyskiwanie danych do usługi Azure Cosmos DB. Należy pamiętać, że dane są zawsze pozyskiwane do kontenerów usługi Azure Cosmos DB za pośrednictwem magazynu transakcyjnego. Po włączeniu Synapse Link wszystkie nowe wstawki, aktualizacje i usunięcia są automatycznie synchronizowane z magazynem analitycznym.
  • Usługa Synapse Apache Spark obsługuje również przesyłanie strumieniowe ze strukturą platformy Spark z usługą Azure Cosmos DB jako źródłem, a także ujściem.

W poniższych sekcjach przedstawiono składnię powyższych możliwości. Możesz również wyewidencjonować moduł Learn, aby dowiedzieć się, jak wykonywać zapytania dotyczące usługi Azure Cosmos DB za pomocą platformy Apache Spark na potrzeby usługi Azure Synapse Analytics. Gesty w obszarze roboczym usługi Azure Synapse Analytics zostały zaprojektowane w celu zapewnienia łatwego gotowego środowiska do rozpoczęcia pracy. Gesty są widoczne po kliknięciu prawym przyciskiem myszy kontenera usługi Azure Cosmos DB na karcie Dane obszaru roboczego usługi Synapse. Za pomocą gestów można szybko wygenerować kod i dostosować go do własnych potrzeb. Gesty są również idealne do odnajdywania danych jednym kliknięciem.

Ważne

Należy pamiętać o pewnych ograniczeniach w schemacie analitycznym, które mogą prowadzić do nieoczekiwanego zachowania operacji ładowania danych. Na przykład tylko pierwsze 1000 właściwości ze schematu transakcyjnego są dostępne w schemacie analitycznym, właściwości z spacjami są niedostępne itp. Jeśli występują nieoczekiwane wyniki, sprawdź ograniczenia schematu magazynu analitycznego , aby uzyskać więcej szczegółów.

Wykonywanie zapytań względem magazynu analitycznego usługi Azure Cosmos DB

Przed zapoznaniem się z dwiema możliwymi opcjami wykonywania zapytań względem magazynu analitycznego usługi Azure Cosmos DB, ładowanie do ramki danych Platformy Spark i tworzenie tabeli Spark warto zapoznać się z różnicami w środowisku, aby można było wybrać opcję, która działa dla Twoich potrzeb.

Różnica w środowisku polega na tym, czy podstawowe zmiany danych w kontenerze usługi Azure Cosmos DB powinny zostać automatycznie odzwierciedlone w analizie wykonywanej na platformie Spark. Po zarejestrowaniu ramki danych platformy Spark lub utworzeniu tabeli Spark w magazynie analitycznym kontenera metadane wokół bieżącej migawki danych w magazynie analitycznym są pobierane do platformy Spark w celu wydajnego wypychania kolejnych analiz. Należy pamiętać, że ponieważ platforma Spark jest zgodna z leniwymi zasadami oceny, chyba że akcja jest wywoływana na ramce danych Spark lub w zapytaniu SparkSQL jest wykonywana względem tabeli Spark, rzeczywiste dane nie są pobierane z magazynu analitycznego kontenera bazowego.

W przypadku ładowania do ramki danych Spark pobrane metadane są buforowane przez okres istnienia sesji Spark. Tym samym kolejne akcje wywoływane dla ramki danych są oceniane względem migawki magazynu analitycznego z czasu utworzenia ramki danych.

Z drugiej strony w przypadku utworzenia tabeli Spark metadane stanu magazynu analitycznego nie są buforowane na platformie Spark i są ponownie ładowane przy każdym wykonaniu zapytania SparkSQL względem tabeli Spark.

W związku z tym możesz wybrać między ładowaniem do ramki danych Spark i utworzeniem tabeli Spark w zależności od tego, czy analiza platformy Spark ma być oceniana względem, odpowiednio, stałej migawki magazynu analitycznego, czy najnowszej migawki magazynu analitycznego.

Uwaga

Aby wykonywać zapytania dotyczące kont usługi Azure Cosmos DB dla bazy danych MongoDB, dowiedz się więcej o pełnej reprezentacji schematu wierności w magazynie analitycznym i rozszerzonych nazwach właściwości do użycia.

Uwaga

Pamiętaj, że wszystkie options poniższe polecenia są uwzględniane w wielkości liter.

Ładowanie do ramki danych platformy Spark

W tym przykładzie utworzysz ramkę danych platformy Spark wskazującą magazyn analityczny usługi Azure Cosmos DB. Następnie możesz wykonać dodatkową analizę, wywołując akcje platformy Spark względem ramki danych. Ta operacja nie ma wpływu na magazyn transakcyjny.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Równoważna składnia w języku Scala byłaby następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Tworzenie tabeli platformy Spark

W tym przykładzie utworzysz tabelę Spark, która wskazuje magazyn analityczny usługi Azure Cosmos DB. Następnie możesz wykonać dodatkową analizę, wywołując zapytania SparkSQL względem tabeli. Ta operacja nie wpływa ani na magazyn transakcyjny, ani nie powoduje żadnego przenoszenia danych. Jeśli zdecydujesz się usunąć tę tabelę Spark, bazowy kontener usługi Azure Cosmos DB i odpowiedni magazyn analityczny nie będą miały wpływu.

Ten scenariusz jest wygodny do ponownego użycia tabel Platformy Spark za pomocą narzędzi innych firm i zapewnia dostępność bazowych danych w czasie wykonywania.

Składnia do utworzenia tabeli Spark jest następująca:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Uwaga

Jeśli masz scenariusze, w których schemat bazowego kontenera usługi Azure Cosmos DB zmienia się w czasie; a jeśli chcesz, aby zaktualizowany schemat automatycznie odzwierciedlał zapytania względem tabeli Spark, możesz to osiągnąć, ustawiając spark.cosmos.autoSchemaMerge opcję na true wartość w opcjach tabeli Spark.

Zapisywanie ramki danych Spark w kontenerze usługi Azure Cosmos DB

W tym przykładzie napiszesz ramkę danych Spark do kontenera usługi Azure Cosmos DB. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowania w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

Równoważna składnia w języku Scala byłaby następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Ładowanie przesyłania strumieniowego ramki danych z kontenera

W tym gestzie użyjesz możliwości przesyłania strumieniowego platformy Spark, aby załadować dane z kontenera do ramki danych. Dane będą przechowywane na podstawowym koncie usługi Data Lake (i systemie plików) połączonym z obszarem roboczym.

Uwaga

Jeśli chcesz odwołać się do bibliotek zewnętrznych w usłudze Synapse Apache Spark, dowiedz się więcej tutaj. Jeśli na przykład chcesz pozyskać ramkę danych Spark do kontenera usługi Azure Cosmos DB dla bazy danych MongoDB, możesz skorzystać z łącznika bazy danych MongoDB dla platformy Spark tutaj.

Ładowanie przesyłania strumieniowego ramki danych z kontenera usługi Azure Cosmos DB

W tym przykładzie użyjesz funkcji przesyłania strumieniowego ze strukturą platformy Spark, aby załadować dane z kontenera usługi Azure Cosmos DB do przesyłania strumieniowego platformy Spark przy użyciu funkcji zestawienia zmian w usłudze Azure Cosmos DB. Dane punktu kontrolnego używane przez platformę Spark będą przechowywane na podstawowym koncie usługi Data Lake (i systemie plików), które zostało połączone z obszarem roboczym.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

Równoważna składnia w języku Scala byłaby następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Zapisywanie przesyłania strumieniowego ramki danych do kontenera usługi Azure Cosmos DB

W tym przykładzie napiszesz obiekt DataFrame przesyłania strumieniowego do kontenera usługi Azure Cosmos DB. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowania w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych. Jeśli folder /localWriteCheckpointFolder nie zostanie utworzony (w poniższym przykładzie), zostanie on automatycznie utworzony.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

Równoważna składnia w języku Scala byłaby następująca:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Następne kroki