Use Apache Flink® DataStream API on HDInsight on AKS for MongoDB as a source and sink


Apache Flink provides a MongoDB connector for reading and writing data from and to MongoDB collections with at-least-once guarantees.

This example demonstrates on how to use Apache Flink 1.17.0 on HDInsight on AKS along with your existing MongoDB as Sink and Source with Flink DataStream API MongoDB connector.

MongoDB is a nonrelational document database that provides support for JSON-like storage that helps store complex structures easily.

In this example, you learn how to use MongoDB to source and sink with DataStream API.


  • HDInsight on AKS - Flink 1.17.0 Cluster
  • For this demonstration, use a Window VM as maven project develop env in the same VNET as HDInsight on AKS.
  • We use the MongoDB Connector
  • For this demonstration, use an Ubuntu VM in the same VNET as HDInsight on AKS, install a MongoDB on this VM.

Installation of MongoDB on Ubuntu VM

Install MongoDB on Ubuntu.

MongoDB Shell commands.

Prepare MongoDB environment:

root@contosoubuntuvm:/var/lib/mongodb# vim /etc/mongod.conf

# network interfaces
  port: 27017

-- Start mongoDB
root@contosoubuntuvm:/var/lib/mongodb# systemctl start mongod
root@contosoubuntuvm:/var/lib/mongodb# systemctl status mongod
● mongod.service - MongoDB Database Server
     Loaded: loaded (/lib/systemd/system/mongod.service; disabled; vendor preset: enabled)
     Active: active (running) since Fri 2023-06-16 00:07:39 UTC; 5s ago
   Main PID: 415775 (mongod)
     Memory: 165.4M
     CGroup: /system.slice/mongod.service
             └─415775 /usr/bin/mongod --config /etc/mongod.conf

Jun 16 00:07:39 contosoubuntuvm systemd[1]: Started MongoDB Database Server.
Jun 16 00:07:39 contosoubuntuvm mongod[415775]: {"t":{"$date":"2023-06-16T00:07:39.091Z"},"s":"I",  "c":"CONTROL",  "id":7484500, "ctx":"-","msg">

-- check connectivity
root@contosoubuntuvm:/var/lib/mongodb# telnet 27017
Connected to
Escape character is '^]'.

-- Use mongosh to connect to mongodb
root@contosoubuntuvm:/var/lib/mongodb# mongosh "mongodb://"
Current Mongosh Log ID: 648bccc3b8a6b0885614b2dc
Connecting to:          mongodb://
Using MongoDB:          6.0.6
Using Mongosh:          1.10.0

   The server generated these startup warnings when booting
- Check `click_events` collection

test> db.click_events.count()


To ensure the MongoDB setup can be accessed outside, change bindIp to

vim /etc/mongod.conf
# network interfaces
  port: 27017

Get started

Create a maven project on IdeaJ to prepare the pom.xml for MongoDB Collection

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=""

    <!-- -->
    <!-- -->
    <!-- -->
    <!-- -->
    <!-- -->
    <!-- -->

Generate a stream source and sink to the MongoDB collection:click_events

package contoso.example;

import com.mongodb.client.model.InsertOneModel;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;

public class MongoDBSinkDemo {
    public static void main(String[] args) throws Exception {
        // 1. get stream env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. event data source, update the ip address from to your MongoDB IP
        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        MongoSink<Event> sink = MongoSink.<Event>builder()
                        (input, context) -> new InsertOneModel<>(BsonDocument.parse(String.valueOf(input))))


        env.execute("Sink click events to MongoDB");

Stream click event source:

package contoso.example;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class ClickSource implements SourceFunction<Event> {
    // declare a flag
    private Boolean running = true;

    // declare a flag
    public void run(SourceContext<Event> ctx) throws Exception{
        // generate random record
        Random random = new Random();
        String[] users = {"Mary","Alice","Bob","Cary"};
        String[] urls = {"./home","./cart","./fav","./prod?id=100","./prod?id=10"};

        // loop generate
        while (running) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            String ts = timestamp.toString();
            ctx.collect(new Event(user,url,ts));
    public void cancel()
        running = false;

package contoso.example;
import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public String ts;

    public Event() {

    public Event(String user, String url, String ts) {
        this.user = user;
        this.url = url;
        this.ts = ts;

    public String toString(){
        return "{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",ts: " + ts +

Use MongoDB as a source and sink to ADLS Gen2

Write a program for MongoDB as a source and sink to ADLS Gen2.

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.bson.BsonDocument;

import java.time.Duration;

public class MongoDBSourceDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        MongoSource<String> mongoSource = MongoSource.<String>builder()
                .setUri("mongodb://") // update with the correct IP address
                .setDeserializationSchema(new MongoDeserializationSchema<String>() {
                    public String deserialize(BsonDocument document) {
                        return document.toJson();

                    public TypeInformation<String> getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;

        DataStream stream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB-Source");
        // 3. sink to gen2, update with your container name and storage path
        String outputPath  = "abfs://<update-container>@<storage-path>";
        FileSink<String> gen2 = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))


        env.execute("MongoDB as a Source Sink to Gen2");

Package the maven jar, and upload it to Storage and then wget it to Flink CLI or directly upload to Flink UI to run.

Screenshot displays how to upload package to storage.

Check Flink UI

Check Flink UI

Validate results

Sink click events to Mongo DB's admin.click_events collection

test> db.click_events.count()

test> db.click_events.find()
    _id: ObjectId("648bc933a68ca7614e1f87a2"),
    user: 'Alice',
    url: './prod?id=10',
    ts: Long("1686882611148")
    _id: ObjectId("648bc935a68ca7614e1f87a3"),
    user: 'Bob',
    url: './prod?id=10',
    ts: Long("1686882613148")

Use Mongo DB's admin.click_events collection as a source, and sink to ADLS Gen2

Use Mongo DB's admin.click_events collection as a source, and sink to ADLS Gen2

