Inizia con l'acquisizione di dati delle modifiche nell'archivio analitico per Azure Cosmos DB

Importante

Collegamento a Synapse per Cosmos DB non è più disponibile per i nuovi progetti. Non usare questa funzionalità.

Si prega di utilizzare il mirroring di Azure Cosmos DB per Microsoft Fabric, che è ora disponibile a livello generale. Il mirroring offre gli stessi vantaggi zero-ETL ed è completamente integrato con Microsoft Fabric. Per maggiori informazioni, vedere Panoramica del mirroring di Cosmos DB.

Usare Change Data Capture (CDC) nello store analitico di Azure Cosmos DB come origine per Azure Data Factory o Azure Synapse Analytics per catturare modifiche specifiche sui tuoi dati.

Prerequisiti

  • Un account di Azure Cosmos DB esistente.

Abilitare l'archivio analitico

Abilitare prima di tutto Azure Collegamento a Synapse a livello di account e quindi abilitare l'archivio analitico per i contenitori appropriati per il carico di lavoro.

  1. Abilitare Azure Collegamento a Synapse: Abilita Collegamento a Synapse di Azure per un account Azure Cosmos DB

  2. Abilitare l'archivio analitico per i contenitori. Per ulteriori informazioni e istruzioni specifiche, vedere abilita Azure Collegamento a Synapse per i tuoi nuovi contenitori.

Creare una risorsa Azure di destinazione usando i flussi di dati

La funzionalità Change Data Capture dell'archivio analitico è disponibile tramite la funzionalità flusso di dati di Azure Data Factory o Azure Synapse Analytics. Per questa guida, usare Azure Data Factory.

Importante

In alternativa, è possibile usare Azure Synapse Analytics. Prima di tutto, creare un'area di lavoro Azure Synapse, se non ne esiste già una. Nell'area di lavoro appena creata selezionare la scheda Sviluppo, quindi Aggiungi nuova risorsa e infine Flusso di dati.

  1. Creare un Azure Data Factory, se non ne hai già uno.

    Suggerimento

    Se possibile, creare la data factory nella stessa area in cui risiede l'account Azure Cosmos DB.

  2. Avviare la data factory appena creata.

  3. Nella data factory selezionare la scheda Flussi di dati e quindi selezionare Nuovo flusso di dati.

  4. Assegnare al flusso di dati appena creato un nome univoco. In questo esempio il flusso di dati è denominato cosmoscdc.

    Screnshot di un nuovo flusso di dati con il nome cosmoscdc.

Configurare le impostazioni di origine per il contenitore dell'archivio analitico

Creare e configurare un'origine per il flusso dei dati dall'archivio analitico dell'account Azure Cosmos DB.

Annotazioni

Il connettore Collegamento a Synapse Spark di Azure Cosmos DB non supporta l'identità gestita.

  1. Selezionare Aggiungi origine.

    Screenshot dell'opzione di menu Aggiungi origine.

  2. Nel campo Nome flusso di output immettere cosmos.

    Screenshot dell'assegnazione di un nome all'origine cosmos appena creata.

  3. Nella sezione Tipo di origine selezionare Inline.

    Screenshot della selezione del tipo di origine inline.

  4. Nel campo Dataset selezionare Azure - Azure Cosmos DB per NoSQL.

    Screenshot della selezione di Azure Cosmos DB per NoSQL come tipologia di set di dati.

  5. Creare un nuovo servizio collegato per l'account denominato cosmoslinkedservice. Selezionare il Azure Cosmos DB esistente per NoSQL account nella finestra di dialogo popup Nuovo servizio collegato quindi selezionare Ok. In questo esempio viene selezionato un Azure Cosmos DB preesistente per NoSQL account denominato msdocs-cosmos-source e un database denominato cosmicworks.

    Screenshot della finestra di dialogo Nuovo servizio collegato con un account Azure Cosmos DB selezionato.

  6. Selezionare Analitico per il tipo di archivio.

    Screenshot dell'opzione analitica selezionata per un servizio collegato.

  7. Selezionare la scheda Opzioni origine.

  8. In Opzioni origine selezionare il contenitore di destinazione e abilitare Debug del flusso di dati. In questo esempio il contenitore è denominato products.

    Screenshot di un contenitore di origine selezionato per i prodotti denominati.

  9. Selezionare Debug flusso di dati. Nella finestra di dialogo popup Attiva il debug del flusso di dati mantenere le opzioni predefinite e quindi scegliere OK.

    Screenshot dell'opzione Attiva/Disattiva per abilitare il debug del flusso di dati.

  10. La scheda Opzioni origine contiene anche altre opzioni che è possibile abilitare. Questa tabella descrive tali opzioni:

Opzione Description
Acquisisci aggiornamenti intermedi Abilitare questa opzione se si vuole acquisire la cronologia delle modifiche apportate agli elementi, incluse le modifiche intermedie tra le letture di Change Data Capture.
Acquisisci eliminazioni Abilitare questa opzione per acquisire i record eliminati dall'utente e applicarli nel sink. Le operazioni di eliminazione non possono essere applicate ai sink di Esplora dati di Azure e Azure Cosmos DB.
Acquisisci record TTL dell'archivio transazionale Abilitare questa opzione per acquisire i record eliminati dell'archivio transazionale di Azure Cosmos DB con TTL (time-to-live) e applicarli alla destinazione (sink). Le eliminazioni TTL non possono essere applicate ai sink di Esplora dati di Azure e di Azure Cosmos DB.
Dimensioni batch in byte Questa impostazione è infatti espressa in gigabyte. Specificare le dimensioni in gigabyte se si vuole raggruppare in batch i feed di Change Data Capture
Configurazioni aggiuntive Altre Azure Cosmos DB configurazioni dell'archivio analitico e i relativi valori. (Ad esempio: spark.cosmos.allowWhiteSpaceInFieldNames -> true)

Utilizzo delle opzioni origine

Quando si seleziona una delle opzioni Capture intermediate updates, Capture Deletes e Capture Transactional store TTLs, il processo CDC creerà e popola il campo __usr_opType nel sink con i valori seguenti:

Value Description Opzione
1 Aggiornamento Acquisisci aggiornamenti intermedi
2 INSERT Non esiste un'opzione per gli inserimenti, è attivata per impostazione predefinita
3 USER_DELETE Acquisisci eliminazioni
4 TTL_DELETE Acquisisci record TTL dell'archivio transazionale

Se è necessario distinguere i record TTL eliminati dai documenti eliminati da utenti o applicazioni, è possibile controllare entrambe le Capture intermediate updates opzioni e Capture Transactional store TTLs . È quindi necessario adattare i processi, le applicazioni o le query CDC per usare __usr_opType in base alle esigenze aziendali.

Suggerimento

Se è necessario che i consumer downstream ripristinino l'ordine degli aggiornamenti con l'opzione "Acquisisci aggiornamenti intermedi" selezionata, il campo timestamp _ts di sistema può essere usato come campo di ordinamento.

Creare e configurare le impostazioni del sink per le operazioni di aggiornamento ed eliminazione

Prima di tutto, creare un sink Archiviazione BLOB di Azure e quindi configurare il sink per filtrare i dati solo per operazioni specifiche.

  1. Creare un account e un contenitore Archiviazione BLOB di Azure, se non ne è già disponibile uno. Per gli esempi successivi si userà un account denominato msdocsblobstorage e un contenitore denominato output.

    Suggerimento

    Se possibile, creare l'account di archiviazione nella stessa area in cui risiede l'account Azure Cosmos DB.

  2. Tornare in Azure Data Factory e creare un nuovo sink per i dati delle modifiche acquisiti dall'origine cosmos.

    Screenshot dell'aggiunta di un nuovo sink connesso all'origine esistente.

  3. Assegnare al sink un nome univoco. In questo esempio il sink è denominato storage.

    Screenshot dell'assegnazione di un nome alla risorsa di archiviazione sink appena creata.

  4. Nella sezione Tipo di sink selezionare Inline. Nel campo Set di dati selezionare Delta.

    Screenshot della selezione e del tipo di set di dati Delta inline per il sink.

  5. Creare un nuovo servizio collegato per l'account usando Archiviazione BLOB di Azure denominato storagelinkedservice. Selezionare l'account di Archiviazione BLOB di Azure esistente nella finestra di dialogo popup Nuovo servizio collegato quindi selezionare Ok. In questo esempio viene selezionato un account Archiviazione BLOB di Azure preesistente denominato msdocsblobstorage.

    Screenshot delle opzioni del tipo di servizio per un nuovo servizio collegato Delta.

    Screenshot della finestra di dialogo 'Nuovo servizio collegato' con un account Archiviazione BLOB di Azure selezionato.

  6. Selezionare la scheda Impostazioni.

  7. In Impostazioni impostare Percorso cartella sul nome del contenitore BLOB. In questo esempio il nome del contenitore è output.

    Screenshot del contenitore BLOB denominato output impostato come destinazione del sink.

  8. Individuare la sezione Metodo aggiornamento e modificare le selezioni in modo da consentire solo le operazioni di eliminazione e aggiornamento. Specificare anche le Colonne chiave come Elenco di colonne utilizzando il campo {_rid} come identificatore univoco.

    Screenshot dei metodi di aggiornamento e delle colonne chiave specificati per il sink.

  9. Selezionare Convalida per assicurarsi che non siano presenti errori o omissioni. Selezionare quindi Pubblica per pubblicare il flusso di dati.

    Screenshot dell'opzione per convalidare e quindi pubblicare il flusso di dati corrente.

Pianificare l'esecuzione di Change Data Capture

Dopo la pubblicazione di un flusso di dati, è possibile aggiungere una nuova pipeline per spostare e trasformare i dati.

  1. Crea una nuova pipeline. Assegnare alla pipeline un nome univoco. In questo esempio la pipeline è denominata cosmoscdcpipeline.

    Screenshot della nuova opzione della pipeline all'interno della sezione resources.

  2. Nella sezione Attività espandere l'opzione Sposta e trasforma e quindi selezionare Flusso di dati.

    Screenshot dell'opzione attività flusso di dati all'interno della sezione attività.

  3. Assegnare un nome univoco all'attività del flusso di dati. In questo esempio l'attività è denominata cosmoscdcactivity.

  4. Nella scheda Impostazioni selezionare il flusso di dati denominato cosmoscdc creato in precedenza in questa guida. Selezionare quindi una dimensione di calcolo in base al volume dei dati e alla latenza necessaria per il carico di lavoro.

    Screenshot delle impostazioni di configurazione per il flusso di dati e le dimensioni di calcolo per l'attività.

    Suggerimento

    Per dimensioni incrementali dei dati maggiori di 100 GB, è consigliabile usare le dimensioni Personalizzate con un numero di core pari a 32 (+16 core driver).

  5. Selezionare + Aggiungi trigger. Pianificare l'esecuzione di questa pipeline alla cadenza appropriata per il carico di lavoro. In questo esempio la pipeline è configurata per l'esecuzione ogni cinque minuti.

    Screenshot del pulsante Aggiungi trigger per una nuova pipeline.

    Screenshot della configurazione di un trigger basato su pianificazione, a partire dall'anno 2023, che viene eseguito ogni cinque minuti.

    Annotazioni

    La finestra di ricorrenza minima per le esecuzioni di Change Data Capture è di un minuto.

  6. Selezionare Convalida per assicurarsi che non siano presenti errori o omissioni. Selezionare quindi Pubblica per pubblicare la pipeline.

  7. Osservare i dati inseriti nel contenitore di Archiviazione BLOB di Azure come output del flusso di dati usando la cattura dei dati delle modifiche nello store analitico di Azure Cosmos DB.

    Screenshot dei file di output della pipeline nel contenitore di Archiviazione BLOB di Azure.

    Annotazioni

    Il tempo di avvio iniziale del cluster può richiedere fino a tre minuti. Per evitare il tempo di avvio del cluster nelle esecuzioni successive di Change Data Capture, configurare il valore Durata (TTL) del cluster del flusso di dati. Per ulteriori informazioni sul runtime di integrazione e sul tempo di vita (TTL), vedere integration runtime in Azure Data Factory.

Processi simultanei

Le dimensioni del batch nelle opzioni per l'origine o i casi in cui il sink è lento nell'inserimento del flusso delle modifiche, possono causare l'esecuzione di più processi contemporaneamente. Per evitare questa situazione, impostare l'opzione Concorrenza su 1 nelle impostazioni della pipeline per assicurarsi che le nuove esecuzioni non vengano attivate fino al completamento dell'esecuzione corrente.

Passaggi successivi