Migrace dat z AmazonU S3 do Azure Data Lake Storage Gen2

PLATÍ PRO: Azure Data Factory Azure Synapse Analytics

Tip

Vyzkoušejte si službu Data Factory v Microsoft Fabric, řešení pro analýzy typu all-in-one pro podniky. Microsoft Fabric zahrnuje všechno od přesunu dat až po datové vědy, analýzy v reálném čase, business intelligence a vytváření sestav. Přečtěte si, jak začít používat novou zkušební verzi zdarma.

Pomocí šablon můžete migrovat petabajty dat sestávajících ze stovek milionů souborů z AmazonU S3 do Azure Data Lake Storage Gen2.

Poznámka:

Pokud chcete zkopírovat malý datový svazek z AWS S3 do Azure (například menší než 10 TB), je efektivnější a jednodušší použít nástroj Pro kopírování dat služby Azure Data Factory. Šablona popsaná v tomto článku je více než to, co potřebujete.

O šablonách řešení

Datový oddíl se doporučuje zejména při migraci více než 10 TB dat. Pokud chcete data rozdělit, pomocí nastavení předpony vyfiltrujte složky a soubory v AmazonU S3 podle názvu a pak každá úloha kopírování ADF může najednou kopírovat jeden oddíl. Pro zajištění lepší propustnosti můžete souběžně spouštět několik úloh kopírování ADF.

Migrace dat obvykle vyžaduje jednorázovou historickou migraci dat a pravidelně synchronizuje změny z AWS S3 do Azure. Níže jsou dvě šablony, kde jedna šablona pokrývá jednorázovou migraci historických dat a další šablona pokrývá synchronizaci změn z AWS S3 do Azure.

Šablona pro migraci historických dat z AmazonU S3 do Azure Data Lake Storage Gen2

Tato šablona (název šablony: Migrace historických dat z AWS S3 do Azure Data Lake Storage Gen2) předpokládá, že jste vytvořili seznam oddílů v tabulce externích ovládacích prvků ve službě Azure SQL Database. Proto použije aktivitu Vyhledávání k načtení seznamu oddílů z tabulky externího ovládacího prvku, iteraci jednotlivých oddílů a vytvoření kopie jednotlivých oddílů a kopírování jednotlivých oddílů do jednoho oddílu. Po dokončení jakékoli úlohy kopírování pomocí aktivity Uložená procedura aktualizuje stav kopírování každého oddílu v řídicí tabulce.

Šablona obsahuje pět aktivit:

  • Vyhledávání načte oddíly, které nebyly zkopírovány do Azure Data Lake Storage Gen2 z tabulky externích ovládacích prvků. Název tabulky je s3_partition_control_table a dotaz na načtení dat z tabulky je SELECT PartitionPrefix FROM s3_partition_control_table WHERE SuccessOrFailure = 0.
  • ForEach získá seznam oddílů z aktivity Vyhledávání a iteruje každý oddíl na aktivitu TriggerCopy . BatchCount můžete nastavit tak, aby souběžně spouštět více úloh kopírování ADF. V této šabloně jsme nastavili hodnotu 2.
  • ExecutePipeline spustí kanál CopyFolderPartitionFromS3 . Důvodem, proč vytvoříme další kanál, aby každá úloha kopírování zkopírovala oddíl, je proto, že z AWS S3 snadno znovu spustíte neúspěšnou úlohu kopírování a znovu načte tento konkrétní oddíl. Všechny ostatní úlohy kopírování, které načítají další oddíly, nebudou ovlivněny.
  • Zkopírujte každý oddíl z AWS S3 do Azure Data Lake Storage Gen2.
  • SqlServerStoredProcedure aktualizuje stav kopírování jednotlivých oddílů v řídicí tabulce.

Šablona obsahuje dva parametry:

  • AWS_S3_bucketName je název kontejneru v AWS S3, ze kterého chcete migrovat data. Pokud chcete migrovat data z více kbelíků v AWS S3, můžete do tabulky externího ovládacího prvku přidat další sloupec, do kterého se uloží název kontejneru pro každý oddíl, a také aktualizovat kanál tak, aby načítal data z daného sloupce odpovídajícím způsobem.
  • Azure_Storage_fileSystem je název systému souborů v Azure Data Lake Storage Gen2, do kterého chcete migrovat data.

Aby šablona zkopírovala změněné soubory jenom z AmazonU S3 do Azure Data Lake Storage Gen2

Tato šablona (název šablony: kopírování rozdílových dat z AWS S3 do Azure Data Lake Storage Gen2) používá lastModifiedTime každého souboru ke kopírování nových nebo aktualizovaných souborů pouze z AWS S3 do Azure. Mějte na paměti, jestli už máte soubory nebo složky čas rozdělené na oddíly s informacemi o timeslice jako součást názvu souboru nebo složky v AWS S3 (například /yyyy/mm/dd/file.csv), můžete přejít k tomuto kurzu a získat výkonnější přístup pro přírůstkové načítání nových souborů. Tato šablona předpokládá, že jste napsali seznam oddílů v tabulce externích ovládacích prvků ve službě Azure SQL Database. Proto použije aktivitu Vyhledávání k načtení seznamu oddílů z tabulky externího ovládacího prvku, iteraci jednotlivých oddílů a vytvoření kopie jednotlivých oddílů a kopírování jednotlivých oddílů do jednoho oddílu. Když se každá úloha kopírování začne kopírovat soubory z AWS S3, spoléhá na vlastnost LastModifiedTime k identifikaci a zkopírování pouze nových nebo aktualizovaných souborů. Po dokončení jakékoli úlohy kopírování pomocí aktivity Uložená procedura aktualizuje stav kopírování každého oddílu v řídicí tabulce.

Šablona obsahuje sedm aktivit:

  • Vyhledávání načte oddíly z tabulky externího ovládacího prvku. Název tabulky je s3_partition_delta_control_table a dotaz pro načtení dat z tabulky je "select distinct PartitionPrefix from s3_partition_delta_control_table".
  • ForEach získá seznam oddílů z aktivity Vyhledávání a iteruje každý oddíl na aktivitu TriggerDeltaCopy . BatchCount můžete nastavit tak, aby souběžně spouštět více úloh kopírování ADF. V této šabloně jsme nastavili hodnotu 2.
  • ExecutePipeline spustí kanál DeltaCopyFolderPartitionFromS3 . Důvodem, proč vytvoříme další kanál, aby každá úloha kopírování zkopírovala oddíl, je proto, že z AWS S3 snadno znovu spustíte neúspěšnou úlohu kopírování a znovu načte tento konkrétní oddíl. Všechny ostatní úlohy kopírování, které načítají další oddíly, nebudou ovlivněny.
  • Vyhledávání načte čas posledního spuštění úlohy kopírování z tabulky externího ovládacího prvku, aby bylo možné nové nebo aktualizované soubory identifikovat prostřednictvím lastModifiedTime. Název tabulky je s3_partition_delta_control_table a dotaz pro načtení dat z tabulky je "select max(JobRunTime) as LastModifiedTime from s3_partition_delta_control_table where PartitionPrefix = '@{pipeline().parameters.prefixStr} and SuccessOrFailure = 1".
  • Zkopírujte nové nebo změněné soubory jenom pro každý oddíl z AWS S3 do Azure Data Lake Storage Gen2. Vlastnost modifiedDatetimeStart je nastavena na poslední čas spuštění úlohy kopírování. Vlastnost modifiedDatetimeEnd je nastavena na aktuální dobu spuštění úlohy kopírování. Mějte na paměti, že čas se vztahuje na časové pásmo UTC.
  • SqlServerStoredProcedure aktualizuje stav kopírování každého oddílu a doby spuštění kopírování v řídicí tabulce, když bude úspěšný. Sloupec SuccessOrFailure je nastaven na hodnotu 1.
  • SqlServerStoredProcedure aktualizuje stav kopírování jednotlivých oddílů a doby spuštění kopírování v řídicí tabulce v případě selhání. Sloupec SuccessOrFailure je nastavený na hodnotu 0.

Šablona obsahuje dva parametry:

  • AWS_S3_bucketName je název kontejneru v AWS S3, ze kterého chcete migrovat data. Pokud chcete migrovat data z více kbelíků v AWS S3, můžete do tabulky externího ovládacího prvku přidat další sloupec, do kterého se uloží název kontejneru pro každý oddíl, a také aktualizovat kanál tak, aby načítal data z daného sloupce odpovídajícím způsobem.
  • Azure_Storage_fileSystem je název systému souborů v Azure Data Lake Storage Gen2, do kterého chcete migrovat data.

Jak používat tyto dvě šablony řešení

Šablona pro migraci historických dat z AmazonU S3 do Azure Data Lake Storage Gen2

  1. Vytvořte v Azure SQL Database řídicí tabulku pro uložení seznamu oddílů AWS S3.

    Poznámka:

    Název tabulky je s3_partition_control_table. Schéma řídicí tabulky je PartitionPrefix a SuccessOrFailure, kde PartitionPrefix je nastavení předpony v S3 pro filtrování složek a souborů v AmazonU S3 podle názvu a SuccessOrFailure je stav kopírování každého oddílu: 0 znamená, že tento oddíl nebyl zkopírován do Azure a 1 znamená, že tento oddíl se úspěšně zkopíroval do Azure. V řídicí tabulce je definováno 5 oddílů a výchozí stav kopírování každého oddílu je 0.

    CREATE TABLE [dbo].[s3_partition_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [SuccessOrFailure] [bit] NULL
    )
    
    INSERT INTO s3_partition_control_table (PartitionPrefix, SuccessOrFailure)
    VALUES
    ('a', 0),
    ('b', 0),
    ('c', 0),
    ('d', 0),
    ('e', 0);
    
  2. Vytvořte uloženou proceduru ve stejné databázi Azure SQL Database pro řídicí tabulku.

    Poznámka:

    Název uložené procedury je sp_update_partition_success. Vyvolá se aktivitou SqlServerStoredProcedure v kanálu ADF.

    CREATE PROCEDURE [dbo].[sp_update_partition_success] @PartPrefix varchar(255)
    AS
    BEGIN
    
        UPDATE s3_partition_control_table
        SET [SuccessOrFailure] = 1 WHERE [PartitionPrefix] = @PartPrefix
    END
    GO
    
  3. Přejděte na šablonu Migrace historických dat z AWS S3 do šablony Azure Data Lake Storage Gen2 . Zadejte připojení k tabulce externího ovládacího prvku AWS S3 jako úložiště zdrojů dat a jako cílové úložiště Azure Data Lake Storage Gen2. Mějte na paměti, že tabulka externích ovládacích prvků a uložená procedura odkazují na stejné připojení.

    Screenshot that shows the Migrate historical data from AWS S3 to Azure Data Lake Storage Gen2 template.

  4. Vyberte Použít tuto šablonu.

    Screenshot that highlights the Use this template button.

  5. Zobrazí se 2 kanály a 3 datové sady, jak je znázorněno v následujícím příkladu:

    Screenshot that shows the two pipelines and three datasets that were created by using the template.

  6. Přejděte do kanálu BulkCopyFromS3 a vyberte Ladit, zadejte parametry. Pak vyberte Dokončit.

    Screenshot that shows where to select Debug and enter the parameters before you select Finish.

  7. Zobrazí se výsledky podobné následujícímu příkladu:

    Screenshot that shows the returned results.

Aby šablona zkopírovala změněné soubory jenom z AmazonU S3 do Azure Data Lake Storage Gen2

  1. Vytvořte v Azure SQL Database řídicí tabulku pro uložení seznamu oddílů AWS S3.

    Poznámka:

    Název tabulky je s3_partition_delta_control_table. Schéma řídicí tabulky je PartitionPrefix, JobRunTime a SuccessOrFailure, kde PartitionPrefix je nastavení předpony v S3 pro filtrování složek a souborů v AmazonU S3 podle názvu, JobRunTime je hodnota data a čas při kopírování úloh a SuccessOrFailure je stav kopírování každého oddílu: 0 znamená, že tento oddíl nebyl zkopírován do Azure a 1 znamená, že tento oddíl se úspěšně zkopíroval do Azure. V řídicí tabulce je definováno 5 oddílů. Výchozí hodnota jobRunTime může být čas, kdy se spustí jednorázová migrace historických dat. Aktivita kopírování ADF zkopíruje soubory na AWS S3, které byly po této době naposledy změněny. Výchozí stav kopírování jednotlivých oddílů je 1.

    CREATE TABLE [dbo].[s3_partition_delta_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [JobRunTime] [datetime] NULL,
        [SuccessOrFailure] [bit] NULL
        )
    
    INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
    VALUES
    ('a','1/1/2019 12:00:00 AM',1),
    ('b','1/1/2019 12:00:00 AM',1),
    ('c','1/1/2019 12:00:00 AM',1),
    ('d','1/1/2019 12:00:00 AM',1),
    ('e','1/1/2019 12:00:00 AM',1);
    
  2. Vytvořte uloženou proceduru ve stejné databázi Azure SQL Database pro řídicí tabulku.

    Poznámka:

    Název uložené procedury je sp_insert_partition_JobRunTime_success. Vyvolá se aktivitou SqlServerStoredProcedure v kanálu ADF.

    CREATE PROCEDURE [dbo].[sp_insert_partition_JobRunTime_success] @PartPrefix varchar(255), @JobRunTime datetime, @SuccessOrFailure bit
    AS
    BEGIN
        INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
        VALUES
            (@PartPrefix,@JobRunTime,@SuccessOrFailure)
    END
    GO
    
  3. Přejděte na šablonu Kopírování rozdílových dat z AWS S3 do šablony Azure Data Lake Storage Gen2 . Zadejte připojení k tabulce externího ovládacího prvku AWS S3 jako úložiště zdrojů dat a jako cílové úložiště Azure Data Lake Storage Gen2. Mějte na paměti, že tabulka externích ovládacích prvků a uložená procedura odkazují na stejné připojení.

    Create a new connection

  4. Vyberte Použít tuto šablonu.

    Use this template

  5. Zobrazí se 2 kanály a 3 datové sady, jak je znázorněno v následujícím příkladu:

    Review the pipeline

  6. Přejděte kanál DeltaCopyFromS3 a vyberte Ladit a zadejte parametry. Pak vyberte Dokončit.

    Click **Debug**

  7. Zobrazí se výsledky podobné následujícímu příkladu:

    Review the result

  8. Výsledky z řídicí tabulky můžete také zkontrolovat pomocí dotazu "select * from s3_partition_delta_control_table", zobrazí se výstup podobný následujícímu příkladu:

    Screenshot that shows the results from the control table after you run the query.