Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
Collegamento a Synapse per Cosmos DB non è più disponibile per i nuovi progetti. Non usare questa funzionalità.
Si prega di utilizzare il mirroring di Azure Cosmos DB per Microsoft Fabric, che è ora disponibile a livello generale. Il mirroring offre gli stessi vantaggi zero-ETL ed è completamente integrato con Microsoft Fabric. Per maggiori informazioni, vedere Panoramica del mirroring di Cosmos DB.
Usare Change Data Capture (CDC) nello store analitico di Azure Cosmos DB come origine per Azure Data Factory o Azure Synapse Analytics per catturare modifiche specifiche sui tuoi dati.
Prerequisiti
- Un account di Azure Cosmos DB esistente.
- Se si dispone di una sottoscrizione di Azure, creare un nuovo account.
- Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
Abilitare l'archivio analitico
Abilitare prima di tutto Azure Collegamento a Synapse a livello di account e quindi abilitare l'archivio analitico per i contenitori appropriati per il carico di lavoro.
Abilitare Azure Collegamento a Synapse: Abilita Collegamento a Synapse di Azure per un account Azure Cosmos DB
Abilitare l'archivio analitico per i contenitori. Per ulteriori informazioni e istruzioni specifiche, vedere abilita Azure Collegamento a Synapse per i tuoi nuovi contenitori.
Creare una risorsa Azure di destinazione usando i flussi di dati
La funzionalità Change Data Capture dell'archivio analitico è disponibile tramite la funzionalità flusso di dati di Azure Data Factory o Azure Synapse Analytics. Per questa guida, usare Azure Data Factory.
Importante
In alternativa, è possibile usare Azure Synapse Analytics. Prima di tutto, creare un'area di lavoro Azure Synapse, se non ne esiste già una. Nell'area di lavoro appena creata selezionare la scheda Sviluppo, quindi Aggiungi nuova risorsa e infine Flusso di dati.
Creare un Azure Data Factory, se non ne hai già uno.
Suggerimento
Se possibile, creare la data factory nella stessa area in cui risiede l'account Azure Cosmos DB.
Avviare la data factory appena creata.
Nella data factory selezionare la scheda Flussi di dati e quindi selezionare Nuovo flusso di dati.
Assegnare al flusso di dati appena creato un nome univoco. In questo esempio il flusso di dati è denominato
cosmoscdc.
Configurare le impostazioni di origine per il contenitore dell'archivio analitico
Creare e configurare un'origine per il flusso dei dati dall'archivio analitico dell'account Azure Cosmos DB.
Annotazioni
Il connettore Collegamento a Synapse Spark di Azure Cosmos DB non supporta l'identità gestita.
Selezionare Aggiungi origine.
Nel campo Nome flusso di output immettere cosmos.
Nella sezione Tipo di origine selezionare Inline.
Nel campo Dataset selezionare Azure - Azure Cosmos DB per NoSQL.
Creare un nuovo servizio collegato per l'account denominato cosmoslinkedservice. Selezionare il Azure Cosmos DB esistente per NoSQL account nella finestra di dialogo popup Nuovo servizio collegato quindi selezionare Ok. In questo esempio viene selezionato un Azure Cosmos DB preesistente per NoSQL account denominato
msdocs-cosmos-sourcee un database denominatocosmicworks.
Selezionare Analitico per il tipo di archivio.
Selezionare la scheda Opzioni origine.
In Opzioni origine selezionare il contenitore di destinazione e abilitare Debug del flusso di dati. In questo esempio il contenitore è denominato
products.
Selezionare Debug flusso di dati. Nella finestra di dialogo popup Attiva il debug del flusso di dati mantenere le opzioni predefinite e quindi scegliere OK.
La scheda Opzioni origine contiene anche altre opzioni che è possibile abilitare. Questa tabella descrive tali opzioni:
| Opzione | Description |
|---|---|
| Acquisisci aggiornamenti intermedi | Abilitare questa opzione se si vuole acquisire la cronologia delle modifiche apportate agli elementi, incluse le modifiche intermedie tra le letture di Change Data Capture. |
| Acquisisci eliminazioni | Abilitare questa opzione per acquisire i record eliminati dall'utente e applicarli nel sink. Le operazioni di eliminazione non possono essere applicate ai sink di Esplora dati di Azure e Azure Cosmos DB. |
| Acquisisci record TTL dell'archivio transazionale | Abilitare questa opzione per acquisire i record eliminati dell'archivio transazionale di Azure Cosmos DB con TTL (time-to-live) e applicarli alla destinazione (sink). Le eliminazioni TTL non possono essere applicate ai sink di Esplora dati di Azure e di Azure Cosmos DB. |
| Dimensioni batch in byte | Questa impostazione è infatti espressa in gigabyte. Specificare le dimensioni in gigabyte se si vuole raggruppare in batch i feed di Change Data Capture |
| Configurazioni aggiuntive | Altre Azure Cosmos DB configurazioni dell'archivio analitico e i relativi valori. (Ad esempio: spark.cosmos.allowWhiteSpaceInFieldNames -> true) |
Utilizzo delle opzioni origine
Quando si seleziona una delle opzioni Capture intermediate updates, Capture Deletes e Capture Transactional store TTLs, il processo CDC creerà e popola il campo __usr_opType nel sink con i valori seguenti:
| Value | Description | Opzione |
|---|---|---|
| 1 | Aggiornamento | Acquisisci aggiornamenti intermedi |
| 2 | INSERT | Non esiste un'opzione per gli inserimenti, è attivata per impostazione predefinita |
| 3 | USER_DELETE | Acquisisci eliminazioni |
| 4 | TTL_DELETE | Acquisisci record TTL dell'archivio transazionale |
Se è necessario distinguere i record TTL eliminati dai documenti eliminati da utenti o applicazioni, è possibile controllare entrambe le Capture intermediate updates opzioni e Capture Transactional store TTLs . È quindi necessario adattare i processi, le applicazioni o le query CDC per usare __usr_opType in base alle esigenze aziendali.
Suggerimento
Se è necessario che i consumer downstream ripristinino l'ordine degli aggiornamenti con l'opzione "Acquisisci aggiornamenti intermedi" selezionata, il campo timestamp _ts di sistema può essere usato come campo di ordinamento.
Creare e configurare le impostazioni del sink per le operazioni di aggiornamento ed eliminazione
Prima di tutto, creare un sink Archiviazione BLOB di Azure e quindi configurare il sink per filtrare i dati solo per operazioni specifiche.
Creare un account e un contenitore Archiviazione BLOB di Azure, se non ne è già disponibile uno. Per gli esempi successivi si userà un account denominato
msdocsblobstoragee un contenitore denominatooutput.Suggerimento
Se possibile, creare l'account di archiviazione nella stessa area in cui risiede l'account Azure Cosmos DB.
Tornare in Azure Data Factory e creare un nuovo sink per i dati delle modifiche acquisiti dall'origine
cosmos.
Assegnare al sink un nome univoco. In questo esempio il sink è denominato
storage.
Nella sezione Tipo di sink selezionare Inline. Nel campo Set di dati selezionare Delta.
Creare un nuovo servizio collegato per l'account usando Archiviazione BLOB di Azure denominato storagelinkedservice. Selezionare l'account di Archiviazione BLOB di Azure esistente nella finestra di dialogo popup Nuovo servizio collegato quindi selezionare Ok. In questo esempio viene selezionato un account Archiviazione BLOB di Azure preesistente denominato
msdocsblobstorage.
Selezionare la scheda Impostazioni.
In Impostazioni impostare Percorso cartella sul nome del contenitore BLOB. In questo esempio il nome del contenitore è
output.
Individuare la sezione Metodo aggiornamento e modificare le selezioni in modo da consentire solo le operazioni di eliminazione e aggiornamento. Specificare anche le Colonne chiave come Elenco di colonne utilizzando il campo
{_rid}come identificatore univoco.
Selezionare Convalida per assicurarsi che non siano presenti errori o omissioni. Selezionare quindi Pubblica per pubblicare il flusso di dati.
Pianificare l'esecuzione di Change Data Capture
Dopo la pubblicazione di un flusso di dati, è possibile aggiungere una nuova pipeline per spostare e trasformare i dati.
Crea una nuova pipeline. Assegnare alla pipeline un nome univoco. In questo esempio la pipeline è denominata
cosmoscdcpipeline.
Nella sezione Attività espandere l'opzione Sposta e trasforma e quindi selezionare Flusso di dati.
Assegnare un nome univoco all'attività del flusso di dati. In questo esempio l'attività è denominata
cosmoscdcactivity.Nella scheda Impostazioni selezionare il flusso di dati denominato
cosmoscdccreato in precedenza in questa guida. Selezionare quindi una dimensione di calcolo in base al volume dei dati e alla latenza necessaria per il carico di lavoro.
Suggerimento
Per dimensioni incrementali dei dati maggiori di 100 GB, è consigliabile usare le dimensioni Personalizzate con un numero di core pari a 32 (+16 core driver).
Selezionare + Aggiungi trigger. Pianificare l'esecuzione di questa pipeline alla cadenza appropriata per il carico di lavoro. In questo esempio la pipeline è configurata per l'esecuzione ogni cinque minuti.
Annotazioni
La finestra di ricorrenza minima per le esecuzioni di Change Data Capture è di un minuto.
Selezionare Convalida per assicurarsi che non siano presenti errori o omissioni. Selezionare quindi Pubblica per pubblicare la pipeline.
Osservare i dati inseriti nel contenitore di Archiviazione BLOB di Azure come output del flusso di dati usando la cattura dei dati delle modifiche nello store analitico di Azure Cosmos DB.
Annotazioni
Il tempo di avvio iniziale del cluster può richiedere fino a tre minuti. Per evitare il tempo di avvio del cluster nelle esecuzioni successive di Change Data Capture, configurare il valore Durata (TTL) del cluster del flusso di dati. Per ulteriori informazioni sul runtime di integrazione e sul tempo di vita (TTL), vedere integration runtime in Azure Data Factory.
Processi simultanei
Le dimensioni del batch nelle opzioni per l'origine o i casi in cui il sink è lento nell'inserimento del flusso delle modifiche, possono causare l'esecuzione di più processi contemporaneamente. Per evitare questa situazione, impostare l'opzione Concorrenza su 1 nelle impostazioni della pipeline per assicurarsi che le nuove esecuzioni non vengano attivate fino al completamento dell'esecuzione corrente.