Caricare dati in tabelle distribuite di Azure Cosmos DB per PostgreSQL

Completato

Woodgrove Bank ha fornito requisiti di caricamento dei dati che determinano l'uso di diversi metodi di inserimento. I singoli eventi di transazione arrivano rapidamente dall'applicazione contactless-payments durante il giorno e devono essere inseriti il più rapidamente possibile. La banca ha inoltre fornito file CSV contenenti eventi di transazione cronologici e dati utente che devono essere caricati in blocco nel database nel modo più efficiente possibile. Inoltre, hanno chiesto di popolare un paio di nuove tabelle dai dati cronologici dopo il caricamento.

In Azure Cosmos DB per PostgreSQL sono disponibili più approcci che è possibile usare per inserire i dati in modo efficiente in un database distribuito. Il caricamento dei dati in tabelle distribuite viene eseguito nello stesso modo in cui si caricano dati in tabelle non distribuite. La differenza principale è che è necessario specificare un valore per la colonna di distribuzione assegnata dalla tabella per ogni riga inserita.

Ogni riga di una tabella distribuita viene scritta in una partizione in base al valore della relativa colonna di distribuzione. Per identificare correttamente la partizione in cui inserire i dati, il coordinatore esegue l'hashing della colonna di distribuzione della riga. Il coordinatore confronta quindi il valore con hash con l'intervallo hash assegnato per ogni partizione. Dopo aver identificato la partizione corretta, il coordinatore instrada la query a essa, in cui viene eseguito il comando di inserimento remoto in tutte le repliche di tale partizione.

Caricare singole righe usando il comando IN edizione Standard RT

Woodgrove Bank richiede la possibilità di inserire singoli record di transazione nel database quando vengono ricevuti dall'app pagamenti senza contatto. Per inserire singole righe in tabelle distribuite, è possibile usare il comando PostgreSQL IN edizione Standard RT standard. Le tabelle distribuite vengono visualizzate come tabelle standard in SQL, ma il modo in cui le query vengono eseguite differiscono perché le tabelle sottostanti vengono partizionate orizzontalmente tra i nodi di lavoro.

Per garantire che il coordinatore possa inoltrare accuratamente le query alle partizioni corrette, è necessario specificare un valore per la colonna di distribuzione durante il caricamento dei dati nelle tabelle distribuite. In altre parole, ogni istruzione IN edizione Standard RT deve includere un valore non Null per la colonna di distribuzione della riga.

È possibile trovare la colonna di distribuzione assegnata di una tabella usando la vista tabelle distribuite nel nodo coordinatore. Ad esempio, l'esecuzione della query SELECT distribution_column FROM citus_tables WHERE table_name = 'payment_events'::regclass; rivela che la colonna di distribuzione per la payment_events tabella è il user_id campo .

Nell'esempio seguente vengono aggiunte due transazioni alla payment_events tabella nel database di Woodgrove Bank usando singoli comandi IN edizione Standard RT.

/*
-- Table schema, for reference
CREATE TABLE payment_events
(
  event_id bigint,
  event_type text,
  user_id bigint,
  merchant_id bigint,
  event_details jsonb,
  created_at timestamp
);
*/

INSERT INTO payment_events VALUES (4951447424,'SendFunds',1159138,4951447330,'{"code": 4951447330, "particulars": "twofactorauth", "reference": "2factorauth"}','1/12/16 5:22');

INSERT INTO payment_events VALUES (4951447488,'RequestFunds',1171503,4951447340,'{"code": 4951447340, "particulars": "vue", "reference": "vuejs"}','1/12/16 5:22');

Poiché ogni riga contiene un valore valido per la user_id colonna, le due righe vengono inserite correttamente nella payment_events tabella. Esaminare ora cosa accade se si tenta di inserire una riga in cui il valore della colonna di distribuzione è null:

INSERT INTO payment_events VALUES (4951447499,'GiftFunds',null,4951447350,'{"code": 4951447350, "particulars": "twofactorauth", "reference": "2factorauth"}','1/12/16 5:22');

Questa INSERT istruzione genera un errore:

ERROR: cannot perform an INSERT with NULL in the partition column

Quando si esegue il INSERT comando per aggiungere dati a una tabella distribuita, è essenziale ricordare che è necessario specificare la colonna di distribuzione. In questo modo il coordinatore può determinare la partizione a cui aggiungere la riga nel cluster.

Combinare istruzioni IN edizione Standard RT per migliorare l'efficienza

Grazie alla tua esperienza con l'app di pagamento senza contatto di Woodgrove Bank, sai che ricevono un numero elevato di transazioni, molte delle quali arrivano simultaneamente durante il giorno. Una tecnica che è possibile usare per migliorare l'efficienza dell'inserimento di singole righe consiste nell'elaborare le transazioni in batch, combinando più istruzioni insert in un'unica istruzione contenente numerose righe. Questo approccio elimina la necessità di eseguire query di database ripetute. Ad esempio, è possibile inserire più righe di transazione contemporaneamente come segue:

INSERT INTO payment_events VALUES 
  (4951447425,'GiftFunds',1159138,4951447350,'{"code": 4951447350, "particulars": "twofactorauth", "reference": "2factorauth"}','1/12/16 5:22'),
  (4951447489,'SendFunds',1171503,4951447360,'{"code": 4951447360, "particulars": "vue", "reference": "vuejs"}','1/12/16 5:22'),
  (4961447699,'RequestFunds',1171503,4951447370,'{"code": 4951447370, "reference": "Lombiq", "particulars": "Orchard-User-Notifications"}','1/12/16 5:22');

Caricare in blocco i dati con il comando COPY

Quando sono necessarie frequenze di inserimento più elevate, il COPY comando consente di caricare i dati in blocco. È possibile usare il COPY comando per caricare i dati direttamente in tabelle distribuite da un'applicazione usando l'opzione FROM STDIN , i file e altre origini. Quando si usa il COPY comando per scrivere dati in tabelle distribuite, i dati vengono copiati in modo asincrono nei nodi di lavoro usando una connessione parallela per ogni posizionamento di partizioni. Il coordinatore instrada i dati ai nodi di lavoro appropriati, consentendo l'inserimento dei dati usando più ruoli di lavoro e core in parallelo.

Woodgrove Bank ha richiesto di fornire un meccanismo per il caricamento bulk dei dati cronologici. I dati forniti vengono archiviati in file con valori delimitati da virgole (CSV). I comandi seguenti illustrano come scaricare il events.csv file nel nodo coordinatore e quindi caricare in blocco i dati del file nella payment_events tabella:

-- Download the events.csv file
\! curl -O https://raw.githubusercontent.com/MicrosoftDocs/mslearn-create-connect-postgresHyperscale/main/events.csv

-- Bulk load the data from the file into the payment_events table
\COPY payment_events FROM 'events.csv' WITH (format CSV)

È anche possibile combinare le istruzioni precedenti in un singolo comando usando la FROM PROGRAM clausola per informare il coordinatore di recuperare i file di dati da un'applicazione in esecuzione nel coordinatore. In questo caso, si indica al coordinatore di usare l'applicazione curl per scaricare un file dall'URL specificato. L'opzione WITH CSV fornisce informazioni sul formato del file inserito.

\COPY payment_events FROM PROGRAM 'curl https://raw.githubusercontent.com/MicrosoftDocs/mslearn-create-connect-postgresHyperscale/main/events.csv' WITH CSV

Il COPY comando offre un modo pratico e rapido di caricare i dati direttamente dai file. Si noti, tuttavia, che quando la destinazione è una tabella distribuita, ogni riga nel file di origine deve contenere un valore per la colonna di distribuzione.

Caricare dati da file nell'archivio BLOB usando l'estensione pg_azure_storage

Un metodo alternativo per il caricamento bulk dei dati dei file consiste nell'usare l'estensione pg_azure_storage . pg_azure_storageè una nuova estensione PostgreSQL sviluppata da Microsoft che consente di usare e caricare in blocco i dati archiviati nei file ospitati in Archiviazione BLOB di Azure.

I file devono essere aggiunti o migrati a un account Archiviazione di Azure prima di poter sfruttare le funzionalità di questa estensione. Lo spostamento di file in Archiviazione BLOB di Azure consente di usare un servizio di archiviazione sicuro nativo del cloud.

Per iniziare, è necessario caricare l'estensione:

-- Install the extension in the database
SELECT create_extension('azure_storage');

L'estensione pg_azure_storage include il account_add() metodo . Questo metodo connette un account di archiviazione al database e consente l'accesso ai file all'interno dell'account. Per connettersi, è necessario specificare il nome e la chiave di accesso dell'account di archiviazione usando la sintassi seguente:

-- Provide the storage account credentials
SELECT azure_storage.account_add('{STORAGE_ACCOUNT_NAME}', '{STORAGE_ACCOUNT_ACCESS_KEY}');

La fornitura di una chiave di accesso è necessaria solo quando si lavora con i dati nei contenitori con un livello di accesso "Privato (nessun accesso anonimo)". Si supponga di voler eseguire il pull dei dati da un contenitore il cui livello di accesso è impostato su "Contenitore (accesso in lettura anonimo per contenitori e BLOB)." In tal caso, è possibile inserire file da Archiviazione BLOB di Azure usando gli URL pubblici ed enumerare il contenuto del contenitore senza configurare la chiave dell'account nell'estensionepg_azure_storage.

Ad esempio, è stato creato un account di archiviazione denominato stlearnpostgresql per Woodgrove Bank e sono stati aggiunti un paio di file in un contenitore privato denominato historical-data. Per fornire l'accesso a tale account dal database, è necessario eseguire la query seguente, specificando la chiave di accesso dell'account di archiviazione recuperata dalla pagina chiavi di accesso dell'account di Archiviazione nella portale di Azure:

SELECT azure_storage.account_add('stlearnpostgresql', '4zzZGx4sUk8TBVnHnoPYt0G7A3w9/DJJBCfdxeeb+VDrR8P3bSwWA3lYsTvO1HwYzrt4XQ37iIEL+AStWuQ/uA==');

Ora che si è connessi all'account di archiviazione, è possibile elencare i BLOB all'interno di un contenitore denominato. Per visualizzare i file nel historical-data contenitore, eseguire la query seguente:

SELECT path, content_type, pg_size_pretty(bytes)
FROM azure_storage.blob_list('stlearnpostgresql', 'historical-data');

La blob_list() funzione restituisce tutti i BLOB all'interno del contenitore specificato:

    path    | content_type | pg_size_pretty 
------------+--------------+----------------
 events.csv | text/csv     | 17 MB
 users.csv  | text/csv     | 29 MB

Woodgrove Bank ha fornito l'elenco di colonne per i dati nel users.csv file, in base allo schema di tabella seguente:

/*
-- Table structure and distribution details provided for reference
CREATE TABLE payment_users
(
    user_id bigint PRIMARY KEY,
    url text,
    login text,
    avatar_url text
);

SELECT created_distributed_table('payment_users', 'user_id');
*/

Informano anche che il file CSV non contiene una riga di intestazione.

Si supponga di non aver fornito dettagli sui dati all'interno del file. In tal caso, è possibile usare il portale di Azure per visualizzare in anteprima i file di dimensioni inferiori a 2,1 MB oppure scaricare una copia del file e aprirla rapidamente per esplorare la struttura del file.

Ora che si conoscono i dati contenuti nel file, è possibile soddisfare la richiesta di Woodgrove Bank di caricare in blocco i dati cronologici dai file. Per caricare in blocco i dati dai file nell'archiviazione BLOB, pg_azure_storage estende il comando PostgreSQL COPY nativo per renderlo in grado di gestire Archiviazione BLOB di Azure URL di risorse. Questa funzionalità è abilitata per impostazione predefinita ed è possibile gestirla usando l'impostazione azure_storage.enable_copy_command . Usando il comando esteso COPY , eseguire il comando seguente per inserire i dati utente nella payment_users tabella:

-- Bulk load data from the user.csv file in Blob Storage into the payment_users table
COPY payment_users FROM 'https://stlearnpostgresql.blob.core.windows.net/historical-data/users.csv';

L'output del COPY comando specifica il numero di righe copiate nella tabella, ad esempio COPY 264197.

Per altre informazioni sull'estensione pg_azure_storage , leggere la documentazione e completare il modulo Estendere la funzionalità di Azure Cosmos DB per PostgreSQL usando le estensioni .

Popolare le tabelle usando la clausola FROM edizione Standard LECT

Nell'ambito del caricamento bulk dei dati cronologici di Woodgrove Bank, hanno richiesto di creare un paio di nuove tabelle dai dati esistenti. In primo luogo, vogliono una tabella di aggregazione contenente il numero di eventi per tipo per utente, che useranno per popolare un dashboard. In secondo luogo, vorrebbero estrarre record commercianti univoci dalla payment_events tabella in una nuova tabella commercianti per un'analisi futura dell'attività dei commercianti.

Sono stati forniti gli schemi seguenti per le nuove tabelle, ma è stato chiesto di gestire la configurazione della distribuzione delle tabelle e la scrittura delle query necessarie per caricare i dati nelle tabelle.

CREATE TABLE user_events
(
  user_id bigint,
  user_login text,
  event_type text,
  event_count bigint
);

CREATE TABLE payment_merchants
(
  merchant_id bigint PRIMARY KEY,
  name text,
  url text
);

È possibile caricare dati in tabelle distribuite da altre tabelle usando i risultati di una SELECT query usando INSERT … SELECT istruzioni . Come per l'esecuzione di istruzioni standard INSERT , i risultati della SELECT query devono includere valori per la colonna di distribuzione per ogni riga.

Questo metodo consente inoltre di utilizzare la ON CONFLICT DO UPDATE clausola per indirizzare l'istruzione INSERT a eseguire un "UP edizione Standard RT", aggiornando una riga esistente in conflitto con la riga proposta per l'inserimento. UP edizione Standard RT offrono il modo più semplice per calcolare e salvare le aggregazioni in anticipo, note come rollup distribuiti.

Uso di tabelle di origine e di destinazione con percorso condiviso

I dati necessari per popolare la user_events tabella sono contenuti nelle payment_events tabelle e payment_users . La colonna di distribuzione più efficiente basata sullo schema di tabella proposto sarà lo stesso campo usato dalle payment_events tabelle e payment_users , user_idperché tale colonna soddisfa meglio i quattro criteri per la scelta di una colonna di distribuzione ideale.

SELECT create_distributed_table('user_events', 'user_id');

Con la user_events tabella ora distribuita, le due tabelle di origine e la tabella di destinazione vengono raggruppate in modo implicito perché condividono la stessa colonna di distribuzione. È stata creata la query seguente per il caricamento della tabella, ma si vuole verificare che la query instrada l'istruzione INSERT ... SELECT ai nodi di lavoro da eseguire in parallelo.

INSERT INTO user_events
SELECT e.user_id, login, event_type, COUNT(event_id)
FROM payment_events AS e
INNER JOIN payment_users AS u on e.user_id = u.user_id
GROUP BY e.user_id, login, event_type;

Prima di eseguire la query, è possibile usare l'istruzione EXPLAIN per visualizzare il piano di esecuzione.

Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
  Task Count: 32
  Tasks Shown: One of 32
  ->  Task
        Node: host=private-w0.learn-cosmosdb-postgresql.postgres.database.azure.com port=5432 dbname=citus
        ->  Insert on user_events_102394 citus_table_alias  (cost=530.35..619.99 rows=0 width=0)
              ->  HashAggregate  (cost=530.35..575.17 rows=4482 width=37)
                    Group Key: e.user_id, u.login, e.event_type
                    ->  Hash Join  (cost=334.93..485.53 rows=4482 width=37)
                          Hash Cond: (e.user_id = u.user_id)
                          ->  Seq Scan on payment_events_102232 e  (cost=0.00..138.82 rows=4482 width=27)
                                Filter: (user_id IS NOT NULL)
                          ->  Hash  (cost=231.08..231.08 rows=8308 width=18)
                                ->  Seq Scan on payment_users_102264 u  (cost=0.00..231.08 rows=8308 width=18)

Le Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) righe e Task Count: 32 mostrano che la query verrà eseguita in parallelo su ognuna delle 32 partizioni. L'esecuzione della query popola la nuova tabella ed è possibile eseguire le operazioni seguenti per visualizzare i risultati:

SELECT * FROM user_events ORDER BY user_id LIMIT 5;
 user_id | user_login |  event_type   | event_count 
---------+------------+---------------+-------------
      45 | mojodna    | RequestFunds  |           2
      45 | mojodna    | RequestFunds  |           2
      87 | tmornini   | IssuesEvent   |           1
      87 | tmornini   | DisputeCharge |           1
      87 | tmornini   | DisputeCharge |           1

Quando le SELECT istruzioni e INSERT includono tabelle con la stessa colonna di distribuzione e la colonna di distribuzione viene visualizzata in entrambe le istruzioni e SELECT , il coordinatore instrada l'istruzione INSERT ... SELECT ai nodi di lavoro per l'esecuzione INSERT parallela. Questa tecnica è l'approccio più efficiente e consigliato quando possibile.

Pull dei dati al coordinatore

L'estrazione degli ID commercianti distinti nella payment_events tabella nella tabella commerciante proposta non consentirà la condivisione della stessa colonna di distribuzione tra le due tabelle. Lo schema fornito per la payment_merchants tabella non contiene la user_id colonna, quindi è necessario scegliere la colonna migliore disponibile, merchant_id.

Per gestire il popolamento della payment_merchants tabella, scrivere la query seguente:

INSERT INTO payment_merchants SELECT DISTINCT merchant_id, CONCAT('merchant', '_', merchant_id), CONCAT('https://api.woodgrove.com/merchants/', merchant_id) FROM payment_events;

Conoscere le tabelle che non condividono la stessa colonna di distribuzione e non sono raggruppate, si vuole comprendere meglio come verrà eseguita la query prima di eseguirla, quindi si userà di nuovo l'istruzione EXPLAIN per visualizzare il piano di esecuzione delle query.

Custom Scan (Citus INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
  INSERT/SELECT method: pull to coordinator
  ->  HashAggregate  (cost=750.00..752.00 rows=200 width=72)
        Output: remote_scan.merchant_id, remote_scan.name, remote_scan.url
        Group Key: remote_scan.merchant_id, remote_scan.name, remote_scan.url
        ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=72)
              Output: remote_scan.merchant_id, remote_scan.name, remote_scan.url
              Task Count: 32
              Tasks Shown: One of 32
              ->  Task
                    Query: SELECT DISTINCT merchant_id, concat('merchant', '_', merchant_id) AS name, concat('https://api.woodgrove.com/merchants/', merchant_id) AS url FROM public.payment_events_102040 payment_events WHERE true
                    Node: host=private-w0.learn-cosmosdb-postgresql.postgres.database.azure.com port=5432 dbname=citus
                    ->  HashAggregate  (cost=194.84..221.80 rows=1797 width=72)
                          Output: merchant_id, (concat('merchant', '_', merchant_id)), (concat('https://api.woodgrove.com/merchants/', merchant_id))
                          Group Key: payment_events.merchant_id, concat('merchant', '_', payment_events.merchant_id), concat('https://api.woodgrove.com/merchants/', payment_events.merchant_id)
                          ->  Seq Scan on public.payment_events_102040 payment_events  (cost=0.00..161.23 rows=4482 width=72)
                                Output: merchant_id, concat('merchant', '_', merchant_id), concat('https://api.woodgrove.com/merchants/', merchant_id)

La INSERT/SELECT method riga rivela che il pull to coordinator metodo verrà usato per eseguire questa query. Quando le tabelle di origine e di destinazione non sono raggruppate e l'ottimizzazione della ripartizione non è possibile, il coordinatore recupererà i risultati dalle SELECT query eseguite in ogni nodo di lavoro ed eseguirà il pull dei dati per eseguire la query in locale. Il coordinatore usa quindi la colonna di distribuzione per instradare le righe verso il basso ai nodi di lavoro per l'inserimento nella partizione appropriata. Questo metodo è il meno efficiente delle tre tecniche, perché tutti i dati vengono forzati a passare attraverso un singolo nodo e l'elaborazione non può essere parallelizzata tra i ruoli di lavoro.

Dato che il metodo di inserimento dati necessario per caricare questa tabella è la tecnica meno efficiente e sapere che la tabella verrà spesso unita alla payment_events tabella per le query analitiche, è consigliabile rivalutare la modalità di distribuzione della tabella.

Dopo aver considerato le opzioni e i modelli di query comuni di Woodgrove Bank, si decide che la payment_merchants tabella sarà definita meglio come tabella di riferimento. Modificare una tabella distribuita in una tabella di riferimento comporta l'esecuzione della undistribution_table() funzione sulla tabella e quindi la ridefinizione come tabella di riferimento, come indicato di seguito:

SELECT create_reference_table('payment_merchants');

Come tabella di riferimento, l'intero contenuto della payment_merchants tabella viene concentrato in una singola partizione, che viene quindi replicata in ogni ruolo di lavoro.

L'esecuzione EXPLAIN nell'istruzione INSERT ... SELECT dopo la conversione della tabella in una tabella di riferimento indica che il caricamento dei dati verrà comunque eseguito usando il pull to coordinator metodo . Di conseguenza, questa modifica non offrirà vantaggi in termini di prestazioni durante l'inserimento dei dati. Tuttavia, influirà sulle query future, ad esempio i join tra le payment_merchants tabelle e payment_events .

Inserire dati con i servizi di Azure

Oltre a inserire dati usando i comandi SQL e le estensioni PostgreSQL, è anche possibile applicare altri servizi di Azure per caricare i dati nel database.

Azure Data Factory

Azure Data Factory (ADF) è un servizio di integrazione dei dati basato sul cloud che offre un ambiente visivo senza codice per orchestrare e automatizzare lo spostamento dei dati. È possibile usare Azure Data Factory per copiare dati da più di 85 origini possibili in Azure Cosmos DB per PostgreSQL.

I requisiti di Woodgrove Bank hanno chiesto un meccanismo per l'esecuzione di un carico bulk monouso dei dati cronologici. ADF è un'altra soluzione alternativa che può essere considerata, ma questa soluzione è più appropriata se esiste un requisito per gli spostamenti di dati ricorrenti e ripetibili e le pipeline di orchestrazione.

Analisi di flusso di Azure

Azure Cosmos DB per PostgreSQL si distingue per la gestione di carichi di lavoro ad alta velocità effettiva, in tempo reale, ad esempio l'hosting dell'input del dispositivo IoT . Analisi di flusso di Azure può fungere da meccanismo senza codice, efficiente e scalabile per l'inserimento di dati in Azure Cosmos DB per PostgreSQL.

Questo metodo di inserimento non è necessario in base ai requisiti correnti di Woodgrove Bank, ma sarebbe utile se decidessero di iniziare a inserire dati da dispositivi come gli ATM.