Conector de Azure Data Explorer (Kusto) para Apache Spark
El conector de Azure Data Explorer (Kusto) para Apache Spark está diseñado para transferir datos de forma eficaz entre clústeres de Kusto y Spark. Este conector está disponible en Python, Java y .NET.
Autenticación
Al usar cuadernos de Azure Synapse o definiciones de trabajos de Apache Spark, la autenticación entre sistemas se realiza sin problemas con el servicio vinculado. El servicio de token conecta con Microsoft Entra ID para obtener los tokens de seguridad que se van a usar al acceder al clúster de Kusto.
En las canalizaciones de Azure Synapse, la autenticación usa el nombre de entidad de seguridad de servicio. Actualmente, no se admiten identidades administradas con el conector de Azure Data Explorer.
Requisitos previos
- Conectarse a Azure Data Explorer: debe configurar un servicio vinculado para conectarse a un clúster de Kusto existente.
Limitaciones
- El servicio vinculado Azure Data Explorer solo se puede configurar con el nombre de entidad de seguridad de servicio.
- En los cuadernos de Azure Synapse o las definiciones de trabajos de Apache Spark, el conector de Azure Data Explorer usa el tránsito de Microsoft Entra para conectarse al clúster de Kusto.
Uso del conector de Azure Data Explorer (Kusto)
En la sección siguiente se proporciona un ejemplo sencillo de cómo escribir datos en una tabla de Kusto y leer datos de ella. Consulte el proyecto del conector de Azure Data Explorer (Kusto) para obtener documentación detallada.
Lectura de datos
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.load()
display(kustoDf)
También puede leer por lotes con el modo de distribución forzada y otras opciones avanzadas. Para más información, puede consultar la referencia sobre las opciones de origen de Kusto.
crp = sc._jvm.com.microsoft.azure.kusto.data.ClientRequestProperties()
crp.setOption("norequesttimeout",True)
crp.toString()
kustoDf = spark.read \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoQuery", "<KQL Query>") \
.option("clientRequestPropertiesJson", crp.toString()) \
.option("readMode", 'ForceDistributedMode') \
.load()
display(kustoDf)
Escritura de datos
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.mode("Append") \
.save()
Además, también puede escribir datos por lotes proporcionando propiedades de ingesta adicionales. Para más información sobre las propiedades de ingesta admitidas, puede visitar el material de referencia sobre las propiedades de ingesta de Kusto.
extentsCreationTime = sc._jvm.org.joda.time.DateTime.now().plusDays(1)
csvMap = "[{\"Name\":\"ColA\",\"Ordinal\":0},{\"Name\":\"ColB\",\"Ordinal\":1}]"
# Alternatively use an existing csv mapping configured on the table and pass it as the last parameter of SparkIngestionProperties or use none
sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
False, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], extentsCreationTime, csvMap, None)
df.write \
.format("com.microsoft.kusto.spark.synapse.datasource") \
.option("spark.synapse.linkedService", "<link service name>") \
.option("kustoDatabase", "<Database name>") \
.option("kustoTable", "<Table name>") \
.option("sparkIngestionPropertiesJson", sp.toString()) \
.option("tableCreateOptions","CreateIfNotExist") \
.mode("Append") \
.save()