Eseguire query sulle tabelle distribuite in Azure Cosmos DB per PostgreSQL

Completato

Woodgrove Bank ha chiesto di ottimizzare le prestazioni dei carichi di lavoro e delle query di analisi utente dell'applicazione di pagamento senza contatto. Applicando le informazioni apprese nell'unità 6, si è pronti per esaminare come eseguire query sui dati distribuiti.

In genere si eseguono query usando query PostgreSQL SELECT standard sul nodo coordinatore in Azure Cosmos DB per PostgreSQL. Gestisce la parallelizzazione SELECT delle query che coinvolgono selezioni complesse, raggruppamenti, ordinamenti e JOIN per ottimizzare e velocizzare le prestazioni delle query.

Eseguire query sulle tabelle distribuite

A livello generale, il coordinatore partiziona ogni SELECT query in frammenti di query più piccoli, assegna frammenti di query ai ruoli di lavoro, supervisiona l'esecuzione, unisce i risultati e restituisce il risultato finale all'utente. Nella maggior parte dei casi, sapere come o dove vengono archiviati i dati in un cluster non è necessario. Il database usa un executor di query distribuito per suddividere automaticamente le normali query SQL ed eseguirle in parallelo sui nodi di lavoro vicini ai dati.

Per le query semplici, ad esempio SELECT COUNT(*) FROM payment_users;, il coordinatore esegue automaticamente il conteggio su tutte le partizioni in parallelo e combina i risultati.

Eseguire funzioni di aggregazione

Per le aggregazioni e le query più complesse associate ai carichi di lavoro analitici di Woodgrove Bank, Azure Cosmos DB per PostgreSQL supporta e parallelizza la maggior parte delle funzioni di aggregazione supportate da PostgreSQL, incluse le aggregazioni personalizzate definite dall'utente.

Animation of the flow of a query arriving from an application is displayed.

Viene visualizzata l'animazione del flusso di una query in arrivo da un'applicazione. Nell'animazione la query elaborata dal nodo coordinatore tramite tabelle di metadati. I frammenti di query vengono quindi inviati ai nodi di lavoro per l'esecuzione. I risultati dell'esecuzione di query nei nodi di lavoro vengono passati di nuovo al coordinatore, aggregato e restituito all'applicazione.

Le aggregazioni vengono eseguite usando uno dei tre metodi, in questo ordine di preferenza:

  1. Quando l'aggregazione viene raggruppata in base alla colonna di distribuzione di una tabella, il coordinatore può eseguire il push dell'esecuzione dell'intera query in ogni ruolo di lavoro. Tutte le aggregazioni sono supportate in questa situazione ed eseguite in parallelo sui nodi di lavoro. Ad esempio, è possibile contare il numero di eventi, per tipo, per utente nell'app contactless-payments usando la query seguente:

    SELECT user_id, event_type, count(*) FROM payment_events GROUP By user_id, event_type;
    

    Questa query viene eseguita rapidamente perché il coordinatore può eseguire il push dell'esecuzione delle query in ogni nodo di lavoro. L'esecuzione push-down è possibile perché la GROUP BY clausola contiene la colonna di distribuzione della tabella. L'esecuzione della stessa query tramite l'istruzione EXPLAIN VERBOSE consente di visualizzare il piano di esecuzione della query e di come viene distribuito tra i nodi di lavoro per parallelizzare l'esecuzione di query.

  2. Quando l'aggregazione non è raggruppata in base alla colonna di distribuzione di una tabella, il coordinatore può comunque ottimizzare caso per caso. Le regole interne per aggregazioni specifiche, ad esempio sum(), avg()e count(distinct) consentono di riscrivere le query per l'aggregazione parziale nei ruoli di lavoro. Ad esempio, per calcolare una media, il coordinatore ottiene una somma e un conteggio da ogni lavoratore e quindi il nodo coordinatore calcola la media finale.

    SELECT merchant_id, event_type, COUNT(*) FROM payment_events GROUP BY merchant_id, event_type;
    

    Questa query genera un'aggregazione parziale sui ruoli di lavoro, che è leggermente meno efficiente del metodo precedente. L'istruzione EXPLAIN VERBOSE fornisce i dettagli dell'oggetto HashAggregate eseguito per recuperare il conteggio di ogni ruolo di lavoro. Il coordinatore calcola quindi il conteggio finale.

  3. Per tutte le altre funzioni di aggregazione, il coordinatore esegue il pull di tutte le righe dai ruoli di lavoro ed esegue l'aggregazione stessa. Se i due metodi precedenti non coprono l'aggregazione, il coordinatore rientra in questo approccio. Tuttavia, è fondamentale notare che questo metodo può causare un sovraccarico di rete ed esaurire le risorse del coordinatore se il set di dati da aggregare è troppo grande.

Join

Sono supportati join tra un numero qualsiasi di tabelle, indipendentemente dalle dimensioni e dal metodo di distribuzione. Query Planner seleziona il metodo di join ottimale e l'ordine in base alla modalità di distribuzione delle tabelle. Valuta diversi ordini di join possibili e crea un piano di join che richiede il trasferimento della quantità minima di dati in rete.

Dato che i carichi di lavoro analitici pesanti che Woodgrove Bank esegue sul database, è necessario supportare ed eseguire join in più tabelle. Quando il database è costruito, Woodgrove avrà più di tutti gli utenti, gli eventi e le tabelle dei commercianti, alcuni dei quali possono essere raggruppati e alcuni dei quali non possono.

Join con colocating

Quando due tabelle vengono raggruppate, possono essere unite in join in modo efficiente sulle colonne di distribuzione comuni. Un join condiviso è il modo più efficiente per unire due tabelle distribuite di grandi dimensioni.

Il coordinatore usa le tabelle di metadati per determinare quali partizioni delle tabelle con percorso condiviso possono corrispondere alle partizioni dell'altra tabella. Questo processo consente al coordinatore di eliminare le coppie di partizioni che non possono produrre chiavi join corrispondenti. I join tra le coppie di partizioni rimanenti vengono eseguiti in parallelo sui nodi di lavoro e i risultati vengono restituiti al coordinatore.

Nota

Assicurarsi che le tabelle siano distribuite nello stesso numero di partizioni e che le colonne di distribuzione di ogni tabella abbiano esattamente tipi corrispondenti. Il tentativo di join su colonne di tipi leggermente diversi, ad int esempio e bigint può causare problemi.

Per l'app di pagamento senza contatto, sono stati raggruppati i payment_events dati e payment_users usando la stessa colonna di distribuzione, user_id. L'esecuzione di un join tra queste due tabelle nella user_id colonna consente al coordinatore di usare le tabelle di metadati per determinare in modo efficiente le righe con chiavi corrispondenti e parallelizzare l'esecuzione di query tra i nodi di lavoro.

SELECT u.user_id, login, event_type, merchant_id, event_details FROM payment_events e INNER JOIN payment_users u ON e.user_id = u.user_id LIMIT 5;

Join di ripartizione

In alcuni casi, potrebbe essere necessario unire due tabelle in colonne diverse dalla colonna di distribuzione. Per questi casi, Azure Cosmos DB per PostgreSQL consente di unire colonne chiave non di distribuzione ripartizionando dinamicamente le tabelle per la query. I join di repartizione richiedono la ripartizione casuale tra partizioni dei dati, quindi sono meno efficienti rispetto ai join con percorso condiviso. Sarebbe preferibile distribuire le tabelle tramite chiavi di join comuni, quando possibile.

In questi casi, Query Optimizer determina le tabelle da partizionare in base alle colonne di distribuzione, alle chiavi di join e alle dimensioni. Con le tabelle ripartizionate, solo le coppie di partizioni rilevanti vengono unite tra loro, riducendo drasticamente la quantità di dati trasferiti in rete.

Nei carichi di lavoro di analisi sono presenti numerose query usate da Woodgrove Bank che richiederanno query di ripartizione, quindi è necessario configurare il database per consentire questi tipi di query. È possibile abilitare l'esecuzione di query di ripartizione eseguendo il comando seguente:

SET citus.enable_repartition_joins TO on;

Si supponga che i join di ripartizione non siano stati abilitati nel database. L'esecuzione di un join che richiede la ripartizione comporta il messaggio di errore: ERROR: the query contains a join that requires repartitioning.

Si supponga di non aver convertito la tabella di payment_merchants Woodgrove Bank in una tabella di riferimento, lasciandola distribuita sulla merchant_id colonna. Per unire le payment_merchants tabelle e payment_events sarebbe necessario un join di ripartizione. Dopo aver abilitato i join di ripartizione, è possibile eseguire il join seguente in una colonna non di distribuzione:

SELECT m.merchant_id, name, event_type, count(*) as event_count
FROM payment_events e
JOIN payment_merchants m ON e.merchant_id = m.merchant_id
WHERE event_type = 'SendFunds'
GROUP BY m.merchant_id, name, event_type
ORDER BY event_count DESC
LIMIT 5;

Join di tabella di riferimento

È possibile usare tabelle di riferimento come tabelle "dimension" per unire in modo efficiente tabelle di tipo "fact". Le tabelle di riferimento vengono replicate in tutti i nodi di lavoro, consentendo la scomposizione di un join di riferimento in join locali in ogni ruolo di lavoro e eseguite in parallelo. Un join di riferimento è simile a una versione più flessibile di un join con colocating perché le tabelle di riferimento non vengono distribuite in una determinata colonna e sono gratuite per il join in una delle relative colonne.

Per popolare un dashboard per l'applicazione di pagamento senza contatto di Woodgrove Bank, è stato chiesto di scrivere una query per contare il numero di transazioni per tipo per ogni commerciante. Per questa query è necessario unire la payment_events tabella distribuita alla tabella di payment_merchants riferimento nella merchant_id colonna .

SELECT m.merchant_id, name, event_type, count(*) as event_count
FROM payment_events e
JOIN payment_merchants m ON e.merchant_id = m.merchant_id
WHERE event_type = 'SendFunds'
GROUP BY m.merchant_id, name, event_type
ORDER BY event_count DESC
LIMIT 5;

L'esecuzione della query con EXPLAIN VERBOSE mostra come il coordinatore può generare un piano che esegue il push dell'esecuzione delle query in ognuna delle partizioni 32, in cui la tabella di riferimento viene unita localmente nei nodi di lavoro. In questo caso, la modifica della payment_merchants tabella in una tabella di riferimento offre miglioramenti significativi delle prestazioni rispetto alla stessa query eseguita su una tabella distribuita non con percorso condiviso.

Le tabelle di riferimento possono anche essere unite a tabelle locali nel nodo coordinatore.

Modificare i dati nelle tabelle distribuite

L'esecuzione di UPDATE comandi e DELETE su tabelle distribuite viene eseguita usando i comandi e DELETE PostgreSQL UPDATE standard. Possono essere completate senza specificare la colonna di distribuzione in una WHERE clausola , ma verranno eseguite in modo più efficiente se è incluso.

Aggiornare le righe in una tabella distribuita

Usare il comando PostgreSQL UPDATE standard per aggiornare i record archiviati nelle tabelle distribuite. Ad esempio, Woodgrove Bank ha chiesto di modificare il event_details campo per ogni record nel database in modo da includere il user_id valore nella stringa JSONB.

UPDATE payment_events
SET event_details = jsonb_set(event_details, '{user_id}', CAST(user_id as text)::jsonb);

Quando gli aggiornamenti influiscono su più partizioni, come nell'esempio precedente, il comportamento predefinito consiste nell'usare un protocollo di commit monofase. Questo comportamento significa che ogni lavoratore invia un messaggio "completato" al coordinatore e quindi attende un messaggio di commit o interruzione dal coordinatore. Dopo che tutti i ruoli di lavoro hanno terminato l'esecuzione della query e inviato un messaggio "completato", il coordinatore decide se eseguire il commit o interrompere la transazione.

Per una maggiore sicurezza, è possibile abilitare il commit in due fasi impostando il citus.multi_shard_commit_protocol valore su 2pc, come indicato di seguito:

SET citus.multi_shard_commit_protocol = '2pc';

UPDATE Se interessa solo una singola partizione, viene eseguita all'interno di un singolo nodo di lavoro e l'abilitazione di 2PC non è necessaria. Questo scenario si verifica spesso quando gli aggiornamenti o eliminano il filtro in base alla colonna di distribuzione di una tabella, come nella query seguente:

UPDATE payment_events
SET event_details = jsonb_set(event_details, '{user_id}', CAST(user_id as text)::jsonb)
WHERE user_id = 796958;

Eliminare record da una tabella distribuita

L'eliminazione di righe da tabelle distribuite usa anche il comando PostgreSQL DELETE standard. Nell'app pagamenti, ad esempio, Woodgrove occasionalmente deve eseguire un'operazione per eliminare record di transazioni duplicati causati da un utente facendo doppio clic sul pulsante di invio della transazione. L'attività potrebbe essere eseguita eliminando il record di transazione più recente dalla payment_events tabella, come indicato di seguito:

DELETE FROM payment_events
WHERE user_id = 796958
AND created_at = (SELECT MAX(created_at) FROM payment_events WHERE user_id = 796958);

Come gli aggiornamenti, le operazioni di eliminazione useranno un protocollo di commit monofase per impostazione predefinita.

Ottimizzazione delle prestazioni di scrittura

Per altre informazioni su come ottimizzare le prestazioni di scrittura, vedere la sezione Aumento del numero di istanze dei dati nella documentazione di Citus.