Guidance on how to use Service Principal with Certificate to Authorize for EventHub Stream Read

BEPV 0 Reputation points
2024-07-01T21:27:20.97+00:00

I found this documentation https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/use-aad-authentication-to-connect-eventhubs.md online on how to use service principal with certificate to use spark stream read from EventHubs, I want to do this in PySpark running in a Synapse notebook using a spark pool, I would like to get some guidance on how to propertly do this in PySpark. User's image

Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
648 questions
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,005 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,226 questions
Microsoft Entra ID
Microsoft Entra ID
A Microsoft Entra identity service that provides identity management and access control capabilities. Replaces Azure Active Directory.
22,204 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Sina Salam 12,166 Reputation points
    2024-07-02T20:47:59.34+00:00

    Hello @BEPV

    Thank you for your time and the question.

    Regarding incomplete code if that will work for you, could you please try the below code:

    import java.io.{ByteArrayInputStream, File};
    import java.util.Collections;
    import java.util.concurrent.CompletableFuture;
    import com.microsoft.aad.msal4j.*;
    import org.apache.commons.io.FileUtils;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.eventhubs.ConnectionStringBuilder;
    import org.apache.spark.eventhubs.{EventHubsConf, EventPosition};
    import org.apache.spark.sql.streaming.Trigger;
    public class EventHubsStreaming {
        public static void main(String[] args) throws Exception {
            // Define your EventHubs and AAD configurations
            String eventHubsNamespace = "YOUR_EVENT_HUBS_NAMESPACE";
            String eventHubName = "YOUR_EVENT_HUB_NAME";
            String tenantId = "YOUR_TENANT_ID";
            String clientId = "YOUR_CLIENT_ID";
            String certificatePath = "PATH_TO_YOUR_CERTIFICATE.pfx";
            String certificatePassword = "YOUR_CERTIFICATE_PASSWORD";
            // Initialize Spark session
            SparkSession spark = SparkSession.builder()
                    .appName("EventHubsSample")
                    .getOrCreate();
            // Load certificate
            byte[] certificateBytes = FileUtils.readFileToByteArray(new File(certificatePath));
            // Create confidential client application
            ConfidentialClientApplication app = ConfidentialClientApplication.builder(
                    clientId,
                    ClientCredentialFactory.createFromCertificate(
                            new ByteArrayInputStream(certificateBytes),
                            certificatePassword.toCharArray()
                    ))
                    .authority("https://login.microsoftonline.com/" + tenantId)
                    .build();
            // Get the access token
            ClientCredentialParameters parameters = ClientCredentialParameters.builder(
                    Collections.singleton("https://eventhubs.azure.net/.default"))
                    .build();
            CompletableFuture<IAuthenticationResult> future = app.acquireToken(parameters);
            IAuthenticationResult result = future.get();
            String token = result.accessToken();
            // Create EventHubs connection string
            ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder()
                    .setNamespaceName(eventHubsNamespace)
                    .setEventHubName(eventHubName)
                    .setSasKeyName("RootManageSharedAccessKey")
                    .setSasKey(token);
            // Create EventHubs configuration
            EventHubsConf eventHubsConf = EventHubsConf(connectionStringBuilder.toString())
                    .setConsumerGroup("$Default")
                    .setStartingPosition(EventPosition.fromEndOfStream());
            // Read from EventHubs
            spark.readStream()
                    .format("eventhubs")
                    .options(eventHubsConf.toMap())
                    .load()
                    .writeStream()
                    .format("console")
                    .trigger(Trigger.ProcessingTime("5 seconds"))
                    .start()
                    .awaitTermination();
        }
    }
    

    This will help you to:

    • Initialization
    • Authentication
    • EventHubs Configuration and
    • Stream Reading.

    Hope to read if this solves the issue. Though, I have another perspective for the solution.

    Best Regards,

    Sina

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.