Dodaj bazę danych PostgreSQL z CDC jako źródło w centrum przetwarzania w czasie rzeczywistym

W tym artykule opisano, jak dodać PostgreSQL Database Change Data Capture (CDC) jako źródło zdarzeń w hubie Fabric Real-Time.

Łącznik źródłowy wykrywania zmian danych w bazie danych PostgreSQL (CDC) dla strumieni zdarzeń usługi Microsoft Fabric umożliwia wykonanie migawki aktualnych danych w bazie danych PostgreSQL. Obecnie funkcja przechwytywania zmian danych bazy danych PostgreSQL (CDC) jest obsługiwana z następujących usług, do których można publicznie uzyskać dostęp do baz danych:

  • Azure Database for PostgreSQL
  • Amazon RDS for PostgreSQL
  • Amazon Aurora PostgreSQL
  • Google Cloud SQL for PostgreSQL

Po dodaniu źródła CDC bazy danych PostgreSQL do strumieniu zdarzeń, przechwytuje zmiany na poziomie wiersza w określonych tabelach. Te zmiany można następnie przetwarzać w czasie rzeczywistym i wysyłać do różnych miejsc docelowych w celu dalszej analizy.

Uwaga

Za pomocą funkcji DeltaFlow (wersja zapoznawcza) można przekształcić nieprzetworzone zdarzenia CDC Debezium na strumienie gotowe do analizy, które odzwierciedlają strukturę tabeli źródłowej. Usługa DeltaFlow automatyzuje rejestrację schematu, zarządzanie tabelami docelowymi i obsługę ewolucji schematu. Aby użyć funkcji DeltaFlow, wybierz pozycję Zdarzenia gotowe do analizy i automatycznie zaktualizowany schemat podczas kroku obsługi schematu.

Wymagania wstępne

Włączanie usługi CDC w bazie danych PostgreSQL

W tej sekcji użyto Azure Database for PostgreSQL jako przykład.

Aby włączyć CDC w elastycznym serwerze Azure Database for PostgreSQL, wykonaj następujące kroki:

  1. Na stronie elastycznego serwera PostgreSQL w portalu Azure wybierz pozycję Parametry serwera w menu nawigacji.

  2. Na stronie Parametry serwera:

    • Ustaw wal_level na wartość logiczną.
    • Zaktualizuj max_worker_processes do co najmniej 16.

    Zrzut ekranu przedstawiający włączanie usługi CDC dla wdrożenia serwera elastycznego.

  3. Zapisz zmiany i uruchom ponownie serwer.

  4. Potwierdź, że wystąpienie elastycznego serwera Azure Database for PostgreSQL zezwala na publiczny ruch sieciowy.

  5. Udziel uprawnień replikacji użytkownika administratora, uruchamiając następującą instrukcję SQL. Jeśli chcesz użyć innego konta użytkownika, aby połączyć bazę danych PostgreSQL (DB) w celu pobrania usługi CDC, upewnij się, że użytkownik jest właścicielem tabeli.

    ALTER ROLE <admin_user_or_table_owner_user> WITH REPLICATION;
    
  1. Zaloguj się do usługi Microsoft Fabric.

  2. Jeśli w lewym dolnym rogu strony wyświetla się Power BI, przejdź do obciążenia Fabric, wybierając pozycję Power BI, a następnie wybierając pozycję Fabric.

    Zrzut ekranu przedstawiający sposób przełączania się do obciążenia typu Fabric.

  3. Wybierz pozycję Czas rzeczywisty na lewym pasku nawigacyjnym.

    zrzut ekranu pokazujący sposób uruchamiania programu Connect to data source experience (Łączenie ze źródłem danych).

  4. Domyślnie zostanie otwarta strona Dane przesyłane strumieniowo . Kliknij przycisk Dodaj dane , aby przejść do strony Źródła danych .

    Zrzut ekranu przedstawiający stronę Źródła danych w centrum Real-Time.

    Możesz również przejść do strony Źródła danych bezpośrednio, wybierając opcję Dodaj dane na pasku nawigacyjnym po lewej stronie.

    Zrzut ekranu przedstawiający przycisk Połącz źródło danych.

Wybierz bazę danych PostgreSQL CDC jako typ źródła

Na stronie Źródła danych wybierz kategorię Źródła firmy Microsoft u góry, a następnie wybierz pozycję Połącz na kafelku Azure DB for PostgreSQL (CDC).

Zrzut ekranu przedstawiający wybór usługi Azure Database (DB) for PostgreSQL (CDC) jako typu źródła na stronie Źródła danych.

Konfigurowanie źródła cdC usługi Azure Database for PostgreSQL

Pobieranie danych o zmianach z baz danych PostgreSQL z automatyczną rejestracją schematu tabeli z wykorzystaniem mechanizmu CDC do Eventstream.

Uwaga

DeltaFlow (wersja zapoznawcza): po wybraniu zdarzeń gotowych do analizy i automatycznie zaktualizowanego schematu w kroku obsługi schematu funkcja DeltaFlow przekształca nieprzetworzone zdarzenia CDC Debezium na strumienie gotowe do analizy, które odzwierciedlają strukturę tabeli źródłowej. Usługa DeltaFlow automatyzuje również tworzenie tabel docelowych i obsługę ewolucji schematu.

  1. Na stronie Łączenie wybierz pozycję Nowe połączenie.

    Zrzut ekranu przedstawiający stronę połączenia bazy danych PostgreSQL z wyróżnionym linkiem Nowe połączenie.

  2. W sekcji Ustawienia połączenia wprowadź następujące informacje.

    • Serwer: adres serwera bazy danych PostgreSQL, na przykład my-pgsql-server.postgres.database.azure.com.

    • Baza danych: nazwa bazy danych, na przykład my_database.

      Zrzut ekranu przedstawiający sekcję Ustawienia połączenia dla łącznika bazy danych PostgreSQL.

    • Nazwa połączenia: wprowadź nazwę połączenia.

    • Rodzaj uwierzytelniania, wybierz pozycję Podstawowa i wprowadź nazwę użytkownika i hasło dla bazy danych.

      Uwaga

      Obecnie strumienie zdarzeń Fabric obsługują tylko uwierzytelnianie podstawowe.

    • Wybierz pozycję Połącz , aby ukończyć ustawienia połączenia. Zrzut ekranu przedstawiający sekcję Poświadczenia połączenia dla łącznika bazy danych PostgreSQL.

  3. Port: wprowadź numer portu serwera. Wartość domyślna to 5432. Jeśli wybrane połączenie w chmurze jest skonfigurowane w Zarządzanie połączeniami i bramami, upewnij się, że numer portu jest zgodny z tym ustawionym. Jeśli nie są one zgodne, numer portu w połączeniu z chmurą w Zarządzanie połączeniami i bramami ma pierwszeństwo.

  4. Podczas przechwytywania zmian z tabel bazy danych można wybrać dwie opcje:

    • Wszystkie tabele: przechwyć zmiany z każdej tabeli w bazie danych.
    • Wprowadź nazwy tabel: umożliwia określenie podzbioru tabel przy użyciu listy rozdzielanej przecinkami. Można użyć jednego z następujących typów: pełnych identyfikatorów tabeli w formacie schemaName.tableName lub prawidłowych wyrażeniach regularnych. Przykłady:
    • dbo.test.*: Wybierz wszystkie tabele, których nazwy zaczynają się od test w schemacie dbo .
    • dbo\.(test1|test2): Wybierz dbo.test1 i dbo.test2.

    Oba formaty można połączyć na liście. Całkowity limit znaków dla całego wpisu wynosi 102 400 znaków.

  5. Nazwa miejsca (opcjonalnie): wprowadź nazwę gniazda dekodowania logicznego PostgreSQL, które zostało utworzone na potrzeby przesyłania strumieniowego zmian z konkretnej wtyczki dla określonej bazy danych/schematu. Serwer używa tego miejsca do przesyłania zdarzeń do łącznika strumieniowego Eventstream. Musi zawierać tylko małe litery, cyfry i podkreślenia.

    • Jeśli nie zostanie określony żaden identyfikator GUID, używany jest do stworzenia slotu, co wymaga posiadania odpowiednich uprawnień bazy danych.
    • Jeśli istnieje określona nazwa miejsca, łącznik używa jej wprost.
  6. Rozwiń pozycję Ustawienia zaawansowane, aby uzyskać dostęp do większej liczby opcji konfiguracji źródła CDC PostgreSQL Database:

    • Nazwa publikacji: określa nazwę publikacji replikacji logicznej PostgreSQL, którą należy użyć. Ta wartość musi być zgodna z istniejącą publikacją w bazie danych lub jest tworzona automatycznie w zależności od trybu automatycznego tworzenia. Wartość domyślna: dbz_publication.

      Uwaga

      Aby utworzyć publikację, użytkownik łącznika musi mieć uprawnienia administratora. Zalecamy ręczne utworzenie publikacji przed uruchomieniem łącznika po raz pierwszy, aby uniknąć problemów związanych z uprawnieniami.

    • Tryb automatycznego tworzenia publikacji: określa, czy i jak publikacja jest tworzona automatycznie. Dostępne opcje:

      • Filtered (ustawienie domyślne): Jeśli określona publikacja nie istnieje, łącznik tworzy ten, który zawiera tylko wybrane tabele (jak określono na liście dołączanej do tabeli).
      • AllTables: Jeśli określona publikacja istnieje, łącznik jej używa. Jeśli nie istnieje, łącznik tworzy jeden, który zawiera wszystkie tabele w bazie danych.
      • Disabled: Łącznik nie tworzy publikacji. Jeśli brakuje określonej publikacji, łącznik zgłasza wyjątek i zatrzymuje się. W takim przypadku publikacja musi zostać utworzona ręcznie w bazie danych.

      Aby uzyskać więcej informacji, zobacz dokumentację debezium dotyczącą trybu automatycznego tworzenia publikacji

    • Tryb obsługi dziesiętnej: określa, jak łącznik obsługuje wartości kolumn PostgreSQL DECIMAL i NUMERIC:

      • Precise: reprezentuje wartości przy użyciu dokładnych typów dziesiętnych (na przykład Java BigDecimal), aby zapewnić pełną precyzję i dokładność w reprezentacji danych.
      • Double: konwertuje wartości na liczby zmiennoprzecinkowe o podwójnej precyzji. Ta opcja zwiększa użyteczność i wydajność, ale może spowodować utratę precyzji.
      • String: koduje wartości jako sformatowane ciągi. Ta opcja ułatwia korzystanie z nich w systemach podrzędnych, ale traci semantyczne informacje o oryginalnym typie liczbowym.
    • Tryb migawki: Wskaż kryteria dotyczące sposobu wykonywania migawki po uruchomieniu łącznika:

      • Initial: Łącznik uruchamia migawkę tylko wtedy, gdy nie zarejestrowano żadnych przesunięć dla nazwy serwera logicznego lub jeśli wykryje, że wcześniejsza migawka nie powiodła się. Po zakończeniu tworzenia migawki łącznik zaczyna przesyłać strumieniowo rekordy zdarzeń dla kolejnych zmian bazy danych.
      • InitialOnly: Konektor wykonuje migawkę tylko wtedy, gdy nie zarejestrowano żadnych przesunięć dla nazwy serwera logicznego. Po zakończeniu tworzenia migawki łącznik przestaje działać. Nie przechodzi do przesyłania strumieniowego w celu odczytu zdarzeń zmiany z binloga.
      • NoData: Łącznik uruchamia migawkę, która przechwytuje tylko schemat, ale nie żadne dane tabeli. Ustaw tę opcję, jeśli nie potrzebujesz spójnej migawki danych, ale potrzebne są tylko zmiany wykonywane od momentu uruchomienia łącznika.
    • Zapytanie akcji pulsu: określa zapytanie, które łącznik wykonuje w źródłowej bazie danych, gdy łącznik wysyła komunikat pulsu.

    • Zastępowanie instrukcji SELECT migawki: określa wiersze tabeli do uwzględnienia w migawce. Użyj właściwości , jeśli chcesz, aby migawka zawierała tylko podzbiór wierszy w tabeli. Ta właściwość ma wpływ tylko na migawki. Nie ma zastosowania do zdarzeń odczytanych przez łącznik z dziennika.

Szczegóły strumienia lub źródła

  1. Na stronie Łączenie wykonaj jedną z tych czynności w zależności od tego, czy używasz usługi Eventstream, czy Real-Time Hub.

    • Strumień zdarzeń:

      W okienku Szczegóły źródła po prawej stronie wykonaj następujące kroki:

      1. W polu Nazwa źródła wybierz przycisk Ołówek , aby zmienić nazwę.

      2. Zwróć uwagę, że nazwa strumienia zdarzeń i nazwa strumienia są tylko do odczytu.

    • Centrum czasu rzeczywistego:

      W sekcji Szczegóły usługi Stream po prawej stronie wykonaj następujące kroki:

      1. Wybierz obszar roboczy Fabric, w którym chcesz utworzyć strumień zdarzeń.

      2. W polu Nazwa strumienia zdarzeń wybierz przycisk Ołówek i wprowadź nazwę strumienia zdarzeń.

      3. Wartość nazwy strumienia jest automatycznie generowana przez dodanie -stream do nazwy eventstream. Strumień ten pojawia się na stronie Wszystkie strumienie danych na centrum czasu rzeczywistego, gdy kreator zakończy działanie.

  2. Wybierz pozycję Dalej w dolnej części strony Konfigurowanie .

Przeglądanie i nawiązywanie połączenia

Na ekranie Przeglądanie i łączenie przejrzyj podsumowanie, a następnie wybierz pozycję Dodaj (Eventstream) lub Połącz (Real-Time hub).

Strona obsługi schematu

  1. W kroku Obsługa schematu wybierz jedną z następujących opcji:

    • Zdarzenia gotowe do analizy i automatycznie zaktualizowany schemat (wersja zapoznawcza usługi DeltaFlow): łącznik przekształca nieprzetworzone zdarzenia CDC w strumienie gotowe do analizy, które odzwierciedlają strukturę tabeli źródłowej. Funkcja DeltaFlow wzbogaca zdarzenia o metadane, takie jak typ zmiany (wstawianie, aktualizowanie lub usuwanie) i znaczniki czasu, a także automatycznie zarządza tabelami docelowymi i ewolucją schematu.
    • Nieprzetworzone zdarzenia CDC: łącznik pozyskuje i udostępnia nieprzetworzone zdarzenia CDC. Opcjonalnie łącznik może automatycznie odnajdywać schematy tabel i rejestrować je w rejestrze schematów. Użyj tej opcji, jeśli chcesz mieć świadomość schematu bez przekształcenia DeltaFlow.

    Uwaga

    Poniższy zrzut ekranu przedstawia usługę Azure SQL Database CDC. Opcje obsługi schematu są takie same dla wszystkich obsługiwanych łączników źródłowych CDC.

    Zrzut ekranu przedstawiający krok obsługi schematu z opcjami zdarzeń DeltaFlow i Raw CDC dla łącznika źródła CDC.

  2. Włącz skojarzenie schematu zdarzeń.

  3. Wybierz obszar roboczy typu Fabric dla zestawu schematów workspace.

  4. W obszarze Zestaw schematówopcja + Utwórz jest domyślnie wybierana, co powoduje utworzenie nowego zestawu schematów. Można go zmienić, aby wybrać istniejący zestaw schematów zdarzeń.

  5. Jeśli w poprzednim kroku wybrano opcję + Utwórz , wprowadź nazwę zestawu schematów.

  6. Na stronie Przeglądanie + łączenie przejrzyj podsumowanie, a następnie wybierz pozycję Dodaj (Eventstream) lub Połącz (Real-Time hub).

    Zrzut ekranu przedstawiający stronę Przeglądanie i tworzenie łącznika bazy danych PostgreSQL z rozszerzonymi funkcjami.

    W przypadku wszystkich tabel lub wybranych tabel w bazie danych PostgreSQL łącznik automatycznie odnajduje i tworzy schematy oraz rejestruje je w rejestrze schematów.

DeltaFlow: przekształcanie zdarzeń gotowych do analizy (wersja zapoznawcza)

Po włączeniu funkcji zdarzeń gotowych do analizy i automatycznego aktualizowania schematu (DeltaFlow) łącznik oferuje następujące możliwości:

  • Kształt zdarzenia gotowego do analityki: Surowe zdarzenia CDC Debezium są przekształcane w format tabelaryczny, który odzwierciedla strukturę tabeli źródłowej. Zdarzenia są wzbogacone o kolumny metadanych, w tym typ zmiany (insert, updatelub delete) i znacznik czasu zdarzenia.
  • Automatyczne zarządzanie tabelami docelowymi: w przypadku kierowania strumieni z obsługą funkcji DeltaFlow do obsługiwanego miejsca docelowego, takiego jak magazyn zdarzeń, tabele docelowe są tworzone automatycznie w celu dopasowania do schematu tabeli źródłowej. Nie musisz ręcznie tworzyć ani konfigurować tabel docelowych.
  • Obsługa ewolucji schematu: gdy tabele źródłowej bazy danych zmieniają się (na przykład dodawane są nowe kolumny lub tworzone tabele), funkcja DeltaFlow automatycznie wykrywa zmiany, aktualizuje zarejestrowane schematy i odpowiednio dostosowuje tabele docelowe. To zachowanie minimalizuje interwencję ręczną spowodowaną zmianami schematu.

Uwaga

Usługa DeltaFlow (wersja zapoznawcza) jest obecnie obsługiwana w przypadku łączników źródłowych cdC usługi Azure SQL Database, usługi Azure SQL Managed Instance CDC, programu SQL Server na maszynie wirtualnej i łączników źródłowych usługi CdC postgreSQL.

Aby uzyskać szczegółowe informacje na temat przekształcania surowych zdarzeń CDC w dane wyjściowe gotowe do analizy, w tym typy operacji i kolumny metadanych, zobacz DeltaFlow output transformation.

Wyświetlanie szczegółów strumienia danych

  1. Na stronie Przeglądanie i łączenie, jeśli wybierzesz Otwórz strumień zdarzeń, kreator otworzy utworzony dla Ciebie strumień zdarzeń z wybraną bazą danych PostgreSQL z CDC jako źródłem. Aby zamknąć kreatora, wybierz opcję Zakończ w dolnej części strony.

    Zrzut ekranu pokazuje stronę sukcesu Przegląd + Połącz.

  2. Strumień powinien zostać wyświetlony w sekcji Ostatnie dane przesyłane strumieniowo na stronie głównej centrumReal-Time . Aby uzyskać szczegółowe instrukcje, zobacz Wyświetlanie szczegółów strumieni danych w Fabric Real-Time Hub.

Aby dowiedzieć się więcej o używaniu strumieni danych, zobacz następujące artykuły: