Kopieren von Daten aus Azure Blob Storage nach Azure SQL-Datenbank mithilfe von Azure Data Factory

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

In diesem Tutorial erstellen Sie eine Data Factory-Pipeline, die Daten aus Azure Blob Storage nach Azure SQL-Datenbank kopiert. Das Konfigurationsmuster in diesem Tutorial gilt für Kopiervorgänge aus einem dateibasierten Datenspeicher in einen relationalen Datenspeicher. Eine Liste der Datenspeicher, die als Quellen und Senken unterstützt werden, finden Sie unter Unterstützte Datenspeicher und Formate.

In diesem Tutorial führen Sie die folgenden Schritte aus:

  • Erstellen einer Data Factory.
  • Erstellen verknüpfter Azure Storage- und Azure SQL-Datenbank-Dienste.
  • Erstellen von Azure Blob- und Azure SQL-Datenbank-Datasets.
  • Erstellen einer Pipeline mit einer Kopieraktivität.
  • Starten einer Pipelineausführung
  • Überwachen der Pipeline- und Aktivitätsausführungen.

Dieses Tutorial verwendet .NET SDK. Sie können andere Mechanismen zur Interaktion mit Azure Data Factory verwenden. Beispiele finden Sie unter Schnellstarts.

Wenn Sie über kein Azure-Abonnement verfügen, können Sie ein kostenloses Azure-Konto erstellen, bevor Sie beginnen.

Voraussetzungen

Erstellen eines Blobs und einer SQL-Tabelle

Bereiten Sie als Nächstes Ihr Azure-Blob und Ihre Azure SQL-Datenbank für das Tutorial vor, indem Sie ein Quellblob und eine SQL-Senkentabelle erstellen.

Erstellen eines Quellblobs

Erstellen Sie zunächst ein Quellblob, indem Sie einen Container erstellen und eine Eingabetextdatei in den Container hochladen:

  1. Öffnen Sie Notepad. Kopieren Sie den folgenden Text, und speichern Sie ihn lokal in einer Datei namens inputEmp.txt.

    John|Doe
    Jane|Doe
    
  2. Verwenden Sie ein Tool wie Azure Storage-Explorer, um den Container adfv2tutorial zu erstellen und die Datei inputEmp.txt in den Container hochzuladen.

Erstellen einer SQL-Senkentabelle

Erstellen Sie als Nächstes eine SQL-Senkentabelle:

  1. Verwenden Sie das folgende SQL-Skript, um die Tabelle dbo.emp in Ihrer Azure SQL-Datenbank zu erstellen.

    CREATE TABLE dbo.emp
    (
        ID int IDENTITY(1,1) NOT NULL,
        FirstName varchar(50),
        LastName varchar(50)
    )
    GO
    
    CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);
    
  2. Gewähren Sie Azure-Diensten den Zugriff auf SQL-Datenbank. Lassen Sie für Ihren Server den Zugriff auf Azure-Dienste zu, damit der Data Factory-Dienst Daten in SQL-Datenbank schreiben kann. Führen Sie folgende Schritte aus, um diese Einstellung zu überprüfen und zu aktivieren:

    1. Navigieren Sie zum Azure-Portal, um Ihre SQL Server-Instanz zu verwalten. Suchen Sie nach SQL-Server, und wählen Sie die entsprechende Option aus.

    2. Wählen Sie Ihren Server aus.

    3. Wählen Sie unter der Überschrift Sicherheit des SQL Server-Menüs die Option Firewalls und virtuelle Netzwerke aus.

    4. Wählen Sie auf der Seite Firewall und virtuelle Netzwerke unter Anderen Azure-Diensten und -Ressourcen den Zugriff auf diesen Server gestatten die Option EIN aus.

Erstellen eines Visual Studio-Projekts

Erstellen Sie mithilfe von Visual Studio eine C# .NET-Konsolenanwendung.

  1. Öffnen Sie Visual Studio.
  2. Wählen Sie im Fenster Start die Option Neues Projekt erstellen aus.
  3. Wählen Sie im Fenster Neues Projekt erstellen in der Liste mit den Projekttypen die C#-Version von Konsolen-App (.NET Framework) aus. Wählen Sie Weiteraus.
  4. Geben Sie im Fenster Neues Projekt konfigurieren unter Projektname die Zeichenfolge ADFv2Tutorial ein. Navigieren Sie unter Speicherort zu dem Verzeichnis, in dem das Projekt gespeichert werden soll, oder erstellen Sie es. Klicken Sie anschließend auf Erstellen. Das neue Projekt wird in der Visual Studio-IDE angezeigt.

Installieren von NuGet-Paketen

Installieren Sie als Nächstes die erforderlichen Bibliothekspakete mithilfe des NuGet-Paket-Managers.

  1. Wählen Sie auf der Menüleiste Extras>NuGet-Paket-Manager>Paket-Manager-Konsole aus.

  2. Führen Sie im Bereich Paket-Manager-Konsole die folgenden Befehle zum Installieren von Paketen aus. Weitere Informationen zum Azure Data Factory-NuGet-Paket finden Sie unter Microsoft.Azure.Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Erstellen eines Data Factory-Clients

Führen Sie die folgenden Schritte aus, um einen Data Factory-Client zu erstellen:

  1. Öffnen Sie Program.cs, und überschreiben Sie die vorhandenen using-Anweisungen durch den folgenden Code, um Verweise auf Namespaces hinzuzufügen:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Fügen Sie der Main-Methode den folgenden Code hinzu, um Variablen festzulegen. Ersetzen Sie die 14 Platzhalter durch Ihre eigenen Werte.

    Eine Liste der Azure-Regionen, in denen Data Factory derzeit verfügbar ist, finden Sie unter Verfügbare Produkte nach Region. Wählen Sie in der Dropdownliste Produkte Folgendes aus: Durchsuchen>Analytics>Data Factory. Wählen Sie anschließend in der Dropdownliste Regionen die Regionen aus, die für Sie von Interesse sind. Daraufhin wird ein Raster mit dem Verfügbarkeitsstatus von Data Factory-Produkten für die ausgewählten Regionen angezeigt.

    Hinweis

    Von Data Factory verwendete Datenspeicher (etwa Azure Storage und Azure SQL-Datenbank) und Computedienste (etwa HDInsight) müssen sich nicht unbedingt in der für Data Factory ausgewählten Region befinden.

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID to create the factory>";
    string resourceGroup = "<your resource group to create the factory>";
    
    string region = "<location to create the data factory in, such as East US>";
    string dataFactoryName = "<name of data factory to create (must be globally unique)>";
    
    // Specify the source Azure Blob information
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    string inputBlobPath = "adfv2tutorial/";
    string inputBlobName = "inputEmp.txt";
    
    // Specify the sink Azure SQL Database information
    string azureSqlConnString =
        "Server=tcp:<your server name>.database.windows.net,1433;" +
        "Database=<your database name>;" +
        "User ID=<your username>@<your server name>;" +
        "Password=<your password>;" +
        "Trusted_Connection=False;Encrypt=True;Connection Timeout=30";
    string azureSqlTableName = "dbo.emp";
    
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string sqlDbLinkedServiceName = "AzureSqlDbLinkedService";
    string blobDatasetName = "BlobDataset";
    string sqlDatasetName = "SqlDataset";
    string pipelineName = "Adfv2TutorialBlobToSqlCopy";
    
  3. Fügen Sie der Main-Methode den folgenden Code hinzu, um eine Instanz der Klasse DataFactoryManagementClient zu erstellen. Sie verwenden dieses Objekt, um eine Data Factory, einen verknüpften Dienst, Datasets und eine Pipeline zu erstellen. Sie verwenden dieses Objekt ebenfalls zum Überwachen der Ausführungsdetails der Pipeline.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync(
        "https://management.azure.com/", cc
    ).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Erstellen einer Data Factory

Fügen Sie der Main-Methode den folgenden Code hinzu, um eine Data Factory zu erstellen.

// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};

client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);

while (
    client.Factories.Get(
        resourceGroup, dataFactoryName
    ).ProvisioningState == "PendingCreation"
)
{
    System.Threading.Thread.Sleep(1000);
}

Erstellen von verknüpften Diensten

In diesem Tutorial werden zwei verknüpfte Dienste erstellt: einer für die Quelle und einer für die Senke.

Erstellen eines verknüpften Azure Storage-Diensts

Fügen Sie der Main-Methode den folgenden Code hinzu, um einen verknüpften Azure Storage-Dienst zu erstellen. Informationen zu unterstützten Eigenschaften und Details finden Sie unter Eigenschaften des verknüpften Diensts.

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey
        )
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);

Erstellen eines verknüpften Azure SQL-Datenbank-Diensts

Fügen Sie der Main-Methode den folgenden Code hinzu, um einen verknüpften Azure SQL-Datenbank-Dienst zu erstellen. Informationen zu unterstützten Eigenschaften und Details finden Sie unter Eigenschaften des verknüpften Diensts.

// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");

LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
    new AzureSqlDatabaseLinkedService
    {
        ConnectionString = new SecureString(azureSqlConnString)
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);

Erstellen von Datasets

In diesem Abschnitt erstellen Sie zwei Datasets: eins für die Quelle und eins für die Senke.

Erstellen eines Datasets für das Azure-Quellblob

Fügen Sie der Main-Methode den folgenden Code hinzu, um ein Azure-Blobdataset zu erstellen. Informationen zu unterstützten Eigenschaften und Details finden Sie unter Dataset-Eigenschaften.

Sie definieren ein Dataset, das die Quelldaten im Azure-Blob darstellt. Dieses Blobdataset verweist auf den verknüpften Azure Storage-Dienst, den Sie im vorherigen Schritt erstellen und beschreibt:

  • Den Speicherort des Blobs, aus dem kopiert werden soll: FolderPath und FileName
  • Das Blobformat, das angibt, wie der Inhalt analysiert werden soll: TextFormat und die zugehörigen Einstellungen (beispielsweise das Spaltentrennzeichen)
  • Die Datenstruktur, einschließlich Spaltennamen und Datentypen (entspricht in diesem Beispiel der SQL-Senkentabelle)
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = inputBlobPath,
        FileName = inputBlobName,
        Format = new TextFormat { ColumnDelimiter = "|" },
        Structure = new List<DatasetDataElement>
        {
            new DatasetDataElement { Name = "FirstName", Type = "String" },
            new DatasetDataElement { Name = "LastName", Type = "String" }
        }
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);

Erstellen eines Datasets für eine Azure SQL-Senkendatenbank

Fügen Sie der Main-Methode den folgenden Code hinzu, um ein Azure SQL-Datenbank-Dataset zu erstellen. Informationen zu unterstützten Eigenschaften und Details finden Sie unter Dataset-Eigenschaften.

Sie definieren ein Dataset, das die Senkendaten in Azure SQL-Datenbank darstellt. Dieses Dataset verweist auf den verknüpften Azure SQL-Datenbank-Dienst, den Sie im vorherigen Schritt erstellt haben. Es gibt auch die SQL-Tabelle an, die die kopierten Daten enthält.

// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
    new AzureSqlTableDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = sqlDbLinkedServiceName
        },
        TableName = azureSqlTableName
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);

Erstellen einer Pipeline

Fügen Sie der Main-Methode den folgenden Code hinzu, um eine Pipeline mit einer Kopieraktivität zu erstellen. In diesem Tutorial enthält diese Pipeline eine einzelne Aktivität (CopyActivity), die das Blobdataset als Quelle und das SQL-Dataset als Senke verwendet. Ausführliche Informationen zur Kopieraktivität finden Sie unter Kopieraktivität in Azure Data Factory.

// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToSQL",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference() { ReferenceName = blobDatasetName }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference { ReferenceName = sqlDatasetName }
            },
            Source = new BlobSource { },
            Sink = new SqlSink { }
        }
    }
};

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);

Erstellen einer Pipelineausführung

Fügen Sie der Main-Methode den folgenden Code hinzu, um eine Pipelineausführung auszulösen.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Überwachen einer Pipelineausführung

Fügen Sie als Nächstes Code ein, um Pipelineausführungszustände zu überprüfen und Details zur Ausführung der Kopieraktivität abzurufen.

  1. Fügen Sie der Main-Methode den folgenden Code hinzu, um kontinuierlich den Status der Pipelineausführung zu überprüfen, bis das Kopieren der Daten abgeschlossen ist.

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId
        );
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. Fügen Sie der Main-Methode den folgenden Code hinzu, um Ausführungsdetails zur Kopieraktivität abzurufen (etwa die Größe der gelesenen oder geschriebenen Daten).

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)
    );
    
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams
    );
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(queryResponse.Value.First().Output);
    }
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Ausführen des Codes

Wählen Sie Erstellen>Projektmappe erstellen aus, um die Anwendung zu erstellen. Starten Sie dann die Anwendung, indem Sie Debuggen>Debuggen starten auswählen, und überprüfen Sie die Pipelineausführung.

Die Konsole gibt den Status der Erstellung der Data Factory, des verknüpften Diensts, der Datasets, der Pipeline und der Pipelineausführung aus. Danach wird der Status der Pipelineausführung überprüft. Warten Sie, bis die Ausführungsdetails der Kopieraktivität mit der Größe der gelesenen/geschriebenen Daten angezeigt werden. Anschließend können Sie mithilfe von Tools wie SQL Server Management Studio (SSMS) oder Visual Studio eine Verbindung mit Ihrer Azure SQL-Zieldatenbank herstellen und überprüfen, ob die von Ihnen angegebene Tabelle die kopierten Daten enthält.

Beispielausgabe

Creating a data factory AdfV2Tutorial...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
      }
    }
  }
}
Creating linked service AzureSqlDbLinkedService...
{
  "properties": {
    "type": "AzureSqlDatabase",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": "adfv2tutorial/",
      "fileName": "inputEmp.txt",
      "format": {
        "type": "TextFormat",
        "columnDelimiter": "|"
      }
    },
    "structure": [
      {
        "name": "FirstName",
        "type": "String"
      },
      {
        "name": "LastName",
        "type": "String"
      }
    ],
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureStorageLinkedService"
    }
  }
}
Creating dataset SqlDataset...
{
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": "dbo.emp"
    },
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureSqlDbLinkedService"
    }
  }
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "BlobDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SqlDataset"
          }
        ],
        "name": "CopyFromBlobToSQL"
      }
    ]
  }
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 18,
  "dataWritten": 28,
  "rowsCopied": 2,
  "copyDuration": 2,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
  "usedDataIntegrationUnits": 2,
  "billedDuration": 2
}

Press any key to exit...

Die Pipeline in diesem Beispiel kopiert Daten in einem Azure Blob Storage von einem Speicherort in einen anderen. Sie haben Folgendes gelernt:

  • Erstellen einer Data Factory.
  • Erstellen verknüpfter Azure Storage- und Azure SQL-Datenbank-Dienste.
  • Erstellen von Azure Blob- und Azure SQL-Datenbank-Datasets.
  • Erstellen einer Pipeline mit einer Kopieraktivität
  • Starten einer Pipelineausführung
  • Überwachen der Pipeline- und Aktivitätsausführungen.

Fahren Sie mit dem folgenden Tutorial fort, um zu erfahren, wie Sie Daten von einem lokalen Speicherort in die Cloud kopieren: