Tutorial: Extrahieren, Transformieren und Laden von Daten mit Azure HDInsight

In diesem Tutorial führen Sie einen ETL-Vorgang durch: Daten extrahieren, transformieren und laden. Sie verwenden eine unformatierte CSV-Datendatei, importieren diese Datei in einen Azure HDInsight-Cluster, transformieren sie mit Apache Hive und laden sie mit Apache Sqoop in Azure SQL-Datenbank.

In diesem Tutorial lernen Sie Folgendes:

  • Extrahieren und Hochladen der Daten in einen HDInsight-Cluster
  • Transformieren der Daten mithilfe von Apache Hive
  • Laden der Daten in Azure SQL-Datenbank mithilfe von Sqoop

Wenn Sie kein Azure-Abonnement besitzen, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.

Voraussetzungen

Herunterladen, Extrahieren und anschließendes Hochladen der Daten

In diesem Abschnitt laden Sie Beispielflugdaten herunter. Anschließend laden Sie diese Daten in Ihren HDInsight-Cluster hoch, und kopieren sie dann in Ihr Data Lake Storage Gen2-Konto.

  1. Laden Sie die Datei On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip herunter. Diese Datei enthält die Flugdaten.

  2. Öffnen Sie eine Eingabeaufforderung, und verwenden Sie den folgenden SCP-Befehl (Secure Copy), um die ZIP-Datei in den Hauptknoten des HDInsight-Clusters hochzuladen:

    scp On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip <ssh-user-name>@<cluster-name>-ssh.azurehdinsight.net:
    
    • Ersetzen Sie den Platzhalter <ssh-user-name> durch den SSH-Benutzernamen für den HDInsight-Cluster.
    • Ersetzen Sie den Platzhalter <cluster-name> durch den Namen des HDInsight-Clusters.

    Wenn Sie für die Authentifizierung Ihres SSH-Benutzernamens ein Kennwort verwenden, werden Sie zur Eingabe dieses Kennworts aufgefordert.

    Wenn Sie einen öffentlichen Schlüssel verwendet haben, müssen Sie möglicherweise den Parameter -i verwenden und den Pfad zum passenden privaten Schlüssel angeben. Beispiel: scp -i ~/.ssh/id_rsa <file_name>.zip <user-name>@<cluster-name>-ssh.azurehdinsight.net:.

  3. Stellen Sie nach Abschluss des Uploadvorgangs eine SSH-Verbindung mit dem Cluster her. Geben Sie an der Eingabeaufforderung den folgenden Befehl ein:

    ssh <ssh-user-name>@<cluster-name>-ssh.azurehdinsight.net
    
  4. Extrahieren Sie die ZIP-Datei mit folgendem Befehl:

    unzip <file-name>.zip
    

    Dieser Befehl extrahiert eine CSV-Datei.

  5. Verwenden Sie den folgenden Befehl, um den Data Lake Storage Gen2-Container zu erstellen.

    hadoop fs -D "fs.azure.createRemoteFileSystemDuringInitialization=true" -ls abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/
    

    Ersetzen Sie den Platzhalter <container-name> durch den Namen, den Sie für Ihren Container verwenden möchten.

    Ersetzen Sie den Platzhalter <storage-account-name> durch den Namen Ihres Speicherkontos.

  6. Verwenden Sie den folgenden Befehl, um ein Verzeichnis zu erstellen.

    hdfs dfs -mkdir -p abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data
    
  7. Kopieren Sie die CSV-Datei mit dem folgenden Befehl in das Verzeichnis:

    hdfs dfs -put "On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2016_1.csv" abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data/
    

    Setzen Sie den Dateinamen in Anführungszeichen, wenn dieser Leerzeichen oder Sonderzeichen enthält.

Transformieren der Daten

In diesem Abschnitt verwenden Sie Beeline, um einen Apache Hive-Auftrag auszuführen.

Im Rahmen des Apache Hive-Auftrags importieren Sie die Daten aus der CSV-Datei in eine Apache Hive-Tabelle namens delays.

  1. Verwenden Sie an der SSH-Eingabeaufforderung, die bereits für den HDInsight-Cluster vorhanden ist, den folgenden Befehl, um eine neue Datei namens flightdelays.hql zu erstellen und zu bearbeiten:

    nano flightdelays.hql
    
  2. Ändern Sie den folgenden Text, indem Sie die Platzhalter <container-name> und <storage-account-name> durch den Namen Ihres Containers und Ihres Speicherkontos ersetzen. Kopieren Sie den Text anschließend, und fügen Sie ihn in die Nano-Konsole ein, indem Sie die UMSCHALTTASTE drücken und gleichzeitig mit der rechten Maustaste klicken.

      DROP TABLE delays_raw;
      -- Creates an external table over the csv file
      CREATE EXTERNAL TABLE delays_raw (
         YEAR string,
         FL_DATE string,
         UNIQUE_CARRIER string,
         CARRIER string,
         FL_NUM string,
         ORIGIN_AIRPORT_ID string,
         ORIGIN string,
         ORIGIN_CITY_NAME string,
         ORIGIN_CITY_NAME_TEMP string,
         ORIGIN_STATE_ABR string,
         DEST_AIRPORT_ID string,
         DEST string,
         DEST_CITY_NAME string,
         DEST_CITY_NAME_TEMP string,
         DEST_STATE_ABR string,
         DEP_DELAY_NEW float,
         ARR_DELAY_NEW float,
         CARRIER_DELAY float,
         WEATHER_DELAY float,
         NAS_DELAY float,
         SECURITY_DELAY float,
         LATE_AIRCRAFT_DELAY float)
      -- The following lines describe the format and location of the file
      ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      STORED AS TEXTFILE
      LOCATION 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data';
    
      -- Drop the delays table if it exists
      DROP TABLE delays;
      -- Create the delays table and populate it with data
      -- pulled in from the CSV file (via the external table defined previously)
      CREATE TABLE delays
      LOCATION 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/processed'
      AS
      SELECT YEAR AS year,
         FL_DATE AS FlightDate, 
         substring(UNIQUE_CARRIER, 2, length(UNIQUE_CARRIER) -1) AS IATA_CODE_Reporting_Airline,
         substring(CARRIER, 2, length(CARRIER) -1) AS Reporting_Airline, 
         substring(FL_NUM, 2, length(FL_NUM) -1) AS Flight_Number_Reporting_Airline,
         ORIGIN_AIRPORT_ID AS OriginAirportID,
         substring(ORIGIN, 2, length(ORIGIN) -1) AS OriginAirportSeqID,
         substring(ORIGIN_CITY_NAME, 2) AS OriginCityName,
         substring(ORIGIN_STATE_ABR, 2, length(ORIGIN_STATE_ABR) -1)  AS OriginState,
         DEST_AIRPORT_ID AS DestAirportID,
         substring(DEST, 2, length(DEST) -1) AS DestAirportSeqID,
         substring(DEST_CITY_NAME,2) AS DestCityName,
         substring(DEST_STATE_ABR, 2, length(DEST_STATE_ABR) -1) AS DestState,
         DEP_DELAY_NEW AS DepDelay,
         ARR_DELAY_NEW AS ArrDelay,
         CARRIER_DELAY AS CarrierDelay,
         WEATHER_DELAY AS WeatherDelay,
         NAS_DELAY AS NASDelay,
         SECURITY_DELAY AS SecurityDelay,
         LATE_AIRCRAFT_DELAY AS LateAircraftDelay
      FROM delays_raw;
    
  3. Speichern Sie die Datei mit STRG+X, und geben Sie dann Y ein, wenn Sie zur Eingabe aufgefordert werden.

  4. Verwenden Sie den folgenden Befehl, um Hive zu starten und die Datei flightdelays.hql auszuführen:

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http' -f flightdelays.hql
    
  5. Öffnen Sie nach der Ausführung des Skripts flightdelays.hql mithilfe des folgenden Befehls eine interaktive Beeline-Sitzung:

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http'
    
  6. Wenn Sie die jdbc:hive2://localhost:10001/>-Eingabeaufforderung erhalten, rufen Sie die Daten mit der folgenden Abfrage aus den importierten Flugverspätungsdaten ab:

    INSERT OVERWRITE DIRECTORY '/tutorials/flightdelays/output'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    SELECT regexp_replace(OriginCityName, '''', ''),
      avg(WeatherDelay)
    FROM delays
    WHERE WeatherDelay IS NOT NULL
    GROUP BY OriginCityName;
    

    Mit dieser Abfrage werden eine Liste von Orten, in denen Verspätungen infolge des Wetters auftraten, sowie die durchschnittliche Verspätung abgerufen. Die Liste wird anschließend in abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output gespeichert. Sqoop liest später die Daten an diesem Speicherort und exportiert sie in die Azure SQL-Datenbank.

  7. Geben Sie zum Beenden von Beeline !quit an der Eingabeaufforderung ein.

Erstellen einer SQL-Datenbanktabelle

Für diesen Vorgang benötigen Sie den Servernamen aus SQL-Datenbank. Führen Sie die folgenden Schritte aus, um den Servernamen festzustellen.

  1. Öffnen Sie das Azure-Portal.

  2. Wählen Sie SQL-Datenbanken aus.

  3. Filtern Sie nach dem Namen der Datenbank, die Sie verwenden möchten. Der Servername befindet sich in der Spalte Servername.

  4. Filtern Sie nach dem Namen der Datenbank, die Sie verwenden möchten. Der Servername befindet sich in der Spalte Servername.

    Abrufen von Details zum Azure SQL-Server

    Es gibt viele Möglichkeiten, eine Verbindung mit der SQL-Datenbank herzustellen und eine Tabelle zu erstellen. Die folgenden Schritte verwenden FreeTDS aus dem HDInsight-Cluster.

  5. Verwenden Sie zum Installieren von FreeTDS den folgenden Befehl über eine SSH-Verbindung mit dem Cluster:

    sudo apt-get --assume-yes install freetds-dev freetds-bin
    
  6. Nachdem die Installation abgeschlossen ist, verwenden Sie den folgenden Befehl, um eine Verbindung mit SQL-Datenbank herzustellen.

    TDSVER=8.0 tsql -H '<server-name>.database.windows.net' -U '<admin-login>' -p 1433 -D '<database-name>'
    
    • Ersetzen Sie den Platzhalter <server-name> durch den logischen SQL-Servernamen.

    • Ersetzen Sie den Platzhalter <admin-login> durch den Administratorbenutzernamen für SQL-Datenbank.

    • Ersetzen Sie den Platzhalter <database-name> durch den Namen der Datenbank.

    Geben Sie das Kennwort für den SQL-Datenbank-Administratorbenutzernamen ein, wenn Sie dazu aufgefordert werden.

    Eine Ausgabe ähnlich folgendem Text wird angezeigt:

    locale is "en_US.UTF-8"
    locale charset is "UTF-8"
    using default charset "UTF-8"
    Default database being set to sqooptest
    1>
    
  7. Geben Sie an der Eingabeaufforderung 1> die folgenden Anweisungen ein:

    CREATE TABLE [dbo].[delays](
    [OriginCityName] [nvarchar](50) NOT NULL,
    [WeatherDelay] float,
    CONSTRAINT [PK_delays] PRIMARY KEY CLUSTERED
    ([OriginCityName] ASC))
    GO
    
  8. Nach Eingabe der Anweisung GO werden die vorherigen Anweisungen ausgewertet.

    Diese Abfrage erstellt eine Tabelle namens delays, die über einen gruppierten Index verfügt.

  9. Stellen Sie mithilfe der folgenden Abfrage sicher, dass die Tabelle erstellt wurde:

    SELECT * FROM information_schema.tables
    GO
    

    Die Ausgabe sieht in etwa wie folgender Text aus:

    TABLE_CATALOG   TABLE_SCHEMA    TABLE_NAME      TABLE_TYPE
    databaseName       dbo             delays        BASE TABLE
    
  10. EINGABE exit at the 1> ein.

Exportieren und Laden der Daten

In den vorherigen Abschnitten haben Sie die transformierten Daten unter abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output kopiert. In diesem Abschnitt verwenden Sie Sqoop, um die Daten aus abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output in die Tabelle zu exportieren, die Sie in Azure SQL-Datenbank erstellt haben.

  1. Verwenden Sie den folgenden Befehl, um zu überprüfen, ob Sqoop Ihre SQL-Datenbank erreichen kann:

    sqoop list-databases --connect jdbc:sqlserver://<SERVER_NAME>.database.windows.net:1433 --username <ADMIN_LOGIN> --password <ADMIN_PASSWORD>
    

    Der Befehl gibt eine Liste von Datenbanken zurück, in der auch die Datenbank enthalten ist, in der Sie die Tabelle delays erstellt haben.

  2. Verwenden Sie den folgenden Befehl, um Daten aus der Tabelle hivesampletable in die Tabelle delays zu exportieren:

    sqoop export --connect 'jdbc:sqlserver://<SERVER_NAME>.database.windows.net:1433;database=<DATABASE_NAME>' --username <ADMIN_LOGIN> --password <ADMIN_PASSWORD> --table 'delays' --export-dir 'abfs://<container-name>@.dfs.core.windows.net/tutorials/flightdelays/output' --fields-terminated-by '\t' -m 1
    

    Sqoop stellt eine Verbindung mit der Datenbank her, die die Tabelle delays enthält, und exportiert Daten aus dem Verzeichnis /tutorials/flightdelays/output in die Tabelle delays.

  3. Nach Abschluss des sqoop-Befehls stellen Sie über das Hilfsprogramm tsql eine Verbindung mit der Datenbank her:

    TDSVER=8.0 tsql -H <SERVER_NAME>.database.windows.net -U <ADMIN_LOGIN> -P <ADMIN_PASSWORD> -p 1433 -D <DATABASE_NAME>
    
  4. Überprüfen Sie mithilfe der folgenden Anweisungen, ob die Daten in die Tabelle delays exportiert wurden:

    SELECT * FROM delays
    GO
    

    Es sollte eine Liste der Tabellendaten angezeigt werden. Die Tabelle enthält den Namen der Stadt und die durchschnittliche Flugverspätung für diese Stadt.

  5. Geben Sie exit ein, um das Hilfsprogramm tsql zu beenden.

Bereinigen von Ressourcen

In diesem Tutorial wurden nur Ressourcen verwendet, die bereits vorhanden waren. Eine Bereinigung ist nicht erforderlich.

Nächste Schritte

Weitere Informationen zum Arbeiten mit Daten in HDInsight finden Sie im folgenden Artikel: