Поделиться через


Запись сообщений о событиях в Azure Data Lake Storage 2-го поколения с помощью API Apache Flink® DataStream

Важный

Azure HDInsight на AKS вышло из эксплуатации 31 января 2025 г. Узнайте больше о в этом объявлении.

Необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого завершения рабочих нагрузок.

Важный

Эта функция сейчас доступна в предварительной версии. Дополнительные условия использования для предварительных версий Microsoft Azure включают дополнительные юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Для получения информации о данной предварительной версии см. информацию о предварительной версии Azure HDInsight на AKS. Для вопросов или предложений по функциям отправьте запрос на AskHDInsight с подробными сведениями и подписывайтесь на нас для получения дополнительных обновлений в Azure HDInsight Community.

Apache Flink использует файловые системы для использования и постоянного хранения данных как для результатов приложений, так и для отказоустойчивости и восстановления. Из этой статьи вы узнаете, как записывать сообщения о событиях в Azure Data Lake Storage 2-го поколения с помощью API DataStream.

Необходимые условия

Этот соединитель файловой системы предоставляет одинаковые гарантии для пакетной обработки и потоковой передачи и предназначен для обеспечения однократной семантики выполнения потоковой передачи. Дополнительные сведения см. в разделе Flink DataStream Filesystem.

Соединитель Apache Kafka

Flink предоставляет соединитель Apache Kafka для чтения данных из и записи данных в темы Kafka с гарантией точности. Дополнительные сведения см. в разделе Коннектор Apache Kafka.

pom.xml IntelliJ IDEA

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

программа для приемника ADLS 2-го поколения

abfsGen2.java

Заметка

Замените Apache Kafka в кластере HDInsight bootStrapServers вашими собственными брокерами для Kafka 3.2

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

JAR-файл пакета и отправьте его в Apache Flink.

  1. Отправьте jar-файл в ABFS.

    снимок экрана: экран режима приложения Flink.

  2. Передайте информацию о jar-файле задания при создании кластера AppMode.

    снимок экрана, показывающий режим создания приложения.

    Заметка

    Обязательно добавьте classloader.resolve-order как parent-first и hadoop.classpath.enable true.

  3. Выберите функцию агрегирования журнала заданий, чтобы передать их в учетную запись хранения.

    снимок экрана, показывающий, как включить журнал заданий.

  4. Вы можете увидеть, как выполняется задача.

    снимок экрана с интерфейсом Flink.

Проверить валидность потоковых данных в ADLS Gen2

Мы видим, как поток данных click_events передаётся в ADLS Gen2.

снимок экрана с выходными данными ADLS 2-го поколения. снимок экрана, показывающий выходные данные события Flink Click.

Вы можете указать политику ротации, которая выполняет перекат части файла при выполнении любого из следующих трех условий.

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

Ссылка