Поделиться через


Массовая загрузка данных в Apache Phoenix с помощью psql

Apache Phoenix — это реляционная база данных на основе Apache HBase с открытым кодом и высоким уровнем параллелизма. Phoenix поддерживает запросы к HBase в стиле SQL. Phoenix использует драйверы JDBC. Это позволяет создавать, удалять и изменять SQL-таблицы, индексы, представления и последовательности, а также вставлять строки по одной или в пакетном режиме. Phoenix использует для сборки запросов компиляцию в машинный код noSQL, а не MapReduce, что позволяет создавать на основе HBase приложения с низким уровнем задержки. Кроме того, Phoenix поддерживает сопроцессоры для выполнения пользовательского кода в адресном пространстве сервера, то есть прямо в месте размещения данных. Это сводит к минимуму трафик между клиентом и сервером. Чтобы применить Phoenix для работы с данными в HDInsight, сначала создайте таблицы и загрузите в них данные.

Массовая загрузка с помощью Apache Phoenix

Существует несколько способов загрузки данных в HBase, включая использование клиентских API, задание MapReduce с TableOutputFormat или ручной ввод данных через оболочку HBase. Phoenix предлагает два метода загрузки данных в формате CSV в таблицы Phoenix: клиентское средство загрузки psql и средство массовой загрузки на основе MapReduce.

Инструмент psql является однопоточным и хорошо подходит для загрузки мегабайт или гигабайт данных. Все загружаемые CSV-файлы должны иметь расширение .csv. Также вы можете указать в командной строке psql SQL-файлы скриптов с расширением .sql.

Массовая загрузка с помощью MapReduce применяется для данных существенно большего объема, в том числе в промышленных средах, поскольку в MapReduce используется многопоточная схема работы.

Прежде чем загружать данные, убедитесь, что Phoenix включен и для запросов правильно настроены параметры времени ожидания. Откройте панель мониторинга Apache Ambari для кластера HDInsight, выберите HBase и перейдите на вкладку "Конфигурация". Прокрутите вниз и убедитесь, что для Apache Phoenix задано значение enabled, как показано ниже.

Параметры кластера Apache Phoenix HDInsight.

Использование psql для массовой загрузки таблиц

  1. Создайте файл с именем createCustomersTable.sql и скопируйте приведенный ниже код в файл. Сохраните и закройте файл.

    CREATE TABLE Customers (
        ID varchar NOT NULL PRIMARY KEY,
        Name varchar,
        Income decimal,
        Age INTEGER,
        Country varchar);
    
  2. Создайте файл с именем listCustomers.sql и скопируйте приведенный ниже код в файл. Сохраните и закройте файл.

    SELECT * from Customers;
    
  3. Создайте файл с именем customers.csv и скопируйте приведенный ниже код в файл. Сохраните и закройте файл.

    1,Samantha,260000.0,18,US
    2,Sam,10000.5,56,US
    3,Anton,550150.0,42,Norway
    
  4. Создайте файл с именем customers2.csv и скопируйте приведенный ниже код в файл. Сохраните и закройте файл.

    4,Nicolle,180000.0,22,US
    5,Kate,210000.5,24,Canada
    6,Ben,45000.0,32,Poland
    
  5. Откройте командную строку и измените каталог на расположение вновь созданных файлов. Замените CLUSTERNAME ниже на фактическое имя своего кластера HBase. Затем выполните код, чтобы передать файлы в головной узел кластера:

    scp customers.csv customers2.csv createCustomersTable.sql listCustomers.sql sshuser@CLUSTERNAME-ssh.azurehdinsight.net:/tmp
    
  6. С помощью команды ssh command подключитесь к кластеру. Измените приведенную ниже команду, заменив CLUSTERNAME именем своего кластера, а затем введите команду:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  7. В сеансе SSH измените каталог на расположение средства psql. Выполните команду:

    cd /usr/hdp/current/phoenix-client/bin
    
  8. Выполните массовую загрузку данных. Приведенный ниже код создаст таблицу Заказчики, а затем загрузит данные.

    python psql.py /tmp/createCustomersTable.sql /tmp/customers.csv
    

    Когда операция psql завершится, вы увидите аналогичное сообщение:

    csv columns from database.
    CSV Upsert complete. 3 rows upserted
    Time: 0.081 sec(s)
    
  9. Можно продолжить использовать psql для просмотра содержимого таблицы "Заказчики". Выполните приведенный ниже код.

    python psql.py /tmp/listCustomers.sql
    

    Кроме того, для запроса данных можно использовать оболочку HBaseили Apache Zeppelin.

  10. Загрузите дополнительные данные. Теперь, когда таблица уже существует, команда указывает таблицу. Выполните команду:

    python psql.py -t CUSTOMERS /tmp/customers2.csv
    

Использование MapReduce для массовой загрузки таблиц

Чтобы повысить пропускную способность и распределить нагрузку в кластере, используйте средство загрузки MapReduce. Этот загрузчик сначала преобразует все данные в HFiles, а затем предоставляет созданный HFiles в HBase.

  1. Далее в этом разделе описывается работа с сеансом SSH и созданными ранее объектами. При необходимости создайте таблицу Заказчики и файл customers.csv, выполнив действия, описанные выше. При необходимости восстановите SSH-подключение.

  2. Усеките содержимое таблицы Заказчики. В открытом сеансе SSH выполните приведенные ниже команды.

    hbase shell
    truncate 'CUSTOMERS'
    exit
    
  3. Скопируйте файл customers.csv из головного узла в службу хранилища Azure.

    hdfs dfs -put /tmp/customers.csv wasbs:///tmp/customers.csv
    
  4. Перейдите в каталог выполнения для команды массовой загрузки MapReduce:

    cd /usr/hdp/current/phoenix-client
    
  5. Запустите загрузчик CSV MapReduce с помощью команды hadoop, указав клиентский JAR-файл Phoenix:

    HADOOP_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/etc/hbase/conf hadoop jar phoenix-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool --table Customers --input /tmp/customers.csv
    

    После завершения загрузки вы увидите примерно следующее сообщение:

    19/12/18 18:30:57 INFO client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
    19/12/18 18:30:57 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x26f15dcceff02c3
    19/12/18 18:30:57 INFO zookeeper.ZooKeeper: Session: 0x26f15dcceff02c3 closed
    19/12/18 18:30:57 INFO zookeeper.ClientCnxn: EventThread shut down
    19/12/18 18:30:57 INFO mapreduce.AbstractBulkLoadTool: Incremental load complete for table=CUSTOMERS
    19/12/18 18:30:57 INFO mapreduce.AbstractBulkLoadTool: Removing output directory /tmp/50254426-aba6-400e-88eb-8086d3dddb6
    
  6. Чтобы использовать MapReduce с Azure Data Lake Storage вы должны определить корневой каталог Data Lake Storage (значение hbase.rootdir в hbase-site.xml). В следующей команде корневой каталог Data Lake Storage имеет значение adl://hdinsightconf1.azuredatalakestore.net:443/hbase1. Передайте при помощи этой команды входной и выходной каталоги Data Lake Storage в качестве параметров.

    cd /usr/hdp/current/phoenix-client
    
    $HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar /usr/hdp/2.4.2.0-258/phoenix/phoenix-4.4.0.2.4.2.0-258-client.jar
    
    org.apache.phoenix.mapreduce.CsvBulkLoadTool --table Customers --input adl://hdinsightconf1.azuredatalakestore.net:443/hbase1/data/hbase/temp/input/customers.csv –zookeeper ZookeeperQuorum:2181:/hbase-unsecure --output  adl://hdinsightconf1.azuredatalakestore.net:443/hbase1/data/hbase/output1
    
  7. Для запроса и просмотра данных можно использовать psql, как описано выше. Можно также использовать оболочку HBase или Apache Zeppelin.

Рекомендации

  • Используйте одну среду хранения для входных и выходных папок Azure Storage (WASB) или Azure Data Lake Storage (ADL). Передать данные из службы хранилища Azure в Data Lake Storage можно с помощью команды distcp.

    hadoop distcp wasb://@.blob.core.windows.net/example/data/gutenberg adl://.azuredatalakestore.net:443/myfolder
    
  • Используйте рабочие узлы большего размера. Процессы картирования в массовой копии MapReduce создают большие объемы временных выходных данных, которые заполняют все доступное пространство вне DFS. Для массовой загрузки большого объема данных используйте значительное количество рабочих узлов большого размера. Число рабочих узлов, выделенных для кластера, прямо влияет на скорость обработки.

  • Разделите входные файлы на фрагменты объемом около 10 ГБ. Для массовой загрузки данных используются большие объемы хранилища. Поэтому разделение входных данных на небольшие фрагменты позволит повысить производительность.

  • Избегайте горячих точек на региональных серверах. Если ключ строки монотонно возрастает, последовательные записи в HBase могут вызвать перегрузку региональных серверов. Добавив соль к ключу строк, вы сможете избежать последовательных операций записи. Phoenix позволяет прозрачно добавлять соль к ключу строки, используя байт соли для конкретной таблицы, как указано ниже.

Следующие шаги