Ingesta y modificación de datos (DML)

En las secciones siguientes se describe cómo ingerir y modificar datos mediante el lenguaje de modificación de datos (DML) en Citus.

Inserción de datos

Para insertar datos en tablas distribuidas, puede usar los comandos INSERT de PostgreSQL estándar. Por ejemplo, se eligen dos filas aleatoriamente del conjunto de datos de archivo de GitHub.

/*
CREATE TABLE github_events
(
  event_id bigint,
  event_type text,
  event_public boolean,
  repo_id bigint,
  payload jsonb,
  repo jsonb,
  actor jsonb,
  org jsonb,
  created_at timestamp
);
*/

INSERT INTO github_events VALUES (2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13');

INSERT INTO github_events VALUES (2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24');

Al insertar filas en tablas distribuidas, se debe especificar la columna de distribución de la fila que se va a insertar. En función de la columna de distribución, Citus determina la partición derecha a la que se debe enrutar la inserción. A continuación, la consulta se reenvía a la partición derecha y el comando de inserción remota se ejecuta en todas las réplicas de esa partición.

A veces es conveniente colocar varias instrucciones insert juntas en una sola inserción de varias filas. También puede ser más eficaz que realizar consultas repetidas de base de datos. Por ejemplo, el ejemplo de la sección anterior se puede cargar todo a la vez de la siguiente manera:

INSERT INTO github_events VALUES
  (
    2489373118,'PublicEvent','t',24509048,'{}','{"id": 24509048, "url": "https://api.github.com/repos/SabinaS/csee6868", "name": "SabinaS/csee6868"}','{"id": 2955009, "url": "https://api.github.com/users/SabinaS", "login": "SabinaS", "avatar_url": "https://avatars.githubusercontent.com/u/2955009?", "gravatar_id": ""}',NULL,'2015-01-01 00:09:13'
  ), (
    2489368389,'WatchEvent','t',28229924,'{"action": "started"}','{"id": 28229924, "url": "https://api.github.com/repos/inf0rmer/blanket", "name": "inf0rmer/blanket"}','{"id": 1405427, "url": "https://api.github.com/users/tategakibunko", "login": "tategakibunko", "avatar_url": "https://avatars.githubusercontent.com/u/1405427?", "gravatar_id": ""}',NULL,'2015-01-01 00:00:24'
  );

Cláusula "From Select" (paquetes acumulativos distribuidos)

Citus también admite INSERT ... SELECT instrucciones que insertan filas en función de los resultados de una consulta de selección. Estas instrucciones son una manera cómoda de rellenar tablas y también permiten upserts con la ON CONFLICT cláusula , la manera más fácil de realizar paquetes acumulativos distribuidos.

En Citus, hay tres maneras de insertar desde una instrucción select. La primera es si las tablas de origen y la tabla de destino se colocan y las instrucciones select/insert incluyen la columna de distribución. En este caso, Citus puede insertar la INSERT ... SELECT instrucción para la ejecución en paralelo en todos los nodos.

La segunda forma de ejecutar una INSERT ... SELECT instrucción es volver a particionar los resultados del conjunto de resultados en fragmentos y enviar esos fragmentos entre trabajos a particiones de tabla de destino coincidentes. Cada nodo de trabajo puede insertar los valores en particiones de destino locales.

La optimización de la repartición puede producirse cuando la consulta SELECT no requiere un paso de combinación en el coordinador. No funciona con las siguientes características de SQL, que requieren un paso de combinación:

  • ORDENAR POR
  • LÍMITE
  • OFFSET
  • GROUP BY cuando la columna de distribución no forma parte de la clave de grupo
  • Las funciones de ventana al crear particiones mediante una columna sin distribución en las tablas de origen.
  • Combinaciones entre tablas no colocadas (es decir, combinaciones de repartición)

Cuando las tablas de origen y destino no están colocadas y no se puede aplicar la optimización de repartición, Citus usa la tercera forma de ejecutar INSERT ... SELECT. Selecciona los resultados de los nodos de trabajo y extrae los datos hasta el nodo de coordinación. El coordinador redirige las filas hacia abajo a la partición adecuada. Dado que todos los datos deben pasar por un solo nodo, este método no es tan eficaz.

Cuando tenga dudas sobre el método que está usando Citus, use el comando EXPLAIN. Cuando la tabla de destino tiene un recuento de particiones demasiado grande, puede ser aconsejable deshabilitar la repartición, consulte citus.enable_repartitioned_insert_select (boolean).

Comando COPY (carga masiva)

Para cargar datos de forma masiva desde un archivo, puede usar directamente el comando \COPY de PostgreSQL.

En primer lugar, descargue nuestro ejemplo github_events conjunto de datos mediante la ejecución de:

wget http://examples.citusdata.com/github_archive/github_events-2015-01-01-{0..5}.csv.gz
gzip -d github_events-2015-01-01-*.gz

A continuación, puede copiar los datos mediante psql (tenga en cuenta que estos datos requieren que la base de datos tenga codificación UTF8):

\COPY github_events FROM 'github_events-2015-01-01-0.csv' WITH (format CSV)

Nota:

No hay ninguna noción de aislamiento de instantáneas entre particiones, lo que significa que una select de varias particiones que se ejecuta simultáneamente con una COPIA podría verla confirmada en algunas particiones, pero no en otras. Si el usuario almacena datos de eventos, es posible que ocasionalmente observe pequeñas lagunas en los datos recientes. Es necesario que las aplicaciones se ocupen de esta situación si se trata de un problema. Por ejemplo, podría excluir los datos más recientes de las consultas o usar algún bloqueo.

Si COPY no puede abrir una conexión para una ubicación de partición, se comporta de la misma manera que INSERT, es decir, marcar las ubicaciones como inactivas a menos que no haya más ubicaciones activas. Si se produce algún otro error después de conectarse, la transacción se revierte y, por tanto, no se realizan cambios en los metadatos.

Almacenamiento en caché de agregaciones con paquetes acumulativos

Las aplicaciones como las canalizaciones de datos de eventos y los paneles en tiempo real requieren consultas subsegundas en grandes volúmenes de datos. Una manera de hacer que estas consultas sean rápidas es calcular y ahorrar agregados con antelación. Esta técnica se denomina puesta al día de los datos y evita el costo de procesar datos sin procesar en tiempo de ejecución. Como ventaja adicional, la implementación de datos de series temporales en estadísticas diarias o por hora también puede ahorrar espacio. Los datos antiguos se pueden eliminar cuando sus detalles completos ya no son necesarios y los agregados son suficientes.

Por ejemplo, esta es una tabla distribuida para realizar el seguimiento de vistas de página por dirección URL:

CREATE TABLE page_views (
  site_id int,
  url text,
  host_ip inet,
  view_time timestamp default now(),

  PRIMARY KEY (site_id, url)
);

SELECT create_distributed_table('page_views', 'site_id');

Una vez que la tabla se rellena con datos, podemos ejecutar una consulta de agregado para contar vistas de página por dirección URL al día, restringiendo a un sitio y año determinado.

-- how many views per url per day on site 5?
SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE site_id = 5 AND
    view_time >= date '2016-01-01' AND view_time < date '2017-01-01'
  GROUP BY view_time::date, site_id, url;

Esta configuración funciona, pero tiene dos inconvenientes. En primer lugar, cuando se ejecuta repetidamente la consulta de agregado, debe pasar por cada fila relacionada y volver a calcular los resultados de todo el conjunto de datos. Si usa esta consulta para representar un panel, es más rápido guardar los resultados agregados en una tabla de vistas de página diaria y consultar esa tabla. En segundo lugar, los costos de almacenamiento aumentan proporcionalmente con los volúmenes de datos y la longitud del historial consultable. En la práctica, es posible que desee mantener los eventos sin procesar durante un breve período de tiempo y examinar los gráficos históricos en un período de tiempo más largo.

Para recibir esas ventajas, podemos crear una daily_page_views tabla para almacenar las estadísticas diarias.

CREATE TABLE daily_page_views (
  site_id int,
  day date,
  url text,
  view_count bigint,
  PRIMARY KEY (site_id, day, url)
);

SELECT create_distributed_table('daily_page_views', 'site_id');

En este ejemplo, se distribuyen tanto como page_viewsdaily_page_views en la site_id columna . Esta distribución garantiza que los datos correspondientes a un sitio determinado se coloquen en el mismo nodo. Mantener juntas las dos filas de las dos tablas en cada nodo minimiza el tráfico de red entre los nodos y permite una ejecución muy paralela.

Una vez creada esta nueva tabla distribuida, podemos ejecutar INSERT INTO ... SELECT para acumular vistas de página sin procesar en la tabla agregada. En el ejemplo siguiente, agregamos vistas de página cada día. Los usuarios de Citus a menudo esperan un período de tiempo determinado después del final del día para ejecutar una consulta como esta, para dar cabida a los datos de llegada tardía.

-- roll up yesterday's data
INSERT INTO daily_page_views (day, site_id, url, view_count)
  SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE view_time >= date '2017-01-01' AND view_time < date '2017-01-02'
  GROUP BY view_time::date, site_id, url;

-- now the results are available right out of the table
SELECT day, site_id, url, view_count
  FROM daily_page_views
  WHERE site_id = 5 AND
    day >= date '2016-01-01' AND day < date '2017-01-01';

Esta consulta de acumulación agrega datos del día anterior e los inserta en daily_page_views. La ejecución de la consulta una vez al día significa que no es necesario actualizar las filas de la tabla de acumulación, ya que los datos del nuevo día no afectan a las filas anteriores.

La situación cambia cuando se trabaja con datos de llegada tardía o se ejecuta la consulta de acumulación más de una vez al día. Si alguna fila nueva coincide con días ya en la tabla de acumulación, los recuentos de coincidencias deben aumentar. PostgreSQL puede controlar esta situación con ON CONFLICT, que es su técnica para realizar upserts. Este es un ejemplo.

-- roll up from a given date onward,
-- updating daily page views when necessary
INSERT INTO daily_page_views (day, site_id, url, view_count)
  SELECT view_time::date AS day, site_id, url, count(*) AS view_count
  FROM page_views
  WHERE view_time >= date '2017-01-01'
  GROUP BY view_time::date, site_id, url
  ON CONFLICT (day, url, site_id) DO UPDATE SET
    view_count = daily_page_views.view_count + EXCLUDED.view_count;

Actualizaciones y eliminación

Puede actualizar o eliminar filas de las tablas distribuidas mediante los comandos ESTÁNDAR UPDATE y DELETE de PostgreSQL.

DELETE FROM github_events
WHERE repo_id IN (24509048, 24509049);

UPDATE github_events
SET event_public = TRUE
WHERE (org->>'id')::int = 5430905;

Cuando las actualizaciones o eliminaciones afectan a varias particiones como en el ejemplo anterior, Citus usa de forma predeterminada un protocolo de confirmación de una fase. Para mayor seguridad, puede habilitar confirmaciones en dos fases estableciendo:

SET citus.multi_shard_commit_protocol = '2pc';

Si una actualización o eliminación afecta solo a una sola partición, se ejecuta dentro de un único nodo de trabajo. En este caso, la habilitación de confirmaciones en dos fases (2PC) no es necesaria. Esta situación suele ocurrir cuando las actualizaciones o elimina el filtro por la columna de distribución de una tabla:

-- since github_events is distributed by repo_id,
-- this will execute in a single worker node

DELETE FROM github_events
WHERE repo_id = 206084;

Además, al tratar con una sola partición, Citus admite SELECT ... FOR UPDATE. A veces, esta técnica la usan los asignadores relacionales de objetos (ORM) para:

  1. Cargar filas.
  2. Realice un cálculo en el código de la aplicación.
  3. Actualice las filas en función del cálculo.

Al seleccionar las filas para actualizar, se coloca un bloqueo de escritura en ellas para evitar que otros procesos provoquen una anomalía de actualización perdida .

BEGIN;

  -- select events for a repo, but
  -- lock them for writing
  SELECT *
  FROM github_events
  WHERE repo_id = 206084
  FOR UPDATE;

  -- calculate a desired value event_public using
  -- application logic that uses those rows...

  -- now make the update
  UPDATE github_events
  SET event_public = :our_new_value
  WHERE repo_id = 206084;

COMMIT;

Esta característica solo se admite para tablas distribuidas y de referencia hash.

Maximizar el rendimiento de escritura

Las instrucciones INSERT y UPDATE/DELETE se pueden escalar verticalmente hasta aproximadamente 50 000 consultas por segundo en máquinas grandes. Sin embargo, para lograr esta velocidad, debe usar muchas conexiones paralelas y de larga duración y considerar cómo tratar con el bloqueo. Para más información, consulte la sección Ingesta de datos de escalabilidad horizontal de nuestra documentación de rendimiento.