Condividi tramite


API tabella e SQL nei cluster Apache Flink® in HDInsight nel servizio Azure Kubernetes

Importante

Questa funzionalità è attualmente disponibile solo in anteprima. Le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure disponibili in versione beta, in anteprima o non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire microsoft per altri aggiornamenti nella community di Azure HDInsight.

Apache Flink include due API relazionali, l'API Tabella e SQL, per l'elaborazione unificata di flussi e batch. L'API Table è un'API di query integrata nel linguaggio che consente la composizione di query da operatori relazionali, ad esempio selezione, filtro e join in modo intuitivo. Il supporto sql di Flink è basato su Apache Calcite, che implementa lo standard SQL.

L'API Table e le interfacce SQL si integrano perfettamente tra loro e l'API DataStream di Flink. È possibile passare facilmente tra tutte le API e le librerie, basate su di esse.

Analogamente ad altri motori SQL, le query Flink operano sulle tabelle. Differisce da un database tradizionale perché Flink non gestisce i dati inattivi localmente; Le query funzionano invece in modo continuo su tabelle esterne.

Le pipeline di elaborazione dei dati Flink iniziano con le tabelle di origine e terminano con le tabelle sink. Le tabelle di origine producono righe eseguite durante l'esecuzione della query; sono le tabelle a cui si fa riferimento nella clausola FROM di una query. Connessione ors possono essere di tipo HDInsight Kafka, HDInsight HBase, Hub eventi di Azure, database, file system o qualsiasi altro sistema il cui connettore si trova nel classpath.

È possibile fare riferimento a questo articolo su come usare l'interfaccia della riga di comando da Secure Shell in portale di Azure. Ecco alcuni esempi rapidi di come iniziare.

  • Per avviare il client SQL

    ./bin/sql-client.sh
    
  • Per passare un file sql di inizializzazione da eseguire insieme a sql-client

    ./sql-client.sh -i /path/to/init_file.sql
    
  • Per impostare una configurazione in sql-client

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL supporta le istruzioni CREATE seguenti

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE CATALOG

Di seguito è riportata una sintassi di esempio per definire una tabella di origine usando il connettore jdbc per connettersi a MSSQL, con ID, nome come colonne in un'istruzione CREATE TABLE

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CREATE DATABA edizione Standard :

CREATE DATABASE students;

CREATE CATALOG:

CREATE CATALOG myhive WITH ('type'='hive');

È possibile eseguire query continue nella parte superiore di queste tabelle

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Scrivere nella tabella sink dalla tabella di origine:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Aggiunta di dipendenze

Le istruzioni JAR vengono usate per aggiungere file JAR utente nel classpath o rimuovere i file JAR utente dal classpath o visualizzare i file JAR aggiunti nel classpath nel runtime.

Flink SQL supporta le istruzioni JAR seguenti:

  • ADD JAR
  • SHOW JARS
  • REMOVE JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

I cataloghi forniscono metadati, ad esempio database, tabelle, partizioni, viste e funzioni e informazioni necessarie per accedere ai dati archiviati in un database o in altri sistemi esterni.

In HDInsight nel servizio Azure Kubernetes il collegamento Flink supporta due opzioni di catalogo:

GenericInMemoryCatalog

GenericInMemoryCatalog è un'implementazione in memoria di un catalogo. Tutti gli oggetti sono disponibili solo per la durata della sessione sql.

HiveCatalog

HiveCatalog serve due scopi: come archiviazione permanente per i metadati Flink puri e come interfaccia per la lettura e la scrittura di metadati Hive esistenti.

Nota

HDInsight nei cluster del servizio Azure Kubernetes include un'opzione integrata di Metastore Hive per Apache Flink. È possibile optare per Il metastore Hive durante la creazione del cluster

È possibile fare riferimento a questo articolo su come usare l'interfaccia della riga di comando e iniziare a usare Flink SQL Client da Secure Shell in portale di Azure.

  • Avviare sql-client.sh la sessione

    Screenshot che mostra il catalogo hive predefinito.

    Default_catalog è il catalogo in memoria predefinito

  • Controllare ora il database predefinito del catalogo in memoria Screenshot che mostra i cataloghi predefiniti in memoria.

  • Creare il catalogo Hive della versione 3.1.2 e usarlo

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Nota

    HDInsight nel servizio Azure Kubernetes supporta Hive 3.1.2 e Hadoop 3.3.2. l'oggetto hive-conf-dir è impostato sulla posizione /opt/hive-conf

  • Creare database nel catalogo hive e impostarlo come predefinito per la sessione (a meno che non venga modificato). Screenshot che mostra la creazione di database nel catalogo hive e la relativa impostazione predefinita per la sessione.

Come creare e registrare tabelle Hive nel catalogo Hive

  • Seguire le istruzioni su Come creare e registrare database Flink nel catalogo

  • Creare una tabella Flink di tipo connettore Hive senza Partition

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Inserisci dati in hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Leggere i dati da hive_table

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Nota

    La directory hive warehouse si trova nel contenitore designato dell'account di archiviazione scelto durante la creazione del cluster Apache Flink, disponibile nella directory hive/warehouse/

  • Consente di creare l'hive della tabella Flink del tipo di connettore con Partition

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Importante

Esiste una limitazione nota in Apache Flink. Le ultime colonne 'n' vengono scelte per le partizioni, indipendentemente dalla colonna di partizione definita dall'utente. FLINK-32596 La chiave di partizione non è corretta quando si usa il dialetto Flink per creare una tabella Hive.

Riferimento