Sdílet prostřednictvím


Přírůstkové načítání dat z více tabulek v SQL Serveru do Azure SQL Database pomocí PowerShellu

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přenosu dat až po datovou vědu, analýzy v reálném čase, business intelligence a tvorbu reportů. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

V tomto kurzu vytvoříte službu Azure Data Factory pomocí kanálu, který načítá delta data z více tabulek v databázi SQL Serveru do Azure SQL Database.

V tomto kurzu provedete následující kroky:

  • Příprava zdrojového a cílového datového úložiště
  • Vytvoření datové továrny
  • Vytvořte místní prostředí Integration Runtime.
  • Instalace prostředí Integration Runtime
  • Vytvoření propojených služeb
  • Vytvořte zdroj, úložiště a datové sady s vodoznakem.
  • Vytvoření a spuštění kanálu a jeho monitorování
  • Zkontrolujte výsledky.
  • Přidání nebo aktualizace dat ve zdrojových tabulkách
  • Opakované spuštění kanálu a jeho monitorování
  • Kontrola konečných výsledků

Přehled

Tady jsou důležité kroky pro vytvoření tohoto řešení:

  1. Vyberte sloupec vodoznaku.

    Vyberte jeden sloupec pro každou tabulku ve zdrojovém úložišti dat, které můžete identifikovat nové nebo aktualizované záznamy pro každé spuštění. Data v tomto vybraném sloupci (například čas_poslední_změny nebo ID) se při vytváření nebo aktualizaci řádků obvykle zvyšují. Maximální hodnota v tomto sloupci se používá jako horní mez.

  2. Připravte úložiště dat pro uložení hodnoty vodotisku.

    V tomto kurzu uložíte hodnotu vodoznaku do databáze SQL.

  3. Vytvořte kanál s následujícími aktivitami:

    1. Vytvořte aktivitu ForEach, která prochází seznam názvů zdrojových tabulek, který je předán jako parametr do pipeline. Pro každou zdrojovou tabulku vyvolá následující aktivity, aby pro tabulku provedl nahrání změn.

    2. Vytvořte dvě aktivity vyhledávání. První aktivitu vyhledávání použijte k načtení poslední hodnoty vodotisku. Druhou aktivitu vyhledávání použijte k načtení nové hodnoty vodítka. Tyto hodnoty vodoznaku se předají aktivitě kopírování.

    3. Vytvořte aktivitu Copy, která kopíruje řádky ze zdrojového úložiště dat s hodnotou sloupce vodoznaku, která je větší než stará hodnota vodoznaku a menší nebo rovna nové hodnotě vodoznaku. Poté zkopíruje rozdílová data ze zdrojového úložiště dat do úložiště Azure Blob jako nový soubor.

    4. Vytvořte aktivitu typu StoredProcedure, která aktualizuje hodnotu ukazatele pro další spuštění pipeline.

    Tady je souhrnný diagram tohoto řešení:

    Přírůstkové načtení dat

Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.

Požadavky

  • SQL Server. Jako zdrojové úložiště dat v tomto kurzu použijete databázi SQL Serveru.
  • Azure SQL Database Jako úložiště dat jímky použijete databázi ve službě Azure SQL Database. Pokud databázi SQL nemáte, přečtěte si téma Vytvoření databáze ve službě Azure SQL Database , kde najdete postup jeho vytvoření.

Vytvoření zdrojových tabulek v databázi SQL Serveru

  1. Otevřete SQL Server Management Studio (SSMS) nebo Azure Data Studio a připojte se k databázi SQL Serveru.

  2. V Průzkumníku serveru (SSMS) nebo v podokně Připojení (Azure Data Studio) klikněte pravým tlačítkem myši na databázi a zvolte Nový dotaz.

  3. Spusťte na databázi následující příkaz SQL, aby se vytvořily tabulky s názvem customer_table a project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    
     INSERT INTO customer_table
     (PersonID, Name, LastModifytime)
     VALUES
     (1, 'John','9/1/2017 12:56:00 AM'),
     (2, 'Mike','9/2/2017 5:23:00 AM'),
     (3, 'Alice','9/3/2017 2:36:00 AM'),
     (4, 'Andy','9/4/2017 3:21:00 AM'),
     (5, 'Anny','9/5/2017 8:06:00 AM');
    
     INSERT INTO project_table
     (Project, Creationtime)
     VALUES
     ('project1','1/1/2015 0:00:00 AM'),
     ('project2','2/2/2016 1:23:00 AM'),
     ('project3','3/4/2017 5:16:00 AM');
    

Vytvoření cílových tabulek ve službě Azure SQL Database

  1. Otevřete SQL Server Management Studio (SSMS) nebo Azure Data Studio a připojte se k databázi SQL Serveru.

  2. V Průzkumníku serveru (SSMS) nebo v podokně Připojení (Azure Data Studio) klikněte pravým tlačítkem myši na databázi a zvolte Nový dotaz.

  3. Spusťte na databázi následující příkaz SQL, aby se vytvořily tabulky s názvem customer_table a project_table:

     create table customer_table
     (
         PersonID int,
         Name varchar(255),
         LastModifytime datetime
     );
    
     create table project_table
     (
         Project varchar(255),
         Creationtime datetime
     );
    

Vytvoření další tabulky ve službě Azure SQL Database pro uložení hodnoty horní meze

  1. Spuštěním následujícího příkazu SQL pro vaši databázi vytvořte tabulku s názvem watermarktable pro uložení hodnoty meze:

     create table watermarktable
     (
    
         TableName varchar(255),
         WatermarkValue datetime,
     );
    
  2. Do tabulky mezí vložte hodnoty počátečních mezí pro obě zdrojové tabulky.

     INSERT INTO watermarktable
     VALUES
     ('customer_table','1/1/2010 12:00:00 AM'),
     ('project_table','1/1/2010 12:00:00 AM');
    

Vytvoření uložené procedury ve službě Azure SQL Database

Spuštěním následujícího příkazu vytvořte uloženou proceduru v databázi. Tato uložená procedura aktualizuje hodnotu vodotisku po každém spuštění pipeline.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Vytvoření datových typů a dalších uložených procedur ve službě Azure SQL Database

Spuštěním následujícího dotazu vytvořte ve své databázi dvě uložené procedury a dva datové typy. Slouží ke slučování dat ze zdrojových tabulek do cílových tabulek.

Aby byla cesta co nejjednodušší již na začátku, přímo používáme tyto uložené procedury, které předávají rozdílová data prostřednictvím proměnné tabulky, a poté je sloučíme do cílového úložiště. Dávejte pozor, aby nebyl očekáván značný počet rozdílových řádků (více než 100) k uložení v tabulkové proměnné.

Pokud potřebujete sloučit velký počet rozdílových řádků do cílového úložiště, doporučujeme použít kopírovací aktivitu ke zkopírování všech rozdílových dat nejprve do dočasné "přípravné" tabulky v cílovém úložišti a pak vytvořte vlastní uložený postup bez použití proměnné tabulky ke sloučení z "přípravné" tabulky do "konečné" tabulky.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShell

Nainstalujte nejnovější moduly Azure PowerShellu podle pokynů v tématu Instalace a konfigurace Azure PowerShellu.

Vytvoření datové továrny

  1. Definujte proměnnou pro název skupiny prostředků, kterou použijete později v příkazech PowerShellu. Zkopírujte do PowerShellu následující text příkazu, zadejte název skupiny prostředků Azure v uvozovkách a pak příkaz spusťte. Příklad: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Pokud již skupina prostředků existuje, možná nebudete chtít ji přepsat. Přiřaďte proměnné $resourceGroupName jinou hodnotu a spusťte tento příkaz znovu.

  2. Definujte proměnnou pro umístění datové továrny.

    $location = "East US"
    
  3. Pokud chcete vytvořit skupinu prostředků Azure, spusťte následující příkaz:

    New-AzResourceGroup $resourceGroupName $location
    

    Pokud již skupina prostředků existuje, možná nebudete chtít ji přepsat. Přiřaďte proměnné $resourceGroupName jinou hodnotu a spusťte tento příkaz znovu.

  4. Definujte proměnnou název datové továrny.

    Důležité

    Aktualizujte název továrny dat tak, aby byl jedinečný na celém světě. Příklad: ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. Pokud chcete vytvořit datovou továrnu, spusťte následující rutinu Set-AzDataFactoryV2 :

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

Mějte na paměti následující body:

  • Název datové továrny musí být globálně jedinečný. Pokud se zobrazí následující chyba, změňte název a zkuste to znovu:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • Pro vytvoření instancí služby Data Factory musí být uživatelský účet, který použijete pro přihlášení k Azure, členem rolí přispěvatel nebo vlastník nebo správcem předplatného Azure.

  • Pokud chcete zobrazit seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, na následující stránce vyberte oblasti, které vás zajímají, pak rozbalte Analýza a vyhledejte Data Factory:Dostupné produkty v jednotlivých oblastech. Úložiště dat (Azure Storage, SQL Database, SQL Managed Instance atd.) a výpočetní prostředí (Azure HDInsight atd.) používané datovou továrnou můžou být v jiných oblastech.

Vytvoření vlastního prostředí pro Integration Runtime

V této části vytvoříte místní prostředí Integration Runtime a přidružíte ho k místnímu počítači s databází SQL Serveru. Místní prostředí Integration Runtime je komponenta, která kopíruje data z SQL Serveru na vašem počítači do služby Azure SQL Database.

  1. Vytvořte proměnnou pro název prostředí Integration Runtime. Použijte jedinečný název a poznamenejte si ho. Použijete ho později v tomto kurzu.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. Vytvořte místní prostředí Integration Runtime.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Tady je ukázkový výstup:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. Pokud chcete načíst stav vytvořeného prostředí Integration Runtime, spusťte následující příkaz. Potvrďte, že hodnota vlastnosti State je nastavena na NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    Tady je ukázkový výstup:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {eu.frontend.clouddatahub.net}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. Spuštěním následujícího příkazu načtěte ověřovací klíče pro registraci místního prostředí Integration Runtime ve službě Azure Data Factory v cloudu:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    Tady je ukázkový výstup:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. Pro registraci místního prostředí Integration Runtime, které nainstalujete na počítači v dalších krocích, zkopírujte jeden z klíčů (bez uvozovek).

Instalace nástroje Integration Runtime

  1. Pokud již na počítači máte prostředí Integration Runtime, odinstalujte ho pomocí panelu Přidat nebo odebrat programy.

  2. Na místním počítači s Windows stáhněte místní prostředí Integration Runtime. Spusťte instalaci.

  3. Na stránce Vítá vás instalace prostředí Microsoft Integration Runtime vyberte Další.

  4. Na stránce Licenční smlouva s koncovým uživatelem (EULA) přijměte podmínky a licenční smlouvu a vyberte Další.

  5. Na stránce Cílová složka vyberte Další.

  6. Na stránce Připraveno k instalaci prostředí Microsoft Integration Runtime vyberte Nainstalovat.

  7. Na stránce Dokončení instalace prostředí Microsoft Integration Runtime vyberte Dokončit.

  8. Na stránce Registrace prostředí Integration Runtime (v místním prostředí) vložte klíč, který jste uložili v předchozí části, a vyberte Zaregistrovat.

    Registrace prostředí Integration Runtime

  9. Na stránce nový uzel Integration Runtime (v místním prostředí) vyberte Dokončit.

  10. Po úspěšném dokončení registrace místního prostředí Integration Runtime se zobrazí následující zpráva:

    Úspěšně zaregistrováno

  11. Na stránce Registrace prostředí Integration Runtime (v místním prostředí) vyberte Spustit Správce konfigurace.

  12. Jakmile se uzel připojí ke cloudové službě, zobrazí se následující stránka:

    Stránka uzel je připojen

  13. Teď otestujte připojení k databázi SQL Serveru.

    Záložka Diagnostika

    a. Na stránce Správce konfigurace přejděte na kartu Diagnostika.

    b. Jako typ zdroje dat vyberte SqlServer.

    c. Zadejte název serveru.

    d. Zadejte název databáze.

    e. Vyberte režim ověřování.

    f. Zadejte uživatelské jméno.

    g. Zadejte heslo přidružené k uživatelskému jménu.

    h. Pokud chcete potvrdit, že se prostředí Integration Runtime může připojit k SQL Serveru, vyberte Test. Pokud je připojení úspěšné, zobrazí se zelená značka zaškrtnutí. Jestliže připojení není úspěšné, zobrazí se chybová zpráva. Opravte všechny problémy a ověřte, že se prostředí Integration Runtime může připojit k SQL Serveru.

    Poznámka:

    Poznamenejte si hodnoty pro typ ověřování, server, databázi, uživatele a heslo. Použijete je později v tomto kurzu.

Vytvoření propojených služeb

V datové továrně vytvoříte propojené služby, abyste svá úložiště dat a výpočetní služby spojili s datovou továrnou. V této části vytvoříte propojené služby s databází SQL Serveru a databází ve službě Azure SQL Database.

Vytvoření propojené služby SQL Serveru

V tomto kroku propojíte databázi SQL Serveru se službou Data Factory.

  1. Vytvořte soubor JSON s názvem SqlServerLinkedService.json ve složce C:\ADFTutorials\IncCopyMultiTableTutorial (vytvořte místní složky, pokud ještě neexistují) s následujícím obsahem. Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.

    Důležité

    Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.

    Pokud používáte ověřování SQL, zkopírujte následující definici JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Pokud používáte ověřování Windows, zkopírujte následující definici JSON:

     {
         "name":"SqlServerLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"SqlServer",
             "typeProperties":{
                 "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                 "userName":"<username> or <domain>\\<username>",
                 "password":{
                     "type":"SecureString",
                     "value":"<password>"
                 }
             },
             "connectVia":{
                 "referenceName":"<integration runtime name>",
                 "type":"IntegrationRuntimeReference"
             }
         }
     }
    

    Důležité

    • Vyberte správnou část na základě ověřování, které požíváte pro připojení k SQL Serveru.
    • Nahraďte <název prostředí Integration Runtime> názvem vašeho prostředí Integration Runtime.
    • Před uložením souboru nahraďte <název> serveru, <název databáze>, <uživatelské jméno> a <heslo> hodnotami databáze SQL Serveru.
    • Pokud v názvu účtu uživatele nebo serveru potřebujete použít lomítko (\), použijte escape znak (\). Příklad: mydomain\\myuser.
  2. V PowerShellu spusťte následující rutinu, která přepne do složky C:\ADFTutorials\IncCopyMultiTableTutorial.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. Spuštěním rutiny Set-AzDataFactoryV2LinkedService vytvořte propojenou službu AzureStorageLinkedService. V následujícím příkladu předáte hodnoty pro parametry ResourceGroupName a DataFactoryName:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    Tady je ukázkový výstup:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

Vytvoření propojené služby SQL Database

  1. Ve složce C:\ADFTutorials\IncCopyMultiTableTutorial vytvořte soubor JSON s názvem AzureSQLDatabaseLinkedService.json s následujícím obsahem. (Pokud složka ADF ještě neexistuje, vytvořte ji.) Před uložením souboru nahraďte <název> serveru, <název> databáze, <uživatelské jméno> a <heslo> názvem databáze SQL Serveru, názvem databáze, uživatelským jménem a heslem.

     {
         "name":"AzureSQLDatabaseLinkedService",
         "properties":{
             "annotations":[
    
             ],
             "type":"AzureSqlDatabase",
             "typeProperties":{
                 "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.windows.net;initial catalog=<database name>;user id=<user name>;Password=<password>;"
             }
         }
     }
    
  2. V PowerShellu spusťte rutinu Set-AzDataFactoryV2LinkedService a vytvořte propojenou službu AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Tady je ukázkový výstup:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

Vytvoření datových sad

V tomto kroku vytvoříte datové sady, které představují zdroj dat, cíl dat a místo pro uložení vodoznaku.

Vytvoření zdrojové datové sady

  1. Ve stejné složce vytvořte soubor JSON s názvem SourceDataset.json a s následujícím obsahem:

    {
         "name":"SourceDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"SqlServerLinkedService",
                 "type":"LinkedServiceReference"
             },
             "annotations":[
    
             ],
             "type":"SqlServerTable",
             "schema":[
    
             ]
         }
    }
    

    Kopírovací aktivita v rámci toku používá SQL dotaz, aby načetla data, místo načtení celé tabulky.

  2. Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Toto je ukázkový výstup tohoto cmdletu:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

Vytvoření datové sady jímky

  1. Ve stejné složce vytvořte soubor JSON s názvem SinkDataset.json s následujícím obsahem. Element tableName je nastaven potrubím dynamicky v době běhu. Aktivita ForEach v tomto kanálu iteruje přes seznam názvů tabulek a předává název tabulky této datové sadě při každé iteraci.

     {
         "name":"SinkDataset",
         "properties":{
             "linkedServiceName":{
                 "referenceName":"AzureSQLDatabaseLinkedService",
                 "type":"LinkedServiceReference"
             },
             "parameters":{
                 "SinkTableName":{
                     "type":"String"
                 }
             },
             "annotations":[
    
             ],
             "type":"AzureSqlTable",
             "typeProperties":{
                 "tableName":{
                     "value":"@dataset().SinkTableName",
                     "type":"Expression"
                 }
             }
         }
     }
    
  2. Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Toto je ukázkový výstup tohoto cmdletu:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Vytvořit datovou sadu pro vodoznak

V tomto kroku vytvoříte datovou sadu pro uložení hodnoty horní meze.

  1. Ve stejné složce vytvořte soubor JSON s názvem WatermarkDataset.json s následujícím obsahem:

     {
         "name": " WatermarkDataset ",
         "properties": {
             "type": "AzureSqlTable",
             "typeProperties": {
                 "tableName": "watermarktable"
             },
             "linkedServiceName": {
                 "referenceName": "AzureSQLDatabaseLinkedService",
                 "type": "LinkedServiceReference"
             }
         }
     }
    
  2. Spuštěním rutiny Set-AzDataFactoryV2Dataset vytvořte datovou sadu WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Toto je ukázkový výstup tohoto cmdletu:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Vytvořit potrubí

Tento datový kanál dostává jako parametr seznam názvů tabulek. Aktivita ForEach prochází seznam názvů tabulek a provádí následující operace:

  1. Pomocí aktivity Vyhledávání můžete načíst starou hodnotu vodotisku (počáteční hodnotu nebo hodnotu použitou v poslední iteraci).

  2. Pomocí aktivity Vyhledávání můžete načíst novou hodnotu vodoznaku (což je maximální hodnota tohoto sloupce ve zdrojové tabulce).

  3. Pomocí aktivity Copy zkopírujte data mezi těmito dvěma hodnotami vodního znamení ze zdrojové databáze do cílové databáze.

  4. Pomocí aktivity StoredProcedure aktualizujte starou hodnotu značky, která má být použita v prvním kroku další iterace.

Vytvořte kanál

  1. Ve stejné složce vytvořte soubor JSON s názvem IncrementalCopyPipeline.json s následujícím obsahem:

     {
         "name":"IncrementalCopyPipeline",
         "properties":{
             "activities":[
                 {
                     "name":"IterateSQLTables",
                     "type":"ForEach",
                     "dependsOn":[
    
                     ],
                     "userProperties":[
    
                     ],
                     "typeProperties":{
                         "items":{
                             "value":"@pipeline().parameters.tableList",
                             "type":"Expression"
                         },
                         "isSequential":false,
                         "activities":[
                             {
                                 "name":"LookupOldWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"AzureSqlSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"WatermarkDataset",
                                         "type":"DatasetReference"
                                     }
                                 }
                             },
                             {
                                 "name":"LookupNewWaterMarkActivity",
                                 "type":"Lookup",
                                 "dependsOn":[
    
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                             "type":"Expression"
                                         }
                                     },
                                     "dataset":{
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     },
                                     "firstRowOnly":true
                                 }
                             },
                             {
                                 "name":"IncrementalCopyActivity",
                                 "type":"Copy",
                                 "dependsOn":[
                                     {
                                         "activity":"LookupOldWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     },
                                     {
                                         "activity":"LookupNewWaterMarkActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "source":{
                                         "type":"SqlServerSource",
                                         "sqlReaderQuery":{
                                             "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                             "type":"Expression"
                                         }
                                     },
                                     "sink":{
                                         "type":"AzureSqlSink",
                                         "sqlWriterStoredProcedureName":{
                                             "value":"@{item().StoredProcedureNameForMergeOperation}",
                                             "type":"Expression"
                                         },
                                         "sqlWriterTableType":{
                                             "value":"@{item().TableType}",
                                             "type":"Expression"
                                         },
                                         "storedProcedureTableTypeParameterName":{
                                             "value":"@{item().TABLE_NAME}",
                                             "type":"Expression"
                                         },
                                         "disableMetricsCollection":false
                                     },
                                     "enableStaging":false
                                 },
                                 "inputs":[
                                     {
                                         "referenceName":"SourceDataset",
                                         "type":"DatasetReference"
                                     }
                                 ],
                                 "outputs":[
                                     {
                                         "referenceName":"SinkDataset",
                                         "type":"DatasetReference",
                                         "parameters":{
                                             "SinkTableName":{
                                                 "value":"@{item().TABLE_NAME}",
                                                 "type":"Expression"
                                             }
                                         }
                                     }
                                 ]
                             },
                             {
                                 "name":"StoredProceduretoWriteWatermarkActivity",
                                 "type":"SqlServerStoredProcedure",
                                 "dependsOn":[
                                     {
                                         "activity":"IncrementalCopyActivity",
                                         "dependencyConditions":[
                                             "Succeeded"
                                         ]
                                     }
                                 ],
                                 "policy":{
                                     "timeout":"7.00:00:00",
                                     "retry":0,
                                     "retryIntervalInSeconds":30,
                                     "secureOutput":false,
                                     "secureInput":false
                                 },
                                 "userProperties":[
    
                                 ],
                                 "typeProperties":{
                                     "storedProcedureName":"[dbo].[usp_write_watermark]",
                                     "storedProcedureParameters":{
                                         "LastModifiedtime":{
                                             "value":{
                                                 "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                 "type":"Expression"
                                             },
                                             "type":"DateTime"
                                         },
                                         "TableName":{
                                             "value":{
                                                 "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                 "type":"Expression"
                                             },
                                             "type":"String"
                                         }
                                     }
                                 },
                                 "linkedServiceName":{
                                     "referenceName":"AzureSQLDatabaseLinkedService",
                                     "type":"LinkedServiceReference"
                                 }
                             }
                         ]
                     }
                 }
             ],
             "parameters":{
                 "tableList":{
                     "type":"array"
                 }
             },
             "annotations":[
    
             ]
         }
     }
    
  2. Spuštěním rutiny Set-AzDataFactoryV2Pipeline vytvořte kanál IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Tady je ukázkový výstup:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

Spuštění kanálu

  1. Ve stejné složce vytvořte soubor parametrů s názvem Parameters.json s následujícím obsahem:

     {
         "tableList":
         [
             {
                 "TABLE_NAME": "customer_table",
                 "WaterMark_Column": "LastModifytime",
                 "TableType": "DataTypeforCustomerTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
             },
             {
                 "TABLE_NAME": "project_table",
                 "WaterMark_Column": "Creationtime",
                 "TableType": "DataTypeforProjectTable",
                 "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
             }
         ]
     }
    
  2. Spusťte kanál IncrementalCopyPipeline pomocí rutiny Invoke-AzDataFactoryV2Pipeline . Zástupné znaky nahraďte vlastním názvem skupiny prostředků a názvem datové továrny.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    

Sledujte potrubí

  1. Přihlaste se k portálu Azure.

  2. Vyberte Všechny služby, spusťte hledání pomocí klíčového slova Datové továrny a vyberte Datové továrny.

  3. V seznamu datových továren vyhledejte vaši datovou továrnu a vyberte ji. Otevře se stránka Datová továrna.

  4. Na stránce Data Factory vyberte Otevřít na dlaždici Otevřít Studio Azure Data Factory a spusťte Azure Data Factory v samostatné kartě.

  5. Na domovské stránce služby Azure Data Factory vyberte Na levé straně možnost Monitorování .

    Snímek obrazovky znázorňující domovskou stránku služby Azure Data Factory

  6. Můžete vidět všechna spuštění kanálu a jejich stavy. Všimněte si, že stav spuštění pipeline v následujícím příkladu je Úspěšný. Parametry předané kanálu můžete zkontrolovat kliknutím na odkaz ve sloupci Parametry. Pokud došlo k chybě, uvidíte odkaz ve sloupci Chyba.

    Snímek obrazovky zobrazuje spuštění datového kanálu pro Data Factory včetně vašeho kanálu.

  7. Když vyberete odkaz ve sloupci Akce, zobrazí se všechna spuštění aktivit datového kanálu.

  8. Pokud se chcete vrátit do zobrazení Pipeline Runs, vyberte Všechna Pipeline Runs.

Kontrola výsledků

V SQL Server Management Studiu spusťte následující dotazy na cílovou databázi SQL a ověřte, že data byla ze zdrojových tabulek zkopírována do cílových tabulek:

Dotaz

select * from customer_table

Výstup

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            Alice    2017-09-03 02:36:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Dotaz

select * from project_table

Výstup

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

Dotaz

select * from watermarktable

Výstup

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-05 08:06:00.000
project_table    2017-03-04 05:16:00.000

Všimněte si, že hodnoty mezí pro obě tabulky byly aktualizovány.

Přidání dalších dat do zdrojových tabulek

Spusťte následující dotaz na zdrojovou databázi SQL Serveru, aby se aktualizoval stávající řádek v tabulce customer_table. Vložte nový řádek do tabulky project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

Opětovné spuštění potrubí

  1. Nyní spusťte znovu potrubí provedením následujícího příkazu PowerShell:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. Monitorujte běhy potrubí podle pokynů v části Sledování potrubí. Pokud je stav kanálu probíhající, zobrazí se v části Akce další odkaz na akci, která zruší spuštění kanálu.

  3. Kliknutím na Aktualizovat můžete aktualizovat seznam, dokud nebude spuštění kanálu úspěšné.

  4. Volitelně můžete vybrat odkaz Zobrazit spuštění aktivit v části Akce, abyste viděli všechna spuštění aktivit související s tímto spuštěním potrubí.

Kontrola konečných výsledků

V SQL Server Management Studiu spusťte následující dotazy na cílovou databázi a ověřte, že aktualizovaná/nová data byla ze zdrojových tabulek zkopírována do cílových tabulek.

Dotaz

select * from customer_table

Výstup

===========================================
PersonID    Name    LastModifytime
===========================================
1            John    2017-09-01 00:56:00.000
2            Mike    2017-09-02 05:23:00.000
3            NewName    2017-09-08 00:00:00.000
4            Andy    2017-09-04 03:21:00.000
5            Anny    2017-09-05 08:06:00.000

Všimněte si nových hodnot položek Name a LastModifytime pro PersonID pro číslo 3.

Dotaz

select * from project_table

Výstup

===================================
Project        Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject    2017-10-01 00:00:00.000

Všimněte si, že do tabulky project_table byla přidána položka NewProject.

Dotaz

select * from watermarktable

Výstup

======================================
TableName        WatermarkValue
======================================
customer_table    2017-09-08 00:00:00.000
project_table    2017-10-01 00:00:00.000

Všimněte si, že hodnoty mezí pro obě tabulky byly aktualizovány.

V tomto kurzu jste provedli následující kroky:

  • Příprava zdrojového a cílového datového úložiště
  • Vytvoření datové továrny
  • Vytvoření místního prostředí Integration Runtime (IR)
  • Instalace prostředí Integration Runtime
  • Vytvoření propojených služeb
  • Vytvořte zdroj, úložiště a datové sady s vodoznakem.
  • Vytvoření a spuštění kanálu a jeho monitorování
  • Zkontrolujte výsledky.
  • Přidání nebo aktualizace dat ve zdrojových tabulkách
  • Opakované spuštění kanálu a jeho monitorování
  • Kontrola konečných výsledků

Pokud se chcete dozvědět víc o transformaci dat pomocí clusteru Spark v Azure, přejděte k následujícímu kurzu: