Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
La clase sentinel_graph proporciona una manera de interactuar con el grafo de Microsoft Sentinel, lo que le permite definir el esquema del grafo, transformar los datos de la Microsoft Sentinel lago de datos en nodos y bordes, publicar un gráfico, un gráfico de consulta y ejecutar algoritmos avanzados de grafos. Esta clase está diseñada para trabajar con las sesiones de Spark en cuadernos de Jupyter Notebook que se ejecutan en Microsoft Sentinel proceso de Spark.
GraphSpecBuilder
La clase GraphSpecBuilder proporciona un generador fluido para crear especificaciones de grafos con canalizaciones de datos e integración de esquemas.
Importante
El GraphBuilder alias de esta clase está en desuso y se quitará en una versión futura. Use GraphSpecBuilder en todo el código nuevo.
# Deprecated — emits DeprecationWarning
from sentinel_graph.builders.graph_builder import GraphBuilder
# Recommended
from sentinel_graph import GraphSpecBuilder
Constructor
GraphSpecBuilder(context: ExecutionContext)
Parámetros:
-
context(ExecutionContext): contexto de ejecución que contiene la sesión y la configuración de Spark
Genera:
-
ValueError: si el contexto es Ninguno o no se puede determinar el nombre del gráfico
Métodos estáticos
start
GraphSpecBuilder.start(context: Optional[ExecutionContext] = None) -> GraphSpecBuilder
Defina un nuevo generador de grafos fluido.
Parámetros:
-
context(ExecutionContext, opcional): instancia de ExecutionContext. Si no es Ninguno, usa el contexto predeterminado.
Devuelve:
-
GraphSpecBuilder: nueva instancia del generador
Ejemplo:
builder = GraphSpecBuilder.start(context=context)
Métodos de instancia
add_node
def add_node(alias: str) -> NodeBuilderInitial
Empiece a crear una definición de nodo.
Parámetros:
-
alias(str): identificador único de este nodo dentro del gráfico
Devuelve:
-
NodeBuilderInitial: generador de nodos en estado inicial
Ejemplo:
builder.add_node("user")
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial
Empiece a crear una definición perimetral.
Parámetros:
-
alias(str): identificador de este borde dentro del gráfico (se puede compartir entre varios bordes)
Devuelve:
-
EdgeBuilderInitial: generador perimetral en estado inicial
Ejemplo:
builder.add_edge("accessed")
done
def done() -> GraphSpec
Finalice la especificación del grafo y devuelva la instancia de GraphSpec.
Devuelve:
-
GraphSpec: especificación completa del grafo con la canalización de datos y el esquema
Genera:
-
ValueError: si el gráfico no tiene nodos o bordes, o si se produce un error en la validación
Ejemplo:
graph_spec = builder.done()
GraphSpec
Especificación de grafos con capacidades de canalización, esquema y visualización de datos.
Constructor
GraphSpec(
name: str,
context: ExecutionContext,
graph_schema: GraphSchema,
etl_pipeline: Optional[ETLPipeline] = None
)
Parámetros:
-
name(str): nombre del gráfico -
context(ExecutionContext): contexto de ejecución -
graph_schema(GraphSchema): definición de esquema de grafo -
etl_pipeline(ETLPipeline, opcional): canalización de datos para la preparación del grafo
Propiedades
nodes
def nodes() -> DataFrame
Obtener dataframe de nodos (diferido, almacenado en caché). Determina automáticamente el origen de la canalización de datos o la tabla de lago.
Devuelve:
-
DataFrame: DataFrame de Spark que contiene todos los nodos
Genera:
-
ValueError: si falta el contexto o no se pueden cargar tramas de datos
edges
def edges() -> DataFrame
Obtención de bordes DataFrame (diferido, almacenado en caché). Determina automáticamente el origen de la canalización de datos o la tabla de lago.
Devuelve:
-
DataFrame: DataFrame de Spark que contiene todos los bordes
Genera:
-
ValueError: si falta el contexto o no se pueden cargar tramas de datos
Métodos
build_graph_with_data
Nota:
build_graph_with_data está en desuso y se quitará en una versión futura.
Use Graph.build(spec) en su lugar.
def build_graph_with_data() -> Dict[str, Any]
Ejecute la canalización de datos y publique el gráfico.
Llama internamente a Graph.build(self), guarda el valor devuelto Graphy devuelve un diccionario compatible con versiones anteriores.
Devuelve:
-
Dict[str, Any]: diccionario que contiene:-
etl_result: resultados de preparación de datos -
api_result: publicar resultados (si se ejecuta correctamente) -
api_error: cadena de error (si se produjo un error en la publicación) -
instance_name: nombre de la instancia de Graph -
status:"published"o"prepared"
-
Ejemplo:
graph = Graph.build(spec)
print(f"Status: {graph.build_status.status}")
get_schema
def get_schema() -> GraphSchema
Obtenga el esquema del grafo.
Devuelve:
-
GraphSchema: definición de esquema de grafo
get_pipeline
Nota:
Este método está en desuso y se quitará en una versión futura. La canalización de datos es un detalle interno de implementación y no se debe tener acceso directamente.
def get_pipeline() -> Optional[ETLPipeline]
Obtenga la canalización de datos (ninguno para los gráficos existentes).
Devuelve:
-
ETLPipelineoNone: canalización de datos si está disponible
to_graphframe
def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame
Convierta todo el gráfico en GraphFrame para ejecutar algoritmos de grafos. Funciona solo en datos locales (desde una canalización de datos o una tabla de lago).
Parámetros:
-
column_mapping(Dict[str, str], opcional): asignación de columnas personalizada con claves:-
"id": nombre de columna del identificador de vértice -
"source_id": nombre de columna del identificador de origen de Edge -
"target_id": nombre de columna del identificador de destino de Edge
-
Devuelve:
-
GraphFrame: objeto GraphFrame con todos los vértices y bordes
Genera:
-
ValueError: si ExecutionContext no está disponible
Ejemplo:
gf = graph_spec.to_graphframe()
pagerank = gf.pageRank(resetProbability=0.15, maxIter=10)
show
def show(limit: int = 100, viz_format: str = "visual") -> None
Mostrar datos de gráficos en varios formatos.
Parámetros:
-
limit(int, default=100): número máximo de nodos/bordes que se van a mostrar -
viz_format(str, default="visual"): formato de salida-
"table": tablas dataframe completas (todas las columnas) -
"visual": visualización interactiva de gráficos -
"all": mostrar todos los formatos
-
Genera:
-
ValueError: si el formato no es uno de los valores admitidos
Ejemplo:
graph_spec.show(limit=50, viz_format="table")
show_schema
def show_schema() -> None
Mostrar el esquema del grafo como una visualización interactiva de grafos.
Ejemplo:
spec.show_schema()
Graph
Instancia de gráfico consultable. Creado a través de Graph.get() (gráfico existente) o Graph.build() (a partir de ).GraphSpec
Constructor
Graph(
name: str,
context: ExecutionContext,
spec: Optional[GraphSpec] = None,
build_status: Optional[BuildStatus] = None,
)
Parámetros:
-
name(str): nombre del gráfico -
context(ExecutionContext): contexto de ejecución -
spec(GraphSpec, opcional): especificación de gráfico adjunta (establecida porGraph.build()) -
build_status(BuildStatus, opcional): metadatos de resultado de compilación (establecidos porGraph.build())
Genera:
-
ValueError: si ExecutionContext es None
Métodos estáticos
get
Graph.get(name: str, context: Optional[ExecutionContext] = None) -> Graph
Obtenga una instancia de grafo de un grafo existente.
El valor devuelto Graph tiene spec=None y build_status=None.
Parámetros:
-
name(str): nombre de instancia de Graph -
context(ExecutionContext, opcional): contexto de ejecución (el valor predeterminado es ExecutionContext.default())
Devuelve:
-
Graph: instancia de Graph
Genera:
-
ValueError: si el nombre del gráfico está vacío o la instancia del grafo no existe
Ejemplo:
graph = Graph.get("my_graph", context=context)
graph.query("MATCH (n) RETURN n")
prepare
Graph.prepare(spec: GraphSpec) -> Graph
Ejecute la fase de preparación de datos para sinGraphSpec publicar. Use publish() después para registrar el gráfico y hacer que sea consultable.
Parámetros:
-
spec(GraphSpec): especificación de grafo para preparar
Devuelve:
-
Graph: instancia de Graph conspecasociado ybuild_status.status == "prepared"
Genera:
-
ValueError: si la especificación no tiene ninguna canalización de datos o no tiene contexto de ejecución -
RuntimeError: si se produce un error en la ejecución de la canalización de datos
Ejemplo:
spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.prepare(spec)
# Inspect results before publishing
graph.nodes.show()
graph.publish()
graph.query("MATCH (n) RETURN n")
build
Graph.build(spec: GraphSpec) -> Graph
Cree un gráfico a partir de mediante la GraphSpec preparación de datos y la publicación. Internamente llama a Graph.prepare(spec) y, a continuación, intenta graph.publish(). A diferencia de llamar a esos dos métodos por separado, se detectan errores de publicación: el gráfico devuelto tiene build_status.status == "prepared" y build_status.api_error se establece en lugar de generar.
Parámetros:
-
spec(GraphSpec): especificación de grafo a partir de la que compilar
Devuelve:
-
Graph: instancia de Graph conspecadjunta ybuild_statusrellenada
Genera:
-
ValueError: si la especificación no tiene ninguna canalización de datos o no tiene contexto de ejecución -
RuntimeError: si se produce un error en la ejecución de la canalización de datos
Ejemplo:
spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.build(spec)
print(graph.build_status.status) # "published" or "prepared" (None if neither ran)
graph.query("MATCH (n) RETURN n")
Propiedades
nodes
def nodes() -> Optional[DataFrame]
Obtención de nodos DataFrame. Delega a self.spec.nodes cuando se adjunta una especificación; devuelve None en caso contrario.
edges
def edges() -> Optional[DataFrame]
Obtener bordes DataFrame. Delega a self.spec.edges cuando se adjunta una especificación; devuelve None en caso contrario.
schema
def schema() -> Optional[GraphSchema]
Obtener esquema de grafo. Delega a self.spec.get_schema() cuando se adjunta una especificación; devuelve None en caso contrario.
Métodos
query
def query(query_string: str, query_language: str = "GQL") -> QueryResult
Ejecute una consulta en la instancia de grafo mediante GQL.
Parámetros:
-
query_string(str): cadena de consulta de Graph (lenguaje GQL) -
query_language(str, default="GQL"): lenguaje de consulta
Devuelve:
-
QueryResult: objeto que contiene nodos, bordes y metadatos
Genera:
-
ValueError: si falta la sesión de ExecutionContext o Spark -
RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas
Ejemplo:
result = graph.query("MATCH (u:user) WHERE u.age > 30 RETURN u")
result.show()
reachability
def reachability(
*,
source_property_value: str = None,
target_property_value: str = None,
source_property: Optional[str] = None,
participating_source_node_labels: Optional[List[str]] = None,
target_property: Optional[str] = None,
participating_target_node_labels: Optional[List[str]] = None,
participating_edge_labels: Optional[List[str]] = None,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
[! NOTA]
reachability(query_input=ReachabilityQueryInput(...))todavía se acepta, pero emiteDeprecationWarningy se quitará en una versión futura.
Realice un análisis de accesibilidad entre los nodos de origen y de destino.
Parámetros:
-
source_property_value(str): valor que debe coincidir con la propiedad de origen (validada en tiempo de ejecución. Debe proporcionarse cuando no se usaquery_input) -
target_property_value(str): valor que debe coincidir con la propiedad de destino (validada en tiempo de ejecución. Debe proporcionarse cuando no se usaquery_input) -
source_property(Opcional[str]): nombre de propiedad para filtrar los nodos de origen -
participating_source_node_labels(Opcional[List[str]]): etiquetas de nodo que se deben considerar como orígenes -
target_property(Opcional[str]): nombre de propiedad para filtrar los nodos de destino -
participating_target_node_labels(Opcional[List[str]]): etiquetas de nodo que se deben considerar como destinos -
participating_edge_labels(Opcional[List[str]]): etiquetas perimetrales que se van a recorrer -
is_directional(bool): indica si los bordes son direccionales (valor predeterminado:True) -
min_hop_count(int): saltos mínimos (valor predeterminado:1) -
max_hop_count(int): saltos máximos (valor predeterminado:4) -
shortest_path(bool): devuelve solo las rutas de acceso más cortas (valor predeterminado:False) -
max_results(int): resultados máximos (valor predeterminado:500)
Genera:
-
ValueError: sisource_property_valuetarget_property_valueo falta,min_hop_count < 1,max_hop_count < min_hop_countomax_results < 1 -
RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas
Devuelve:
-
QueryResult: contiene las rutas de acceso
Ejemplo:
result = graph.reachability(
source_property_value="user-001",
target_property_value="device-003")
result.show()
k_hop
def k_hop(
*,
source_property: Optional[str] = None,
source_property_value: Optional[str] = None,
participating_source_node_labels: Optional[List[str]] = None,
target_property: Optional[str] = None,
target_property_value: Optional[str] = None,
participating_target_node_labels: Optional[List[str]] = None,
participating_edge_labels: Optional[List[str]] = None,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
Nota:
k_hop(query_input=K_HopQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.
Realice el análisis de k-hop desde un nodo de origen determinado.
Parámetros:
- Igual que
reachability
Validación:
- Al menos uno de
source_property_valueotarget_property_valuedebe proporcionarse
Genera:
-
ValueError: si no se proporciona nisource_property_valuetarget_property_value, o si se infringen restricciones numéricas (igualreachabilityque ) -
RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas
Devuelve:
-
QueryResult: contiene los resultados de k-hop
Ejemplo:
result = graph.k_hop(source_property_value="user-001")
result.show()
blast_radius
def blast_radius(
*,
source_property_value: str = None,
target_property_value: str = None,
source_property: Optional[str] = None,
participating_source_node_labels: Optional[List[str]] = None,
target_property: Optional[str] = None,
participating_target_node_labels: Optional[List[str]] = None,
participating_edge_labels: Optional[List[str]] = None,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
Nota:
blast_radius(query_input=BlastRadiusQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.
Realice un análisis de radio de expansión desde el nodo de origen al nodo de destino.
Parámetros:
-
source_property_value(str): valor que identifica el nodo de origen (validado en tiempo de ejecución. Debe proporcionarse cuando no se usaquery_input) -
target_property_value(str): valor que identifica el nodo de destino (validado en tiempo de ejecución. Debe proporcionarse cuando no se usaquery_input) - Otros parámetros: igual que
reachability
Genera:
-
ValueError: sisource_property_valuefalta otarget_property_value, o si se infringen restricciones numéricas (igualreachabilityque ) -
RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas
Devuelve:
-
QueryResult: contiene los resultados del radio de expansión
Ejemplo:
result = graph.blast_radius(
source_property_value="user-003",
target_property_value="device-003",
min_hop_count=1)
result.show()
centrality
def centrality(
*,
participating_source_node_labels: Optional[List[str]] = None,
participating_target_node_labels: Optional[List[str]] = None,
participating_edge_labels: Optional[List[str]] = None,
threshold: int = 3,
centrality_type: CentralityType = None,
max_paths: int = 1000000,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
Nota:
centrality(query_input=CentralityQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.
Realice un análisis de centralidad en el gráfico.
Parámetros:
-
participating_source_node_labels(Opcional[List[str]]): etiquetas de nodo de origen -
participating_target_node_labels(Opcional[List[str]]): Etiquetas de nodo de destino -
participating_edge_labels(Opcional[List[str]]): etiquetas perimetrales que se van a recorrer -
threshold(int): puntuación de centralidad mínima (valor predeterminado:3); debe ser no negativa. -
centrality_type(CentralityType):CentralityType.NodeoCentralityType.Edge(valor predeterminado:None, vuelve aCentralityType.Node) -
max_paths(int): las rutas de acceso máximas que se deben tener en cuenta (valor predeterminado:1000000;0= todas las rutas de acceso); deben ser no negativas. -
is_directional(bool): indica si los bordes son direccionales (valor predeterminado:True) -
min_hop_count(int): los saltos mínimos (valor predeterminado:1); deben ser ≥ 1 -
max_hop_count(int): los saltos máximos (valor predeterminado:4); deben ser ≥min_hop_count -
shortest_path(bool): devuelve solo las rutas de acceso más cortas (valor predeterminado:False) -
max_results(int): los resultados máximos (valor predeterminado:500); deben ser ≥ 1
Genera:
-
ValueError: sithreshold < 0es ,max_paths < 0,min_hop_count < 1,max_hop_count < min_hop_countomax_results < 1 -
RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas
Devuelve:
-
QueryResult: contiene las métricas de centralidad
Ejemplo:
result = graph.centrality(
participating_source_node_labels=["user", "device"],
participating_target_node_labels=["device", "user"],
participating_edge_labels=["sign_in"],
is_directional=False)
result.show()
ranked
def ranked(
*,
rank_property_name: str = None,
threshold: int = 0,
max_paths: int = 1000000,
decay_factor: float = 1,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
Nota:
ranked(query_input=RankedQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.
Realice un análisis clasificado en el gráfico.
Parámetros:
-
rank_property_name(str): nombre de propiedad que se va a usar para la clasificación (validado en tiempo de ejecución. Debe proporcionarse cuando no se usaquery_input) -
threshold(int): solo las rutas de acceso devueltas por encima de este peso (valor predeterminado:0); deben ser no negativas. -
max_paths(int): las rutas de acceso máximas que se deben tener en cuenta (valor predeterminado:1000000;0= todas las rutas de acceso); deben ser no negativas. -
decay_factor(float): decaimiento de clasificación por paso; 2 significa que la reducción a la mitad (valor predeterminado:1); debe ser no negativo. -
is_directional(bool): indica si los bordes son direccionales (valor predeterminado:True) -
min_hop_count(int): los saltos mínimos (valor predeterminado:1); deben ser ≥ 1 -
max_hop_count(int): los saltos máximos (valor predeterminado:4); deben ser ≥min_hop_count -
shortest_path(bool): devuelve solo las rutas de acceso más cortas (valor predeterminado:False) -
max_results(int): los resultados máximos (valor predeterminado:500); deben ser ≥ 1
Genera:
-
ValueError: sirank_property_namefalta,threshold < 0,max_paths < 0,decay_factor < 0,min_hop_count < 1,max_hop_count < min_hop_countomax_results < 1 -
RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas
Devuelve:
-
QueryResult: contiene los nodos o bordes clasificados
Ejemplo:
result = graph.ranked(
rank_property_name="risk_score",
threshold=5,
decay_factor=2)
result.show()
to_graphframe
def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame
Convierta todo el gráfico en GraphFrame. Usa datos de especificación cuando está disponible; lee de las tablas de lago en caso contrario.
Parámetros:
-
column_mapping(Dict[str, str], optional): Asignación de columnas personalizada
Devuelve:
-
GraphFrame: objeto GraphFrame con todos los vértices y bordes
Ejemplo:
gf = graph.to_graphframe()
show
def show() -> None
Mostrar información del gráfico. Delega en spec.show() para una visualización enriquecida cuando se adjunta una especificación; imprime información mínima en caso contrario.
show_schema
def show_schema() -> None
Mostrar esquema de gráfico. Delega a spec.show_schema() cuando se adjunta una especificación; imprime un mensaje que indica que no hay ningún esquema disponible en caso contrario.
publish
(novedades de la versión 0.3.3)
def publish() -> Graph
Registre el gráfico con la API, lo que hace que sea consultable. Llame a esto después Graph.prepare() de (o en cualquiera Graph que tenga una especificación adjunta) para publicar la instancia de grafo.
Devuelve:
-
Graph: auto para el encadenamiento de métodos
Genera:
-
ValueError: si no hay ninguna especificación adjunta o falta el contexto -
RuntimeError: si se produce un error en la publicación
Ejemplo:
graph = Graph.prepare(spec)
graph.publish()
# Now the graph is queryable
graph.query("MATCH (n) RETURN n")
BuildStatus
Clase de datos que transporta metadatos de una Graph.build() operación.
Fields
| Campo | Tipo | Description |
|---|---|---|
etl_result |
Any |
Resultado de la prepare fase (ejecución de canalización de datos) |
api_result |
Optional[Dict] |
Resultado de la fase de publicación (None si se produce un error en la publicación) |
api_error |
Optional[str] |
Mensaje de error si se produjo un error en la publicación (None si la publicación se realizó correctamente) |
instance_name |
str |
Nombre de la instancia del gráfico |
status |
Optional[BuildStatusKind] |
None, "published" o "prepared" |
Rutas de construcción
GraphSpecBuilder.start(...).done() → GraphSpec (spec only, no graph yet)
Graph.get(name, context) → Graph (spec=None, build_status=None)
Graph.prepare(spec) → Graph (spec=spec, build_status.status="prepared")
graph.publish() → Graph (build_status.status="published")
Graph.build(spec) → Graph (prepare + publish in one step)
Ejemplo:
graph = Graph.build(spec)
if graph.build_status.status == "published":
print("Graph prepared and published successfully")
elif graph.build_status.status == "prepared":
print(f"Prepare succeeded but publish failed: {graph.build_status.api_error}")
elif graph.build_status.status is None:
print("Neither prepare nor publish has run")
Generadores de nodos
NodeBuilderInitial
Estado inicial del generador de nodos: solo están disponibles los métodos de origen de datos.
Constructor
NodeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)
Nota: Normalmente se crea a través de GraphSpecBuilder.add_node(), no se crean instancias directamente.
Métodos
Nota:
No se admite el uso de caracteres de subrayado _ al asignar nombres a nodos, bordes o propiedades en un gráfico personalizado. Se devuelve un error de solicitud no válido cuando se usan caracteres de subrayado.
from_table
def from_table(table_name: str, database: Optional[str] = None) -> NodeBuilderSourceSet
Establezca la tabla como origen de datos con resolución de base de datos inteligente.
Parámetros:
-
table_name(str): nombre de la tabla (obligatorio) -
database(str, opcional): nombre explícito de la base de datos (tiene prioridad sobre el valor predeterminado del contexto)
Devuelve:
-
NodeBuilderSourceSet: Generador para una configuración adicional
Genera:
-
ValueError: si no se encontró la tabla o se encontraron varias tablas en conflicto
Orden de resolución de la base de datos:
- Parámetro explícito
database(prioridad más alta) - ExecutionContext.default_database
- Buscar en todas las bases de datos (con detección de conflictos)
Ejemplo:
builder.add_node("user").from_table("SigninLogs", database="security_db")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> NodeBuilderSourceSet
Establezca DataFrame de Spark como origen de datos.
Parámetros:
-
dataframe(DataFrame): DataFrame de Spark
Devuelve:
-
NodeBuilderSourceSet: Generador para una configuración adicional
Ejemplo:
df = spark.read.table("users")
builder.add_node("user").from_dataframe(df)
NodeBuilderSourceSet
Generador de nodos después de establecer el origen de datos: métodos de configuración disponibles.
Constructor
NodeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)
Nota: Creado internamente por métodos de origen NodeBuilderInitial.
Métodos
with_time_range
def with_time_range(
time_column: str,
start_time: Optional[Union[str, datetime]] = None,
end_time: Optional[Union[str, datetime]] = None,
lookback_hours: Optional[float] = None
) -> NodeBuilderSourceSet
Aplique el filtrado del intervalo de tiempo al origen de datos del nodo.
Parámetros:
-
time_column(str): nombre de columna que contiene datos de marca de tiempo (obligatorio) -
start_time(str o datetime, opcional): fecha de inicio ('10/20/25', '2025-10-20' o objeto datetime) -
end_time(str o datetime, opcional): fecha de finalización (los mismos formatos que start_time) -
lookback_hours(float, opcional): horas para mirar atrás desde ahora
Devuelve:
-
NodeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si la columna de hora no se encuentra en el esquema de origen
Lógica de intervalo de tiempo:
- Si start_time y end_time proporcionados: úselos directamente.
- Si solo se proporciona lookback_hours: end=now, start=now-lookback_hours
- Si no se proporciona nada: sin filtrado de tiempo
- Si start/end AND lookback_hours: start/end tienen prioridad
Ejemplo:
# Explicit date range
builder.add_node("user").from_table("SigninLogs") \
.with_time_range(time_column="TimeGenerated", start_time="2025-01-01", end_time="2025-01-31")
# Lookback window
builder.add_node("user").from_table("SigninLogs") \
.with_time_range(time_column="TimeGenerated", lookback_hours=24)
with_label
def with_label(label: str) -> NodeBuilderSourceSet
Establezca la etiqueta de nodo (el valor predeterminado es alias si no se llama a ).
Parámetros:
-
label(str): etiqueta de nodo
Devuelve:
-
NodeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si la etiqueta ya está establecida
Ejemplo:
builder.add_node("u").from_table("Users").with_label("user")
with_columns
def with_columns(
*columns: str,
key: str,
display: str
) -> NodeBuilderSourceSet
Configure las columnas con la clave necesaria y la designación para mostrar.
Parámetros:
-
*columns(str): nombres de columna que se van a incluir (al menos uno necesario) -
key(str): nombre de columna que se va a marcar como clave (obligatorio, debe estar en columnas) -
display(str): nombre de columna que se marca como valor para mostrar (obligatorio, debe estar en columnas, puede ser igual que la clave)
Devuelve:
-
NodeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si se produce un error en la validación (columnas duplicadas, clave o pantalla que falta, etc.)
Notas:
Las propiedades se compilan automáticamente a partir de tipos de columna
La columna de filtro de tiempo se agrega automáticamente si se especifica
Los tipos de propiedad se deducen automáticamente del esquema de origen
Consulte Restricciones
Ejemplo:
builder.add_node("user").from_table("Users") \
.with_columns("id", "name", "email", "created_at", key="id", display="name")
add_node
def add_node(alias: str) -> NodeBuilderInitial
Finalice este nodo y empiece a compilar otro nodo.
Parámetros:
-
alias(str): alias para el nuevo nodo
Devuelve:
-
NodeBuilderInitial: nuevo generador de nodos
Ejemplo:
builder.add_node("user").from_table("Users") \
.with_columns("id", "name", key="id", display="name") \
.add_node("device")
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial
Finalice este nodo y empiece a crear un borde.
Parámetros:
-
alias(str): alias para el borde
Devuelve:
-
EdgeBuilderInitial: nuevo generador perimetral
Ejemplo:
builder.add_node("user").from_table("Users") \
.with_columns("id", "name", key="id", display="name") \
.add_edge("accessed")
done
def done() -> GraphSpec
Finalice este nodo y complete la especificación del grafo.
Devuelve:
-
GraphSpec: especificación completa del grafo
Ejemplo:
graph_spec = builder.add_node("user").from_table("Users") \
.with_columns("id", "name", key="id", display="name") \
.done()
Generadores perimetrales
EdgeBuilderInitial
Estado inicial para el generador perimetral: solo están disponibles los métodos de origen de datos.
Constructor
EdgeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)
Nota: Normalmente se crea a través de GraphSpecBuilder.add_edge(), no se crean instancias directamente.
Métodos
Nota:
No se admite el uso de caracteres de subrayado _ al asignar nombres a nodos, bordes o propiedades en un gráfico personalizado. Se devuelve un error de solicitud no válido cuando se usan caracteres de subrayado.
from_table
def from_table(table_name: str, database: Optional[str] = None) -> EdgeBuilderSourceSet
Establezca la tabla como origen de datos con resolución de base de datos inteligente.
Parámetros:
-
table_name(str): nombre de la tabla (obligatorio) -
database(str, opcional): nombre explícito de la base de datos
Devuelve:
-
EdgeBuilderSourceSet: Generador para una configuración adicional
Genera:
-
ValueError: si no se encontró la tabla o se encontraron varias tablas en conflicto
Ejemplo:
builder.add_edge("accessed").from_table("AccessLogs")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> EdgeBuilderSourceSet
Establezca DataFrame de Spark como origen de datos.
Parámetros:
-
dataframe(DataFrame): DataFrame de Spark
Devuelve:
-
EdgeBuilderSourceSet: Generador para una configuración adicional
Ejemplo:
df = spark.read.table("access_logs")
builder.add_edge("accessed").from_dataframe(df)
EdgeBuilderSourceSet
Generador perimetral después de establecer el origen de datos: métodos de configuración disponibles.
Constructor
EdgeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)
Nota: Creado internamente por los métodos de origen EdgeBuilderInitial.
Métodos
with_label
def with_label(label: str) -> EdgeBuilderSourceSet
Establezca el tipo de relación perimetral o la etiqueta (el valor predeterminado es alias si no se llama a ).
Parámetros:
-
label(str): etiqueta perimetral
Devuelve:
-
EdgeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si la etiqueta ya está establecida
Ejemplo:
builder.add_edge("rel").from_table("AccessLogs").with_label("ACCESSED")
edge_label
Nota:
Use with_label() en su lugar. Este método se quitará en una versión futura.
def edge_label(label: str) -> EdgeBuilderSourceSet
Establezca el tipo de relación perimetral o la etiqueta (el valor predeterminado es alias si no se llama a ).
Parámetros:
-
label(str): etiqueta perimetral
Devuelve:
-
EdgeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si la etiqueta ya está establecida
Ejemplo:
builder.add_edge("acc").from_table("AccessLogs").edge_label("accessed")
source
def source(id_column: str, node_type: str) -> EdgeBuilderSourceSet
Establezca el nodo de origen con la columna de identificador y la etiqueta.
Parámetros:
-
id_column(str): nombre de columna que contiene el identificador de nodo de origen -
node_type(str): etiqueta de nodo de origen
Devuelve:
-
EdgeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si el origen ya está establecido
Ejemplo:
builder.add_edge("accessed").from_table("AccessLogs") \
.source(id_column="user_id", node_type="user")
target
def target(id_column: str, node_type: str) -> EdgeBuilderSourceSet
Establezca el nodo de destino con la columna de identificador y la etiqueta.
Parámetros:
-
id_column(str): nombre de columna que contiene el identificador de nodo de destino -
node_type(str): etiqueta de nodo de destino
Devuelve:
-
EdgeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si el destino ya está establecido
Ejemplo:
builder.add_edge("accessed").from_table("AccessLogs") \
.source(id_column="user_id", node_type="user") \
.target(id_column="device_id", node_type="device")
with_time_range
def with_time_range(
time_column: str,
start_time: Optional[Union[str, datetime]] = None,
end_time: Optional[Union[str, datetime]] = None,
lookback_hours: Optional[float] = None
) -> EdgeBuilderSourceSet
Aplique el filtrado del intervalo de tiempo al origen de datos del perímetro.
Parámetros:
-
time_column(str): nombre de columna que contiene datos de marca de tiempo (obligatorio) -
start_time(str o datetime, opcional): fecha de inicio -
end_time(str o datetime, opcional): fecha de finalización -
lookback_hours(float, opcional): horas para mirar atrás desde ahora
Devuelve:
-
EdgeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si la columna de hora no se encuentra en el esquema de origen
Ejemplo:
builder.add_edge("accessed").from_table("AccessLogs") \
.with_time_range(time_column="TimeGenerated", lookback_hours=48)
with_columns
def with_columns(
*columns: str,
key: str,
display: str
) -> EdgeBuilderSourceSet
Configure las columnas con la clave necesaria y la designación para mostrar.
Parámetros:
-
*columns(str): nombres de columna que se van a incluir (al menos uno necesario) -
key(str): nombre de columna que se va a marcar como clave (obligatorio, debe estar en columnas) -
display(str): nombre de columna que se marca como valor para mostrar (obligatorio, debe estar en columnas)
Devuelve:
-
EdgeBuilderSourceSet: auto para el encadenamiento de métodos
Genera:
-
ValueError: si se produce un error en la validación
Ejemplo:
builder.add_edge("accessed").from_table("AccessLogs") \
.source(id_column="user_id", node_type="user") \
.target(id_column="device_id", node_type="device") \
.with_columns("id", "location", "status", key="id", display="location")
add_node
def add_node(alias: str) -> NodeBuilderInitial
Finalice este borde y empiece a compilar un nodo.
Parámetros:
-
alias(str): alias para el nuevo nodo
Devuelve:
-
NodeBuilderInitial: nuevo generador de nodos
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial
Termine este borde y empiece a crear otro borde.
Parámetros:
-
alias(str): alias para el nuevo borde
Devuelve:
-
EdgeBuilderInitial: nuevo generador perimetral
Ejemplo:
builder.add_edge("accessed").from_table("AccessLogs") \
.source(id_column="user_id", node_type="user") \
.target(id_column="device_id", node_type="device") \
.with_columns("id", "location", key="id", display="location") \
.add_edge("connected_to")
done
def done() -> GraphSpec
Finalice este borde y complete la especificación del grafo.
Devuelve:
-
GraphSpec: especificación completa del grafo
Clases de esquema
GraphDefinitionReference
Referencia a una definición de gráfico con nombre y versión.
Constructor
GraphDefinitionReference(
fully_qualified_name: str,
version: str
)
Parámetros:
-
fully_qualified_name(str): nombre completo del gráfico al que se hace referencia -
version(str): versión del gráfico al que se hace referencia
Genera:
-
ValueError: si fully_qualified_name o versión está vacía
Métodos
to_dict
def to_dict() -> Dict[str, Any]
Serializar en el diccionario.
Devuelve:
-
Dict[str, Any]: referencia serializada
Propiedad
Definición de propiedad con interfaz segura de tipos.
Constructor
Property(
name: str,
property_type: PropertyType,
is_non_null: bool = False,
description: str = "",
is_key: bool = False,
is_display_value: bool = False,
is_internal: bool = False
)
Parámetros:
-
name(str): nombre de propiedad -
property_type(PropertyType): tipo de datos de propiedad -
is_non_null(bool, default=False): si se requiere la propiedad -
description(str, default=""): descripción de la propiedad -
is_key(bool, default=False): si la propiedad es una clave -
is_display_value(bool, default=False): si la propiedad es el valor para mostrar -
is_internal(bool, default=False): si la propiedad es interna
Genera:
-
ValueError: si el nombre está vacío o se produce un error en la validación
Métodos de clase
key
@classmethod
Property.key(
name: str,
property_type: PropertyType,
description: str = "",
is_non_null: bool = False
) -> Property
Cree una propiedad de clave con valores comunes (is_key=True, is_display_value=True).
display
@classmethod
Property.display(
name: str,
property_type: PropertyType,
description: str = "",
is_non_null: bool = False
) -> Property
Cree una propiedad de valor para mostrar (is_display_value=True).
Métodos
describe
def describe(text: str) -> Property
Agregue la descripción con fluidez.
Parámetros:
-
text(str): texto de descripción
Devuelve:
-
Property: auto para el encadenamiento de métodos
to_dict
def to_dict() -> Dict[str, Any]
Serializar la propiedad en el diccionario con claves de anotación con prefijo @.
Devuelve:
-
Dict[str, Any]: propiedad serializada
to_gql
def to_gql() -> str
Genere la definición de la propiedad GQL.
Devuelve:
-
str: representación de cadena de GQL
EdgeNode
Referencia de nodo usada en definiciones perimetrales.
Constructor
EdgeNode(
alias: Optional[str] = None,
labels: List[str] = []
)
Parámetros:
-
alias(str, opcional): alias de nodo (se establece automáticamente en la primera etiqueta si ninguno o está vacío) -
labels(List[str]): etiquetas de nodo (al menos una necesaria)
Genera:
-
ValueError: si la lista de etiquetas está vacía -
TypeError: si las etiquetas no son cadenas
Mutación automática:
- Si alias es Ninguno o está vacío, se establece en la primera etiqueta.
Métodos
to_dict
def to_dict() -> Dict[str, Any]
Serializar en el diccionario.
Devuelve:
-
Dict[str, Any]: referencia de nodo perimetral serializado
Nodo
Definición de nodo con interfaz segura de tipos.
Constructor
Node(
alias: str = "",
labels: List[str] = [],
implies_labels: List[str] = [],
properties: List[Property] = [],
description: str = "",
entity_group: str = "",
dynamic_labels: bool = False,
abstract_edge_aliases: bool = False
)
Parámetros:
-
alias(str, default=""): alias de nodo (se establece automáticamente en la primera etiqueta si está vacío) -
labels(List[str]): etiquetas de nodo (al menos una necesaria) -
implies_labels(List[str], default=[]): etiquetas implícitas -
properties(List[Property], default=[]): propiedades del nodo -
description(str, default=""): descripción del nodo -
entity_group(str, default=""): nombre del grupo de entidades -
dynamic_labels(bool, default=False): si el nodo tiene etiquetas dinámicas -
abstract_edge_aliases(bool, default=False): indica si el nodo usa alias perimetrales abstractos
Genera:
-
ValueError: si se produce un error en la validación (sin etiquetas, sin propiedad de clave, sin propiedad display, etc.)
Mutación automática:
- Si el alias está vacío, se establece en la primera etiqueta.
- Si entity_group está vacío, se establece en la etiqueta principal.
Métodos
get_primary_label
def get_primary_label() -> Optional[str]
Obtenga la etiqueta principal (primero).
Devuelve:
-
stroNone: etiqueta principal o Ninguno si no hay etiquetas
get_entity_group_name
def get_entity_group_name() -> str
Obtenga el nombre del grupo de entidades o la reserva en la etiqueta principal.
Devuelve:
-
str: nombre del grupo de entidades
get_primary_key_property_name
def get_primary_key_property_name() -> Optional[str]
Obtenga el nombre de la propiedad de clave principal.
Devuelve:
-
stroNone: nombre de propiedad de clave principal
get_properties
def get_properties() -> Dict[str, Property]
Obtenga propiedades como diccionario para facilitar el acceso.
Devuelve:
-
Dict[str, Property]: propiedades con clave por nombre
get_property
def get_property(name: str) -> Optional[Property]
Obtenga una propiedad específica por nombre.
Parámetros:
-
name(str): nombre de propiedad
Devuelve:
-
PropertyoNone: propiedad si se encuentra
add_property
def add_property(prop: Property) -> None
Agregue una propiedad a este nodo.
Parámetros:
-
prop(Propiedad): propiedad que se va a agregar
Genera:
-
ValueError: si el nombre de la propiedad está duplicado
is_dynamically_labeled
def is_dynamically_labeled() -> bool
Compruebe si el nodo tiene etiquetas dinámicas.
Devuelve:
-
bool: True si las etiquetas dinámicas están habilitadas
is_abstract_edge_node_aliases
def is_abstract_edge_node_aliases() -> bool
Compruebe si el nodo usa alias de nodo perimetral abstractos.
Devuelve:
-
bool: True si los alias de borde abstracto están habilitados
describe
def describe(text: str) -> Node
Agregue la descripción con fluidez.
Parámetros:
-
text(str): texto de descripción
Devuelve:
-
Node: auto para el encadenamiento de métodos
to_dict
def to_dict() -> Dict[str, Any]
Serializar el nodo en el diccionario.
Devuelve:
-
Dict[str, Any]: nodo serializado
to_gql
def to_gql() -> str
Generar definición de nodo de GQL.
Devuelve:
-
str: representación de cadena de GQL
Genera:
-
ValueError: si el nodo no tiene los campos necesarios para GQL
Métodos de clase
create
@classmethod
Node.create(
alias: str,
labels: List[str],
properties: List[Property],
description: str = "",
entity_group: str = "",
**kwargs
) -> Node
Cree un nodo con todos los campos necesarios.
Parámetros:
-
alias(str): alias de nodo -
labels(List[str]): etiquetas de nodo -
properties(List[Property]): propiedades del nodo -
description(str, default=""): descripción del nodo -
entity_group(str, default=""): nombre del grupo de entidades
Devuelve:
-
Node: nueva instancia de nodo
Microsoft Edge
Definición perimetral con interfaz segura de tipos.
Constructor
Edge(
relationship_type: str,
source_node_label: str,
target_node_label: str,
direction: EdgeDirection = EdgeDirection.DIRECTED_RIGHT,
properties: List[Property] = [],
description: str = "",
entity_group: str = "",
dynamic_type: bool = False
)
Parámetros:
-
relationship_type(str): tipo de relación perimetral (por ejemplo, "FOLLOWS", "OWNS") -
source_node_label(str): etiqueta de nodo de origen -
target_node_label(str): etiqueta de nodo de destino -
direction(EdgeDirection, default=DIRECTED_RIGHT): dirección perimetral -
properties(List[Property], default=[]): propiedades de Edge -
description(str, default=""): descripción de Edge -
entity_group(str, default=""): nombre del grupo de entidades -
dynamic_type(bool, default=False): si edge tiene tipo dinámico
Genera:
-
ValueError: si se produce un error en la validación
Mutación automática:
-
labelslist se rellena automáticamente con[relationship_type] - Si entity_group está vacío, se establece en relationship_type
Propiedades
edge_type
def edge_type() -> str
Alias de compatibilidad con versiones anteriores para relationship_type.
Devuelve:
-
str: tipo de relación
Métodos
get_entity_group_name
def get_entity_group_name() -> str
Obtenga el nombre del grupo de entidades o la reserva al tipo de relación.
Devuelve:
-
str: nombre del grupo de entidades
is_dynamic_type
def is_dynamic_type() -> bool
Compruebe si edge tiene un tipo dinámico.
Devuelve:
-
bool: True si el tipo dinámico
add_property
def add_property(edge_property: Property) -> None
Agregue una propiedad a este borde.
Parámetros:
-
edge_property(Propiedad): propiedad que se va a agregar
describe
def describe(text: str) -> Edge
Agregue la descripción con fluidez.
Parámetros:
-
text(str): texto de descripción
Devuelve:
-
Edge: auto para el encadenamiento de métodos
to_dict
def to_dict() -> Dict[str, Any]
Serializar el borde en el diccionario.
Devuelve:
-
Dict[str, Any]: borde serializado
to_gql
def to_gql() -> str
Generar definición perimetral de GQL.
Devuelve:
-
str: representación de cadena de GQL
Métodos de clase
create
Edge.create(
relationship_type: str,
source_node_label: str,
target_node_label: str,
properties: List[Property] = None,
description: str = "",
entity_group: str = "",
**kwargs
) -> Edge
Cree un borde con todos los campos necesarios.
Parámetros:
-
relationship_type(str): tipo de relación perimetral -
source_node_label(str): etiqueta de nodo de origen -
target_node_label(str): etiqueta de nodo de destino -
properties(List[Property], opcional): propiedades de Edge -
description(str, default=""): descripción de Edge -
entity_group(str, default=""): nombre del grupo de entidades
Devuelve:
-
Edge: nueva instancia perimetral
GraphSchema
Definición de esquema de grafo con interfaz segura de tipos.
Constructor
GraphSchema(
name: str,
nodes: List[Node] = [],
edges: List[Edge] = [],
base_graphs: List[GraphSchema] = [],
description: str = "",
version: str = "1.0",
fully_qualified_name: str = "",
namespace: str = ""
)
Parámetros:
-
name(str): nombre del esquema del gráfico -
nodes(List[Node], default=[]): definiciones de nodo -
edges(List[Edge], default=[]): definiciones de Edge -
base_graphs(List[GraphSchema], default=[]): esquemas de gráfico base -
description(str, default=""): descripción del esquema -
version(str, default="1.0"): versión del esquema -
fully_qualified_name(str, default=""): nombre completo -
namespace(str, default=""): Espacio de nombres
Genera:
-
ValueError: si se produce un error en la validación (alias duplicados, bordes hacen referencia a nodos inexistentes, etc.)
Métodos
get_fully_qualified_name
def get_fully_qualified_name() -> str
Obtenga el nombre completo.
Devuelve:
-
str: nombre completo
get_namespace
def get_namespace() -> str
Obtenga el espacio de nombres del nombre completo o devuelva el valor predeterminado.
Devuelve:
-
str: Espacio de nombres
get_version
def get_version() -> str
Obtener versión.
Devuelve:
-
str: cadena de versión
get_node
def get_node(label_or_alias: str) -> Optional[Node]
Obtenga el nodo por etiqueta o alias.
Parámetros:
-
label_or_alias(str): alias o etiqueta de nodo
Devuelve:
-
NodeoNone: nodo si se encuentra
get_edge
def get_edge(name: str) -> Optional[Edge]
Obtenga el borde por nombre o tipo.
Parámetros:
-
name(str): tipo de relación perimetral
Devuelve:
-
EdgeoNone: Edge si se encuentra
add_node
def add_node(node: Node) -> None
Agregue un nodo a este gráfico.
Parámetros:
-
node(Nodo): nodo que se va a agregar
Genera:
-
ValueError: si el alias del nodo está duplicado
add_edge
def add_edge(edge: Edge) -> None
Agregue un borde a este gráfico.
Parámetros:
-
edge(Edge): Edge que se va a agregar
Genera:
-
ValueError: si el tipo de borde está duplicado
include_graph
def include_graph(fully_qualified_name: str, version: str) -> GraphSchema
Agregar una inclusión de grafos (FLUENT API).
Parámetros:
-
fully_qualified_name(str): nombre completo del gráfico que se va a incluir -
version(str): versión del gráfico que se va a incluir
Devuelve:
-
GraphSchema: auto para el encadenamiento de métodos
get_included_graph_references
def get_included_graph_references() -> List[GraphDefinitionReference]
Obtenga una lista de las referencias de grafo incluidas.
Devuelve:
-
List[GraphDefinitionReference]: lista de referencias de definición de grafos
describe
def describe(text: str) -> GraphSchema
Agregue la descripción con fluidez.
Parámetros:
-
text(str): texto de descripción
Devuelve:
-
GraphSchema: auto para el encadenamiento de métodos
to_dict
def to_dict() -> Dict[str, Any]
Serializar el esquema en el diccionario.
Devuelve:
-
Dict[str, Any]: esquema serializado
to_json
def to_json(indent: int = 2) -> str
Generar representación JSON.
Parámetros:
-
indent(int, default=2): nivel de sangría JSON
Devuelve:
-
str: cadena JSON
to_gql
def to_gql() -> str
Generar definición de esquema de GQL.
Devuelve:
-
str: representación de cadena de GQL
Métodos de clase
create
@classmethod
GraphSchema.create(
name: str,
nodes: List[Node] = None,
edges: List[Edge] = None,
description: str = "",
version: str = "1.0",
**kwargs
) -> GraphSchema
Cree un esquema de grafo con todos los campos necesarios.
Parámetros:
-
name(str): nombre del esquema del gráfico -
nodes(List[Node], optional): Definiciones de nodo -
edges(List[Edge], opcional): Definiciones de Edge -
description(str, default=""): descripción del esquema -
version(str, default="1.0"): versión del esquema
Devuelve:
-
GraphSchema: nueva instancia de esquema de grafo
Clases de entrada de consulta
Clases de datos que representan parámetros de entrada para consultas de grafo predefinidas.
Nota:
Pasar QueryInput objetos directamente a Graph métodos de consulta está en desuso y se quitará en versiones futuras.
En su lugar, use argumentos de palabra clave. Los Graph métodos (reachability, k_hop, blast_radius, centrality, ranked) aceptan todos los parámetros como argumentos de palabra clave y construyen los objetos de entrada internamente. Estas clases permanecen en el código base por ahora, pero no deben usarse en código nuevo.
QueryInputBase
Clase base para todos los parámetros de entrada de consulta.
Métodos
to_json_payload
def to_json_payload() -> Dict[str, Any]
Convierta los parámetros de entrada en un diccionario para el envío de API.
Devuelve:
-
Dict[str, Any]: representación del diccionario de los parámetros de entrada
validate
def validate() -> None
Valide los parámetros de entrada.
Genera:
-
ValueError: si los parámetros de entrada no son válidos
ReachabilityQueryInput
Parámetros de entrada para una consulta de accesibilidad entre los nodos de origen y de destino.
Hereda de ReachabilityQueryInputBase la que hereda de QueryInputBase.
Fields
| Campo | Tipo | Predeterminado | Descripción |
|---|---|---|---|
source_property_value |
str |
(obligatorio) | Valor que debe coincidir con la propiedad de origen |
target_property_value |
str |
(obligatorio) | Valor que debe coincidir con la propiedad de destino |
source_property |
Optional[str] |
None |
Nombre de propiedad para filtrar nodos de origen |
participating_source_node_labels |
Optional[List[str]] |
None |
Etiquetas de nodo que se deben tener en cuenta como nodos de origen |
target_property |
Optional[str] |
None |
Nombre de propiedad para filtrar los nodos de destino |
participating_target_node_labels |
Optional[List[str]] |
None |
Etiquetas de nodo que se deben tener en cuenta como nodos de destino |
participating_edge_labels |
Optional[List[str]] |
None |
Etiquetas perimetrales que se recorrerán en la ruta de acceso |
is_directional |
Optional[bool] |
True |
Si los bordes son direccionales |
min_hop_count |
Optional[int] |
1 |
Número mínimo de saltos en la ruta de acceso |
max_hop_count |
Optional[int] |
4 |
Número máximo de saltos en la ruta de acceso |
shortest_path |
Optional[bool] |
False |
Si buscar solo la ruta de acceso más corta |
max_results |
Optional[int] |
500 |
Número máximo de resultados que se van a devolver |
Validación:
-
source_property_valuees necesario -
target_property_valuees necesario
Ejemplo:
# Preferred: keyword arguments (no import needed)
result = graph.reachability(
source_property="UserId",
source_property_value="user123",
target_property="DeviceId",
target_property_value="device456",
participating_edge_labels=["accessed", "connected_to"],
shortest_path=True
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import ReachabilityQueryInput
result = graph.reachability(query_input=ReachabilityQueryInput(
source_property_value="user123",
target_property_value="device456"
))
K_HopQueryInput
Parámetros de entrada para una consulta de k-hop desde un nodo de origen determinado.
Hereda de ReachabilityQueryInputBase.
Hereda todos los campos de ReachabilityQueryInput.
Validación:
- Al menos uno de
source_property_valueotarget_property_valuedebe proporcionarse
Ejemplo:
# Preferred: keyword arguments
result = graph.k_hop(
source_property_value="user123",
max_hop_count=3,
participating_edge_labels=["accessed"]
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import K_HopQueryInput
result = graph.k_hop(query_input=K_HopQueryInput(source_property_value="user123"))
BlastRadiusQueryInput
Parámetros de entrada para una consulta de radio de expansión de los nodos de origen a destino.
Hereda de ReachabilityQueryInputBase.
Hereda todos los campos de ReachabilityQueryInput, con los siguientes campos obligatorios:
| Campo | Tipo | Obligatorio | Descripción |
|---|---|---|---|
source_property_value |
str |
Sí | Valor para identificar el nodo de origen |
target_property_value |
str |
Sí | Valor para identificar el nodo de destino |
Validación:
-
source_property_valuees necesario -
target_property_valuees necesario
Ejemplo:
# Preferred: keyword arguments
result = graph.blast_radius(
source_property_value="user123",
target_property_value="device456",
participating_edge_labels=["accessed", "connected_to"]
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import BlastRadiusQueryInput
result = graph.blast_radius(query_input=BlastRadiusQueryInput(
source_property_value="user123",
target_property_value="device456"
))
CentralityQueryInput
Parámetros de entrada para una consulta de análisis de centralidad.
Hereda de QueryInputBase.
Enumeración CentralityType
| Valor | Description |
|---|---|
CentralityType.Node |
Centralidad del nodo de proceso |
CentralityType.Edge |
Centralidad perimetral de proceso |
Fields
| Campo | Tipo | Predeterminado | Descripción |
|---|---|---|---|
threshold |
Optional[int] |
3 |
Puntuación de centralidad mínima que se debe tener en cuenta |
centrality_type |
CentralityType |
CentralityType.Node |
Tipo de centralidad que se va a calcular |
max_paths |
Optional[int] |
1000000 |
Rutas de acceso máximas que se deben tener en cuenta (0 = todos) |
participating_source_node_labels |
Optional[List[str]] |
None |
Etiquetas de nodo de origen |
participating_target_node_labels |
Optional[List[str]] |
None |
Etiquetas de nodo de destino |
participating_edge_labels |
Optional[List[str]] |
None |
Etiquetas perimetrales que se van a recorrer |
is_directional |
Optional[bool] |
True |
Si los bordes son direccionales |
min_hop_count |
Optional[int] |
1 |
Saltos mínimos |
max_hop_count |
Optional[int] |
4 |
Máximo de saltos |
shortest_path |
Optional[bool] |
False |
Solo las rutas de acceso más cortas |
max_results |
Optional[int] |
500 |
Resultados máximos |
Ejemplo:
# Preferred: keyword arguments (works for all centrality types)
result = graph.centrality(
centrality_type=CentralityType.Edge, # or CentralityType.Node (default)
participating_edge_labels=["accessed", "connected_to"],
threshold=5,
max_results=100
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import CentralityQueryInput, CentralityType
result = graph.centrality(query_input=CentralityQueryInput(
centrality_type=CentralityType.Edge,
participating_edge_labels=["accessed"]
))
RankedQueryInput
Parámetros de entrada para una consulta de análisis clasificado.
Hereda de QueryInputBase.
Fields
| Campo | Tipo | Predeterminado | Descripción |
|---|---|---|---|
rank_property_name |
str |
(obligatorio) | Nombre de propiedad que se va a usar para las rutas de acceso de clasificación |
threshold |
Optional[int] |
0 |
Solo las rutas de acceso de retorno con pesos por encima de este valor |
max_paths |
Optional[int] |
1000000 |
Rutas de acceso máximas que se deben tener en cuenta (0 = todos) |
decay_factor |
Optional[float] |
1 |
Cuánto reduce cada paso de gráfico la clasificación (2 = mitad cada paso) |
is_directional |
Optional[bool] |
True |
Si los bordes son direccionales |
min_hop_count |
Optional[int] |
1 |
Saltos mínimos |
max_hop_count |
Optional[int] |
4 |
Máximo de saltos |
shortest_path |
Optional[bool] |
False |
Solo las rutas de acceso más cortas |
max_results |
Optional[int] |
500 |
Resultados máximos |
Ejemplo:
# Preferred: keyword arguments
result = graph.ranked(
rank_property_name="risk_score",
threshold=5,
decay_factor=2,
max_results=50
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import RankedQueryInput
result = graph.ranked(query_input=RankedQueryInput(
rank_property_name="risk_score",
threshold=5
))
Resultados de la consulta
QueryResult
Resultado de una consulta de grafo con acceso diferido a DataFrame.
Constructor
QueryResult(raw_response: Dict[str, Any], graph: Graph)
Parámetros:
-
raw_response(Dict[str, Any]): diccionario de respuesta de API sin procesar -
graph(Gráfico): Referencia al gráfico primario
Nota: Normalmente creado por Graph.query(), no se crean instancias directamente.
Métodos
to_dataframe
def to_dataframe() -> DataFrame
Convierte el resultado de la consulta en un dataframe de Spark.
Devuelve:
-
DataFrame: resultado de la consulta como dataframe de Spark
Genera:
-
ValueError: si se produce un error en la conversión
Ejemplo:
result = graph.query("MATCH (u:user) RETURN u")
df = result.to_dataframe()
df.show()
get_raw_data
def get_raw_data() -> Dict[str, Any]
Obtener la sección RawData de la respuesta.
Devuelve:
-
Dict[str, Any]: diccionario con metadatos sin procesar o edición vacía si no está presente
Ejemplo:
result = graph.query("MATCH (u:user) RETURN u")
metadata = result.get_raw_data()
show
def show(format: str = "visual") -> None
Mostrar el resultado de la consulta en varios formatos.
Parámetros:
-
format(str, default="visual"): formato de salida-
"table": tablas dataframe completas (todas las columnas) -
"visual": visualización interactiva de gráficos con el complemento VSC -
"all": mostrar todos los formatos
-
Genera:
-
ValueError: si el formato no es uno de los valores admitidos
Ejemplo:
result = graph.query("MATCH (u:user)-[r:accessed]->(d:device) RETURN u, r, d")
result.show() # Visual by default
result.show(format="table") # Table format
Ejemplo completo (recomendado: v0.3+)
# 0. Imports
from sentinel_graph import GraphSpecBuilder, Graph
# 1. Define graph specification
spec = (
GraphSpecBuilder.start()
.add_node("User")
.from_dataframe(user_nodes) # native Spark DF from groupBy → no .df
.with_columns(
"UserId", "UserDisplayName", "UserPrincipalName",
"DistinctLocationCount", "DistinctIPCount", "DistinctAppCount",
"TotalSignIns", "RiskySignInCount", "ImpossibleTravelFlag",
key="UserId", display="UserDisplayName"
)
.add_node("IPAddress")
.from_dataframe(ip_nodes) # native Spark DF from groupBy → no .df
.with_columns(
"IPAddress", "UniqueUsers", "UniqueLocations",
"SignInCount", "RiskySignInCount", "SharedIPFlag",
key="IPAddress", display="IPAddress"
)
.add_edge("UsedIP")
.from_dataframe(edge_used_ip) # native Spark DF → no .df
.source(id_column="UserId", node_type="User")
.target(id_column="IPAddress", node_type="IPAddress")
.with_columns(
"SignInCount", "FirstSeen", "LastSeen", "EdgeKey",
key="EdgeKey", display="EdgeKey"
)
.done()
)
# 2. Inspect schema before building (GraphSpec owns this)
spec.show_schema()
# 3. Build: prepares data + publishes graph → returns Graph
graph = Graph.build(spec)
print(f"Build status: {graph.build_status.status}")
# 4. Query the graph (query lives on Graph)
result = graph.query("MATCH (u:user)-[used:UsedIP]->(ip:IPAddress) RETURN * LIMIT 100")
result.show()
# 5. Access data via delegation
df = result.to_dataframe()
df.printSchema()
# 6. Graph algorithms
gf = graph.to_graphframe()
pagerank_result = gf.pageRank(resetProbability=0.15, maxIter=10)
pagerank_result.vertices.select("id", "pagerank").show()
# 7. Fetch an existing graph (no spec needed)
graph = Graph.get("my_existing_graph", context=context)
graph.query("MATCH (n) RETURN n LIMIT 10").show()
Notas sobre patrones de diseño
Fluent API
Todos los generadores admiten el encadenamiento de métodos para definiciones de grafos declarativas legibles:
builder.add_node("user") \
.from_table("Users") \
.with_columns("id", "name", key="id", display="name") \
.add_edge("follows")
Esquemas de unión
Varios bordes con el mismo alias se combinan automáticamente con propiedades combinadas:
# Both edges use alias "sign_in" - they will be merged into one schema edge
builder.add_edge("sign_in") \
.from_table("AzureSignins") \
.source(id_column="UserId", node_type="AZuser") \
.target(id_column="DeviceId", node_type="device")
builder.add_edge("sign_in") \
.from_table("EntraSignins") \
.source(id_column="UserId", node_type="EntraUser") \
.target(id_column="DeviceId", node_type="device")
Configuración automática
Muchos campos tienen valores predeterminados razonables:
- Las etiquetas de nodo o perimetral tienen como valor predeterminado sus alias
- Las propiedades se deducen automáticamente de los esquemas de origen
- Los grupos de entidades se establecen de forma predeterminada en los tipos de etiquetas o relaciones principales.
Evaluación diferida
Los dataframes y los recursos se cargan de forma diferenciable y se almacenan en caché:
-
graph_spec.nodesygraph_spec.edgesse cargan en el primer acceso - Los resultados de la consulta crean tramas de datos solo cuando se solicitan