Руководство по Извлечение, преобразование и загрузка данных с помощью Azure HDInsight

В этом руководстве рассматривается выполнение операций извлечения, преобразования и загрузки данных. Вы берете необработанный файл данных CSV, импортируете его в кластер Azure HDInsight, преобразовываете его с помощью Apache Hive и загружаете в базу данных SQL Azure с помощью Apache Sqoop.

В этом руководстве описано следующее:

  • Извлечение и отправка данных в кластер HDInsight.
  • Преобразование данных с помощью Apache Hive.
  • Загрузка данных в Базу данных SQL Azure с помощью Sqoop.

Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.

Предварительные требования

Скачивание, извлечение и отправка данных

В этом разделе вы скачиваете примеры данных о тестировании. Затем вы отправляете эти данные в кластер HDInsight, а затем копируете их в учетную запись Data Lake Storage 2-го поколения.

  1. Скачайте файл On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Этот файл содержит данные о рейсах.

  2. Откройте командную строку и воспользуйтесь следующей командой безопасного копирования (SCP), чтобы передать ZIP-файл в головной узел кластера HDInsight:

    scp On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip <ssh-user-name>@<cluster-name>-ssh.azurehdinsight.net:
    
    • Замените <ssh-user-name> заполнитель именем пользователя SSH для кластера HDInsight.
    • Замените заполнитель <cluster-name> именем кластера HDInsight.

    Если вы используете пароль для проверки подлинности имени пользователя SSH, вам будет предложено ввести пароль.

    Если используется открытый ключ, может потребоваться использовать параметр -i и указать путь к соответствующему закрытому ключу. Например, scp -i ~/.ssh/id_rsa <file_name>.zip <user-name>@<cluster-name>-ssh.azurehdinsight.net:.

  3. После завершения отправки можно подключиться к кластеру с помощью SSH. Введите приведенную ниже команду в окне командной строки.

    ssh <ssh-user-name>@<cluster-name>-ssh.azurehdinsight.net
    
  4. Чтобы распаковать ZIP-файл, используйте следующую команду:

    unzip <file-name>.zip
    

    Она извлекает CSV-файл.

  5. Используйте следующую команду для создания контейнера Data Lake Storage 2-го поколения.

    hadoop fs -D "fs.azure.createRemoteFileSystemDuringInitialization=true" -ls abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/
    

    Замените заполнитель <container-name> именем, которое хотите присвоить своему контейнеру.

    Замените заполнитель <storage-account-name> именем вашей учетной записи хранения.

  6. Создайте каталог с помощью следующей команды:

    hdfs dfs -mkdir -p abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data
    
  7. Используйте следующую команду для копирования CSV-файла в каталог:

    hdfs dfs -put "On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2016_1.csv" abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data/
    

    Возьмите имя файла в кавычки, если оно содержит пробелы или специальные символы.

Преобразование данных

В этом разделе вы используете клиент Beeline для выполнения задания Apache Hive.

Как часть задания Apache Hive вы импортируете данные из CSV-файла в таблицу Apache Hive с именем delays.

  1. В командной строке SSH, которую вы уже использовали для кластера HDInsight, выполните следующую команду для создания и редактирования нового файла flightdelays.hql.

    nano flightdelays.hql
    
  2. Измените следующий текст, заменив <container-name> заполнители и <storage-account-name> именем контейнера и учетной записи хранения. Затем скопируйте и вставьте текст в консоль nano, нажав клавишу SHIFT и кнопку выбора правой кнопки мыши.

      DROP TABLE delays_raw;
      -- Creates an external table over the csv file
      CREATE EXTERNAL TABLE delays_raw (
         YEAR string,
         FL_DATE string,
         UNIQUE_CARRIER string,
         CARRIER string,
         FL_NUM string,
         ORIGIN_AIRPORT_ID string,
         ORIGIN string,
         ORIGIN_CITY_NAME string,
         ORIGIN_CITY_NAME_TEMP string,
         ORIGIN_STATE_ABR string,
         DEST_AIRPORT_ID string,
         DEST string,
         DEST_CITY_NAME string,
         DEST_CITY_NAME_TEMP string,
         DEST_STATE_ABR string,
         DEP_DELAY_NEW float,
         ARR_DELAY_NEW float,
         CARRIER_DELAY float,
         WEATHER_DELAY float,
         NAS_DELAY float,
         SECURITY_DELAY float,
         LATE_AIRCRAFT_DELAY float)
      -- The following lines describe the format and location of the file
      ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      STORED AS TEXTFILE
      LOCATION 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data';
    
      -- Drop the delays table if it exists
      DROP TABLE delays;
      -- Create the delays table and populate it with data
      -- pulled in from the CSV file (via the external table defined previously)
      CREATE TABLE delays
      LOCATION 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/processed'
      AS
      SELECT YEAR AS year,
         FL_DATE AS FlightDate, 
         substring(UNIQUE_CARRIER, 2, length(UNIQUE_CARRIER) -1) AS IATA_CODE_Reporting_Airline,
         substring(CARRIER, 2, length(CARRIER) -1) AS Reporting_Airline, 
         substring(FL_NUM, 2, length(FL_NUM) -1) AS Flight_Number_Reporting_Airline,
         ORIGIN_AIRPORT_ID AS OriginAirportID,
         substring(ORIGIN, 2, length(ORIGIN) -1) AS OriginAirportSeqID,
         substring(ORIGIN_CITY_NAME, 2) AS OriginCityName,
         substring(ORIGIN_STATE_ABR, 2, length(ORIGIN_STATE_ABR) -1)  AS OriginState,
         DEST_AIRPORT_ID AS DestAirportID,
         substring(DEST, 2, length(DEST) -1) AS DestAirportSeqID,
         substring(DEST_CITY_NAME,2) AS DestCityName,
         substring(DEST_STATE_ABR, 2, length(DEST_STATE_ABR) -1) AS DestState,
         DEP_DELAY_NEW AS DepDelay,
         ARR_DELAY_NEW AS ArrDelay,
         CARRIER_DELAY AS CarrierDelay,
         WEATHER_DELAY AS WeatherDelay,
         NAS_DELAY AS NASDelay,
         SECURITY_DELAY AS SecurityDelay,
         LATE_AIRCRAFT_DELAY AS LateAircraftDelay
      FROM delays_raw;
    
  3. Сохраните файл, введя клавиши CTRL+X, а затем введя Y при появлении запроса.

  4. Для запуска Hive и выполнения файла flightdelays.hql используйте следующую команду:

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http' -f flightdelays.hql
    
  5. По завершении выполнения сценария flightdelays.hql используйте следующую команду, чтобы открыть интерактивный сеанс Beeline:

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http'
    
  6. При появлении командной строки jdbc:hive2://localhost:10001/> используйте приведенный ниже запрос, чтобы извлечь информацию из импортированных данных о задержке рейсов:

    INSERT OVERWRITE DIRECTORY '/tutorials/flightdelays/output'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    SELECT regexp_replace(OriginCityName, '''', ''),
      avg(WeatherDelay)
    FROM delays
    WHERE WeatherDelay IS NOT NULL
    GROUP BY OriginCityName;
    

    Вы получите список городов, рейсы в которых задержаны из-за погодных условий, а также среднее время задержки. Он будет сохранен в abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output. Позже Sqoop считает данные из этого расположения и экспортирует их в базу данных SQL Azure.

  7. Чтобы выйти из Beeline, введите !quit в командной строке.

Создание таблицы базы данных SQL

Вам требуется указать имя сервера из Базы данных SQL для этой операции. Выполните следующие действия, чтобы найти имя вашего сервера.

  1. Перейдите на портал Azure.

  2. Выберите Базы данных SQL.

  3. Выполните фильтрацию по имени базы данных, которую вы выбрали. Имя сервера указано в столбце Имя сервера.

  4. Выполните фильтрацию по имени базы данных, которую вы хотите использовать. Имя сервера указано в столбце Имя сервера.

    Получение сведений о сервере Azure SQL

    Существует множество способов подключения к базе данных SQL и создания таблицы. В приведенных ниже действиях используется FreeTDS из кластера HDInsight.

  5. Чтобы установить FreeTDS, выполните следующую команду с помощью SSH-подключения к кластеру:

    sudo apt-get --assume-yes install freetds-dev freetds-bin
    
  6. После завершения установки используйте следующую команду для подключения к Базе данных SQL.

    TDSVER=8.0 tsql -H '<server-name>.database.windows.net' -U '<admin-login>' -p 1433 -D '<database-name>'
    
    • Замените заполнитель <server-name> именем логического сервера SQL Server.

    • Замените <admin-login> заполнитель именем пользователя администратора для База данных SQL.

    • Замените заполнитель <database-name> именем базы данных.

    При появлении запроса введите пароль для имени пользователя администратора База данных SQL.

    Должен появиться результат, аналогичный приведенному ниже тексту.

    locale is "en_US.UTF-8"
    locale charset is "UTF-8"
    using default charset "UTF-8"
    Default database being set to sqooptest
    1>
    
  7. В окне запроса 1> введите следующие инструкции:

    CREATE TABLE [dbo].[delays](
    [OriginCityName] [nvarchar](50) NOT NULL,
    [WeatherDelay] float,
    CONSTRAINT [PK_delays] PRIMARY KEY CLUSTERED
    ([OriginCityName] ASC))
    GO
    
  8. Если вводится инструкция GO, то оцениваются предыдущие инструкции.

    Этот запрос создает таблицу delays с кластеризованным индексом.

  9. Используйте следующий запрос для проверки создания таблицы.

    SELECT * FROM information_schema.tables
    GO
    

    Результат будет аналогичен приведенному ниже:

    TABLE_CATALOG   TABLE_SCHEMA    TABLE_NAME      TABLE_TYPE
    databaseName       dbo             delays        BASE TABLE
    
  10. Enter exit at the 1> , чтобы выйти из служебной программы tsql.

Экспорт и загрузка данных

В предыдущих разделах вы скопировали преобразованные данные в расположение abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output. В этом разделе вы с помощью Sqoop экспортируете данные из папки abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output в созданную в Базе данных SQL Azure таблицу.

  1. Чтобы проверить, видно ли в Sqoop базу данных SQL, используйте следующую команду:

    sqoop list-databases --connect jdbc:sqlserver://<SERVER_NAME>.database.windows.net:1433 --username <ADMIN_LOGIN> --password <ADMIN_PASSWORD>
    

    Эта команда выводит список баз данных, включая базу данных, в которой вы создали таблицу delays.

  2. Для экспорта данных из таблицы hivesampletable в таблицу delays используйте следующую команду:

    sqoop export --connect 'jdbc:sqlserver://<SERVER_NAME>.database.windows.net:1433;database=<DATABASE_NAME>' --username <ADMIN_LOGIN> --password <ADMIN_PASSWORD> --table 'delays' --export-dir 'abfs://<container-name>@.dfs.core.windows.net/tutorials/flightdelays/output' --fields-terminated-by '\t' -m 1
    

    Sqoop подключается к базе данных, которая содержит таблицу delays, и экспортирует данные из каталога /tutorials/flightdelays/output в таблицу delays.

  3. Когда команда sqoop будет выполнена, используйте служебную программу tsql для подключения к базе данных:

    TDSVER=8.0 tsql -H <SERVER_NAME>.database.windows.net -U <ADMIN_LOGIN> -P <ADMIN_PASSWORD> -p 1433 -D <DATABASE_NAME>
    
  4. Используйте следующие инструкции, чтобы проверить состояние экспорта данных в таблицу delays:

    SELECT * FROM delays
    GO
    

    Вы увидите список данных в таблице. Таблица содержит название города и среднее время задержки рейса для этого города.

  5. Введите exit для выхода из служебной программы tsql.

Очистка ресурсов

Все ресурсы, используемые в этом руководстве, предварительно созданы. Очистка не требуется.

Дальнейшие действия

Чтобы узнать дополнительные возможности работы с данными в HDInsight, ознакомьтесь со статьей по следующей ссылке: