使用 HDInsight on AKS 上的 Apache Flink®,將 Apache Kafka® 訊息接收至 Azure Cosmos DB for Apache Cassandra
注意
AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。
在淘汰日期之前,只有基本支援可用。
重要
此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。
此範例會使用 Apache Flink,將 HDInsight for Apache Kafka 訊息接收至 Azure Cosmos DB for Apache Cassandra。
當工程師偏好即時彙總資料進行分析時,此範例很突出。 透過存取歷程彙總資料,您可以建置機器學習 (ML) 模型來建置見解或動作。 您也可以將 IoT 資料內嵌至 Apache Flink,以即時彙總資料並將其儲存在Apache Cassandra 中。
必要條件
- AKS 上的 HDInsight 上的 Apache Flink 1.17.0
- HDInsight 上的 Apache Kafka 3.2
- Azure Cosmos DB for Apache Cassandra
- 適用於 maven 專案開發環境的 Ubuntu VM,與 HDInsight on AKS 叢集位於相同的 VNet 中。
Azure Cosmos DB for Apache Cassandra
Azure Cosmos DB for Apache Cassandra 可供針對 Apache Cassandra 撰寫的應用程式用作資料存放區。 這種相容性表示,藉由使用符合 CQLv4 的現有 Apache 驅動程式,您現有的 Cassandra 應用程式現在可以與 API for Cassandra 通訊。
如需詳細資訊,請參閱下列連結。
取得認證並在 Stream 原始程式碼上使用:
實作
在 Ubuntu VM 上,讓我們準備開發環境
複製 Azure 範例的存放庫
請參閱 GitHub 讀我檔案以下載 Maven,然後使用來自 Azure 範例的 Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git
複製此存放庫。
更新 Cassandra 的 Maven 專案
移至 Maven 專案資料夾 azure-cosmos-db-cassandra-java-getting-started-main,並更新此範例所需的變更。
maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.azure.cosmosdb.cassandra</groupId>
<artifactId>cosmosdb-cassandra-examples</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<finalName>cosmosdb-cassandra-examples</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Cosmos DB for Apache Cassandra 的連線設定
您必須更新主機名稱和使用者名稱,以及下列程式碼片段中的金鑰。
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_host=<update-host-name>.cassandra.cosmos.azure.com
cassandra_port = 10350
cassandra_username=<update-user-name>
cassandra_password=mxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>
來源結構
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra# ll
total 24
drwxr-xr-x 5 root root 4096 May 12 12:46 ./
drwxr-xr-x 3 root root 4096 Apr 9 2020 ../
-rw-r--r-- 1 root root 1105 Apr 9 2020 User.java
drwxr-xr-x 2 root root 4096 May 15 03:53 examples/
drwxr-xr-x 2 root root 4096 Apr 9 2020 repository/
drwxr-xr-x 2 root root 4096 May 15 02:43 util/
util 資料夾
CassandraUtils.java
注意
變更 ssl_keystore_file_path 取決於 Java 憑證位置。 HDInsight on AKS 上的 Apache Flink 叢集,路徑為 /usr/lib/jvm/msopenjdk-11-jre/lib/security
package com.azure.cosmosdb.cassandra.util;
import com.datastax.driver.core.*;
import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;
/**
* Cassandra utility class to handle the Cassandra Sessions
*/
public class CassandraUtils {
private Cluster cluster;
private Configurations config = new Configurations();
private String cassandraHost = "<cassandra-host-ip>";
private int cassandraPort = 10350;
private String cassandraUsername = "localhost";
private String cassandraPassword = "<cassandra-password>";
private File sslKeyStoreFile = null;
private String sslKeyStorePassword = "<keystore-password>";
/**
* This method creates a Cassandra Session based on the end-point details given in config.properties.
* This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
* If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
* @return Session Cassandra Session
*/
public Session getSession() {
try {
//Load cassandra endpoint details from config.properties
loadCassandraConnectionDetails();
final KeyStore keyStore = KeyStore.getInstance("JKS");
try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
keyStore.load(is, sslKeyStorePassword.toCharArray());
}
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
.getDefaultAlgorithm());
kmf.init(keyStore, sslKeyStorePassword.toCharArray());
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory
.getDefaultAlgorithm());
tmf.init(keyStore);
// Creates a socket factory for HttpsURLConnection using JKS contents.
final SSLContext sc = SSLContext.getInstance("TLSv1.2");
sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());
JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
.withSSLContext(sc)
.build();
cluster = Cluster.builder()
.addContactPoint(cassandraHost)
.withPort(cassandraPort)
.withCredentials(cassandraUsername, cassandraPassword)
.withSSL(sslOptions)
.build();
return cluster.connect();
} catch (Exception ex) {
ex.printStackTrace();
}
return null;
}
public Cluster getCluster() {
return cluster;
}
/**
* Closes the cluster and Cassandra session
*/
public void close() {
cluster.close();
}
/**
* Loads Cassandra end-point details from config.properties.
* @throws Exception
*/
private void loadCassandraConnectionDetails() throws Exception {
cassandraHost = config.getProperty("cassandra_host");
cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
cassandraUsername = config.getProperty("cassandra_username");
cassandraPassword = config.getProperty("cassandra_password");
String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
String ssl_keystore_password = config.getProperty("ssl_keystore_password");
// If ssl_keystore_file_path, build the path using JAVA_HOME directory.
if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
String javaHomeDirectory = System.getenv("JAVA_HOME");
if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
throw new Exception("JAVA_HOME not set");
}
ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();
}
sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
ssl_keystore_password : sslKeyStorePassword;
sslKeyStoreFile = new File(ssl_keystore_file_path);
if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));
}
}
}
Configurations.java
package com.azure.cosmosdb.cassandra.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* Configuration utility to read the configurations from properties file
*/
public class Configurations {
private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
private static String PROPERTY_FILE = "config.properties";
private static Properties prop = null;
private void loadProperties() throws IOException {
InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
if (input == null) {
LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
return;
}
prop = new Properties();
prop.load(input);
}
public String getProperty(String propertyName) throws IOException {
if (prop == null) {
loadProperties();
}
return prop.getProperty(propertyName);
}
}
範例資料夾
CassandraSink.java
package com.azure.cosmosdb.cassandra.examples;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;
public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {
@Override
public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {
CassandraUtils utils = new CassandraUtils();
Session cassandraSession = utils.getSession();
try {
UserRepository repository = new UserRepository(cassandraSession);
//Insert rows into user table
PreparedStatement preparedStatement = repository.prepareInsertStatement();
repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);
} finally {
if (null != utils) utils.close();
if (null != cassandraSession) cassandraSession.close();
}
}
}
主要類別:CassandraDemo.java
注意
- 將 Kafka 訊息代理程式 IP 取代為 Kafka 叢集訊息代理程式 IP
- 準備主題
- 使用者
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
- 使用者
package com.azure.cosmosdb.cassandra.examples;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CassandraDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 1. read kafka message as stream input, update the broker IPs from your Kafka setup
String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("user")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafka.print();
DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
String[] fields = line.split(",");
int v1 = Integer.parseInt(fields[0]);
Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
return tuple3;
}).returns(Types.TUPLE(Types.INT,Types.STRING,Types.STRING));
dataStream.addSink(new CassandraSink());
// 4. run stream
env.execute("sink Kafka to Cosmos DB for Apache Cassandra");
}
}
建置專案
從 azure-cosmos-db-cassandra-java-getting-started-main 資料夾執行 mvn clean install,以建置專案。 此命令會在目標資料夾下產生 cosmosdb-cassandra-examples.jar。
root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/target# ll
total 91156
drwxr-xr-x 7 root root 4096 May 15 03:54 ./
drwxr-xr-x 7 root root 4096 May 15 03:54 ../
drwxr-xr-x 2 root root 4096 May 15 03:54 archive-tmp/
drwxr-xr-x 3 root root 4096 May 15 03:54 classes/
-rw-r--r-- 1 root root 15542 May 15 03:54 cosmosdb-cassandra-examples-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 93290819 May 15 03:54 cosmosdb-cassandra-examples.jar
drwxr-xr-x 3 root root 4096 May 15 03:54 generated-sources/
drwxr-xr-x 2 root root 4096 May 15 03:54 maven-archiver/
drwxr-xr-x 3 root root 4096 May 15 03:54 maven-status/
上傳 jar 進行 Apache Flink 作業提交
將 jar 上傳至 Azure 記憶體,並將 wget 上傳至 webssh
msdata@pod-0 [ ~ ]$ ls -l cosmosdb-cassandra-examples.jar
-rw-r----- 1 msdata msdata 93290819 May 15 04:02 cosmosdb-cassandra-examples.jar
準備 Cosmos DB 金鑰儲存區和資料表
在 /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples 中執行 UserProfile 類別,以建立 Azure Cosmos DB 的金鑰儲存區和資料表。
bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar
將 Kafka 主題接收至 Cosmos DB for Apache Cassandra
執行 CassandraDemo 類別以將 Kafka 主題接收至 Cosmos DB for Apache Cassandra。
bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar
驗證 Apache Flink 作業提交
在 AKS 上的 HDInsight 叢集上檢查 Flink Web UI 上的作業。
在 Kafka 中產生訊息
將訊息產生至 Kafka 主題。
sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random
user_set = [
'John',
'Mike',
'Lucy',
'Tom',
'Machael',
'Lily',
'Zark',
'Tim',
'Andrew',
'Pick',
'Sean',
'Luke',
'Chunck'
]
city_set = [
'Atmore',
'Auburn',
'Bessemer',
'Birmingham',
'Chickasaw',
'Clanton',
'Decatur',
'Florence',
'Greenville',
'Jasper',
'Huntsville',
'Homer',
'Homer'
]
def main():
while True:
unique_id = str(int(time.time()))
if random.randrange(10) < 4:
city = random.choice(city_set[:3])
else:
city = random.choice(city_set)
user = random.choice(user_set)
print(unique_id + "," + user + "," + city )
time.sleep(1)
if __name__ == "__main__":
main()
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516
在 Azure 入口網站上檢查 Cosmos DB for Apache Cassandra 上的資料表
喜好設定
- Azure Cosmos DB for Apache Cassandra。
- 在 Azure Cosmos DB 中建立 Cassandra 帳戶的 API
- Azure 範例
- Apache、Apache Kafka、Kafka、Apache Flink、Flink、Apache Cassandra、Cassandra,以及相關聯的開放原始碼專案名稱均為 Apache Software Foundation (ASF) 的商標。