Hello @BEPV
Thank you for your time and the question.
Regarding incomplete code if that will work for you, could you please try the below code:
import java.io.{ByteArrayInputStream, File};
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import com.microsoft.aad.msal4j.*;
import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.eventhubs.ConnectionStringBuilder;
import org.apache.spark.eventhubs.{EventHubsConf, EventPosition};
import org.apache.spark.sql.streaming.Trigger;
public class EventHubsStreaming {
public static void main(String[] args) throws Exception {
// Define your EventHubs and AAD configurations
String eventHubsNamespace = "YOUR_EVENT_HUBS_NAMESPACE";
String eventHubName = "YOUR_EVENT_HUB_NAME";
String tenantId = "YOUR_TENANT_ID";
String clientId = "YOUR_CLIENT_ID";
String certificatePath = "PATH_TO_YOUR_CERTIFICATE.pfx";
String certificatePassword = "YOUR_CERTIFICATE_PASSWORD";
// Initialize Spark session
SparkSession spark = SparkSession.builder()
.appName("EventHubsSample")
.getOrCreate();
// Load certificate
byte[] certificateBytes = FileUtils.readFileToByteArray(new File(certificatePath));
// Create confidential client application
ConfidentialClientApplication app = ConfidentialClientApplication.builder(
clientId,
ClientCredentialFactory.createFromCertificate(
new ByteArrayInputStream(certificateBytes),
certificatePassword.toCharArray()
))
.authority("https://login.microsoftonline.com/" + tenantId)
.build();
// Get the access token
ClientCredentialParameters parameters = ClientCredentialParameters.builder(
Collections.singleton("https://eventhubs.azure.net/.default"))
.build();
CompletableFuture<IAuthenticationResult> future = app.acquireToken(parameters);
IAuthenticationResult result = future.get();
String token = result.accessToken();
// Create EventHubs connection string
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder()
.setNamespaceName(eventHubsNamespace)
.setEventHubName(eventHubName)
.setSasKeyName("RootManageSharedAccessKey")
.setSasKey(token);
// Create EventHubs configuration
EventHubsConf eventHubsConf = EventHubsConf(connectionStringBuilder.toString())
.setConsumerGroup("$Default")
.setStartingPosition(EventPosition.fromEndOfStream());
// Read from EventHubs
spark.readStream()
.format("eventhubs")
.options(eventHubsConf.toMap())
.load()
.writeStream()
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
.awaitTermination();
}
}
This will help you to:
- Initialization
- Authentication
- EventHubs Configuration and
- Stream Reading.
Hope to read if this solves the issue. Though, I have another perspective for the solution.
Best Regards,
Sina