Compartir a través de


Conector de Azure Data Explorer (Kusto) para Apache Spark

El conector de Azure Data Explorer (Kusto) para Apache Spark está diseñado para transferir datos de forma eficaz entre clústeres de Kusto y Spark. Este conector está disponible en Python, Java y .NET.

Autenticación

Al usar cuadernos de Azure Synapse o definiciones de trabajos de Apache Spark, la autenticación entre sistemas se realiza sin problemas con el servicio vinculado. El servicio de token conecta con Microsoft Entra ID para obtener los tokens de seguridad que se van a usar al acceder al clúster de Kusto.

En las canalizaciones de Azure Synapse, la autenticación usa el nombre de entidad de seguridad de servicio. Actualmente, no se admiten identidades administradas con el conector de Azure Data Explorer.

Requisitos previos

Limitaciones

  • El servicio vinculado Azure Data Explorer solo se puede configurar con el nombre de entidad de seguridad de servicio.
  • En los cuadernos de Azure Synapse o las definiciones de trabajos de Apache Spark, el conector de Azure Data Explorer usa el tránsito de Microsoft Entra para conectarse al clúster de Kusto.

Uso del conector de Azure Data Explorer (Kusto)

En la sección siguiente se proporciona un ejemplo sencillo de cómo escribir datos en una tabla de Kusto y leer datos de ella. Consulte el proyecto del conector de Azure Data Explorer (Kusto) para obtener documentación detallada.

Lectura de datos

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .load()

display(kustoDf)

También puede leer por lotes con el modo de distribución forzada y otras opciones avanzadas. Para más información, puede consultar la referencia sobre las opciones de origen de Kusto.

crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()

kustoDf  = spark.read \
            .format("com.microsoft.kusto.spark.synapse.datasource") \
            .option("spark.synapse.linkedService", "<link service name>") \
            .option("kustoDatabase", "<Database name>") \
            .option("kustoQuery", "<KQL Query>") \
            .option("clientRequestPropertiesJson", crp.toString()) \
            .option("readMode", 'ForceDistributedMode') \
            .load()

display(kustoDf) 

Escritura de datos

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .mode("Append") \
    .save()

Además, también puede escribir datos por lotes proporcionando propiedades de ingesta adicionales. Para más información sobre las propiedades de ingesta admitidas, puede visitar el material de referencia sobre las propiedades de ingesta de Kusto.

extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none

sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
        False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)

df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "<link service name>") \
    .option("kustoDatabase", "<Database name>") \
    .option("kustoTable", "<Table name>") \
    .option("sparkIngestionPropertiesJson", sp.toString()) \
    .option("tableCreateOptions","CreateIfNotExist") \
    .mode("Append") \
    .save()

Pasos siguientes