Hinzufügen von Postgre SQL Datenbank CDC als Quelle im Echtzeithub

Dieser Artikel beschreibt die Vorgehensweise zum Hinzufügen von Postgre SQL-Datenbank Change Data Capture (CDC) als Ereignisquelle im Fabric-Echtzeithub.

Der PostgreSQL-Datenbank-Change Data Capture (CDC)-Quellconnector für Microsoft Fabric-Eventstreams ermöglicht es, eine Momentaufnahme der aktuellen Daten in einer PostgreSQL-Datenbank zu erfassen. Derzeit wird PostgreSQL Database Change Data Capture (CDC) von den folgenden Diensten unterstützt, auf die öffentlich zugegriffen werden kann, um die Datenbanken abzurufen:

  • Azure-Datenbank für PostgreSQL
  • Amazon RDS für PostgreSQL
  • Amazon Aurora PostgreSQL
  • Google Cloud SQL für PostgreSQL

Sobald die CdC-Quelle der PostgreSQL-Datenbank dem Eventstream hinzugefügt wurde, erfasst sie Änderungen auf Zeilenebene an den angegebenen Tabellen. Diese Änderungen können dann in Echtzeit verarbeitet und zur weiteren Analyse an verschiedene Ziele gesendet werden.

Hinweis

Mit DeltaFlow (Vorschau) können Sie unformatierte Debezium-CDC-Ereignisse in analysefähige Datenströme umwandeln, die Ihre Quelltabellenstruktur spiegeln. DeltaFlow automatisiert schemaregistrierung, Zieltabellenverwaltung und Schemaentwicklungsbehandlung. Um DeltaFlow zu verwenden, wählen Sie während des Schemabehandlungsschritts analysebereite Ereignisse und automatisch aktualisiertes Schema aus.

Voraussetzungen

Aktivieren von CDC in Ihrer PostgreSQL-Datenbank

In diesem Abschnitt wird Azure Database für PostgreSQL als Beispiel verwendet.

Gehen Sie wie folgt vor, um CDC auf Ihrer Instanz von Azure Database for PostgreSQL – Flexibler Server zu aktivieren:

  1. Wählen Sie im Azure-Portal auf der Seite „Azure Database for PostgreSQL – Flexibler Server“ im Navigationsmenü die Option Serverparameter aus.

  2. Auf der Seite Serverparameter:

    • Setzen Sie wal_level auf logisch.
    • Aktualisieren Sie max_worker_processes auf mindestens 16.

    Screenshot der Aktivierung von CDC für die Bereitstellung eines flexiblen Servers.

  3. Speichern Sie die Änderungen, und starten Sie den Server neu.

  4. Vergewissern Sie sich, dass Ihre Instanz von Azure Database for PostgreSQL – Flexibler Server den öffentlichen Netzwerkdatenverkehr zulässt.

  5. Erteilen Sie dem Administratorbenutzer Replikationsberechtigungen durch Ausführen der folgenden SQL-Anweisung. Wenn Sie ein anderes Benutzerkonto verwenden möchten, um Ihre PostgreSQL-Datenbank (DB) zum Abrufen von CDC herzustellen, stellen Sie sicher, dass der Benutzer der Tabellenbesitzer ist.

    ALTER ROLE <admin_user_or_table_owner_user> WITH REPLICATION;
    
  1. Melden Sie sich bei Microsoft Fabric an.

  2. Wenn unten links auf der Seite Power BI angezeigt wird, wechseln Sie zur Fabric-Workload, indem Sie Power BI und dann Fabric auswählen.

    Screenshot: Wechseln zur Fabric-Workload

  3. Wählen Sie in der linken Navigationsleiste Echtzeit aus.

    Ein Screenshot, der zeigt, wie man die Funktion 'Mit Datenquelle verbinden' startet.

  4. Die Streamingdatenseite wird standardmäßig geöffnet. Klicken Sie auf die Schaltfläche " Daten hinzufügen ", um zur Seite " Datenquellen " zu gelangen.

    Screenshot der Seite

    Sie können auch direkt zur Seite " Datenquellen " gelangen, indem Sie in der linken Navigationsleiste die Option " Daten hinzufügen " auswählen.

    Screenshot der Schaltfläche

Wählen Sie PostgreSQL Database CDC als Quelltyp

Wählen Sie auf der Seite Datenquellen die Kategorie Microsoft-Quellen oben aus, und wählen Sie dann Verbinden auf der Kachel Azure DB für PostgreSQL (CDC) aus.

Screenshot der Auswahl von Azure Database (DB) für PostgreSQL (CDC) als Quelltyp auf der Seite Datenquellen.

Konfigurieren Sie Azure Database for PostgreSQL als CDC-Quelle

Erfassen von Änderungsdaten aus PostgreSQL-Datenbanken mit automatischer Tabellenschemaregistrierung über CDC in Eventstream.

Hinweis

DeltaFlow (Vorschau):Wenn Sie Analysefähige Ereignisse und automatisch aktualisiertes Schema im Schemabehandlungsschritt auswählen, transformiert DeltaFlow rohe Debezium CDC-Ereignisse in analysefähige Datenströme, die Ihre Quelltabellenstruktur spiegeln. DeltaFlow automatisiert auch die Erstellung von Zieltabellen und die Behandlung der Schemaentwicklung.

  1. Wählen Sie auf der Seite Verbinden die Option Neue Verbindung aus.

    Screenshot der Seite

  2. Geben Sie im Abschnitt Verbindungseinstellungen die folgende Informationen ein.

    • Server: Die Serveradresse Ihrer PostgreSQL-Datenbank, z. B. my-pgsql-server.postgres.database.azure.com.

    • Datenbank: Der Datenbankname, z. B. my_database.

      Screenshot des Abschnitts

    • Verbindungsname: Geben Sie einen Namen für die Verbindung ein.

    • Authentifizierungsart, Wählen Sie "Einfach " aus, und geben Sie Ihren Benutzernamen und Ihr Kennwort für die Datenbank ein.

      Hinweis

      Derzeit unterstützen Fabric-Ereignisstreams nur die Standardauthentifizierung .

    • Wählen Sie "Verbinden" aus, um die Verbindungseinstellungen abzuschließen. Screenshot des Abschnitts

  3. Port: Geben Sie die Portnummer Ihres Servers ein. Der Standardwert ist 5432. Wenn Ihre ausgewählte Cloudverbindung in "Verbindungen und Gateways verwalten" konfiguriert ist, stellen Sie sicher, dass die Portnummer dem dort festgelegten Entspricht. Wenn sie nicht übereinstimmen, haben die Portnummer in der Cloudverbindung unter "Verbindungen und Gateways verwalten " Vorrang.

  4. Sie können zwischen zwei Optionen beim Erfassen von Änderungen aus Datenbanktabellen wählen:

    • Alle Tabellen: Erfassen Sie Änderungen aus jeder Tabelle in der Datenbank.
    • Geben Sie Tabellennamen ein: Ermöglicht ihnen, eine Teilmenge von Tabellen mithilfe einer durch Trennzeichen getrennten Liste anzugeben. Sie können entweder: vollständige Tabellenbezeichner im Format schemaName.tableName oder gültige reguläre Ausdrücke verwenden. Beispiele:
    • dbo.test.*: Wählen Sie alle Tabellen aus, deren Namen mit test im dbo Schema beginnen.
    • dbo\.(test1|test2): Auswählen dbo.test1 und dbo.test2.

    Sie können beide Formate in der Liste kombinieren. Die Gesamtzeichenbeschränkung für den gesamten Eintrag beträgt 102.400 Zeichen.

  5. Slotname (optional): Geben Sie den Namen des logischen Dekodierungsslots in PostgreSQL ein, der für die Streaming-Änderungen eines bestimmten Plug-ins für eine bestimmte Datenbank oder ein bestimmtes Schema erstellt wurde. Der Server verwendet diesen Steckplatz, um Ereignisse zum Eventstream-Streamingconnector zu streamen. Sie darf nur Kleinbuchstaben, Zahlen und Unterstriche enthalten.

    • Wenn nicht angegeben, wird eine GUID verwendet, um den Steckplatz zu erstellen, der die entsprechenden Datenbankberechtigungen erfordert.
    • Wenn ein angegebener Steckplatzname vorhanden ist, verwendet der Verbinder ihn direkt.
  6. Erweitern Sie erweiterte Einstellungen , um auf weitere Konfigurationsoptionen für die CdC-Quelle der PostgreSQL-Datenbank zuzugreifen:

    • Publikationsname: Gibt den Namen der zu verwendenden logischen Replikationsveröffentlichung von PostgreSQL an. Dieser Wert muss mit einer vorhandenen Publikation in der Datenbank übereinstimmen, oder er wird abhängig vom automatischen Erstellungsmodus automatisch erstellt. Standardwert. dbz_publication.

      Hinweis

      Der Connectorbenutzer muss über Superuserberechtigungen verfügen, um die Publikation zu erstellen. Es wird empfohlen, die Publikation manuell zu erstellen, bevor Sie den Connector zum ersten Mal starten, um Berechtigungsbezogene Probleme zu vermeiden.

    • Automatischer Publikations-Erstellungsmodus: Steuert, ob und wie die Publikation automatisch erstellt wird. Zu den Optionen gehören:

      • Filtered (Standard): Wenn die angegebene Publikation nicht vorhanden ist, erstellt der Connector eine, die nur die ausgewählten Tabellen enthält, wie in der Tabellen-Auswahlliste angegeben.
      • AllTables: Wenn die angegebene Publikation vorhanden ist, verwendet der Connector sie. Wenn er nicht vorhanden ist, erstellt der Connector einen, der alle Tabellen in der Datenbank enthält.
      • Disabled: Der Connector erstellt keine Publikation. Wenn die angegebene Publikation fehlt, löst der Connector eine Ausnahme aus und stoppt. In diesem Fall muss die Publikation manuell in der Datenbank erstellt werden.

      Weitere Informationen finden Sie in der Debezium-Dokumentation zum automatischen Erstellen im Publikationsmodus.

    • Dezimalbehandlungsmodus: Gibt an, wie der Verbinder PostgreSQL DECIMAL und NUMERIC Spaltenwerte verarbeitet:

      • Precise: Stellt Werte mithilfe exakter Dezimaltypen dar (z. B. Java BigDecimal), um die volle Genauigkeit und Genauigkeit in der Datendarstellung sicherzustellen.
      • Double: Wandelt Werte in Gleitkommazahlen mit doppelter Genauigkeit um. Diese Option verbessert die Benutzerfreundlichkeit und Leistung, kann aber zu einem Genauigkeitsverlust führen.
      • String: Codiert Werte als formatierte Zeichenfolgen. Diese Option erleichtert die Nutzung in nachgelagerten Systemen, verliert jedoch semantische Informationen über den ursprünglichen numerischen Typ.
    • Momentaufnahmemodus: Geben Sie die Kriterien für das Ausführen einer Momentaufnahme an, wenn der Connector gestartet wird:

      • Initial: Der Connector führt eine Momentaufnahme nur aus, wenn keine Offsets für den logischen Servernamen aufgezeichnet wurden oder wenn erkannt wird, dass eine frühere Momentaufnahme nicht abgeschlossen werden konnte. Nach Abschluss der Momentaufnahme beginnt der Connector, Ereignisdatensätze für nachfolgende Datenbankänderungen zu streamen.
      • InitialOnly: Der Connector führt einen Snapshot nur dann aus, wenn für den logischen Servernamen keine Offsets aufgezeichnet wurden. Nach Abschluss der Momentaufnahme stoppt der Verbinder. Es wechselt nicht zum Streaming, um Änderungsereignisse aus dem Binlog zu lesen.
      • NoData: Der Connector führt eine Momentaufnahme aus, die nur das Schema, aber keine Tabellendaten erfasst. Legen Sie diese Option fest, wenn Sie keine konsistente Momentaufnahme der Daten benötigen, aber sie benötigen nur die Änderungen, die seit dem Start des Connectors vorgenommen werden.
    • Heartbeat-Abfrage: Gibt eine Abfrage an, die der Connector in der Quelldatenbank ausführt, wenn er eine Heartbeat-Nachricht sendet.

    • Außerkraftsetzung der Snapshot select-Anweisung: Gibt die Tabellenzeilen an, die in eine Momentaufnahme eingeschlossen werden sollen. Verwenden Sie die Eigenschaft, wenn eine Momentaufnahme nur eine Teilmenge der Zeilen in einer Tabelle enthalten soll. Diese Eigenschaft wirkt sich nur auf Momentaufnahmen aus. Sie gilt nicht für Ereignisse, die der Connector aus dem Protokoll liest.

Stream- oder Quelldetails

  1. Führen Sie auf der Seite "Verbinden " einen dieser Schritte aus, je nachdem, ob Sie Eventstream oder Real-Time Hub verwenden.

    • Eventstream:

      Führen Sie im Bereich " Quelldetails " rechts die folgenden Schritte aus:

      1. Um den Quellnamen zu ändern, wählen Sie das Stiftsymbol aus.

      2. Beachten Sie, dass der Eventstream-Name und der Stream-Name schreibgeschützt sind.

    • Real-Time Hub:

      Führen Sie im Abschnitt Datenstromdetails die folgenden Schritte aus:

      1. Wählen Sie den Fabric-Arbeitsbereich aus, in dem Sie den Eventstream erstellen möchten.

      2. Wählen Sie für den Eventstream-Namen die Stiftschaltfläche aus, und geben Sie einen Namen für den Eventstream ein.

      3. Der Wert für den Stream-Namen wird automatisch generiert, indem "-stream " an den Namen des Ereignisstreams angefügt wird. Dieser Datenstrom wird auf der Seite "Alle Datenströme" des Echtzeithubs angezeigt, wenn der Assistent beendet wird.

  2. Wählen Sie unten auf der Seite "Konfigurieren" die Option "Weiter" aus.

Überprüfen und Verbinden

Überprüfen Sie auf dem Bildschirm "Überprüfen + Verbinden " die Zusammenfassung, und wählen Sie "Hinzufügen " (Eventstream) oder "Verbinden " (Real-Time Hub) aus.

Schemabehandlungsseite

  1. Wählen Sie im Schemabehandlungsschritt eine der folgenden Optionen aus:

    • Analysefähige Ereignisse und automatisch aktualisiertes Schema (DeltaFlow Preview): Der Connector wandelt unformatierte CDC-Ereignisse in analysefähige Datenströme um, die Ihre Quelltabellenstruktur spiegeln. DeltaFlow erweitert Ereignisse mit Metadaten wie Änderungstyp (Einfügen, Aktualisieren oder Löschen) und Zeitstempeln und verwaltet automatisch Zieltabellen und Schemaentwicklung.
    • Roh-CDC-Ereignisse: Der Connector importiert und stellt die Roh-CDC-Ereignisse zur Verfügung. Optional kann der Connector Tabellenschemas automatisch bestimmen und in der Schemaregistrierung registrieren. Verwenden Sie diese Option, wenn Sie die Schemasensibilisierung ohne DeltaFlow-Transformation wünschen.

    Hinweis

    Der folgende Screenshot zeigt CDC in Azure SQL-Datenbank. Die Schemabehandlungsoptionen sind für alle unterstützten CDC-Quellconnectors identisch.

    Screenshot des Schemabehandlungsschritts mit DeltaFlow- und Raw CDC-Ereignisoptionen für einen CDC-Quellconnector.

  2. Aktivieren Sie die Ereignisschemazuordnung.

  3. Wählen Sie einen Fabric-Arbeitsbereich für den Schemasatz aus.

  4. Bei Schemasatz ist +Erstellen standardmäßig ausgewählt, wodurch ein neuer Schemasatz erstellt wird. Sie können ihn ändern, um einen vorhandenen Ereignisschemasatz auszuwählen.

  5. Wenn Sie im vorherigen Schritt die Option +Erstellen ausgewählt haben, geben Sie einen Namen für den Schemasatz ein.

  6. Überprüfen Sie auf der Seite "Überprüfen + Verbinden " die Zusammenfassung, und wählen Sie dann "Hinzufügen " (Eventstream) oder "Verbinden " (Real-Time Hub) aus.

    Screenshot der Seite

    Für alle Tabellen oder ausgewählte Tabellen in der PostgreSQL-Datenbank erkennt der Connector automatisch und erstellt Schemas, die mit dem Schemaregister registriert werden.

DeltaFlow: Analysebereite Ereignistransformation (Vorschau)

Wenn Sie Analysefähige Ereignisse und automatisch aktualisiertes Schema (DeltaFlow) aktivieren, bietet der Connector die folgenden Funktionen:

  • Für Analysen geeignete Ereignisstruktur: Rohe Debezium CDC-Ereignisse werden in ein tabellarisches Format umgewandelt, das die Struktur der Quelltabelle widerspiegelt. Ereignisse werden mit Metadatenspalten erweitert, einschließlich des Änderungstyps (insert, updateoder delete) und des Ereigniszeitstempels.
  • Automatische Verwaltung der Zieltabelle: Wenn Sie DeltaFlow-fähige Datenströme an ein unterstütztes Ziel wie ein Eventhouse weiterleiten, werden Zieltabellen automatisch erstellt, um dem Quelltabellenschema zu entsprechen. Sie müssen keine Zieltabellen manuell erstellen oder konfigurieren.
  • Schemaentwicklungsbehandlung: Wenn sich Quelldatenbanktabellen ändern (z. B. werden neue Spalten hinzugefügt oder Tabellen erstellt), erkennt DeltaFlow automatisch die Änderungen, aktualisiert die registrierten Schemas und passt die Zieltabellen entsprechend an. Dieses Verhalten minimiert den manuellen Eingriff durch Schemaänderungen.

Hinweis

DeltaFlow (Vorschau) wird derzeit mit Azure SQL-Datenbank-CDC, Azure SQL Managed Instance CDC, SQL Server auf virtuellem Computer CDC und PostgreSQL CDC-Quellconnectors unterstützt.

Ausführliche Informationen dazu, wie DeltaFlow rohe CDC-Ereignisse in analysebereite Ausgaben transformiert, einschließlich Vorgangstypen und Metadatenspalten, finden Sie unter DeltaFlow-Ausgabetransformation.

Datenstrom-Details anzeigen

  1. Wenn Sie auf der Seite Überprüfen + Verbinden den Ereignisstream öffnen, öffnet der Assistent den Ereignisstream, den er für Sie mit dem ausgewählten PostgreSQL-Datenbank-CDC als Quelle erstellt hat. Um den Assistenten zu schließen, wählen Sie unten auf der Seite Fertig stellen aus.

    Screenshot, der die Erfolgsseite Überprüfung und Verbindung zeigt.

  2. Sie sollten den Stream im Abschnitt "Aktuelle Streamingdaten" der Startseite desReal-Time-Hubs sehen. Ausführliche Schritte finden Sie unter Details von Datenströmen im Fabric Real-Time Hub anzeigen.

Weitere Informationen zum Thema Konsumieren von Datenströmen finden Sie in den folgenden Artikeln: