Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
Azure HDInsight su Azure Kubernetes Service è stato ritirato il 31 gennaio 2025. Scopri di più con questo annuncio.
È necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare la chiusura brusca dei carichi di lavoro.
Importante
Questa funzionalità è attualmente in anteprima. Le condizioni supplementari per l'utilizzo per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure in versione beta, in anteprima o altrimenti non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere informazioni sull'anteprima di Azure HDInsight su AKS. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguireci per ulteriori aggiornamenti sulla comunità di Azure HDInsight.
Change Data Capture (CDC) è una tecnica che consente di tenere traccia delle modifiche a livello di riga nelle tabelle di database in risposta a operazioni di creazione, aggiornamento ed eliminazione. In questo articolo vengono usati CONNETTORI CDC per Apache Flink®, che offrono un set di connettori di origine per Apache Flink. I connettori integrano Debezium® come motore per il rilevamento delle modifiche ai dati.
Apache Flink supporta l'interpretazione dei messaggi JSON e Avro Debezium come messaggi di tipo INSERT/UPDATE/DELETE all'interno del sistema SQL Flink.
Questo supporto è utile in molti casi per:
- Sincronizzare i dati incrementali dai database ad altri sistemi
- Registri di audit
- Creare viste materializzate in tempo reale nei database
- Visualizzare la cronologia delle modifiche del join temporale di una tabella di database
A questo punto si apprenderà come usare Change Data Capture (CDC) di SQL Server usando Flink SQL. Il connettore SQLServer CDC consente di leggere i dati degli snapshot e i dati incrementali dal database SQLServer.
Prerequisiti
Connettore SQL Server CDC
Il connettore SQLServer CDC è un connettore di origine per Flink, che legge inizialmente lo snapshot del database e poi continua a leggere gli eventi di modifica garantendo una semantica di elaborazione esattamente una volta, anche in caso di errori. Questo esempio usa Flink CDC per creare una tabella SQLServerCDC in FLINK SQL
Usare SSH per usare il client SQL Flink
Questa sezione è già stata illustrata in dettaglio su come usare shell sicura con Flink.
Preparare la tabella e abilitare la funzionalità CDC in SQL Server SQLDB
Per preparare una tabella e abilitare CDC, è possibile fare riferimento ai passaggi dettagliati elencati in documentazione SQL
Creare un database
CREATE DATABASE inventory;
GO
Abilitare CDC nel database di SQL Server
USE inventory;
EXEC sys.sp_cdc_enable_db;
GO
Verificare che l'utente abbia accesso alla tabella CDC
USE inventory
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
Nota
La query restituisce informazioni di configurazione per ogni tabella del database (abilitata per CDC). Se il risultato è vuoto, verificare che l'utente disponga dei privilegi per accedere sia all'istanza di acquisizione che alle tabelle CDC.
Creare e popolare i nostri prodotti usando un'unica inserzione con molte righe
CREATE TABLE products (
id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
INSERT INTO products(name,description,weight)
VALUES ('scooter','Small 2-wheel scooter',3.14);
INSERT INTO products(name,description,weight)
VALUES ('car battery','12V car battery',8.1);
INSERT INTO products(name,description,weight)
VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products(name,description,weight)
VALUES ('hammer','12oz carpenter''s hammer',0.75);
INSERT INTO products(name,description,weight)
VALUES ('hammer','14oz carpenter''s hammer',0.875);
INSERT INTO products(name,description,weight)
VALUES ('hammer','16oz carpenter''s hammer',1.0);
INSERT INTO products(name,description,weight)
VALUES ('rocks','box of assorted rocks',5.3);
INSERT INTO products(name,description,weight)
VALUES ('jacket','water resistant black wind breaker',0.1);
INSERT INTO products(name,description,weight)
VALUES ('spare tire','24 inch spare tire',22.2);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;
-- Creating simple orders on SQL Table
CREATE TABLE orders (
id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('16-JAN-2016', 1001, 1, 102);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('17-JAN-2016', 1002, 2, 105);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('19-FEB-2016', 1002, 2, 106);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('21-FEB-2016', 1003, 1, 107);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0;
GO
Scaricare il connettore SQLServer CDC in SSH
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.4.1/flink-sql-connector-sqlserver-cdc-2.4.1.jar
Aggiungere jar in sql-client.sh e connettersi a Flink SQL Client
bin/sql-client.sh -j flink-sql-connector-sqlserver-cdc-2.4.1.jar
Creare una tabella SQLServer CDC
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE orders (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '<updatehostname>.database.windows.net', //update with the host name
'port' = '1433',
'username' = '<update-username>', //update with the user name
'password' = '<update-password>', //update with the password
'database-name' = 'inventory',
'table-name' = 'dbo.orders'
);
select * from orders;
Eseguire modifiche sulla tabella dal lato SQLServer
Validazione
Monitorare la tabella in Flink SQL
Riferimento
- Il connettore SQLServer CDC è concesso in licenza sotto la licenza Apache 2.0
- Apache, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi del Apache Software Foundation (ASF).