Partager via


Sécuriser Spark et Kafka – Scénario d’intégration de la diffusion en continu Spark

Dans ce document, vous apprenez à exécuter un travail Spark dans un cluster Spark sécurisé qui lit à partir d’une rubrique dans un cluster Kafka sécurisé, à condition que les réseaux virtuels soient identiques/appairés.

Conditions préalables

  • Créez un cluster Kafka sécurisé et un cluster Spark sécurisé avec le même domaine Microsoft Entra Domain Services et le même réseau virtuel. Si vous préférez ne pas créer les deux clusters dans le même réseau virtuel, vous pouvez les créer dans deux réseaux virtuels distincts et les associer également. Si vous préférez ne pas créer les deux clusters dans le même réseau virtuel.
  • Si vos clusters se trouvent dans des réseaux virtuels différents, consultez Connecter des réseaux virtuels à l’aide du peering de réseaux virtuels en utilisant le portail Azure
  • Créez des keytabs pour deux utilisateurs. Par exemple : alicetest et bobadmin.

Qu’est-ce qu’un keytab ?

Un keytab est un fichier contenant des paires de principes Kerberos et de clés chiffrées (qui sont dérivées du mot de passe Kerberos). Vous pouvez utiliser un fichier keytab pour vous authentifier auprès de différents systèmes distants à l’aide de Kerberos sans entrer de mot de passe.

Pour plus d’informations sur cette rubrique, consultez

  1. KTUTIL

  2. Création d’un principal Kerberos et d’un fichier keytab

ktutil
ktutil: addent -password -p user1@TEST.COM -k 1 -e RC4-HMAC
Password for user1@TEST.COM:
ktutil: wkt user1.keytab
ktutil: q
  1. Créez une application Java de diffusion en continu Spark qui lit à partir de rubriques Kafka. Ce document utilise un exemple DirectKafkaWorkCount basé sur des exemples de diffusion en continu Spark à partir de https://github.com/apache/spark/blob/branch-2.3/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java

Procédure pas à pas de haut niveau des scénarios

Configuration sur le cluster Kafka :

  1. Création de sujets alicetopic2, bobtopic2
  2. Produire des données vers les rubriques alicetopic2, bobtopic2
  3. Configurer la stratégie Ranger pour permettre à l’utilisateur alicetest de lire à partir de alicetopic*
  4. Configurer la stratégie Ranger pour permettre à l’utilisateur bobadmin de lire à partir de *

Scénarios à exécuter sur le cluster Spark

  1. Consommer des données de alicetopic2 en tant qu’utilisateur alicetest. Le travail Spark s’exécute correctement et le nombre de mots de la rubrique devrait être généré dans l’interface utilisateur YARN. Les enregistrements d’audit Ranger dans le cluster Kafka indiquent que l’accès est autorisé.
  2. Consommer des données de bobtopic2 en tant qu’utilisateur alicetest. Le travail Spark échoue avec org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [bobtopic2]. Les enregistrements d’audit Ranger dans le cluster Kafka indiquent que l’accès est refusé.
  3. Consommer des données de alicetopic2 en tant qu’utilisateur bobadmin. Le travail Spark s’exécute correctement et le nombre de mots de la rubrique devrait être généré dans l’interface utilisateur YARN. Les enregistrements d’audit Ranger dans le cluster Kafka indiquent que l’accès est autorisé.
  4. Consommer des données de bobtopic2 en tant qu’utilisateur bobadmin. Le travail Spark s’exécute correctement et le nombre de mots de la rubrique devrait être généré dans l’interface utilisateur YARN. Les enregistrements d’audit Ranger dans le cluster Kafka indiquent que l’accès est autorisé.

Étapes à effectuer sur le cluster Kafka

Dans le cluster Kafka, configurez des stratégies Ranger et produisez des données à partir du cluster Kafka, comme expliqué dans cette section

  1. Accédez à l’interface utilisateur Ranger sur le cluster Kafka et configurez deux stratégies Ranger

  2. Ajoutez une stratégie Ranger pour alicetest avec l’accès à la consommation aux rubriques avec un modèle générique alicetopic*

  3. Ajoutez une stratégie Ranger pour bobadmin avec tous les accès à toutes les rubriques avec un modèle générique *

  4. Exécuter les commandes ci-dessous en fonction de des valeurs de votre paramètre

    sshuser@hn0-umasec:~$ sudo apt -y install jq 
    sshuser@hn0-umasec:~$ export clusterName='YOUR_CLUSTER_NAME'
    sshuser@hn0-umasec:~$ export TOPICNAME='YOUR_TOPIC_NAME'
    sshuser@hn0-umasec:~$ export password='YOUR_SSH_PASSWORD'
    sshuser@hn0-umasec:~$ export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    sshuser@hn0-umasec:~$ export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    sshuser@hn0-umasec:~$ echo $KAFKABROKERS
    wn0-umasec.securehadooprc.onmicrosoft.com:9092,
    wn1-umasec.securehadooprc.onmicrosoft.com:9092
    
  5. Créez un keytab pour l’utilisateur bobadmin à l’aide de l’outil ktutil.

  6. Appelons ce fichier bobadmin.keytab

    sshuser@hn0-umasec:~$ ktutil
    ktutil: addent -password -p bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -k 1 -e RC4-HMAC 
    Password for <username>@<DOMAIN.COM>
    ktutil: wkt bobadmin.keytab 
    ktutil: q
    Kinit the created keytab
    sudo kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
    
  7. Créez une classe bobadmin_jaas.config

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./bobadmin.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM";
    };
    
  8. Créez des rubriques alicetopic2 et bobtopic2 en tant que bobadmin

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create alicetopic2 $KAFKABROKERS
    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar create bobtopic2 $KAFKABROKERS
    
  9. Produisez des données vers alicetopic2 en tant que bobadmin

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer alicetopic2 $KAFKABROKERS
    
  10. Produisez des données vers bobtopic2 en tant que bobadmin

    sshuser@hn0-umasec:~$ java -jar -Djava.security.auth.login.config=bobadmin_jaas.conf kafka-producer-consumer.jar producer bobadmin2 $KAFKABROKERS
    

Configurer les étapes à effectuer sur le cluster Spark

Dans le cluster Spark, ajoutez des entrées dans /etc/hosts dans les nœuds Worker Spark. Pour les nœuds Worker Kafka, créez des keytabs, des fichiers jaas_config et effectuez une soumission Spark pour envoyer un travail Spark à lire à partir de la rubrique Kafka :

  1. Accédez par SSH au cluster Spark avec les informations d’identification sshuser

  2. Créez des entrées pour les nœuds Worker Kafka dans le cluster Spark /etc/hosts.

    Notes

    Effectuez l’entrée de ces nœuds Worker Kafka dans chaque nœud Spark (nœud principal + nœud Worker). Vous pouvez obtenir ces détails à partir du cluster Kafka dans le fichier /etc/hosts du nœud principal de Kafka.

    10.3.16.118 wn0-umasec.securehadooprc.onmicrosoft.com wn0-umasec wn0-umasec.securehadooprc.onmicrosoft.com. wn0-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
    
    10.3.16.145 wn1-umasec.securehadooprc.onmicrosoft.com wn1-umasec wn1-umasec.securehadooprc.onmicrosoft.com. wn1-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
    
    10.3.16.176 wn2-umasec.securehadooprc.onmicrosoft.com wn2-umasec wn2-umasec.securehadooprc.onmicrosoft.com. wn2-umasec.cu1cgjaim53urggr4psrgczloa.cx.internal.cloudapp.net
    
  3. Créez un keytab pour l’utilisateur bobadmin à l’aide de l’outil ktutil. Appelons ce fichier bobadmin.keytab

  4. Créez un keytab pour l’utilisateur alicetest à l’aide de l’outil ktutil. Appelons ce fichier alicetest.keytab

  5. Créer un bobadmin_jaas.conf comme indiqué dans l’exemple suivant

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./bobadmin.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM";
    };
    
  6. Créer un alicetest_jaas.conf comme indiqué dans l’exemple suivant

    KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="./alicetest.keytab"
      useTicketCache=false
      serviceName="kafka"
      principal="alicetest@SECUREHADOOPRC.ONMICROSOFT.COM";
    };
    
  7. Préparez le fichier jar spark-streaming.

  8. Créez votre propre fichier jar qui lit à partir d’une rubrique Kafka en suivant l’exemple et les instructions ici pour DirectKafkaWorkCount

Notes

Pour plus de commodité, cet exemple de fichier jar utilisé dans cet exemple a été créé à partir de https://github.com/markgrover/spark-secure-kafka-app en suivant ces étapes.

sudo apt install maven
git clone https://github.com/markgrover/spark-secure-kafka-app.git
cd spark-secure-kafka-app
mvn clean package
cd target

Scénario 1

À partir d’un cluster Spark, lisez la rubrique Kafka alicetopic2, car l’utilisateur alicetest est autorisé

  1. Exécutez une commande kdestroy pour supprimer les tickets Kerberos dans le cache d’informations d’identification en exécutant la commande suivante

    sshuser@hn0-umaspa:~$ kdestroy
    
  2. Exécutez la commande suivante kinit avec alicetest

    sshuser@hn0-umaspa:~$ kinit alicetest@SECUREHADOOPRC.ONMICROSOFT.COM -t alicetest.keytab
    
  3. Exécutez une commande spark-submit pour lire la rubrique Kafka alicetopic2 en tant que alicetest

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    

    Par exemple,

    sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 alicetopic2 false
    

    Si vous voyez l’erreur suivante, cela indique un problème de DNS (Domain Name System). Veillez à vérifier l’entrée des nœuds Worker Kafka dans le fichier /etc/hosts dans le cluster Spark.

    Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7))
            at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:770)
            at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
            at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
            at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
    
  4. À partir de l’interface utilisateur YARN, accédez à la sortie du travail YARN. Vous pouvez voir que l’utilisateur alicetest est en mesure de lire à partir de alicetopic2. Vous pouvez voir le nombre de mots dans la sortie.

  5. Vous trouverez les étapes détaillées suivantes pour vérifier la sortie de l’application à partir de l’interface utilisateur YARN.

    1. Accédez à l’interface utilisateur YARN et ouvrez votre application. Attendez que le travail passe à l’état RUNNING. Les détails suivants de l’application s’affichent.

    2. Cliquez sur Journaux. La liste suivante de journaux s’affiche.

    3. Cliquez sur « stdout ». Vous verrez la sortie suivante avec le nombre de mots de votre rubrique Kafka.

    4. Dans l’interface utilisateur Ranger du cluster Kafka, les journaux d’audit correspondants s’affichent.

Scénario 2

À partir d’un cluster Spark, lisez la rubrique Kafka bobtopic2, car l’utilisateur alicetest est refusé

  1. Exécutez la commande kdestroy pour supprimer les tickets Kerberos dans le cache d’informations d’identification en émettant la commande suivante

    sshuser@hn0-umaspa:~$ kdestroy
    
  2. Exécutez la commande suivante kinit avec alicetest

    sshuser@hn0-umaspa:~$ kinit alicetest@SECUREHADOOPRC.ONMICROSOFT.COM -t alicetest.keytab
    
  3. Exécutez la commande spark-submit pour lire à partir de la rubrique Kafka bobtopic2 en tant que alicetest

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicetest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    

    Par exemple,

    sshuser@hn0-umaspa:~$ spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files alicetest_jaas.conf#alicetest_jaas.conf,alicetest.keytab#alicestest.keytab --driver-java-options "-Djava.security.auth.login.config=./alicetest_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./alicetest_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar 10.3.16.118:9092 bobtopic2 false
    
  4. À partir de l’interface utilisateur YARN, accédez à la sortie du travail YARN à partir de alicetest. Vous pouvez voir que l’utilisateur ne peut pas lire dans bobtopic2 et que le travail échoue.

  5. Dans l’interface utilisateur Ranger du cluster Kafka, les journaux d’audit correspondants s’affichent.

Scénario 3

À partir d’un cluster Spark, lisez la rubrique Kafka alicetopic2, car l’utilisateur bobadmin est autorisé

  1. Exécutez la commande kdestroy pour supprimer les tickets Kerberos dans le cache d’informations d’identification

    sshuser@hn0-umaspa:~$ kdestroy
    
  2. Exécutez la commande kinit avec bobadmin

    sshuser@hn0-umaspa:~$ kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
    
  3. Exécutez la commande spark-submit pour lire à partir d’une rubrique Kafka alicetopic2 en tant que bobadmin

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    

    Par exemple,

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 alicetopic2 false
    
  4. À partir de l’interface utilisateur YARN, accédez à la sortie du travail YARN. Vous pouvez voir que l’utilisateur bobadmin est en mesure de lire dans alicetopic2, et le nombre de mots est affiché dans la sortie.

  5. Dans l’interface utilisateur Ranger du cluster Kafka, les journaux d’audit correspondants s’affichent.

Scénario 4

À partir du cluster Spark, lisez la rubrique Kafka bobtopic2, car l’utilisateur bobadmin est autorisé.

  1. Supprimez les tickets Kerberos dans le cache d’informations d’identification en exécutant la commande suivante

    sshuser@hn0-umaspa:~$ kdestroy
    
  2. Exécuter kinit avec bobadmin

    sshuser@hn0-umaspa:~$ kinit bobadmin@SECUREHADOOPRC.ONMICROSOFT.COM -t bobadmin.keytab
    
  3. Exécutez une commande spark-submit pour lire la rubrique Kafka bobtopic2 en tant que bobadmin

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages <list of packages the jar depends on> --repositories <repository for the dependency packages> --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class <classname to execute in jar> --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" <path to jar> <kafkabrokerlist> <topicname> false
    

    Par exemple,

    spark-submit --num-executors 1 --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2.3.1.0.4-1 --repositories http://repo.hortonworks.com/content/repositories/releases/ --files bobadmin_jaas.conf#bobadmin_jaas.conf,bobadmin.keytab#bobadmin.keytab --driver-java-options "-Djava.security.auth.login.config=./bobadmin_jaas.conf" --class com.cloudera.spark.examples.DirectKafkaWordCount --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./bobadmin_jaas.conf" /home/sshuser/spark-secure-kafka-app/target/spark-secure-kafka-app-1.0-SNAPSHOT.jar wn0-umasec:9092, wn1-umasec:9092 bobtopic2 false
    
  4. À partir de l’interface utilisateur YARN, accédez à la sortie du travail YARN. Vous pouvez voir que l’utilisateur bobtest est en mesure de lire dans bobtopic2, et le nombre de mots est affiché dans la sortie.

  5. Dans l’interface utilisateur Ranger du cluster Kafka, les journaux d’audit correspondants s’affichent.

Étapes suivantes