分享方式:


使用 Apache Flink® DataStream API 將事件訊息寫入 Azure Data Lake 儲存體 Gen2

重要

此功能目前為預覽功能。 適用於 Microsoft Azure 預覽版的補充使用規定包含適用於 Beta 版、預覽版或尚未發行至正式運作之 Azure 功能的更合法條款。 如需此特定預覽的相關信息,請參閱 AKS 預覽資訊的 Azure HDInsight。 如需問題或功能建議,請在 AskHDInsight提交要求,並提供詳細數據,並遵循我們在 Azure HDInsight 社群取得更多更新。

Apache Flink 會使用檔案系統來取用並持續儲存數據,無論是應用程式的結果,還是容錯和復原。 在本文中,瞭解如何使用 DataStream API 將事件訊息寫入 Azure Data Lake 儲存體 Gen2。

必要條件

此文件系統連接器為 BATCH 和 STREAMING 提供相同的保證,其設計目的是為串流執行提供一次語意。 如需詳細資訊,請參閱 Flink DataStream 文件系統

Apache Kafka 連接器

Flink 提供 Apache Kafka 連接器,可讓您只保證一次保證,從 Kafka 主題讀取數據,並將數據寫入 Kafka 主題。 如需詳細資訊,請參閱 Apache Kafka 連線 or

IntelliJ IDEA 上的pom.xml

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

ADLS Gen2 接收的程式

abfsGen2.java

注意

將 HDInsight 叢集 bootStrapServers 上的 Apache Kafka 取代為 Kafka 3.2 的代理程式

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

封裝 jar,並提交至 Apache Flink。

  1. 將 jar 上傳至 ABFS。

    顯示 Flink 應用程式模式畫面的螢幕快照。

  2. 在叢集建立中 AppMode 傳遞作業 jar 資訊。

    顯示建立應用程式模式的螢幕快照。

    注意

    請務必將 classloader.resolve-order 新增為 'parent-first' 和 hadoop.classpath.enable 作為 true

  3. 選取 [作業記錄匯總] 以將作業記錄推送至記憶體帳戶。

    顯示如何啟用作業記錄的螢幕快照。

  4. 您可以看到作業正在執行。

    顯示 Flink UI 的螢幕快照。

在 ADLS Gen2 上驗證串流數據

我們看到串 click_events 流至 ADLS Gen2。

顯示ADLS Gen2輸出的螢幕快照。顯示 Flink click 事件輸出的螢幕快照。

您可以指定滾動原則,以在下列三個條件中輪替進行中的元件檔案:

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

參考