Condividi tramite


Cattura delle Modifiche ai Dati (CDC) di SQL Server con Apache Flink®

Importante

Azure HDInsight su Azure Kubernetes Service è stato ritirato il 31 gennaio 2025. Scopri di più con questo annuncio.

È necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare la chiusura brusca dei carichi di lavoro.

Importante

Questa funzionalità è attualmente in anteprima. Le condizioni supplementari per l'utilizzo per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure in versione beta, in anteprima o altrimenti non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere informazioni sull'anteprima di Azure HDInsight su AKS. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguireci per ulteriori aggiornamenti sulla comunità di Azure HDInsight.

Change Data Capture (CDC) è una tecnica che consente di tenere traccia delle modifiche a livello di riga nelle tabelle di database in risposta a operazioni di creazione, aggiornamento ed eliminazione. In questo articolo vengono usati CONNETTORI CDC per Apache Flink®, che offrono un set di connettori di origine per Apache Flink. I connettori integrano Debezium® come motore per il rilevamento delle modifiche ai dati.

Apache Flink supporta l'interpretazione dei messaggi JSON e Avro Debezium come messaggi di tipo INSERT/UPDATE/DELETE all'interno del sistema SQL Flink.

Questo supporto è utile in molti casi per:

  • Sincronizzare i dati incrementali dai database ad altri sistemi
  • Registri di audit
  • Creare viste materializzate in tempo reale nei database
  • Visualizzare la cronologia delle modifiche del join temporale di una tabella di database

A questo punto si apprenderà come usare Change Data Capture (CDC) di SQL Server usando Flink SQL. Il connettore SQLServer CDC consente di leggere i dati degli snapshot e i dati incrementali dal database SQLServer.

Prerequisiti

Connettore SQL Server CDC

Il connettore SQLServer CDC è un connettore di origine per Flink, che legge inizialmente lo snapshot del database e poi continua a leggere gli eventi di modifica garantendo una semantica di elaborazione esattamente una volta, anche in caso di errori. Questo esempio usa Flink CDC per creare una tabella SQLServerCDC in FLINK SQL

Questa sezione è già stata illustrata in dettaglio su come usare shell sicura con Flink.

Preparare la tabella e abilitare la funzionalità CDC in SQL Server SQLDB

Per preparare una tabella e abilitare CDC, è possibile fare riferimento ai passaggi dettagliati elencati in documentazione SQL

Creare un database

CREATE DATABASE inventory;
GO

Abilitare CDC nel database di SQL Server

USE inventory;
EXEC sys.sp_cdc_enable_db;  
GO

Verificare che l'utente abbia accesso alla tabella CDC

USE inventory
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

Nota

La query restituisce informazioni di configurazione per ogni tabella del database (abilitata per CDC). Se il risultato è vuoto, verificare che l'utente disponga dei privilegi per accedere sia all'istanza di acquisizione che alle tabelle CDC.

Creare e popolare i nostri prodotti usando un'unica inserzione con molte righe

CREATE TABLE products (
id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);

INSERT INTO products(name,description,weight)
VALUES ('scooter','Small 2-wheel scooter',3.14);
INSERT INTO products(name,description,weight)
VALUES ('car battery','12V car battery',8.1);
INSERT INTO products(name,description,weight)
VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products(name,description,weight)
VALUES ('hammer','12oz carpenter''s hammer',0.75);
INSERT INTO products(name,description,weight)
VALUES ('hammer','14oz carpenter''s hammer',0.875);
INSERT INTO products(name,description,weight)
VALUES ('hammer','16oz carpenter''s hammer',1.0);
INSERT INTO products(name,description,weight)
VALUES ('rocks','box of assorted rocks',5.3);
INSERT INTO products(name,description,weight)
VALUES ('jacket','water resistant black wind breaker',0.1);
INSERT INTO products(name,description,weight)
VALUES ('spare tire','24 inch spare tire',22.2);

EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;

-- Creating simple orders on SQL Table

CREATE TABLE orders (
id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);

INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('16-JAN-2016', 1001, 1, 102);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('17-JAN-2016', 1002, 2, 105);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('19-FEB-2016', 1002, 2, 106);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('21-FEB-2016', 1003, 1, 107);

EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0;
GO

Scaricare il connettore SQLServer CDC in SSH

wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.4.1/flink-sql-connector-sqlserver-cdc-2.4.1.jar
bin/sql-client.sh -j flink-sql-connector-sqlserver-cdc-2.4.1.jar

Creare una tabella SQLServer CDC

SET 'sql-client.execution.result-mode' = 'tableau';

CREATE TABLE orders (
    id INT,
    order_date DATE,
    purchaser INT,
    quantity INT,
    product_id INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = '<updatehostname>.database.windows.net', //update with the host name
    'port' = '1433',
    'username' = '<update-username>', //update with the user name
    'password' = '<update-password>', //update with the password
    'database-name' = 'inventory',
    'table-name' = 'dbo.orders'
);

select * from orders;

Screenshot che mostra la creazione della tabella CDC Flink.

Eseguire modifiche sulla tabella dal lato SQLServer

Screenshot che mostra le modifiche apportate alla tabella SQL.

Validazione

Monitorare la tabella in Flink SQL

Screenshot che mostra come monitorare CDC in Flink SQL.

Riferimento