Share via


Delta kopiëren uit een database met een besturingstabel

VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics

Tip

Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .

In dit artikel wordt een sjabloon beschreven die beschikbaar is voor het incrementeel laden van nieuwe of bijgewerkte rijen van een databasetabel naar Azure met behulp van een externe besturingstabel waarin een bovengrenswaarde wordt opgeslagen.

Voor deze sjabloon moet het schema van de brondatabase een tijdstempelkolom of incrementele sleutel bevatten om nieuwe of bijgewerkte rijen te identificeren.

Notitie

Als u een tijdstempelkolom in uw brondatabase hebt om nieuwe of bijgewerkte rijen te identificeren, maar u geen externe besturingstabel wilt maken voor deltakopie, kunt u in plaats daarvan het hulpprogramma Azure Data Factory Copy Data gebruiken om een pijplijn op te halen. Dit hulpprogramma maakt gebruik van een geplande tijd voor triggers als een variabele om nieuwe rijen uit de brondatabase te lezen.

Over deze oplossingssjabloon

Met deze sjabloon wordt eerst de oude grenswaarde opgehaald en vergeleken met de huidige grenswaarde. Daarna worden alleen de wijzigingen uit de brondatabase gekopieerd op basis van een vergelijking tussen de twee watermerkwaarden. Ten slotte wordt de nieuwe bovengrenswaarde opgeslagen in een externe besturingstabel voor het laden van deltagegevens de volgende keer.

De sjabloon bevat vier activiteiten:

  • Met opzoeken wordt de oude waarde voor hoog watermerk opgehaald, die is opgeslagen in een externe besturingstabel.
  • Met een andere opzoekactiviteit wordt de huidige bovengrenswaarde opgehaald uit de brondatabase.
  • Kopieer alleen wijzigingen van de brondatabase naar het doelarchief. De query die de wijzigingen in de brondatabase identificeert, is vergelijkbaar met SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > 'laatste hoog watermerk' en TIMESTAMP_Column <= 'huidig hoog watermerk'.
  • SqlServerStoredProcedure schrijft de huidige bovengrenswaarde naar een externe besturingstabel voor deltakopie de volgende keer.

De sjabloon definieert de volgende parameters:

  • Data_Source_Table_Name is de tabel in de brondatabase waaruit u gegevens wilt laden.
  • Data_Source_WaterMarkColumn is de naam van de kolom in de brontabel die wordt gebruikt om nieuwe of bijgewerkte rijen te identificeren. Het type van deze kolom is doorgaans datum/tijd, INT of vergelijkbaar.
  • Data_Destination_Container is het hoofdpad van de plaats waarnaar de gegevens worden gekopieerd in uw doelarchief.
  • Data_Destination_Directory is het mappad onder de hoofdmap van de locatie waarnaar de gegevens worden gekopieerd in uw doelarchief.
  • Data_Destination_Table_Name is de plaats waar de gegevens naar uw doelarchief worden gekopieerd (van toepassing wanneer Azure Synapse Analytics is geselecteerd als gegevensbestemming).
  • Data_Destination_Folder_Path is de plaats waar de gegevens naar uw doelarchief worden gekopieerd (van toepassing wanneer 'Bestandssysteem' of 'Azure Data Lake Storage Gen1' is geselecteerd als gegevensbestemming).
  • Control_Table_Table_Name is de tabel met externe besturingselementen waarin de waarde van het hoge watermerk wordt opgeslagen.
  • Control_Table_Column_Name is de kolom in de tabel met externe besturingselementen waarin de bovengrenswaarde wordt opgeslagen.

Deze oplossingssjabloon gebruiken

  1. Verken de brontabel die u wilt laden en definieer de kolom met hoog watermerk die kan worden gebruikt om nieuwe of bijgewerkte rijen te identificeren. Het type van deze kolom kan datum/tijd, INT of vergelijkbaar zijn. De waarde van deze kolom neemt toe naarmate er nieuwe rijen worden toegevoegd. In de volgende voorbeeldbrontabel (data_source_table) kunnen we de kolom LastModifytime gebruiken als de kolom met hoog watermerk.

    PersonID	Name            LastModifytime
    1           aaaa            2017-09-01 00:56:00.000
    2           bbbb            2017-09-02 05:23:00.000
    3           cccc            2017-09-03 02:36:00.000
    4           dddd            2017-09-04 03:21:00.000
    5           eeee            2017-09-05 08:06:00.000
    6           fffffff         2017-09-06 02:23:00.000
    7           gggg            2017-09-07 09:01:00.000
    8           hhhh            2017-09-08 09:01:00.000
    9           iiiiiiiii       2017-09-09 09:01:00.000
    
  2. Maak een besturingstabel in SQL Server of Azure SQL Database om de bovengrenswaarde voor het laden van deltagegevens op te slaan. In het volgende voorbeeld is de naam van de besturingstabel watermerktabel. In deze tabel is WatermarkValue de kolom waarin de waarde van het hoge watermerk wordt opgeslagen en het bijbehorende type datum/tijd is.

    create table watermarktable
    (
    WatermarkValue datetime,
    );
    INSERT INTO watermarktable
    VALUES ('1/1/2010 12:00:00 AM')
    
  3. Maak een opgeslagen procedure in dezelfde SQL Server- of Azure SQL Database-instantie die u hebt gebruikt om de besturingstabel te maken. De opgeslagen procedure wordt gebruikt om de nieuwe hoge grenswaarde naar de externe besturingstabel te schrijven voor het laden van deltagegevens de volgende keer.

    CREATE PROCEDURE update_watermark @LastModifiedtime datetime
    AS
    
    BEGIN
    
        UPDATE watermarktable
        SET [WatermarkValue] = @LastModifiedtime 
    
    END
    
  4. Ga naar de Delta-kopie van de databasesjabloon . Maak een nieuwe verbinding met de brondatabase waaruit u gegevens wilt kopiëren.

    Screenshot showing the creation of a new connection to the source table.

  5. Maak een nieuwe verbinding met het doelgegevensarchief waarnaar u de gegevens wilt kopiëren.

    Screenshot showing the creation of a new connection to the destination table.

  6. Maak een nieuwe verbinding met de externe besturingstabel en opgeslagen procedure die u in stap 2 en 3 hebt gemaakt.

    Screenshot showing the creation of a new connection to the control table data store.

  7. Selecteer Deze sjabloon gebruiken.

  8. U ziet de beschikbare pijplijn, zoals wordt weergegeven in het volgende voorbeeld:

    Screenshot showing the pipeline.

  9. Selecteer Opgeslagen procedure. Kies [dbo] voor de naam van de opgeslagen procedure. update_watermark]. Selecteer de parameter Importeren en selecteer vervolgens Dynamische inhoud toevoegen.

    Screenshot showing where to set the stored procedure activity.

  10. Schrijf de inhoud @{activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue} en selecteer Voltooien.

    Screenshot showing where to write the content for the parameters of the stored procedure.

  11. Selecteer Fouten opsporen, voer de parameters in en selecteer Voltooien.

    Screenshot showing the Debug button.

  12. Resultaten die vergelijkbaar zijn met het volgende voorbeeld, worden weergegeven:

    Sreenshot showing the result of the pipeline run.

  13. U kunt nieuwe rijen maken in de brontabel. Hier volgt een voorbeeldtaal van SQL voor het maken van nieuwe rijen:

    INSERT INTO data_source_table
    VALUES (10, 'newdata','9/10/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (11, 'newdata','9/11/2017 9:01:00 AM')
    
  14. Als u de pijplijn opnieuw wilt uitvoeren, selecteert u Foutopsporing, voert u de parameters in en selecteert u Voltooien.

    U ziet dat alleen nieuwe rijen naar het doel zijn gekopieerd.

  15. (Optioneel:) Als u Azure Synapse Analytics als de gegevensbestemming selecteert, moet u ook een verbinding opgeven met Azure Blob Storage voor fasering. Dit is vereist voor Azure Synapse Analytics Polybase. Met de sjabloon wordt een containerpad voor u gegenereerd. Nadat de pijplijn is uitgevoerd, controleert u of de container is gemaakt in Blob Storage.

    Screenshot showing where to configure Polybase.