Colección distribuida de datos agrupados en columnas con nombre.
Un DataFrame es equivalente a una tabla relacional en Spark SQL y se puede crear mediante varias funciones en SparkSession.
Importante
Una trama de datos no debe crearse directamente mediante el constructor .
Admite Spark Connect
Propiedades
| Propiedad |
Descripción |
sparkSession |
Devuelve SparkSession que creó este DataFrame. |
rdd |
Devuelve el contenido como un RDD de fila (solo en modo clásico). |
na |
Devuelve un objeto DataFrameNaFunctions para controlar los valores que faltan. |
stat |
Devuelve una clase DataFrameStatFunctions para las funciones estadísticas. |
write |
Interfaz para guardar el contenido del dataframe que no es de streaming en el almacenamiento externo. |
writeStream |
Interfaz para guardar el contenido del dataframe de streaming en el almacenamiento externo. |
schema |
Devuelve el esquema de este DataFrame como StructType. |
dtypes |
Devuelve todos los nombres de columna y sus tipos de datos como una lista. |
columns |
Recupera los nombres de todas las columnas de DataFrame como una lista. |
storageLevel |
Obtenga el nivel de almacenamiento actual de DataFrame. |
isStreaming |
Devuelve True si este DataFrame contiene uno o varios orígenes que devuelven datos continuamente a medida que llegan. |
executionInfo |
Devuelve un objeto ExecutionInfo después de ejecutar la consulta. |
plot |
Devuelve un pySparkPlotAccessor para las funciones de trazado. |
Methods
Visualización e inspección de datos
| Método |
Descripción |
toJSON(use_unicode) |
Convierte un DataFrame en un RDD de cadena o DataFrame. |
printSchema(level) |
Imprime el esquema en el formato de árbol. |
explain(extended, mode) |
Imprime los planes (lógicos y físicos) en la consola con fines de depuración. |
show(n, truncate, vertical) |
Imprime las primeras n filas del DataFrame en la consola. |
collect() |
Devuelve todos los registros del dataframe como una lista de filas. |
toLocalIterator(prefetchPartitions) |
Devuelve un iterador que contiene todas las filas de este DataFrame. |
take(num) |
Devuelve las primeras filas numéricos como una lista de filas. |
tail(num) |
Devuelve las últimas filas numéricos como una lista de filas. |
head(n) |
Devuelve las primeras n filas. |
first() |
Devuelve la primera fila como fila. |
count() |
Devuelve el número de filas de este DataFrame. |
isEmpty() |
Comprueba si el DataFrame está vacío y devuelve un valor booleano. |
describe(*cols) |
Calcula estadísticas básicas para columnas numéricas y de cadena. |
summary(*statistics) |
Calcula las estadísticas especificadas para las columnas numéricas y de cadena. |
Vistas temporales
Selección y proyección
| Método |
Descripción |
select(*cols) |
Proyecta un conjunto de expresiones y devuelve un nuevo dataframe. |
selectExpr(*expr) |
Proyecta un conjunto de expresiones SQL y devuelve un nuevo dataframe. |
filter(condition) |
Filtra las filas mediante la condición especificada. |
where(condition) |
Alias para filtro. |
drop(*cols) |
Devuelve un nuevo DataFrame sin columnas especificadas. |
toDF(*cols) |
Devuelve un nuevo DataFrame con nuevos nombres de columna especificados. |
withColumn(colName, col) |
Devuelve un nuevo DataFrame agregando una columna o reemplazando la columna existente que tiene el mismo nombre. |
withColumns(*colsMap) |
Devuelve un nuevo dataframe agregando varias columnas o reemplazando las columnas existentes que tienen los mismos nombres. |
withColumnRenamed(existing, new) |
Devuelve un nuevo dataframe cambiando el nombre de una columna existente. |
withColumnsRenamed(colsMap) |
Devuelve un nuevo dataframe cambiando el nombre de varias columnas. |
withMetadata(columnName, metadata) |
Devuelve un nuevo dataframe actualizando una columna existente con metadatos. |
metadataColumn(colName) |
Selecciona una columna de metadatos basada en su nombre de columna lógica y la devuelve como columna. |
colRegex(colName) |
Selecciona la columna en función del nombre de columna especificado como una expresión regular y la devuelve como Columna. |
Ordenación y ordenación
Agregación y agrupación
| Método |
Descripción |
groupBy(*cols) |
Agrupa la trama de datos por las columnas especificadas para que la agregación se pueda realizar en ellas. |
rollup(*cols) |
Cree un paquete acumulativo multidimensional para el dataframe actual mediante las columnas especificadas. |
cube(*cols) |
Cree un cubo multidimensional para el dataframe actual mediante las columnas especificadas. |
groupingSets(groupingSets, *cols) |
Cree una agregación multidimensional para el dataframe actual mediante los conjuntos de agrupación especificados. |
agg(*exprs) |
Agregado en toda la trama de datos sin grupos (abreviada para df.groupBy().agg()). |
observe(observation, *exprs) |
Defina (con nombre) las métricas que se van a observar en dataframe. |
Se une
Operaciones de conjunto
| Método |
Descripción |
union(other) |
Devuelve un nuevo DataFrame que contiene la unión de filas en este y otro DataFrame. |
unionByName(other, allowMissingColumns) |
Devuelve un nuevo DataFrame que contiene la unión de filas en este y otro DataFrame. |
intersect(other) |
Devuelve un nuevo dataframe que contiene filas solo en este dataframe y en otro dataframe. |
intersectAll(other) |
Devuelve un nuevo DataFrame que contiene filas tanto en esta trama de datos como en otra trama de datos, a la vez que conserva los duplicados. |
subtract(other) |
Devuelve un nuevo DataFrame que contiene filas en este DataFrame, pero no en otro DataFrame. |
exceptAll(other) |
Devuelve un nuevo dataframe que contiene filas en este dataframe, pero no en otro dataframe mientras conserva los duplicados. |
Eliminación de duplicados
| Método |
Descripción |
distinct() |
Devuelve un nuevo dataframe que contiene las filas distintas de este dataframe. |
dropDuplicates(subset) |
Devuelve un nuevo DataFrame con filas duplicadas quitadas, opcionalmente solo teniendo en cuenta ciertas columnas. |
dropDuplicatesWithinWatermark(subset) |
Devuelve un nuevo DataFrame con filas duplicadas eliminadas, opcionalmente solo teniendo en cuenta ciertas columnas, dentro de la marca de agua. |
Muestreo y división
Partición
| Método |
Descripción |
coalesce(numPartitions) |
Devuelve un nuevo DataFrame que tiene exactamente particiones numPartitions. |
repartition(numPartitions, *cols) |
Devuelve una nueva trama de datos particionada por las expresiones de partición especificadas. |
repartitionByRange(numPartitions, *cols) |
Devuelve una nueva trama de datos particionada por las expresiones de partición especificadas. |
repartitionById(numPartitions, partitionIdCol) |
Devuelve una nueva trama de datos particionada por la expresión de identificador de partición especificada. |
Reestructuración
Falta el control de datos
Funciones estadísticas
Operaciones de esquema
| Método |
Descripción |
to(schema) |
Devuelve un nuevo dataframe en el que cada fila se reconcilia para que coincida con el esquema especificado. |
alias(alias) |
Devuelve un nuevo dataframe con un conjunto de alias. |
Iteración
| Método |
Descripción |
foreach(f) |
Aplica la función f a todas las filas de este dataframe. |
foreachPartition(f) |
Aplica la función f a cada partición de este DataFrame. |
Almacenamiento en caché y persistencia
| Método |
Descripción |
cache() |
Conserva el dataframe con el nivel de almacenamiento predeterminado (MEMORY_AND_DISK_DESER). |
persist(storageLevel) |
Establece el nivel de almacenamiento para conservar el contenido del DataFrame en todas las operaciones. |
unpersist(blocking) |
Marca el DataFrame como no persistente y quita todos los bloques de memoria y disco. |
Creación de punto de comprobación
Operaciones de streaming
Sugerencias de optimización
Límites y desplazamientos
| Método |
Descripción |
limit(num) |
Limita el recuento de resultados al número especificado. |
offset(num) |
Devuelve un nuevo dataframe omitiendo las primeras n filas. |
Métodos de conversión
Escritura de datos
| Método |
Descripción |
writeTo(table) |
Cree un generador de configuración de escritura para orígenes v2. |
mergeInto(table, condition) |
Combina un conjunto de actualizaciones, inserciones y eliminaciones basadas en una tabla de origen en una tabla de destino. |
Comparación de tramas de datos
| Método |
Descripción |
sameSemantics(other) |
Devuelve True cuando los planes de consulta lógicos dentro de ambos DataFrames son iguales. |
semanticHash() |
Devuelve un código hash del plan de consulta lógica en este DataFrame. |
| Método |
Descripción |
inputFiles() |
Devuelve una instantánea de mejor esfuerzo de los archivos que componen este DataFrame. |
Características avanzadas de SQL
| Método |
Descripción |
isLocal() |
Devuelve True si los métodos collect y take se pueden ejecutar localmente. |
asTable() |
Convierte el objeto DataFrame en un objeto TableArg, que se puede usar como argumento de tabla en un TVF. |
scalar() |
Devuelve un objeto Column para una subconsulta SCALAR que contiene exactamente una fila y una columna. |
exists() |
Devuelve un objeto Column para una subconsulta EXISTS. |
Ejemplos
Operaciones básicas de DataFrame
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
Agregación y agrupación
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
Se une
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()