Ingestão e modificação de dados (DML)

As seções a seguir descrevem como ingerir e modificar dados usando a linguagem de modificação de dados (DML) no Citus.

Inserindo dados

Para inserir dados em tabelas distribuídas, você pode usar os comandos INSERT do PostgreSQL padrão. Por exemplo, selecionamos duas linhas aleatoriamente no conjunto de dados do Arquivo GitHub.

/*
CREATE TABLE github_events
(
  event_id bigint,
  event_type text,
  event_public boolean,
  repo_id bigint,
  payload jsonb,
  repo jsonb,
  actor jsonb,
  org jsonb,
  created_at timestamp
);
*/

INSERT INTO github_events VALUES (2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13');

INSERT INTO github_events VALUES (2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24');

Quando você insere linhas em tabelas distribuídas, a coluna de distribuição da linha que está sendo inserida deve ser especificada. Com base na coluna de distribuição, o Citus determina o fragmento certo para o qual a inserção deve ser roteada. Em seguida, a consulta é encaminhada para o fragmento direito e o comando de inserção remota é executado em todas as réplicas desse fragmento.

Às vezes, é conveniente colocar várias instruções de inserção juntas em uma única inserção de várias linhas. Ele também pode ser mais eficiente do que fazer consultas de banco de dados repetidas. Por exemplo, o exemplo da seção anterior pode ser carregado de uma só vez, assim:

INSERT INTO github_events VALUES
  (
    2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13'
  ), (
    2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24'
  );

Cláusula "From Select" (rollups distribuídos)

O Citus também dá INSERT ... SELECT suporte a instruções que inserem linhas com base nos resultados de uma consulta selecionada. Essas instruções são uma maneira conveniente de preencher tabelas e também permitem upserts com a ON CONFLICT cláusula, a maneira mais fácil de fazer rollups distribuídos.

No Citus, há três maneiras de inserir a partir de uma instrução select. A primeira é se as tabelas de origem e a tabela de destino são colocadas e as instruções select/insert incluem a coluna de distribuição. Nesse caso, o Citus pode efetuar push da INSERT ... SELECT instrução para execução paralela em todos os nós.

A segunda maneira de executar uma instrução INSERT ... SELECT é reparticionando os resultados do conjunto de resultados em partes e enviando essas partes entre os trabalhadores para fragmentos de tabela de destino correspondentes. Cada nó de trabalho pode inserir os valores em fragmentos de destino locais.

A otimização de reparticionamento pode acontecer quando a consulta SELECT não exige uma etapa de mesclagem no coordenador. Ele não funciona com os seguintes recursos do SQL, que exigem uma etapa de mesclagem:

  • ORDENAR POR
  • LIMIT
  • DESLOCAMENTO
  • GROUP BY quando a coluna de distribuição não faz parte da chave de grupo
  • A janela funciona ao particionar por uma coluna de não distribuição nas tabelas de origem.
  • Junções entre tabelas não colocadas (ou seja, junções de repartição)

Quando as tabelas de origem e destino não são colocadas e a otimização de repartição não pode ser aplicada, o Citus usa a terceira maneira de executar INSERT ... SELECT. Ele seleciona os resultados de nós de trabalho e efetua pull dos dados para o nó coordenador. O coordenador redireciona as linhas de volta para o fragmento apropriado. Como todos os dados devem passar por um único nó, esse método não é tão eficiente.

Quando estiver em dúvida sobre qual método o Citus está usando, use o comando EXPLAIN. Quando a tabela de destino tiver uma contagem de fragmentos excessivamente grande, talvez seja sábio desabilitar o reparticionamento, consulte citus.enable_repartitioned_insert_select (boolean).

Comando COPY (carregamento em massa)

Para carregar dados em massa de um arquivo, você pode usar diretamente o comando \COPY do PostgreSQL.

Primeiro, baixe nosso exemplo github_events conjunto de dados executando:

wget http://examples.citusdata.com/github_archive/github_events-2015-01-01-{0..5}.csv.gz
gzip -d github_events-2015-01-01-*.gz

Em seguida, você pode copiar os dados usando psql (observe que esses dados exigem que o banco de dados tenha codificação UTF8):

\COPY github_events FROM 'github_events-2015-01-01-0.csv' WITH (format CSV)

Observação

Não há nenhuma noção de isolamento de instantâneo entre fragmentos, o que significa que um SELECT multishard executado simultaneamente com uma CÓPIA pode vê-lo confirmado em alguns fragmentos, mas não em outros. Se o usuário estiver armazenando dados de eventos, ele poderá ocasionalmente observar pequenas lacunas nos dados recentes. Cabe aos aplicativos lidar com essa situação se for um problema. Por exemplo, você pode excluir os dados mais recentes de consultas ou usar algum bloqueio.

Se o COPY não abrir uma conexão para um posicionamento de fragmento, ele se comportará da mesma maneira que INSERT, ou seja, para marcar os posicionamentos como inativos, a menos que não haja mais posicionamentos ativos. Se qualquer outra falha ocorrer após a conexão, a transação será revertida e, portanto, nenhuma alteração de metadados será feita.

Agregações de cache com rollups

Aplicativos como pipelines de dados de eventos e dashboards em tempo real exigem consultas de subsecond em grandes volumes de dados. Uma maneira de tornar essas consultas rápidas é calcular e salvar agregações antecipadamente. Essa técnica é chamada de rolagem dos dados e evita o custo de processamento de dados brutos em tempo de execução. Como um benefício extra, a reversão de dados de série temporal em estatísticas diárias ou por hora também pode economizar espaço. Dados antigos podem ser excluídos quando seus detalhes completos não são mais necessários e agregações suficientes.

Por exemplo, aqui está uma tabela distribuída para acompanhar exibições de página por URL:

CREATE TABLE page_views (
  site_id int,
  url text,
  host_ip inet,
  view_time timestamp default now(),

  PRIMARY KEY (site_id, url)
);

SELECT create_distributed_table('page_views', 'site_id');

Depois que a tabela é preenchida com dados, podemos executar uma consulta agregada para contar exibições de página por URL por dia, restringindo-se a um determinado site e ano.

-- how many views per url per day on site 5?
SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE site_id = 5 AND
    view_time >= date '2016-01-01' AND view_time < date '2017-01-01'
  GROUP BY view_time::date, site_id, url;

Essa configuração funciona, mas tem duas desvantagens. Primeiro, quando você executa repetidamente a consulta de agregação, ela deve passar por cada linha relacionada e recompor os resultados de todo o conjunto de dados. Se você estiver usando essa consulta para renderizar um painel, será mais rápido salvar os resultados agregados em uma tabela de exibições de página diária e consultar essa tabela. Em segundo lugar, os custos de armazenamento aumentam proporcionalmente com volumes de dados e o comprimento do histórico que pode ser consultado. Na prática, talvez você queira manter eventos brutos por um curto período de tempo e examinar grafos históricos em uma janela de tempo mais longa.

Para receber esses benefícios, podemos criar uma daily_page_views tabela para armazenar as estatísticas diárias.

CREATE TABLE daily_page_views (
  site_id int,
  day date,
  url text,
  view_count bigint,
  PRIMARY KEY (site_id, day, url)
);

SELECT create_distributed_table('daily_page_views', 'site_id');

Neste exemplo, distribuímos ambos page_views e daily_page_views na site_id coluna. Essa distribuição garante que os dados correspondentes a um determinado site sejam colocados no mesmo nó. Manter as linhas das duas tabelas juntas em cada nó minimiza o tráfego de rede entre nós e permite a execução altamente paralela.

Depois de criarmos essa nova tabela distribuída, podemos executar INSERT INTO ... SELECT para acumular exibições de página brutas na tabela agregada. No exemplo a seguir, agregamos exibições de página todos os dias. Os usuários do Citus geralmente esperam por um determinado período de tempo após o fim do dia para executar uma consulta como esta, para acomodar dados de chegada tardia.

-- roll up yesterday's data
INSERT INTO daily_page_views (day, site_id, url, view_count)
  SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE view_time >= date '2017-01-01' AND view_time < date '2017-01-02'
  GROUP BY view_time::date, site_id, url;

-- now the results are available right out of the table
SELECT day, site_id, url, view_count
  FROM daily_page_views
  WHERE site_id = 5 AND
    day >= date '2016-01-01' AND day < date '2017-01-01';

Essa consulta cumulativo agrega dados do dia anterior e os insere em daily_page_views. Executar a consulta uma vez por dia significa que você não precisa atualizar linhas de tabela cumulativo, pois os dados do novo dia não afetam as linhas anteriores.

A situação muda ao lidar com dados de chegada tardia ou executar a consulta cumulativa mais de uma vez por dia. Se novas linhas corresponderem a dias já na tabela cumulativo, as contagens de correspondência deverão aumentar. O PostgreSQL pode lidar com essa situação, ON CONFLICTque é sua técnica para fazer upserts. Veja um exemplo.

-- roll up from a given date onward,
-- updating daily page views when necessary
INSERT INTO daily_page_views (day, site_id, url, view_count)
  SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE view_time >= date '2017-01-01'
  GROUP BY view_time::date, site_id, url
  ON CONFLICT (day, url, site_id) DO UPDATE SET
    view_count = daily_page_views.view_count + EXCLUDED.view_count;

Atualizações e exclusão

Você pode atualizar ou excluir linhas de suas tabelas distribuídas usando os comandos padrão PostgreSQL UPDATE e DELETE .

DELETE FROM github_events
WHERE repo_id IN (24509048, 24509049);

UPDATE github_events
SET event_public = TRUE
WHERE (org->>'id')::int = 5430905;

Quando atualizações/exclusões afetam vários fragmentos como no exemplo anterior, o Citus usa um protocolo de confirmação de uma fase. Para maior segurança, você pode habilitar confirmações em duas fases definindo:

SET citus.multi_shard_commit_protocol = '2pc';

Se uma atualização ou exclusão afetar apenas um único fragmento, ela será executada em um único nó de trabalho. Nesse caso, a habilitação de confirmações em duas fases (2PC) é desnecessária. Essa situação geralmente ocorre quando atualizações ou exclusões são filtradas pela coluna de distribuição de uma tabela:

-- since github_events is distributed by repo_id,
-- this will execute in a single worker node

DELETE FROM github_events
WHERE repo_id = 206084;

Além disso, ao lidar com um único fragmento, o Citus dá SELECT ... FOR UPDATEsuporte a . Às vezes, essa técnica é usada por ORMs (mapeadores relacionais de objeto) para ter segurança:

  1. Carregar linhas.
  2. Faça um cálculo no código do aplicativo.
  3. Atualize as linhas com base no cálculo.

Selecionar as linhas para atualização coloca um bloqueio de gravação neles para impedir que outros processos causem uma anomalia de atualização perdida .

BEGIN;

  -- select events for a repo, but
  -- lock them for writing
  SELECT *
  FROM github_events
  WHERE repo_id = 206084
  FOR UPDATE;

  -- calculate a desired value event_public using
  -- application logic that uses those rows...

  -- now make the update
  UPDATE github_events
  SET event_public = :our_new_value
  WHERE repo_id = 206084;

COMMIT;

Esse recurso tem suporte apenas para tabelas de hash distribuídas e de referência.

Maximizando o desempenho de gravação

As instruções INSERT e UPDATE/DELETE podem ser dimensionadas até cerca de 50.000 consultas por segundo em computadores grandes. No entanto, para atingir essa taxa, você precisa usar muitas conexões paralelas e de longa duração e considerar como lidar com o bloqueio. Para obter mais informações, consulte a seção Ingestão de dados do Scale out de nossa documentação de desempenho.