Compartir a través de


Optimización del procesamiento de datos para Apache Spark

En este artículo se describe cómo optimizar la configuración del clúster de Apache Spark para obtener el mejor rendimiento en Azure HDInsight.

Visión general

Si tiene trabajos lentos en un Join o una reorganización, es probable que la causa sea el sesgo de datos. El sesgo de datos es una asimetría en los datos de tu trabajo. Por ejemplo, un trabajo de mapa puede tardar 20 segundos. Pero la ejecución de un trabajo en el que los datos se unen o se ordenan aleatoriamente tardan horas. Para corregir la asimetría de datos, debe agregar sal a toda la clave, o usar una sal aislada para ciertos subconjuntos de claves. Si usa una sal aislada, debe filtrar aún más para aislar el subconjunto de claves con sal en combinaciones de mapa. Otra opción consiste en introducir primero una columna de cubo y agregar previamente en cubos.

Otro factor que causa combinaciones lentas podría ser el tipo de combinación. De forma predeterminada, Spark usa el tipo de combinación SortMerge. Este tipo de combinación es más adecuado para grandes conjuntos de datos. Pero, de lo contrario, es costoso computacionalmente porque primero debe ordenar los lados izquierdo y derecho de los datos antes de combinarlos.

Una Broadcast combinación es más adecuada para conjuntos de datos más pequeños o donde un lado de la combinación es mucho menor que el otro. Este tipo de combinación transmite una de las partes a todos los ejecutores, por lo que requiere más memoria para la transmisión en general.

Puede cambiar el tipo de combinación en la configuración estableciendo spark.sql.autoBroadcastJoinThresholdo puede establecer una sugerencia de combinación mediante las API dataframe (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

Si usa tablas en cubos, tendrá un tercer tipo de combinación, la Merge combinación. Un conjunto de datos que ya esté particionado y ordenado correctamente omitirá la fase de ordenación costosa de un SortMerge join.

El orden de las combinaciones es importante, especialmente en consultas más complejas. Comience con las combinaciones más selectivas. Además, mueva combinaciones que aumenten el número de filas después de las agregaciones siempre que sea posible.

Para administrar el paralelismo de las combinaciones cartesianas, puede agregar estructuras anidadas, ventanas y, quizás, omitir uno o más pasos en la tarea de Spark.

Optimización de la ejecución de trabajos

  • Almacene en caché según sea necesario, por ejemplo, si usa los datos dos veces y, a continuación, lo almacena en caché.
  • Difundir variables a todos los ejecutores. Las variables solo se serializan una vez, lo que da lugar a búsquedas más rápidas.
  • Utilice el grupo de subprocesos en el controlador, lo que resulta en una operación más rápida para muchas tareas.

Supervise periódicamente los trabajos en ejecución para detectar problemas de rendimiento. Si necesita más información sobre determinados problemas, considere una de las siguientes herramientas de generación de perfiles de rendimiento:

La clave para el rendimiento de las consultas de Spark 2.x es el motor Tungsten, que depende de la generación de código de etapa completa. En algunos casos, se puede deshabilitar la generación de código de fase completa. Por ejemplo, si usa un tipo no mutable (string) en la expresión de agregación, SortAggregate aparece en lugar de HashAggregate. Por ejemplo, para mejorar el rendimiento, pruebe lo siguiente y vuelva a habilitar la generación de código:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

Pasos siguientes