Carregar dados no Azure Cosmos DB para tabelas distribuídas PostgreSQL

Concluído

O Woodgrove Bank forneceu requisitos de carregamento de dados que ditam o uso de vários métodos de ingestão diferentes. Os eventos de transações individuais chegam rapidamente da aplicação de pagamentos Contactless ao longo do dia e devem ser inseridos o mais rapidamente possível. O banco também forneceu arquivos CSV contendo eventos históricos de transações e dados de usuários que devem ser carregados em massa no banco de dados da forma mais eficiente possível. Além disso, eles pediram que algumas novas tabelas fossem preenchidas a partir de seus dados históricos depois de carregadas.

No Azure Cosmos DB para PostgreSQL, há várias abordagens que você pode usar para ingerir dados de forma eficiente em um banco de dados distribuído. O carregamento de dados em tabelas distribuídas é realizado da mesma maneira que ao carregar dados em tabelas não distribuídas. A principal diferença é que você deve fornecer um valor para a coluna de distribuição atribuída da tabela para cada linha que está sendo inserida.

Cada linha de uma tabela distribuída é gravada em um fragmento com base no valor de sua coluna de distribuição. Para identificar corretamente o fragmento no qual os dados devem ser inseridos, o coordenador faz hash na coluna de distribuição da linha. Em seguida, o coordenador compara o valor de hash com o intervalo de hash atribuído para cada fragmento. Uma vez identificado o fragmento correto, o coordenador encaminha a consulta para ele, onde o comando remote insert é executado em todas as réplicas desse fragmento.

Carregar linhas individuais usando o comando INSERT

O Woodgrove Bank requer a capacidade de inserir registros de transações individuais no banco de dados quando eles são recebidos do aplicativo de pagamentos sem contato. Para inserir linhas individuais em tabelas distribuídas, você pode usar o comando padrão PostgreSQL INSERT . As tabelas distribuídas aparecem como tabelas padrão para SQL, mas a forma como as consultas são executadas difere porque as tabelas subjacentes são particionadas horizontalmente entre nós de trabalho.

Para garantir que o coordenador possa retransmitir consultas com precisão para os fragmentos corretos, você deve especificar um valor para a coluna de distribuição ao carregar dados em tabelas distribuídas. Em outras palavras, cada instrução INSERT precisa incluir um valor não nulo para a coluna de distribuição da linha.

Você pode encontrar a coluna de distribuição atribuída de uma tabela usando a exibição de tabelas distribuídas no nó coordenador. Por exemplo, a execução da consulta SELECT distribution_column FROM citus_tables WHERE table_name = 'payment_events'::regclass; revela que a coluna de distribuição da tabela é o payment_eventsuser_id campo.

No exemplo abaixo, duas transações estão sendo adicionadas payment_events à tabela no banco de dados do Woodgrove Bank usando comandos INSERT individuais.

/*
-- Table schema, for reference
CREATE TABLE payment_events
(
  event_id bigint,
  event_type text,
  user_id bigint,
  merchant_id bigint,
  event_details jsonb,
  created_at timestamp
);
*/

INSERT INTO payment_events VALUES (4951447424,'SendFunds',1159138,4951447330,'{"code": 4951447330, "particulars": "twofactorauth", "reference": "2factorauth"}','1/12/16 5:22');

INSERT INTO payment_events VALUES (4951447488,'RequestFunds',1171503,4951447340,'{"code": 4951447340, "particulars": "vue", "reference": "vuejs"}','1/12/16 5:22');

Como cada linha contém um valor válido para a user_id coluna, as duas linhas são inseridas com êxito na payment_events tabela. Agora, veja o que acontece se você tentar inserir uma linha onde o valor da coluna de distribuição é null:

INSERT INTO payment_events VALUES (4951447499,'GiftFunds',null,4951447350,'{"code": 4951447350, "particulars": "twofactorauth", "reference": "2factorauth"}','1/12/16 5:22');

Esta INSERT instrução resulta em um erro:

ERROR: cannot perform an INSERT with NULL in the partition column

Ao executar o INSERT comando para adicionar dados a uma tabela distribuída, é essencial lembrar que a coluna de distribuição deve ser especificada. Isso permite que o coordenador determine a qual fragmento ele deve adicionar a linha no cluster.

Combine instruções INSERT para melhorar a eficiência

Através da sua experiência com o aplicativo de pagamento sem contato do Woodgrove Bank, você sabe que eles recebem um alto número de transações, muitas das quais chegam simultaneamente ao longo do dia. Uma técnica que você pode usar para melhorar a eficiência da inserção de linhas individuais é processar transações em lotes, combinando várias instruções insert em uma única instrução contendo várias linhas. Essa abordagem elimina a necessidade de fazer consultas repetidas ao banco de dados. Por exemplo, você pode inserir várias linhas de transação de uma só vez, da seguinte forma:

INSERT INTO payment_events VALUES 
  (4951447425,'GiftFunds',1159138,4951447350,'{"code": 4951447350, "particulars": "twofactorauth", "reference": "2factorauth"}','1/12/16 5:22'),
  (4951447489,'SendFunds',1171503,4951447360,'{"code": 4951447360, "particulars": "vue", "reference": "vuejs"}','1/12/16 5:22'),
  (4961447699,'RequestFunds',1171503,4951447370,'{"code": 4951447370, "reference": "Lombiq", "particulars": "Orchard-User-Notifications"}','1/12/16 5:22');

Carregar dados em massa com o comando COPY

Quando você precisa de taxas de ingestão mais altas, o COPY comando permite que você carregue dados em massa. Você pode usar o COPY comando para carregar dados diretamente em tabelas distribuídas de um aplicativo usando a FROM STDIN opção, arquivos e outras fontes. Quando você usa o COPY comando para gravar dados em tabelas distribuídas, ele copia dados de forma assíncrona para nós de trabalho usando uma conexão paralela para cada posicionamento de estilhaço. O coordenador encaminha os dados para os nós de trabalho apropriados, permitindo que os dados sejam ingeridos usando vários trabalhadores e núcleos em paralelo.

O Woodgrove Bank solicitou que você fornecesse um mecanismo para carregar em massa seus dados históricos. Os dados fornecidos são armazenados em arquivos CSV (valores separados por vírgula). Os comandos a seguir mostram como você pode baixar o events.csv arquivo no nó coordenador e, em seguida, carregar em massa os dados do payment_events arquivo na tabela:

-- Download the events.csv file
\! curl -O https://raw.githubusercontent.com/MicrosoftDocs/mslearn-create-connect-postgresHyperscale/main/events.csv

-- Bulk load the data from the file into the payment_events table
\COPY payment_events FROM 'events.csv' WITH (format CSV)

Você também pode combinar as instruções acima em um único comando usando a FROM PROGRAM cláusula para informar o coordenador para recuperar os arquivos de dados de um aplicativo em execução no coordenador. Nesse caso, você instrui o coordenador a usar o curl aplicativo para baixar um arquivo da URL especificada. A WITH CSV opção fornece informações sobre o formato do arquivo que está sendo ingerido.

\COPY payment_events FROM PROGRAM 'curl https://raw.githubusercontent.com/MicrosoftDocs/mslearn-create-connect-postgresHyperscale/main/events.csv' WITH CSV

O COPY comando fornece uma maneira prática e rápida de carregar dados diretamente de arquivos. No entanto, observe que, ao direcionar uma tabela distribuída, cada linha no arquivo de origem deve conter um valor para a coluna de distribuição.

Carregue dados de arquivos no armazenamento de blob usando a extensão pg_azure_storage

Um método alternativo para carregar dados de arquivo em massa é usar a pg_azure_storage extensão. pg_azure_storage é uma nova extensão PostgreSQL desenvolvida pela Microsoft que permite trabalhar com e carregar em massa dados armazenados em arquivos hospedados no Armazenamento de Blob do Azure.

Os arquivos precisam ser adicionados ou migrados para uma conta de Armazenamento do Azure antes que você possa aproveitar os recursos dessa extensão. Mover arquivos para o Armazenamento de Blobs do Azure permite que você use um serviço de armazenamento seguro nativo da nuvem.

Para começar, você precisa carregar a extensão:

-- Install the extension in the database
SELECT create_extension('azure_storage');

A pg_azure_storage extensão inclui o account_add() método. Esse método conecta uma conta de armazenamento ao seu banco de dados e permite o acesso aos arquivos dentro da conta. Para se conectar, você deve fornecer o nome da conta de armazenamento e a chave de acesso usando a seguinte sintaxe:

-- Provide the storage account credentials
SELECT azure_storage.account_add('{STORAGE_ACCOUNT_NAME}', '{STORAGE_ACCOUNT_ACCESS_KEY}');

O fornecimento de uma chave de acesso só é necessário ao trabalhar com dados em contêineres com um nível de acesso "Privado (sem acesso anônimo)". Suponha que você deseje extrair dados de um contêiner cujo nível de acesso esteja definido como "Contêiner (acesso de leitura anônimo para contêineres e blobs)". Nesse caso, você pode ingerir arquivos do Armazenamento de Blobs do Azure usando suas URLs públicas e enumerar o conteúdo do contêiner sem configurar a pg_azure_storage chave da conta na extensão.

Por exemplo, você criou uma conta de armazenamento chamada stlearnpostgresql Woodgrove Bank e adicionou alguns arquivos em um contêiner privado chamado historical-data. Para fornecer acesso a essa conta a partir do seu banco de dados, você precisa executar a seguinte consulta, fornecendo a chave de acesso da conta de armazenamento recuperada da página Chaves de acesso da Conta de Armazenamento no portal do Azure:

SELECT azure_storage.account_add('stlearnpostgresql', '4zzZGx4sUk8TBVnHnoPYt0G7A3w9/DJJBCfdxeeb+VDrR8P3bSwWA3lYsTvO1HwYzrt4XQ37iIEL+AStWuQ/uA==');

Agora que você está conectado à conta de armazenamento, pode listar os blobs dentro de um contêiner nomeado. Para exibir os arquivos no historical-data contêiner, execute a seguinte consulta:

SELECT path, content_type, pg_size_pretty(bytes)
FROM azure_storage.blob_list('stlearnpostgresql', 'historical-data');

A blob_list() função produz todos os blobs dentro do contêiner especificado:

    path    | content_type | pg_size_pretty 
------------+--------------+----------------
 events.csv | text/csv     | 17 MB
 users.csv  | text/csv     | 29 MB

O Woodgrove Bank forneceu a lista de colunas para os dados no arquivo, com base no users.csv seguinte esquema de tabela:

/*
-- Table structure and distribution details provided for reference
CREATE TABLE payment_users
(
    user_id bigint PRIMARY KEY,
    url text,
    login text,
    avatar_url text
);

SELECT created_distributed_table('payment_users', 'user_id');
*/

Eles também informaram que o arquivo CSV não contém uma linha de cabeçalho.

Suponha que você não tenha recebido detalhes sobre os dados dentro do arquivo. Nesse caso, você pode usar o portal do Azure para visualizar arquivos menores que 2,1 MB ou pode baixar uma cópia do arquivo e abri-lo rapidamente para explorar a estrutura do arquivo.

Agora que você entende os dados contidos no arquivo, pode atender à solicitação do Woodgrove Bank para carregar em massa dados históricos de arquivos. Para carregar dados em massa de arquivos no armazenamento de blob, pg_azure_storage estende o comando nativo do PostgreSQL COPY para torná-lo capaz de lidar com URLs de recursos do Armazenamento de Blob do Azure. Esse recurso é habilitado por padrão e você pode gerenciá-lo usando a azure_storage.enable_copy_command configuração. Usando o comando extend COPY , execute o seguinte comando para ingerir dados do usuário na payment_users tabela:

-- Bulk load data from the user.csv file in Blob Storage into the payment_users table
COPY payment_users FROM 'https://stlearnpostgresql.blob.core.windows.net/historical-data/users.csv';

A saída do COPY comando especificará o número de linhas copiadas na tabela, como COPY 264197.

Para saber mais sobre a extensão, leia a documentação e conclua o módulo Estender a pg_azure_storage funcionalidade do Azure Cosmos DB para PostgreSQL usando extensões.

Preencher tabelas usando a cláusula FROM SELECT

Como parte do carregamento em massa dos dados históricos do Woodgrove Bank, eles solicitaram que você criasse algumas novas tabelas a partir de seus dados existentes. Primeiro, eles querem uma tabela agregada contendo a contagem de eventos por tipo por usuário, que eles usarão para preencher um painel. Em segundo lugar, eles gostariam que você extraísse registros exclusivos de comerciantes da payment_events tabela em uma nova tabela de comerciantes para análise futura da atividade do comerciante.

Eles forneceram os seguintes esquemas para as novas tabelas, mas pediram que você cuidasse da configuração da distribuição de tabelas e da gravação das consultas necessárias para carregar dados nas tabelas.

CREATE TABLE user_events
(
  user_id bigint,
  user_login text,
  event_type text,
  event_count bigint
);

CREATE TABLE payment_merchants
(
  merchant_id bigint PRIMARY KEY,
  name text,
  url text
);

O carregamento de dados em tabelas distribuídas de outras tabelas pode ser realizado usando os resultados de uma SELECT consulta usando INSERT … SELECT instruções. Assim como na execução de instruções padrão INSERT , os SELECT resultados da consulta devem incluir valores para a coluna de distribuição para cada linha.

Esse método também permite que você use a cláusula para direcionar a instrução para executar um "UPSERT", atualizando uma linha existente que entra em conflito com a INSERTON CONFLICT DO UPDATE linha proposta para inserção. Os UPSERTs fornecem a maneira mais fácil de calcular e salvar agregados com antecedência, conhecidos como pacotes cumulativos distribuídos.

Usando tabelas de origem e destino colocalizadas

Os dados necessários para preencher a user_events tabela estão contidos nas payment_events tabelas e payment_users . A coluna de distribuição mais eficiente com base no esquema de tabela proposto será o mesmo campo usado pelas payment_events tabelas user_ide payment_users , porque essa coluna atende melhor aos quatro critérios para escolher uma coluna de distribuição ideal.

SELECT create_distributed_table('user_events', 'user_id');

Com a tabela agora distribuída, as duas tabelas de origem e a tabela de destino são implicitamente colocalizadas porque compartilham a user_events mesma coluna de distribuição. Você criou a seguinte consulta para carregar a tabela, mas deseja verificar se a consulta encaminhará a instrução para nós de trabalho a INSERT ... SELECT serem executados em paralelo.

INSERT INTO user_events
SELECT e.user_id, login, event_type, COUNT(event_id)
FROM payment_events AS e
INNER JOIN payment_users AS u on e.user_id = u.user_id
GROUP BY e.user_id, login, event_type;

Antes de executar a consulta, você pode usar a EXPLAIN instrução para exibir o plano de execução.

Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
  Task Count: 32
  Tasks Shown: One of 32
  ->  Task
        Node: host=private-w0.learn-cosmosdb-postgresql.postgres.database.azure.com port=5432 dbname=citus
        ->  Insert on user_events_102394 citus_table_alias  (cost=530.35..619.99 rows=0 width=0)
              ->  HashAggregate  (cost=530.35..575.17 rows=4482 width=37)
                    Group Key: e.user_id, u.login, e.event_type
                    ->  Hash Join  (cost=334.93..485.53 rows=4482 width=37)
                          Hash Cond: (e.user_id = u.user_id)
                          ->  Seq Scan on payment_events_102232 e  (cost=0.00..138.82 rows=4482 width=27)
                                Filter: (user_id IS NOT NULL)
                          ->  Hash  (cost=231.08..231.08 rows=8308 width=18)
                                ->  Seq Scan on payment_users_102264 u  (cost=0.00..231.08 rows=8308 width=18)

As Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) linhas e Task Count: 32 mostram que a consulta será executada em paralelo em cada um dos 32 fragmentos. A execução da consulta preenche a nova tabela e você pode executar o seguinte para exibir os resultados:

SELECT * FROM user_events ORDER BY user_id LIMIT 5;
 user_id | user_login |  event_type   | event_count 
---------+------------+---------------+-------------
      45 | mojodna    | RequestFunds  |           2
      45 | mojodna    | RequestFunds  |           2
      87 | tmornini   | IssuesEvent   |           1
      87 | tmornini   | DisputeCharge |           1
      87 | tmornini   | DisputeCharge |           1

Quando as SELECT instruções and incluem tabelas com a mesma coluna de distribuição e a coluna de distribuição aparece nas INSERT instruções and SELECTINSERT, o coordenador encaminhará a INSERT ... SELECT instrução para os nós de trabalho para execução paralela. Esta técnica é a abordagem mais eficiente e recomendada quando possível.

Extraindo dados para o coordenador

Extrair os IDs de comerciante distintos na payment_events tabela para a tabela de comerciante proposta não permitirá que a mesma coluna de distribuição seja compartilhada entre as duas tabelas. O esquema fornecido para a tabela não contém a coluna, portanto, você deve escolher a payment_merchantsuser_id melhor coluna disponível, merchant_id.

Você escreve a seguinte consulta para manipular a payment_merchants população da tabela:

INSERT INTO payment_merchants SELECT DISTINCT merchant_id, CONCAT('merchant', '_', merchant_id), CONCAT('https://api.woodgrove.com/merchants/', merchant_id) FROM payment_events;

Sabendo que as tabelas não compartilham a mesma coluna de distribuição e não estão colocalizadas, você deseja entender melhor como a consulta será executada antes de executá-la, para que você use novamente a EXPLAIN instrução para exibir o plano de execução de consulta.

Custom Scan (Citus INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
  INSERT/SELECT method: pull to coordinator
  ->  HashAggregate  (cost=750.00..752.00 rows=200 width=72)
        Output: remote_scan.merchant_id, remote_scan.name, remote_scan.url
        Group Key: remote_scan.merchant_id, remote_scan.name, remote_scan.url
        ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=72)
              Output: remote_scan.merchant_id, remote_scan.name, remote_scan.url
              Task Count: 32
              Tasks Shown: One of 32
              ->  Task
                    Query: SELECT DISTINCT merchant_id, concat('merchant', '_', merchant_id) AS name, concat('https://api.woodgrove.com/merchants/', merchant_id) AS url FROM public.payment_events_102040 payment_events WHERE true
                    Node: host=private-w0.learn-cosmosdb-postgresql.postgres.database.azure.com port=5432 dbname=citus
                    ->  HashAggregate  (cost=194.84..221.80 rows=1797 width=72)
                          Output: merchant_id, (concat('merchant', '_', merchant_id)), (concat('https://api.woodgrove.com/merchants/', merchant_id))
                          Group Key: payment_events.merchant_id, concat('merchant', '_', payment_events.merchant_id), concat('https://api.woodgrove.com/merchants/', payment_events.merchant_id)
                          ->  Seq Scan on public.payment_events_102040 payment_events  (cost=0.00..161.23 rows=4482 width=72)
                                Output: merchant_id, concat('merchant', '_', merchant_id), concat('https://api.woodgrove.com/merchants/', merchant_id)

A INSERT/SELECT method linha revela que o pull to coordinator método será usado para executar essa consulta. Quando as tabelas de origem e destino não estiverem colocalizadas e a otimização de reparticionamento não for possível, o coordenador recuperará os resultados das SELECT consultas executadas em cada nó de trabalho e puxará os dados para cima para executar a consulta localmente. Em seguida, o coordenador usa a coluna de distribuição para rotear linhas de volta para os nós de trabalho para inserção no fragmento apropriado. Esse método é o menos eficiente das três técnicas, porque todos os dados são forçados a passar por um único nó e o processamento não pode ser paralelo entre trabalhadores.

Dado que o método de ingestão de dados necessário para carregar esta tabela é a técnica menos eficiente, e sabendo que a tabela será frequentemente unida à tabela para consultas analíticas, é uma boa ideia reavaliar como a tabela é distribuída payment_events .

Depois de considerar as opções e os padrões de consulta comuns do Woodgrove Bank, você decide que a payment_merchants tabela seria melhor definida como uma tabela de referência. Alterar uma tabela distribuída para uma tabela de referência envolve executar a undistribution_table() função na tabela e, em seguida, redefini-la como uma tabela de referência, da seguinte maneira:

SELECT create_reference_table('payment_merchants');

Como tabela de referência, todo o conteúdo da tabela é concentrado em um único fragmento payment_merchants , que é replicado em cada trabalhador.

A execução EXPLAIN na INSERT ... SELECT instrução depois de converter a tabela em uma tabela de referência mostra que o carregamento dos dados ainda será realizado usando o pull to coordinator método. Como tal, esta alteração não proporcionará benefícios de desempenho durante a ingestão de dados. No entanto, isso afetará consultas futuras, como junções entre as payment_merchants tabelas e payment_events .

Ingerir dados usando os serviços do Azure

Além de ingerir dados usando comandos SQL e extensões PostgreSQL, você também pode aplicar outros serviços do Azure para carregar dados em seu banco de dados.

Azure Data Factory

O Azure Data Factory (ADF) é um serviço de integração de dados baseado na nuvem que oferece um ambiente visual livre de código para orquestrar e automatizar a movimentação de dados. Você pode usar o ADF para copiar dados de mais de 85 fontes possíveis para o Azure Cosmos DB para PostgreSQL.

Os requisitos do Woodgrove Bank solicitavam um mecanismo para realizar uma carga única em massa de seus dados históricos. O ADF é outra solução alternativa que poderia ser considerada, mas essa solução é mais apropriada se houver um requisito para movimentação de dados recorrente e repetível e pipelines de orquestração.

Azure Stream Analytics

O Azure Cosmos DB para PostgreSQL brilha ao lidar com cargas de trabalho de alta taxa de transferência e em tempo real, como hospedagem de entrada de dispositivo IoT . O Azure Stream Analytics (ASA) pode atuar como um mecanismo sem código, eficiente e escalável para inserir dados no Azure Cosmos DB para PostgreSQL.

Este método de ingestão não é necessário, dados os requisitos atuais do Woodgrove Bank, mas seria útil se eles decidissem começar a ingerir dados de dispositivos como caixas eletrônicos.