Referencia de la API de Graph Builder (versión preliminar)

La clase sentinel_graph proporciona una manera de interactuar con el grafo de Microsoft Sentinel, lo que le permite definir el esquema del grafo, transformar los datos de la Microsoft Sentinel lago de datos en nodos y bordes, publicar un gráfico, un gráfico de consulta y ejecutar algoritmos avanzados de grafos. Esta clase está diseñada para trabajar con las sesiones de Spark en cuadernos de Jupyter Notebook que se ejecutan en Microsoft Sentinel proceso de Spark.

GraphSpecBuilder

La clase GraphSpecBuilder proporciona un generador fluido para crear especificaciones de grafos con canalizaciones de datos e integración de esquemas.

Importante

El GraphBuilder alias de esta clase está en desuso y se quitará en una versión futura. Use GraphSpecBuilder en todo el código nuevo.

# Deprecated — emits DeprecationWarning
from sentinel_graph.builders.graph_builder import GraphBuilder

# Recommended
from sentinel_graph import GraphSpecBuilder

Constructor

GraphSpecBuilder(context: ExecutionContext)

Parámetros:

  • context (ExecutionContext): contexto de ejecución que contiene la sesión y la configuración de Spark

Genera:

  • ValueError: si el contexto es Ninguno o no se puede determinar el nombre del gráfico

Métodos estáticos

start

GraphSpecBuilder.start(context: Optional[ExecutionContext] = None) -> GraphSpecBuilder

Defina un nuevo generador de grafos fluido.

Parámetros:

  • context (ExecutionContext, opcional): instancia de ExecutionContext. Si no es Ninguno, usa el contexto predeterminado.

Devuelve:

  • GraphSpecBuilder: nueva instancia del generador

Ejemplo:

builder = GraphSpecBuilder.start(context=context)

Métodos de instancia

add_node

def add_node(alias: str) -> NodeBuilderInitial

Empiece a crear una definición de nodo.

Parámetros:

  • alias (str): identificador único de este nodo dentro del gráfico

Devuelve:

  • NodeBuilderInitial: generador de nodos en estado inicial

Ejemplo:

builder.add_node("user")

add_edge

def add_edge(alias: str) -> EdgeBuilderInitial

Empiece a crear una definición perimetral.

Parámetros:

  • alias (str): identificador de este borde dentro del gráfico (se puede compartir entre varios bordes)

Devuelve:

  • EdgeBuilderInitial: generador perimetral en estado inicial

Ejemplo:

builder.add_edge("accessed")

done

def done() -> GraphSpec

Finalice la especificación del grafo y devuelva la instancia de GraphSpec.

Devuelve:

  • GraphSpec: especificación completa del grafo con la canalización de datos y el esquema

Genera:

  • ValueError: si el gráfico no tiene nodos o bordes, o si se produce un error en la validación

Ejemplo:

graph_spec = builder.done()

GraphSpec

Especificación de grafos con capacidades de canalización, esquema y visualización de datos.

Constructor

GraphSpec(
    name: str,
    context: ExecutionContext,
    graph_schema: GraphSchema,
    etl_pipeline: Optional[ETLPipeline] = None
)

Parámetros:

  • name (str): nombre del gráfico
  • context (ExecutionContext): contexto de ejecución
  • graph_schema (GraphSchema): definición de esquema de grafo
  • etl_pipeline (ETLPipeline, opcional): canalización de datos para la preparación del grafo

Propiedades

nodes

def nodes() -> DataFrame

Obtener dataframe de nodos (diferido, almacenado en caché). Determina automáticamente el origen de la canalización de datos o la tabla de lago.

Devuelve:

  • DataFrame: DataFrame de Spark que contiene todos los nodos

Genera:

  • ValueError: si falta el contexto o no se pueden cargar tramas de datos

edges

def edges() -> DataFrame

Obtención de bordes DataFrame (diferido, almacenado en caché). Determina automáticamente el origen de la canalización de datos o la tabla de lago.

Devuelve:

  • DataFrame: DataFrame de Spark que contiene todos los bordes

Genera:

  • ValueError: si falta el contexto o no se pueden cargar tramas de datos

Métodos

build_graph_with_data

Nota:

build_graph_with_data está en desuso y se quitará en una versión futura. Use Graph.build(spec) en su lugar.

def build_graph_with_data() -> Dict[str, Any]

Ejecute la canalización de datos y publique el gráfico. Llama internamente a Graph.build(self), guarda el valor devuelto Graphy devuelve un diccionario compatible con versiones anteriores.

Devuelve:

  • Dict[str, Any]: diccionario que contiene:
    • etl_result: resultados de preparación de datos
    • api_result: publicar resultados (si se ejecuta correctamente)
    • api_error: cadena de error (si se produjo un error en la publicación)
    • instance_name: nombre de la instancia de Graph
    • status: "published" o "prepared"

Ejemplo:

graph = Graph.build(spec)
print(f"Status: {graph.build_status.status}")

get_schema

def get_schema() -> GraphSchema

Obtenga el esquema del grafo.

Devuelve:

  • GraphSchema: definición de esquema de grafo

get_pipeline

Nota:

Este método está en desuso y se quitará en una versión futura. La canalización de datos es un detalle interno de implementación y no se debe tener acceso directamente.

def get_pipeline() -> Optional[ETLPipeline]

Obtenga la canalización de datos (ninguno para los gráficos existentes).

Devuelve:

  • ETLPipeline o None: canalización de datos si está disponible

to_graphframe

def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame

Convierta todo el gráfico en GraphFrame para ejecutar algoritmos de grafos. Funciona solo en datos locales (desde una canalización de datos o una tabla de lago).

Parámetros:

  • column_mapping (Dict[str, str], opcional): asignación de columnas personalizada con claves:
    • "id": nombre de columna del identificador de vértice
    • "source_id": nombre de columna del identificador de origen de Edge
    • "target_id": nombre de columna del identificador de destino de Edge

Devuelve:

  • GraphFrame: objeto GraphFrame con todos los vértices y bordes

Genera:

  • ValueError: si ExecutionContext no está disponible

Ejemplo:

gf = graph_spec.to_graphframe()
pagerank = gf.pageRank(resetProbability=0.15, maxIter=10)

show

def show(limit: int = 100, viz_format: str = "visual") -> None

Mostrar datos de gráficos en varios formatos.

Parámetros:

  • limit (int, default=100): número máximo de nodos/bordes que se van a mostrar
  • viz_format (str, default="visual"): formato de salida
    • "table": tablas dataframe completas (todas las columnas)
    • "visual": visualización interactiva de gráficos
    • "all": mostrar todos los formatos

Genera:

  • ValueError: si el formato no es uno de los valores admitidos

Ejemplo:

graph_spec.show(limit=50, viz_format="table")

show_schema

def show_schema() -> None

Mostrar el esquema del grafo como una visualización interactiva de grafos.

Ejemplo:

spec.show_schema()

Graph

Instancia de gráfico consultable. Creado a través de Graph.get() (gráfico existente) o Graph.build() (a partir de ).GraphSpec

Constructor

Graph(
    name: str,
    context: ExecutionContext,
    spec: Optional[GraphSpec] = None,
    build_status: Optional[BuildStatus] = None,
)

Parámetros:

  • name (str): nombre del gráfico
  • context (ExecutionContext): contexto de ejecución
  • spec (GraphSpec, opcional): especificación de gráfico adjunta (establecida por Graph.build())
  • build_status (BuildStatus, opcional): metadatos de resultado de compilación (establecidos por Graph.build())

Genera:

  • ValueError: si ExecutionContext es None

Métodos estáticos

get

Graph.get(name: str, context: Optional[ExecutionContext] = None) -> Graph

Obtenga una instancia de grafo de un grafo existente. El valor devuelto Graph tiene spec=None y build_status=None.

Parámetros:

  • name (str): nombre de instancia de Graph
  • context (ExecutionContext, opcional): contexto de ejecución (el valor predeterminado es ExecutionContext.default())

Devuelve:

  • Graph: instancia de Graph

Genera:

  • ValueError: si el nombre del gráfico está vacío o la instancia del grafo no existe

Ejemplo:

graph = Graph.get("my_graph", context=context)
graph.query("MATCH (n) RETURN n")

prepare

Graph.prepare(spec: GraphSpec) -> Graph

Ejecute la fase de preparación de datos para sinGraphSpec publicar. Use publish() después para registrar el gráfico y hacer que sea consultable.

Parámetros:

  • spec (GraphSpec): especificación de grafo para preparar

Devuelve:

  • Graph: instancia de Graph con spec asociado y build_status.status == "prepared"

Genera:

  • ValueError: si la especificación no tiene ninguna canalización de datos o no tiene contexto de ejecución
  • RuntimeError: si se produce un error en la ejecución de la canalización de datos

Ejemplo:

spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.prepare(spec)
# Inspect results before publishing
graph.nodes.show()
graph.publish()
graph.query("MATCH (n) RETURN n")

build

Graph.build(spec: GraphSpec) -> Graph

Cree un gráfico a partir de mediante la GraphSpec preparación de datos y la publicación. Internamente llama a Graph.prepare(spec) y, a continuación, intenta graph.publish(). A diferencia de llamar a esos dos métodos por separado, se detectan errores de publicación: el gráfico devuelto tiene build_status.status == "prepared" y build_status.api_error se establece en lugar de generar.

Parámetros:

  • spec (GraphSpec): especificación de grafo a partir de la que compilar

Devuelve:

  • Graph: instancia de Graph con spec adjunta y build_status rellenada

Genera:

  • ValueError: si la especificación no tiene ninguna canalización de datos o no tiene contexto de ejecución
  • RuntimeError: si se produce un error en la ejecución de la canalización de datos

Ejemplo:

spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.build(spec)
print(graph.build_status.status)  # "published" or "prepared" (None if neither ran)
graph.query("MATCH (n) RETURN n")

Propiedades

nodes

def nodes() -> Optional[DataFrame]

Obtención de nodos DataFrame. Delega a self.spec.nodes cuando se adjunta una especificación; devuelve None en caso contrario.

edges

def edges() -> Optional[DataFrame]

Obtener bordes DataFrame. Delega a self.spec.edges cuando se adjunta una especificación; devuelve None en caso contrario.

schema

def schema() -> Optional[GraphSchema]

Obtener esquema de grafo. Delega a self.spec.get_schema() cuando se adjunta una especificación; devuelve None en caso contrario.

Métodos

query

def query(query_string: str, query_language: str = "GQL") -> QueryResult

Ejecute una consulta en la instancia de grafo mediante GQL.

Parámetros:

  • query_string (str): cadena de consulta de Graph (lenguaje GQL)
  • query_language (str, default="GQL"): lenguaje de consulta

Devuelve:

  • QueryResult: objeto que contiene nodos, bordes y metadatos

Genera:

  • ValueError: si falta la sesión de ExecutionContext o Spark
  • RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas

Ejemplo:

result = graph.query("MATCH (u:user) WHERE u.age > 30 RETURN u")
result.show()

reachability

def reachability(
    *,
    source_property_value: str = None,
    target_property_value: str = None,
    source_property: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

[! NOTA] reachability(query_input=ReachabilityQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.

Realice un análisis de accesibilidad entre los nodos de origen y de destino.

Parámetros:

  • source_property_value (str): valor que debe coincidir con la propiedad de origen (validada en tiempo de ejecución. Debe proporcionarse cuando no se usa query_input)
  • target_property_value (str): valor que debe coincidir con la propiedad de destino (validada en tiempo de ejecución. Debe proporcionarse cuando no se usa query_input)
  • source_property (Opcional[str]): nombre de propiedad para filtrar los nodos de origen
  • participating_source_node_labels (Opcional[List[str]]): etiquetas de nodo que se deben considerar como orígenes
  • target_property (Opcional[str]): nombre de propiedad para filtrar los nodos de destino
  • participating_target_node_labels (Opcional[List[str]]): etiquetas de nodo que se deben considerar como destinos
  • participating_edge_labels (Opcional[List[str]]): etiquetas perimetrales que se van a recorrer
  • is_directional (bool): indica si los bordes son direccionales (valor predeterminado: True)
  • min_hop_count (int): saltos mínimos (valor predeterminado: 1)
  • max_hop_count (int): saltos máximos (valor predeterminado: 4)
  • shortest_path (bool): devuelve solo las rutas de acceso más cortas (valor predeterminado: False)
  • max_results (int): resultados máximos (valor predeterminado: 500)

Genera:

  • ValueError: si source_property_valuetarget_property_value o falta, min_hop_count < 1, max_hop_count < min_hop_counto max_results < 1
  • RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas

Devuelve:

  • QueryResult: contiene las rutas de acceso

Ejemplo:

result = graph.reachability(
    source_property_value="user-001",
    target_property_value="device-003")
result.show()

k_hop

def k_hop(
    *,
    source_property: Optional[str] = None,
    source_property_value: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    target_property_value: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Nota:

k_hop(query_input=K_HopQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.

Realice el análisis de k-hop desde un nodo de origen determinado.

Parámetros:

Validación:

  • Al menos uno de source_property_value o target_property_value debe proporcionarse

Genera:

  • ValueError: si no se proporciona ni source_property_valuetarget_property_value , o si se infringen restricciones numéricas (igual reachabilityque )
  • RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas

Devuelve:

  • QueryResult: contiene los resultados de k-hop

Ejemplo:

result = graph.k_hop(source_property_value="user-001")
result.show()

blast_radius

def blast_radius(
    *,
    source_property_value: str = None,
    target_property_value: str = None,
    source_property: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Nota:

blast_radius(query_input=BlastRadiusQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.

Realice un análisis de radio de expansión desde el nodo de origen al nodo de destino.

Parámetros:

  • source_property_value (str): valor que identifica el nodo de origen (validado en tiempo de ejecución. Debe proporcionarse cuando no se usa query_input)
  • target_property_value (str): valor que identifica el nodo de destino (validado en tiempo de ejecución. Debe proporcionarse cuando no se usa query_input)
  • Otros parámetros: igual que reachability

Genera:

  • ValueError: si source_property_value falta o target_property_value , o si se infringen restricciones numéricas (igual reachabilityque )
  • RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas

Devuelve:

  • QueryResult: contiene los resultados del radio de expansión

Ejemplo:

result = graph.blast_radius(
    source_property_value="user-003",
    target_property_value="device-003",
    min_hop_count=1)
result.show()

centrality

def centrality(
    *,
    participating_source_node_labels: Optional[List[str]] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    threshold: int = 3,
    centrality_type: CentralityType = None,
    max_paths: int = 1000000,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Nota:

centrality(query_input=CentralityQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.

Realice un análisis de centralidad en el gráfico.

Parámetros:

  • participating_source_node_labels (Opcional[List[str]]): etiquetas de nodo de origen
  • participating_target_node_labels (Opcional[List[str]]): Etiquetas de nodo de destino
  • participating_edge_labels (Opcional[List[str]]): etiquetas perimetrales que se van a recorrer
  • threshold (int): puntuación de centralidad mínima (valor predeterminado: 3); debe ser no negativa.
  • centrality_type (CentralityType): CentralityType.Node o CentralityType.Edge (valor predeterminado: None, vuelve a CentralityType.Node)
  • max_paths (int): las rutas de acceso máximas que se deben tener en cuenta (valor predeterminado: 1000000; 0 = todas las rutas de acceso); deben ser no negativas.
  • is_directional (bool): indica si los bordes son direccionales (valor predeterminado: True)
  • min_hop_count (int): los saltos mínimos (valor predeterminado: 1); deben ser ≥ 1
  • max_hop_count (int): los saltos máximos (valor predeterminado: 4); deben ser ≥ min_hop_count
  • shortest_path (bool): devuelve solo las rutas de acceso más cortas (valor predeterminado: False)
  • max_results (int): los resultados máximos (valor predeterminado: 500); deben ser ≥ 1

Genera:

  • ValueError: si threshold < 0es , max_paths < 0, min_hop_count < 1, max_hop_count < min_hop_counto max_results < 1
  • RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas

Devuelve:

  • QueryResult: contiene las métricas de centralidad

Ejemplo:

result = graph.centrality(
    participating_source_node_labels=["user", "device"],
    participating_target_node_labels=["device", "user"],
    participating_edge_labels=["sign_in"],
    is_directional=False)
result.show()

ranked

def ranked(
    *,
    rank_property_name: str = None,
    threshold: int = 0,
    max_paths: int = 1000000,
    decay_factor: float = 1,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Nota:

ranked(query_input=RankedQueryInput(...)) todavía se acepta, pero emite DeprecationWarning y se quitará en una versión futura.

Realice un análisis clasificado en el gráfico.

Parámetros:

  • rank_property_name (str): nombre de propiedad que se va a usar para la clasificación (validado en tiempo de ejecución. Debe proporcionarse cuando no se usa query_input)
  • threshold (int): solo las rutas de acceso devueltas por encima de este peso (valor predeterminado: 0); deben ser no negativas.
  • max_paths (int): las rutas de acceso máximas que se deben tener en cuenta (valor predeterminado: 1000000; 0 = todas las rutas de acceso); deben ser no negativas.
  • decay_factor (float): decaimiento de clasificación por paso; 2 significa que la reducción a la mitad (valor predeterminado: 1); debe ser no negativo.
  • is_directional (bool): indica si los bordes son direccionales (valor predeterminado: True)
  • min_hop_count (int): los saltos mínimos (valor predeterminado: 1); deben ser ≥ 1
  • max_hop_count (int): los saltos máximos (valor predeterminado: 4); deben ser ≥ min_hop_count
  • shortest_path (bool): devuelve solo las rutas de acceso más cortas (valor predeterminado: False)
  • max_results (int): los resultados máximos (valor predeterminado: 500); deben ser ≥ 1

Genera:

  • ValueError: si rank_property_name falta, threshold < 0, max_paths < 0, decay_factor < 0, min_hop_count < 1, max_hop_count < min_hop_counto max_results < 1
  • RuntimeError: si se produce un error en la inicialización del cliente o en la ejecución de consultas

Devuelve:

  • QueryResult: contiene los nodos o bordes clasificados

Ejemplo:

result = graph.ranked(
    rank_property_name="risk_score",
    threshold=5,
    decay_factor=2)
result.show()

to_graphframe

def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame

Convierta todo el gráfico en GraphFrame. Usa datos de especificación cuando está disponible; lee de las tablas de lago en caso contrario.

Parámetros:

  • column_mapping (Dict[str, str], optional): Asignación de columnas personalizada

Devuelve:

  • GraphFrame: objeto GraphFrame con todos los vértices y bordes

Ejemplo:

gf = graph.to_graphframe()

show

def show() -> None

Mostrar información del gráfico. Delega en spec.show() para una visualización enriquecida cuando se adjunta una especificación; imprime información mínima en caso contrario.

show_schema

def show_schema() -> None

Mostrar esquema de gráfico. Delega a spec.show_schema() cuando se adjunta una especificación; imprime un mensaje que indica que no hay ningún esquema disponible en caso contrario.

publish (novedades de la versión 0.3.3)

def publish() -> Graph

Registre el gráfico con la API, lo que hace que sea consultable. Llame a esto después Graph.prepare() de (o en cualquiera Graph que tenga una especificación adjunta) para publicar la instancia de grafo.

Devuelve:

  • Graph: auto para el encadenamiento de métodos

Genera:

  • ValueError: si no hay ninguna especificación adjunta o falta el contexto
  • RuntimeError: si se produce un error en la publicación

Ejemplo:

graph = Graph.prepare(spec)
graph.publish()
# Now the graph is queryable
graph.query("MATCH (n) RETURN n")

BuildStatus

Clase de datos que transporta metadatos de una Graph.build() operación.

Fields

Campo Tipo Description
etl_result Any Resultado de la prepare fase (ejecución de canalización de datos)
api_result Optional[Dict] Resultado de la fase de publicación (None si se produce un error en la publicación)
api_error Optional[str] Mensaje de error si se produjo un error en la publicación (None si la publicación se realizó correctamente)
instance_name str Nombre de la instancia del gráfico
status Optional[BuildStatusKind] None, "published" o "prepared"

Rutas de construcción

GraphSpecBuilder.start(...).done()  →  GraphSpec             (spec only, no graph yet)
Graph.get(name, context)            →  Graph (spec=None, build_status=None)
Graph.prepare(spec)                 →  Graph (spec=spec, build_status.status="prepared")
graph.publish()                     →  Graph (build_status.status="published")
Graph.build(spec)                   →  Graph (prepare + publish in one step)

Ejemplo:

graph = Graph.build(spec)
if graph.build_status.status == "published":
    print("Graph prepared and published successfully")
elif graph.build_status.status == "prepared":
    print(f"Prepare succeeded but publish failed: {graph.build_status.api_error}")
elif graph.build_status.status is None:
    print("Neither prepare nor publish has run")

Generadores de nodos

NodeBuilderInitial

Estado inicial del generador de nodos: solo están disponibles los métodos de origen de datos.

Constructor

NodeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)

Nota: Normalmente se crea a través de GraphSpecBuilder.add_node(), no se crean instancias directamente.

Métodos

Nota:

No se admite el uso de caracteres de subrayado _ al asignar nombres a nodos, bordes o propiedades en un gráfico personalizado. Se devuelve un error de solicitud no válido cuando se usan caracteres de subrayado.

from_table
def from_table(table_name: str, database: Optional[str] = None) -> NodeBuilderSourceSet

Establezca la tabla como origen de datos con resolución de base de datos inteligente.

Parámetros:

  • table_name (str): nombre de la tabla (obligatorio)
  • database (str, opcional): nombre explícito de la base de datos (tiene prioridad sobre el valor predeterminado del contexto)

Devuelve:

  • NodeBuilderSourceSet: Generador para una configuración adicional

Genera:

  • ValueError: si no se encontró la tabla o se encontraron varias tablas en conflicto

Orden de resolución de la base de datos:

  1. Parámetro explícito database (prioridad más alta)
  2. ExecutionContext.default_database
  3. Buscar en todas las bases de datos (con detección de conflictos)

Ejemplo:

builder.add_node("user").from_table("SigninLogs", database="security_db")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> NodeBuilderSourceSet

Establezca DataFrame de Spark como origen de datos.

Parámetros:

  • dataframe (DataFrame): DataFrame de Spark

Devuelve:

  • NodeBuilderSourceSet: Generador para una configuración adicional

Ejemplo:

df = spark.read.table("users")
builder.add_node("user").from_dataframe(df)

NodeBuilderSourceSet

Generador de nodos después de establecer el origen de datos: métodos de configuración disponibles.

Constructor

NodeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)

Nota: Creado internamente por métodos de origen NodeBuilderInitial.

Métodos

with_time_range
def with_time_range(
    time_column: str,
    start_time: Optional[Union[str, datetime]] = None,
    end_time: Optional[Union[str, datetime]] = None,
    lookback_hours: Optional[float] = None
) -> NodeBuilderSourceSet

Aplique el filtrado del intervalo de tiempo al origen de datos del nodo.

Parámetros:

  • time_column (str): nombre de columna que contiene datos de marca de tiempo (obligatorio)
  • start_time (str o datetime, opcional): fecha de inicio ('10/20/25', '2025-10-20' o objeto datetime)
  • end_time (str o datetime, opcional): fecha de finalización (los mismos formatos que start_time)
  • lookback_hours (float, opcional): horas para mirar atrás desde ahora

Devuelve:

  • NodeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si la columna de hora no se encuentra en el esquema de origen

Lógica de intervalo de tiempo:

  1. Si start_time y end_time proporcionados: úselos directamente.
  2. Si solo se proporciona lookback_hours: end=now, start=now-lookback_hours
  3. Si no se proporciona nada: sin filtrado de tiempo
  4. Si start/end AND lookback_hours: start/end tienen prioridad

Ejemplo:

# Explicit date range
builder.add_node("user").from_table("SigninLogs") \
    .with_time_range(time_column="TimeGenerated", start_time="2025-01-01", end_time="2025-01-31")

# Lookback window
builder.add_node("user").from_table("SigninLogs") \
    .with_time_range(time_column="TimeGenerated", lookback_hours=24)
with_label
def with_label(label: str) -> NodeBuilderSourceSet

Establezca la etiqueta de nodo (el valor predeterminado es alias si no se llama a ).

Parámetros:

  • label (str): etiqueta de nodo

Devuelve:

  • NodeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si la etiqueta ya está establecida

Ejemplo:

builder.add_node("u").from_table("Users").with_label("user")
with_columns
def with_columns(
    *columns: str,
    key: str,
    display: str
) -> NodeBuilderSourceSet

Configure las columnas con la clave necesaria y la designación para mostrar.

Parámetros:

  • *columns (str): nombres de columna que se van a incluir (al menos uno necesario)
  • key (str): nombre de columna que se va a marcar como clave (obligatorio, debe estar en columnas)
  • display (str): nombre de columna que se marca como valor para mostrar (obligatorio, debe estar en columnas, puede ser igual que la clave)

Devuelve:

  • NodeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si se produce un error en la validación (columnas duplicadas, clave o pantalla que falta, etc.)

Notas:

  • Las propiedades se compilan automáticamente a partir de tipos de columna

  • La columna de filtro de tiempo se agrega automáticamente si se especifica

  • Los tipos de propiedad se deducen automáticamente del esquema de origen

  • Consulte Restricciones

Ejemplo:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", "email", "created_at", key="id", display="name")
add_node
def add_node(alias: str) -> NodeBuilderInitial

Finalice este nodo y empiece a compilar otro nodo.

Parámetros:

  • alias (str): alias para el nuevo nodo

Devuelve:

  • NodeBuilderInitial: nuevo generador de nodos

Ejemplo:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_node("device")
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial

Finalice este nodo y empiece a crear un borde.

Parámetros:

  • alias (str): alias para el borde

Devuelve:

  • EdgeBuilderInitial: nuevo generador perimetral

Ejemplo:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_edge("accessed")
done
def done() -> GraphSpec

Finalice este nodo y complete la especificación del grafo.

Devuelve:

  • GraphSpec: especificación completa del grafo

Ejemplo:

graph_spec = builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .done()

Generadores perimetrales

EdgeBuilderInitial

Estado inicial para el generador perimetral: solo están disponibles los métodos de origen de datos.

Constructor

EdgeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)

Nota: Normalmente se crea a través de GraphSpecBuilder.add_edge(), no se crean instancias directamente.

Métodos

Nota:

No se admite el uso de caracteres de subrayado _ al asignar nombres a nodos, bordes o propiedades en un gráfico personalizado. Se devuelve un error de solicitud no válido cuando se usan caracteres de subrayado.

from_table
def from_table(table_name: str, database: Optional[str] = None) -> EdgeBuilderSourceSet

Establezca la tabla como origen de datos con resolución de base de datos inteligente.

Parámetros:

  • table_name (str): nombre de la tabla (obligatorio)
  • database (str, opcional): nombre explícito de la base de datos

Devuelve:

  • EdgeBuilderSourceSet: Generador para una configuración adicional

Genera:

  • ValueError: si no se encontró la tabla o se encontraron varias tablas en conflicto

Ejemplo:

builder.add_edge("accessed").from_table("AccessLogs")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> EdgeBuilderSourceSet

Establezca DataFrame de Spark como origen de datos.

Parámetros:

  • dataframe (DataFrame): DataFrame de Spark

Devuelve:

  • EdgeBuilderSourceSet: Generador para una configuración adicional

Ejemplo:

df = spark.read.table("access_logs")
builder.add_edge("accessed").from_dataframe(df)

EdgeBuilderSourceSet

Generador perimetral después de establecer el origen de datos: métodos de configuración disponibles.

Constructor

EdgeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)

Nota: Creado internamente por los métodos de origen EdgeBuilderInitial.

Métodos

with_label
def with_label(label: str) -> EdgeBuilderSourceSet

Establezca el tipo de relación perimetral o la etiqueta (el valor predeterminado es alias si no se llama a ).

Parámetros:

  • label (str): etiqueta perimetral

Devuelve:

  • EdgeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si la etiqueta ya está establecida

Ejemplo:

builder.add_edge("rel").from_table("AccessLogs").with_label("ACCESSED")
edge_label

Nota:

Use with_label() en su lugar. Este método se quitará en una versión futura.

def edge_label(label: str) -> EdgeBuilderSourceSet

Establezca el tipo de relación perimetral o la etiqueta (el valor predeterminado es alias si no se llama a ).

Parámetros:

  • label (str): etiqueta perimetral

Devuelve:

  • EdgeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si la etiqueta ya está establecida

Ejemplo:

builder.add_edge("acc").from_table("AccessLogs").edge_label("accessed")
source
def source(id_column: str, node_type: str) -> EdgeBuilderSourceSet

Establezca el nodo de origen con la columna de identificador y la etiqueta.

Parámetros:

  • id_column (str): nombre de columna que contiene el identificador de nodo de origen
  • node_type (str): etiqueta de nodo de origen

Devuelve:

  • EdgeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si el origen ya está establecido

Ejemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user")
target
def target(id_column: str, node_type: str) -> EdgeBuilderSourceSet

Establezca el nodo de destino con la columna de identificador y la etiqueta.

Parámetros:

  • id_column (str): nombre de columna que contiene el identificador de nodo de destino
  • node_type (str): etiqueta de nodo de destino

Devuelve:

  • EdgeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si el destino ya está establecido

Ejemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device")
with_time_range
def with_time_range(
    time_column: str,
    start_time: Optional[Union[str, datetime]] = None,
    end_time: Optional[Union[str, datetime]] = None,
    lookback_hours: Optional[float] = None
) -> EdgeBuilderSourceSet

Aplique el filtrado del intervalo de tiempo al origen de datos del perímetro.

Parámetros:

  • time_column (str): nombre de columna que contiene datos de marca de tiempo (obligatorio)
  • start_time (str o datetime, opcional): fecha de inicio
  • end_time (str o datetime, opcional): fecha de finalización
  • lookback_hours (float, opcional): horas para mirar atrás desde ahora

Devuelve:

  • EdgeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si la columna de hora no se encuentra en el esquema de origen

Ejemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .with_time_range(time_column="TimeGenerated", lookback_hours=48)
with_columns
def with_columns(
    *columns: str,
    key: str,
    display: str
) -> EdgeBuilderSourceSet

Configure las columnas con la clave necesaria y la designación para mostrar.

Parámetros:

  • *columns (str): nombres de columna que se van a incluir (al menos uno necesario)
  • key (str): nombre de columna que se va a marcar como clave (obligatorio, debe estar en columnas)
  • display (str): nombre de columna que se marca como valor para mostrar (obligatorio, debe estar en columnas)

Devuelve:

  • EdgeBuilderSourceSet: auto para el encadenamiento de métodos

Genera:

  • ValueError: si se produce un error en la validación

Ejemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device") \
    .with_columns("id", "location", "status", key="id", display="location")
add_node
def add_node(alias: str) -> NodeBuilderInitial

Finalice este borde y empiece a compilar un nodo.

Parámetros:

  • alias (str): alias para el nuevo nodo

Devuelve:

  • NodeBuilderInitial: nuevo generador de nodos
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial

Termine este borde y empiece a crear otro borde.

Parámetros:

  • alias (str): alias para el nuevo borde

Devuelve:

  • EdgeBuilderInitial: nuevo generador perimetral

Ejemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device") \
    .with_columns("id", "location", key="id", display="location") \
    .add_edge("connected_to")
done
def done() -> GraphSpec

Finalice este borde y complete la especificación del grafo.

Devuelve:

  • GraphSpec: especificación completa del grafo

Clases de esquema

GraphDefinitionReference

Referencia a una definición de gráfico con nombre y versión.

Constructor

GraphDefinitionReference(
    fully_qualified_name: str,
    version: str
)

Parámetros:

  • fully_qualified_name (str): nombre completo del gráfico al que se hace referencia
  • version (str): versión del gráfico al que se hace referencia

Genera:

  • ValueError: si fully_qualified_name o versión está vacía

Métodos

to_dict
def to_dict() -> Dict[str, Any]

Serializar en el diccionario.

Devuelve:

  • Dict[str, Any]: referencia serializada

Propiedad

Definición de propiedad con interfaz segura de tipos.

Constructor

Property(
    name: str,
    property_type: PropertyType,
    is_non_null: bool = False,
    description: str = "",
    is_key: bool = False,
    is_display_value: bool = False,
    is_internal: bool = False
)

Parámetros:

  • name (str): nombre de propiedad
  • property_type (PropertyType): tipo de datos de propiedad
  • is_non_null (bool, default=False): si se requiere la propiedad
  • description (str, default=""): descripción de la propiedad
  • is_key (bool, default=False): si la propiedad es una clave
  • is_display_value (bool, default=False): si la propiedad es el valor para mostrar
  • is_internal (bool, default=False): si la propiedad es interna

Genera:

  • ValueError: si el nombre está vacío o se produce un error en la validación

Métodos de clase

key
@classmethod
Property.key(
    name: str,
    property_type: PropertyType,
    description: str = "",
    is_non_null: bool = False
) -> Property

Cree una propiedad de clave con valores comunes (is_key=True, is_display_value=True).

display
@classmethod
Property.display(
    name: str,
    property_type: PropertyType,
    description: str = "",
    is_non_null: bool = False
) -> Property

Cree una propiedad de valor para mostrar (is_display_value=True).

Métodos

describe
def describe(text: str) -> Property

Agregue la descripción con fluidez.

Parámetros:

  • text (str): texto de descripción

Devuelve:

  • Property: auto para el encadenamiento de métodos
to_dict
def to_dict() -> Dict[str, Any]

Serializar la propiedad en el diccionario con claves de anotación con prefijo @.

Devuelve:

  • Dict[str, Any]: propiedad serializada
to_gql
def to_gql() -> str

Genere la definición de la propiedad GQL.

Devuelve:

  • str: representación de cadena de GQL

EdgeNode

Referencia de nodo usada en definiciones perimetrales.

Constructor

EdgeNode(
    alias: Optional[str] = None,
    labels: List[str] = []
)

Parámetros:

  • alias (str, opcional): alias de nodo (se establece automáticamente en la primera etiqueta si ninguno o está vacío)
  • labels (List[str]): etiquetas de nodo (al menos una necesaria)

Genera:

  • ValueError: si la lista de etiquetas está vacía
  • TypeError: si las etiquetas no son cadenas

Mutación automática:

  • Si alias es Ninguno o está vacío, se establece en la primera etiqueta.

Métodos

to_dict
def to_dict() -> Dict[str, Any]

Serializar en el diccionario.

Devuelve:

  • Dict[str, Any]: referencia de nodo perimetral serializado

Nodo

Definición de nodo con interfaz segura de tipos.

Constructor

Node(
    alias: str = "",
    labels: List[str] = [],
    implies_labels: List[str] = [],
    properties: List[Property] = [],
    description: str = "",
    entity_group: str = "",
    dynamic_labels: bool = False,
    abstract_edge_aliases: bool = False
)

Parámetros:

  • alias (str, default=""): alias de nodo (se establece automáticamente en la primera etiqueta si está vacío)
  • labels (List[str]): etiquetas de nodo (al menos una necesaria)
  • implies_labels (List[str], default=[]): etiquetas implícitas
  • properties (List[Property], default=[]): propiedades del nodo
  • description (str, default=""): descripción del nodo
  • entity_group (str, default=""): nombre del grupo de entidades
  • dynamic_labels (bool, default=False): si el nodo tiene etiquetas dinámicas
  • abstract_edge_aliases (bool, default=False): indica si el nodo usa alias perimetrales abstractos

Genera:

  • ValueError: si se produce un error en la validación (sin etiquetas, sin propiedad de clave, sin propiedad display, etc.)

Mutación automática:

  • Si el alias está vacío, se establece en la primera etiqueta.
  • Si entity_group está vacío, se establece en la etiqueta principal.

Métodos

get_primary_label
def get_primary_label() -> Optional[str]

Obtenga la etiqueta principal (primero).

Devuelve:

  • str o None: etiqueta principal o Ninguno si no hay etiquetas
get_entity_group_name
def get_entity_group_name() -> str

Obtenga el nombre del grupo de entidades o la reserva en la etiqueta principal.

Devuelve:

  • str: nombre del grupo de entidades
get_primary_key_property_name
def get_primary_key_property_name() -> Optional[str]

Obtenga el nombre de la propiedad de clave principal.

Devuelve:

  • str o None: nombre de propiedad de clave principal
get_properties
def get_properties() -> Dict[str, Property]

Obtenga propiedades como diccionario para facilitar el acceso.

Devuelve:

  • Dict[str, Property]: propiedades con clave por nombre
get_property
def get_property(name: str) -> Optional[Property]

Obtenga una propiedad específica por nombre.

Parámetros:

  • name (str): nombre de propiedad

Devuelve:

  • Property o None: propiedad si se encuentra
add_property
def add_property(prop: Property) -> None

Agregue una propiedad a este nodo.

Parámetros:

  • prop (Propiedad): propiedad que se va a agregar

Genera:

  • ValueError: si el nombre de la propiedad está duplicado
is_dynamically_labeled
def is_dynamically_labeled() -> bool

Compruebe si el nodo tiene etiquetas dinámicas.

Devuelve:

  • bool: True si las etiquetas dinámicas están habilitadas
is_abstract_edge_node_aliases
def is_abstract_edge_node_aliases() -> bool

Compruebe si el nodo usa alias de nodo perimetral abstractos.

Devuelve:

  • bool: True si los alias de borde abstracto están habilitados
describe
def describe(text: str) -> Node

Agregue la descripción con fluidez.

Parámetros:

  • text (str): texto de descripción

Devuelve:

  • Node: auto para el encadenamiento de métodos
to_dict
def to_dict() -> Dict[str, Any]

Serializar el nodo en el diccionario.

Devuelve:

  • Dict[str, Any]: nodo serializado
to_gql
def to_gql() -> str

Generar definición de nodo de GQL.

Devuelve:

  • str: representación de cadena de GQL

Genera:

  • ValueError: si el nodo no tiene los campos necesarios para GQL

Métodos de clase

create
@classmethod
Node.create(
    alias: str,
    labels: List[str],
    properties: List[Property],
    description: str = "",
    entity_group: str = "",
    **kwargs
) -> Node

Cree un nodo con todos los campos necesarios.

Parámetros:

  • alias (str): alias de nodo
  • labels (List[str]): etiquetas de nodo
  • properties (List[Property]): propiedades del nodo
  • description (str, default=""): descripción del nodo
  • entity_group (str, default=""): nombre del grupo de entidades

Devuelve:

  • Node: nueva instancia de nodo

Microsoft Edge

Definición perimetral con interfaz segura de tipos.

Constructor

Edge(
    relationship_type: str,
    source_node_label: str,
    target_node_label: str,
    direction: EdgeDirection = EdgeDirection.DIRECTED_RIGHT,
    properties: List[Property] = [],
    description: str = "",
    entity_group: str = "",
    dynamic_type: bool = False
)

Parámetros:

  • relationship_type (str): tipo de relación perimetral (por ejemplo, "FOLLOWS", "OWNS")
  • source_node_label (str): etiqueta de nodo de origen
  • target_node_label (str): etiqueta de nodo de destino
  • direction (EdgeDirection, default=DIRECTED_RIGHT): dirección perimetral
  • properties (List[Property], default=[]): propiedades de Edge
  • description (str, default=""): descripción de Edge
  • entity_group (str, default=""): nombre del grupo de entidades
  • dynamic_type (bool, default=False): si edge tiene tipo dinámico

Genera:

  • ValueError: si se produce un error en la validación

Mutación automática:

  • labels list se rellena automáticamente con [relationship_type]
  • Si entity_group está vacío, se establece en relationship_type

Propiedades

edge_type
def edge_type() -> str

Alias de compatibilidad con versiones anteriores para relationship_type.

Devuelve:

  • str: tipo de relación

Métodos

get_entity_group_name
def get_entity_group_name() -> str

Obtenga el nombre del grupo de entidades o la reserva al tipo de relación.

Devuelve:

  • str: nombre del grupo de entidades
is_dynamic_type
def is_dynamic_type() -> bool

Compruebe si edge tiene un tipo dinámico.

Devuelve:

  • bool: True si el tipo dinámico
add_property
def add_property(edge_property: Property) -> None

Agregue una propiedad a este borde.

Parámetros:

  • edge_property (Propiedad): propiedad que se va a agregar
describe
def describe(text: str) -> Edge

Agregue la descripción con fluidez.

Parámetros:

  • text (str): texto de descripción

Devuelve:

  • Edge: auto para el encadenamiento de métodos
to_dict
def to_dict() -> Dict[str, Any]

Serializar el borde en el diccionario.

Devuelve:

  • Dict[str, Any]: borde serializado
to_gql
def to_gql() -> str

Generar definición perimetral de GQL.

Devuelve:

  • str: representación de cadena de GQL

Métodos de clase

create
Edge.create(
    relationship_type: str,
    source_node_label: str,
    target_node_label: str,
    properties: List[Property] = None,
    description: str = "",
    entity_group: str = "",
    **kwargs
) -> Edge

Cree un borde con todos los campos necesarios.

Parámetros:

  • relationship_type (str): tipo de relación perimetral
  • source_node_label (str): etiqueta de nodo de origen
  • target_node_label (str): etiqueta de nodo de destino
  • properties (List[Property], opcional): propiedades de Edge
  • description (str, default=""): descripción de Edge
  • entity_group (str, default=""): nombre del grupo de entidades

Devuelve:

  • Edge: nueva instancia perimetral

GraphSchema

Definición de esquema de grafo con interfaz segura de tipos.

Constructor

GraphSchema(
    name: str,
    nodes: List[Node] = [],
    edges: List[Edge] = [],
    base_graphs: List[GraphSchema] = [],
    description: str = "",
    version: str = "1.0",
    fully_qualified_name: str = "",
    namespace: str = ""
)

Parámetros:

  • name (str): nombre del esquema del gráfico
  • nodes (List[Node], default=[]): definiciones de nodo
  • edges (List[Edge], default=[]): definiciones de Edge
  • base_graphs (List[GraphSchema], default=[]): esquemas de gráfico base
  • description (str, default=""): descripción del esquema
  • version (str, default="1.0"): versión del esquema
  • fully_qualified_name (str, default=""): nombre completo
  • namespace (str, default=""): Espacio de nombres

Genera:

  • ValueError: si se produce un error en la validación (alias duplicados, bordes hacen referencia a nodos inexistentes, etc.)

Métodos

get_fully_qualified_name
def get_fully_qualified_name() -> str

Obtenga el nombre completo.

Devuelve:

  • str: nombre completo
get_namespace
def get_namespace() -> str

Obtenga el espacio de nombres del nombre completo o devuelva el valor predeterminado.

Devuelve:

  • str: Espacio de nombres
get_version
def get_version() -> str

Obtener versión.

Devuelve:

  • str: cadena de versión
get_node
def get_node(label_or_alias: str) -> Optional[Node]

Obtenga el nodo por etiqueta o alias.

Parámetros:

  • label_or_alias (str): alias o etiqueta de nodo

Devuelve:

  • Node o None: nodo si se encuentra
get_edge
def get_edge(name: str) -> Optional[Edge]

Obtenga el borde por nombre o tipo.

Parámetros:

  • name (str): tipo de relación perimetral

Devuelve:

  • Edge o None: Edge si se encuentra
add_node
def add_node(node: Node) -> None

Agregue un nodo a este gráfico.

Parámetros:

  • node (Nodo): nodo que se va a agregar

Genera:

  • ValueError: si el alias del nodo está duplicado
add_edge
def add_edge(edge: Edge) -> None

Agregue un borde a este gráfico.

Parámetros:

  • edge (Edge): Edge que se va a agregar

Genera:

  • ValueError: si el tipo de borde está duplicado
include_graph
def include_graph(fully_qualified_name: str, version: str) -> GraphSchema

Agregar una inclusión de grafos (FLUENT API).

Parámetros:

  • fully_qualified_name (str): nombre completo del gráfico que se va a incluir
  • version (str): versión del gráfico que se va a incluir

Devuelve:

  • GraphSchema: auto para el encadenamiento de métodos
get_included_graph_references
def get_included_graph_references() -> List[GraphDefinitionReference]

Obtenga una lista de las referencias de grafo incluidas.

Devuelve:

  • List[GraphDefinitionReference]: lista de referencias de definición de grafos
describe
def describe(text: str) -> GraphSchema

Agregue la descripción con fluidez.

Parámetros:

  • text (str): texto de descripción

Devuelve:

  • GraphSchema: auto para el encadenamiento de métodos
to_dict
def to_dict() -> Dict[str, Any]

Serializar el esquema en el diccionario.

Devuelve:

  • Dict[str, Any]: esquema serializado
to_json
def to_json(indent: int = 2) -> str

Generar representación JSON.

Parámetros:

  • indent (int, default=2): nivel de sangría JSON

Devuelve:

  • str: cadena JSON
to_gql
def to_gql() -> str

Generar definición de esquema de GQL.

Devuelve:

  • str: representación de cadena de GQL

Métodos de clase

create
@classmethod
GraphSchema.create(
    name: str,
    nodes: List[Node] = None,
    edges: List[Edge] = None,
    description: str = "",
    version: str = "1.0",
    **kwargs
) -> GraphSchema

Cree un esquema de grafo con todos los campos necesarios.

Parámetros:

  • name (str): nombre del esquema del gráfico
  • nodes (List[Node], optional): Definiciones de nodo
  • edges (List[Edge], opcional): Definiciones de Edge
  • description (str, default=""): descripción del esquema
  • version (str, default="1.0"): versión del esquema

Devuelve:

  • GraphSchema: nueva instancia de esquema de grafo

Clases de entrada de consulta

Clases de datos que representan parámetros de entrada para consultas de grafo predefinidas.

Nota:

Pasar QueryInput objetos directamente a Graph métodos de consulta está en desuso y se quitará en versiones futuras. En su lugar, use argumentos de palabra clave. Los Graph métodos (reachability, k_hop, blast_radius, centrality, ranked) aceptan todos los parámetros como argumentos de palabra clave y construyen los objetos de entrada internamente. Estas clases permanecen en el código base por ahora, pero no deben usarse en código nuevo.

QueryInputBase

Clase base para todos los parámetros de entrada de consulta.

Métodos

to_json_payload
def to_json_payload() -> Dict[str, Any]

Convierta los parámetros de entrada en un diccionario para el envío de API.

Devuelve:

  • Dict[str, Any]: representación del diccionario de los parámetros de entrada
validate
def validate() -> None

Valide los parámetros de entrada.

Genera:

  • ValueError: si los parámetros de entrada no son válidos

ReachabilityQueryInput

Parámetros de entrada para una consulta de accesibilidad entre los nodos de origen y de destino. Hereda de ReachabilityQueryInputBase la que hereda de QueryInputBase.

Fields

Campo Tipo Predeterminado Descripción
source_property_value str (obligatorio) Valor que debe coincidir con la propiedad de origen
target_property_value str (obligatorio) Valor que debe coincidir con la propiedad de destino
source_property Optional[str] None Nombre de propiedad para filtrar nodos de origen
participating_source_node_labels Optional[List[str]] None Etiquetas de nodo que se deben tener en cuenta como nodos de origen
target_property Optional[str] None Nombre de propiedad para filtrar los nodos de destino
participating_target_node_labels Optional[List[str]] None Etiquetas de nodo que se deben tener en cuenta como nodos de destino
participating_edge_labels Optional[List[str]] None Etiquetas perimetrales que se recorrerán en la ruta de acceso
is_directional Optional[bool] True Si los bordes son direccionales
min_hop_count Optional[int] 1 Número mínimo de saltos en la ruta de acceso
max_hop_count Optional[int] 4 Número máximo de saltos en la ruta de acceso
shortest_path Optional[bool] False Si buscar solo la ruta de acceso más corta
max_results Optional[int] 500 Número máximo de resultados que se van a devolver

Validación:

  • source_property_value es necesario
  • target_property_value es necesario

Ejemplo:

# Preferred: keyword arguments (no import needed)
result = graph.reachability(
    source_property="UserId",
    source_property_value="user123",
    target_property="DeviceId",
    target_property_value="device456",
    participating_edge_labels=["accessed", "connected_to"],
    shortest_path=True
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import ReachabilityQueryInput
result = graph.reachability(query_input=ReachabilityQueryInput(
    source_property_value="user123",
    target_property_value="device456"
))

K_HopQueryInput

Parámetros de entrada para una consulta de k-hop desde un nodo de origen determinado. Hereda de ReachabilityQueryInputBase.

Hereda todos los campos de ReachabilityQueryInput.

Validación:

  • Al menos uno de source_property_value o target_property_value debe proporcionarse

Ejemplo:

# Preferred: keyword arguments
result = graph.k_hop(
    source_property_value="user123",
    max_hop_count=3,
    participating_edge_labels=["accessed"]
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import K_HopQueryInput
result = graph.k_hop(query_input=K_HopQueryInput(source_property_value="user123"))

BlastRadiusQueryInput

Parámetros de entrada para una consulta de radio de expansión de los nodos de origen a destino. Hereda de ReachabilityQueryInputBase.

Hereda todos los campos de ReachabilityQueryInput, con los siguientes campos obligatorios:

Campo Tipo Obligatorio Descripción
source_property_value str Valor para identificar el nodo de origen
target_property_value str Valor para identificar el nodo de destino

Validación:

  • source_property_value es necesario
  • target_property_value es necesario

Ejemplo:

# Preferred: keyword arguments
result = graph.blast_radius(
    source_property_value="user123",
    target_property_value="device456",
    participating_edge_labels=["accessed", "connected_to"]
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import BlastRadiusQueryInput
result = graph.blast_radius(query_input=BlastRadiusQueryInput(
    source_property_value="user123",
    target_property_value="device456"
))

CentralityQueryInput

Parámetros de entrada para una consulta de análisis de centralidad. Hereda de QueryInputBase.

Enumeración CentralityType

Valor Description
CentralityType.Node Centralidad del nodo de proceso
CentralityType.Edge Centralidad perimetral de proceso

Fields

Campo Tipo Predeterminado Descripción
threshold Optional[int] 3 Puntuación de centralidad mínima que se debe tener en cuenta
centrality_type CentralityType CentralityType.Node Tipo de centralidad que se va a calcular
max_paths Optional[int] 1000000 Rutas de acceso máximas que se deben tener en cuenta (0 = todos)
participating_source_node_labels Optional[List[str]] None Etiquetas de nodo de origen
participating_target_node_labels Optional[List[str]] None Etiquetas de nodo de destino
participating_edge_labels Optional[List[str]] None Etiquetas perimetrales que se van a recorrer
is_directional Optional[bool] True Si los bordes son direccionales
min_hop_count Optional[int] 1 Saltos mínimos
max_hop_count Optional[int] 4 Máximo de saltos
shortest_path Optional[bool] False Solo las rutas de acceso más cortas
max_results Optional[int] 500 Resultados máximos

Ejemplo:

# Preferred: keyword arguments (works for all centrality types)
result = graph.centrality(
    centrality_type=CentralityType.Edge,  # or CentralityType.Node (default)
    participating_edge_labels=["accessed", "connected_to"],
    threshold=5,
    max_results=100
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import CentralityQueryInput, CentralityType
result = graph.centrality(query_input=CentralityQueryInput(
    centrality_type=CentralityType.Edge,
    participating_edge_labels=["accessed"]
))

RankedQueryInput

Parámetros de entrada para una consulta de análisis clasificado. Hereda de QueryInputBase.

Fields

Campo Tipo Predeterminado Descripción
rank_property_name str (obligatorio) Nombre de propiedad que se va a usar para las rutas de acceso de clasificación
threshold Optional[int] 0 Solo las rutas de acceso de retorno con pesos por encima de este valor
max_paths Optional[int] 1000000 Rutas de acceso máximas que se deben tener en cuenta (0 = todos)
decay_factor Optional[float] 1 Cuánto reduce cada paso de gráfico la clasificación (2 = mitad cada paso)
is_directional Optional[bool] True Si los bordes son direccionales
min_hop_count Optional[int] 1 Saltos mínimos
max_hop_count Optional[int] 4 Máximo de saltos
shortest_path Optional[bool] False Solo las rutas de acceso más cortas
max_results Optional[int] 500 Resultados máximos

Ejemplo:

# Preferred: keyword arguments
result = graph.ranked(
    rank_property_name="risk_score",
    threshold=5,
    decay_factor=2,
    max_results=50
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import RankedQueryInput
result = graph.ranked(query_input=RankedQueryInput(
    rank_property_name="risk_score",
    threshold=5
))

Resultados de la consulta

QueryResult

Resultado de una consulta de grafo con acceso diferido a DataFrame.

Constructor

QueryResult(raw_response: Dict[str, Any], graph: Graph)

Parámetros:

  • raw_response (Dict[str, Any]): diccionario de respuesta de API sin procesar
  • graph (Gráfico): Referencia al gráfico primario

Nota: Normalmente creado por Graph.query(), no se crean instancias directamente.

Métodos

to_dataframe
def to_dataframe() -> DataFrame

Convierte el resultado de la consulta en un dataframe de Spark.

Devuelve:

  • DataFrame: resultado de la consulta como dataframe de Spark

Genera:

  • ValueError: si se produce un error en la conversión

Ejemplo:

result = graph.query("MATCH (u:user) RETURN u")
df = result.to_dataframe()
df.show()
get_raw_data
def get_raw_data() -> Dict[str, Any]

Obtener la sección RawData de la respuesta.

Devuelve:

  • Dict[str, Any]: diccionario con metadatos sin procesar o edición vacía si no está presente

Ejemplo:

result = graph.query("MATCH (u:user) RETURN u")
metadata = result.get_raw_data()
show
def show(format: str = "visual") -> None

Mostrar el resultado de la consulta en varios formatos.

Parámetros:

  • format (str, default="visual"): formato de salida
    • "table": tablas dataframe completas (todas las columnas)
    • "visual": visualización interactiva de gráficos con el complemento VSC
    • "all": mostrar todos los formatos

Genera:

  • ValueError: si el formato no es uno de los valores admitidos

Ejemplo:

result = graph.query("MATCH (u:user)-[r:accessed]->(d:device) RETURN u, r, d")
result.show()  # Visual by default
result.show(format="table")  # Table format

# 0. Imports
from sentinel_graph import GraphSpecBuilder, Graph

# 1. Define graph specification
spec = (
    GraphSpecBuilder.start()
    
    .add_node("User")
        .from_dataframe(user_nodes)  # native Spark DF from groupBy → no .df
            .with_columns(
                "UserId", "UserDisplayName", "UserPrincipalName",
                "DistinctLocationCount", "DistinctIPCount", "DistinctAppCount",
                "TotalSignIns", "RiskySignInCount", "ImpossibleTravelFlag",
                key="UserId", display="UserDisplayName"
            )
    
    .add_node("IPAddress")
        .from_dataframe(ip_nodes)  # native Spark DF from groupBy → no .df
            .with_columns(
                "IPAddress", "UniqueUsers", "UniqueLocations",
                "SignInCount", "RiskySignInCount", "SharedIPFlag",
                key="IPAddress", display="IPAddress"
            )
    
    .add_edge("UsedIP")
        .from_dataframe(edge_used_ip)  # native Spark DF → no .df
            .source(id_column="UserId", node_type="User")
            .target(id_column="IPAddress", node_type="IPAddress")
            .with_columns(
                "SignInCount", "FirstSeen", "LastSeen", "EdgeKey",
                key="EdgeKey", display="EdgeKey"
            )
   
    .done()
)

# 2. Inspect schema before building (GraphSpec owns this)
spec.show_schema()

# 3. Build: prepares data + publishes graph → returns Graph
graph = Graph.build(spec)
print(f"Build status: {graph.build_status.status}")

# 4. Query the graph (query lives on Graph)
result = graph.query("MATCH (u:user)-[used:UsedIP]->(ip:IPAddress) RETURN * LIMIT 100")
result.show()

# 5. Access data via delegation
df = result.to_dataframe()
df.printSchema()

# 6. Graph algorithms
gf = graph.to_graphframe()
pagerank_result = gf.pageRank(resetProbability=0.15, maxIter=10)
pagerank_result.vertices.select("id", "pagerank").show()

# 7. Fetch an existing graph (no spec needed)
graph = Graph.get("my_existing_graph", context=context)
graph.query("MATCH (n) RETURN n LIMIT 10").show()

Notas sobre patrones de diseño

Fluent API

Todos los generadores admiten el encadenamiento de métodos para definiciones de grafos declarativas legibles:

builder.add_node("user") \
    .from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_edge("follows")

Esquemas de unión

Varios bordes con el mismo alias se combinan automáticamente con propiedades combinadas:

# Both edges use alias "sign_in" - they will be merged into one schema edge
builder.add_edge("sign_in") \
    .from_table("AzureSignins") \
    .source(id_column="UserId", node_type="AZuser") \
    .target(id_column="DeviceId", node_type="device")

builder.add_edge("sign_in") \
    .from_table("EntraSignins") \
    .source(id_column="UserId", node_type="EntraUser") \
    .target(id_column="DeviceId", node_type="device")

Configuración automática

Muchos campos tienen valores predeterminados razonables:

  • Las etiquetas de nodo o perimetral tienen como valor predeterminado sus alias
  • Las propiedades se deducen automáticamente de los esquemas de origen
  • Los grupos de entidades se establecen de forma predeterminada en los tipos de etiquetas o relaciones principales.

Evaluación diferida

Los dataframes y los recursos se cargan de forma diferenciable y se almacenan en caché:

  • graph_spec.nodes y graph_spec.edges se cargan en el primer acceso
  • Los resultados de la consulta crean tramas de datos solo cuando se solicitan