Eseguire query sulle tabelle distribuite in Azure Cosmos DB per PostgreSQL
Woodgrove Bank ha chiesto di ottimizzare le prestazioni dei carichi di lavoro e delle query di analisi utente dell'applicazione di pagamento senza contatto. Applicando le informazioni apprese nell'unità 6, si è pronti per esaminare come eseguire query sui dati distribuiti.
In genere si eseguono query usando query PostgreSQL SELECT
standard sul nodo coordinatore in Azure Cosmos DB per PostgreSQL. Gestisce la parallelizzazione SELECT
delle query che coinvolgono selezioni complesse, raggruppamenti, ordinamenti e JOIN per ottimizzare e velocizzare le prestazioni delle query.
Eseguire query sulle tabelle distribuite
A livello generale, il coordinatore partiziona ogni SELECT
query in frammenti di query più piccoli, assegna frammenti di query ai ruoli di lavoro, supervisiona l'esecuzione, unisce i risultati e restituisce il risultato finale all'utente. Nella maggior parte dei casi, sapere come o dove vengono archiviati i dati in un cluster non è necessario. Il database usa un executor di query distribuito per suddividere automaticamente le normali query SQL ed eseguirle in parallelo sui nodi di lavoro vicini ai dati.
Per le query semplici, ad esempio SELECT COUNT(*) FROM payment_users;
, il coordinatore esegue automaticamente il conteggio su tutte le partizioni in parallelo e combina i risultati.
Eseguire funzioni di aggregazione
Per le aggregazioni e le query più complesse associate ai carichi di lavoro analitici di Woodgrove Bank, Azure Cosmos DB per PostgreSQL supporta e parallelizza la maggior parte delle funzioni di aggregazione supportate da PostgreSQL, incluse le aggregazioni personalizzate definite dall'utente.
Viene visualizzata l'animazione del flusso di una query in arrivo da un'applicazione. Nell'animazione la query elaborata dal nodo coordinatore tramite tabelle di metadati. I frammenti di query vengono quindi inviati ai nodi di lavoro per l'esecuzione. I risultati dell'esecuzione di query nei nodi di lavoro vengono passati di nuovo al coordinatore, aggregato e restituito all'applicazione.
Le aggregazioni vengono eseguite usando uno dei tre metodi, in questo ordine di preferenza:
Quando l'aggregazione viene raggruppata in base alla colonna di distribuzione di una tabella, il coordinatore può eseguire il push dell'esecuzione dell'intera query in ogni ruolo di lavoro. Tutte le aggregazioni sono supportate in questa situazione ed eseguite in parallelo sui nodi di lavoro. Ad esempio, è possibile contare il numero di eventi, per tipo, per utente nell'app contactless-payments usando la query seguente:
SELECT user_id, event_type, count(*) FROM payment_events GROUP By user_id, event_type;
Questa query viene eseguita rapidamente perché il coordinatore può eseguire il push dell'esecuzione delle query in ogni nodo di lavoro. L'esecuzione push-down è possibile perché la
GROUP BY
clausola contiene la colonna di distribuzione della tabella. L'esecuzione della stessa query tramite l'istruzioneEXPLAIN VERBOSE
consente di visualizzare il piano di esecuzione della query e di come viene distribuito tra i nodi di lavoro per parallelizzare l'esecuzione di query.Quando l'aggregazione non è raggruppata in base alla colonna di distribuzione di una tabella, il coordinatore può comunque ottimizzare caso per caso. Le regole interne per aggregazioni specifiche, ad esempio
sum()
,avg()
ecount(distinct)
consentono di riscrivere le query per l'aggregazione parziale nei ruoli di lavoro. Ad esempio, per calcolare una media, il coordinatore ottiene una somma e un conteggio da ogni lavoratore e quindi il nodo coordinatore calcola la media finale.SELECT merchant_id, event_type, COUNT(*) FROM payment_events GROUP BY merchant_id, event_type;
Questa query genera un'aggregazione parziale sui ruoli di lavoro, che è leggermente meno efficiente del metodo precedente. L'istruzione
EXPLAIN VERBOSE
fornisce i dettagli dell'oggettoHashAggregate
eseguito per recuperare il conteggio di ogni ruolo di lavoro. Il coordinatore calcola quindi il conteggio finale.Per tutte le altre funzioni di aggregazione, il coordinatore esegue il pull di tutte le righe dai ruoli di lavoro ed esegue l'aggregazione stessa. Se i due metodi precedenti non coprono l'aggregazione, il coordinatore rientra in questo approccio. Tuttavia, è fondamentale notare che questo metodo può causare un sovraccarico di rete ed esaurire le risorse del coordinatore se il set di dati da aggregare è troppo grande.
Join
Sono supportati join tra un numero qualsiasi di tabelle, indipendentemente dalle dimensioni e dal metodo di distribuzione. Query Planner seleziona il metodo di join ottimale e l'ordine in base alla modalità di distribuzione delle tabelle. Valuta diversi ordini di join possibili e crea un piano di join che richiede il trasferimento della quantità minima di dati in rete.
Dato che i carichi di lavoro analitici pesanti che Woodgrove Bank esegue sul database, è necessario supportare ed eseguire join in più tabelle. Quando il database è costruito, Woodgrove avrà più di tutti gli utenti, gli eventi e le tabelle dei commercianti, alcuni dei quali possono essere raggruppati e alcuni dei quali non possono.
Join con colocating
Quando due tabelle vengono raggruppate, possono essere unite in join in modo efficiente sulle colonne di distribuzione comuni. Un join condiviso è il modo più efficiente per unire due tabelle distribuite di grandi dimensioni.
Il coordinatore usa le tabelle di metadati per determinare quali partizioni delle tabelle con percorso condiviso possono corrispondere alle partizioni dell'altra tabella. Questo processo consente al coordinatore di eliminare le coppie di partizioni che non possono produrre chiavi join corrispondenti. I join tra le coppie di partizioni rimanenti vengono eseguiti in parallelo sui nodi di lavoro e i risultati vengono restituiti al coordinatore.
Nota
Assicurarsi che le tabelle siano distribuite nello stesso numero di partizioni e che le colonne di distribuzione di ogni tabella abbiano esattamente tipi corrispondenti. Il tentativo di join su colonne di tipi leggermente diversi, ad int
esempio e bigint
può causare problemi.
Per l'app di pagamento senza contatto, sono stati raggruppati i payment_events
dati e payment_users
usando la stessa colonna di distribuzione, user_id
. L'esecuzione di un join tra queste due tabelle nella user_id
colonna consente al coordinatore di usare le tabelle di metadati per determinare in modo efficiente le righe con chiavi corrispondenti e parallelizzare l'esecuzione di query tra i nodi di lavoro.
SELECT u.user_id, login, event_type, merchant_id, event_details FROM payment_events e INNER JOIN payment_users u ON e.user_id = u.user_id LIMIT 5;
Join di ripartizione
In alcuni casi, potrebbe essere necessario unire due tabelle in colonne diverse dalla colonna di distribuzione. Per questi casi, Azure Cosmos DB per PostgreSQL consente di unire colonne chiave non di distribuzione ripartizionando dinamicamente le tabelle per la query. I join di repartizione richiedono la ripartizione casuale tra partizioni dei dati, quindi sono meno efficienti rispetto ai join con percorso condiviso. Sarebbe preferibile distribuire le tabelle tramite chiavi di join comuni, quando possibile.
In questi casi, Query Optimizer determina le tabelle da partizionare in base alle colonne di distribuzione, alle chiavi di join e alle dimensioni. Con le tabelle ripartizionate, solo le coppie di partizioni rilevanti vengono unite tra loro, riducendo drasticamente la quantità di dati trasferiti in rete.
Nei carichi di lavoro di analisi sono presenti numerose query usate da Woodgrove Bank che richiederanno query di ripartizione, quindi è necessario configurare il database per consentire questi tipi di query. È possibile abilitare l'esecuzione di query di ripartizione eseguendo il comando seguente:
SET citus.enable_repartition_joins TO on;
Si supponga che i join di ripartizione non siano stati abilitati nel database. L'esecuzione di un join che richiede la ripartizione comporta il messaggio di errore: ERROR: the query contains a join that requires repartitioning
.
Si supponga di non aver convertito la tabella di payment_merchants
Woodgrove Bank in una tabella di riferimento, lasciandola distribuita sulla merchant_id
colonna. Per unire le payment_merchants
tabelle e payment_events
sarebbe necessario un join di ripartizione. Dopo aver abilitato i join di ripartizione, è possibile eseguire il join seguente in una colonna non di distribuzione:
SELECT m.merchant_id, name, event_type, count(*) as event_count
FROM payment_events e
JOIN payment_merchants m ON e.merchant_id = m.merchant_id
WHERE event_type = 'SendFunds'
GROUP BY m.merchant_id, name, event_type
ORDER BY event_count DESC
LIMIT 5;
Join di tabella di riferimento
È possibile usare tabelle di riferimento come tabelle "dimension" per unire in modo efficiente tabelle di tipo "fact". Le tabelle di riferimento vengono replicate in tutti i nodi di lavoro, consentendo la scomposizione di un join di riferimento in join locali in ogni ruolo di lavoro e eseguite in parallelo. Un join di riferimento è simile a una versione più flessibile di un join con colocating perché le tabelle di riferimento non vengono distribuite in una determinata colonna e sono gratuite per il join in una delle relative colonne.
Per popolare un dashboard per l'applicazione di pagamento senza contatto di Woodgrove Bank, è stato chiesto di scrivere una query per contare il numero di transazioni per tipo per ogni commerciante. Per questa query è necessario unire la payment_events
tabella distribuita alla tabella di payment_merchants
riferimento nella merchant_id
colonna .
SELECT m.merchant_id, name, event_type, count(*) as event_count
FROM payment_events e
JOIN payment_merchants m ON e.merchant_id = m.merchant_id
WHERE event_type = 'SendFunds'
GROUP BY m.merchant_id, name, event_type
ORDER BY event_count DESC
LIMIT 5;
L'esecuzione della query con EXPLAIN VERBOSE
mostra come il coordinatore può generare un piano che esegue il push dell'esecuzione delle query in ognuna delle partizioni 32, in cui la tabella di riferimento viene unita localmente nei nodi di lavoro. In questo caso, la modifica della payment_merchants
tabella in una tabella di riferimento offre miglioramenti significativi delle prestazioni rispetto alla stessa query eseguita su una tabella distribuita non con percorso condiviso.
Le tabelle di riferimento possono anche essere unite a tabelle locali nel nodo coordinatore.
Modificare i dati nelle tabelle distribuite
L'esecuzione di UPDATE
comandi e DELETE
su tabelle distribuite viene eseguita usando i comandi e DELETE
PostgreSQL UPDATE
standard. Possono essere completate senza specificare la colonna di distribuzione in una WHERE
clausola , ma verranno eseguite in modo più efficiente se è incluso.
Aggiornare le righe in una tabella distribuita
Usare il comando PostgreSQL UPDATE
standard per aggiornare i record archiviati nelle tabelle distribuite. Ad esempio, Woodgrove Bank ha chiesto di modificare il event_details
campo per ogni record nel database in modo da includere il user_id
valore nella stringa JSONB.
UPDATE payment_events
SET event_details = jsonb_set(event_details, '{user_id}', CAST(user_id as text)::jsonb);
Quando gli aggiornamenti influiscono su più partizioni, come nell'esempio precedente, il comportamento predefinito consiste nell'usare un protocollo di commit monofase. Questo comportamento significa che ogni lavoratore invia un messaggio "completato" al coordinatore e quindi attende un messaggio di commit o interruzione dal coordinatore. Dopo che tutti i ruoli di lavoro hanno terminato l'esecuzione della query e inviato un messaggio "completato", il coordinatore decide se eseguire il commit o interrompere la transazione.
Per una maggiore sicurezza, è possibile abilitare il commit in due fasi impostando il citus.multi_shard_commit_protocol
valore su 2pc
, come indicato di seguito:
SET citus.multi_shard_commit_protocol = '2pc';
UPDATE
Se interessa solo una singola partizione, viene eseguita all'interno di un singolo nodo di lavoro e l'abilitazione di 2PC non è necessaria. Questo scenario si verifica spesso quando gli aggiornamenti o eliminano il filtro in base alla colonna di distribuzione di una tabella, come nella query seguente:
UPDATE payment_events
SET event_details = jsonb_set(event_details, '{user_id}', CAST(user_id as text)::jsonb)
WHERE user_id = 796958;
Eliminare record da una tabella distribuita
L'eliminazione di righe da tabelle distribuite usa anche il comando PostgreSQL DELETE
standard. Nell'app pagamenti, ad esempio, Woodgrove occasionalmente deve eseguire un'operazione per eliminare record di transazioni duplicati causati da un utente facendo doppio clic sul pulsante di invio della transazione. L'attività potrebbe essere eseguita eliminando il record di transazione più recente dalla payment_events
tabella, come indicato di seguito:
DELETE FROM payment_events
WHERE user_id = 796958
AND created_at = (SELECT MAX(created_at) FROM payment_events WHERE user_id = 796958);
Come gli aggiornamenti, le operazioni di eliminazione useranno un protocollo di commit monofase per impostazione predefinita.
Ottimizzazione delle prestazioni di scrittura
Per altre informazioni su come ottimizzare le prestazioni di scrittura, vedere la sezione Aumento del numero di istanze dei dati nella documentazione di Citus.