Referência da API do Graph Builder (pré-visualização)

A classe sentinel_graph fornece uma forma de interagir com o gráfico de Microsoft Sentinel, permitindo-lhe definir o seu esquema de grafo, transformar dados do Microsoft Sentinel data lake em nós e arestas, publicar um gráfico, um gráfico de consulta e executar algoritmos de gráfico avançados. Esta classe foi concebida para funcionar com as sessões do Spark em blocos de notas do Jupyter em execução no Microsoft Sentinel computação spark.

GraphSpecBuilder

A classe GraphSpecBuilder fornece um construtor fluente para criar especificações de grafos com pipelines de dados e integração de esquemas.

Importante

O GraphBuilder alias desta classe foi preterido e será removido numa versão futura. Utilize GraphSpecBuilder em todo o novo código.

# Deprecated — emits DeprecationWarning
from sentinel_graph.builders.graph_builder import GraphBuilder

# Recommended
from sentinel_graph import GraphSpecBuilder

Construtor

GraphSpecBuilder(context: ExecutionContext)

Parâmetros:

  • context (ExecutionContext): Contexto de execução que contém a sessão e a configuração do Spark

Gera:

  • ValueError: se o contexto for Nenhum ou não for possível determinar o nome do gráfico

Métodos Estáticos

start

GraphSpecBuilder.start(context: Optional[ExecutionContext] = None) -> GraphSpecBuilder

Definir um novo construtor de gráficos fluente.

Parâmetros:

  • context (ExecutionContext, opcional): instância ExecutionContext. Se Não, utiliza o contexto predefinido.

Devolve:

  • GraphSpecBuilder: Nova instância do construtor

Exemplo:

builder = GraphSpecBuilder.start(context=context)

Métodos de Instância

add_node

def add_node(alias: str) -> NodeBuilderInitial

Comece a criar uma definição de nó.

Parâmetros:

  • alias (str): identificador exclusivo para este nó no gráfico

Devolve:

  • NodeBuilderInitial: Construtor de nós no estado inicial

Exemplo:

builder.add_node("user")

add_edge

def add_edge(alias: str) -> EdgeBuilderInitial

Comece a criar uma definição de limite.

Parâmetros:

  • alias (str): identificador para esta aresta dentro do gráfico (pode ser partilhado em várias arestas)

Devolve:

  • EdgeBuilderInitial: Construtor do Edge no estado inicial

Exemplo:

builder.add_edge("accessed")

done

def done() -> GraphSpec

Finalize a especificação do gráfico e devolva a instância do GraphSpec.

Devolve:

  • GraphSpec: especificação de grafos completa com o pipeline de dados e o esquema

Gera:

  • ValueError: se o gráfico não tiver nós ou arestas ou se a validação falhar

Exemplo:

graph_spec = builder.done()

GraphSpec

Especificação de gráfico com pipeline de dados, esquema e capacidades de apresentação.

Construtor

GraphSpec(
    name: str,
    context: ExecutionContext,
    graph_schema: GraphSchema,
    etl_pipeline: Optional[ETLPipeline] = None
)

Parâmetros:

  • name (str): Nome do gráfico
  • context (ExecutionContext): Contexto de execução
  • graph_schema (GraphSchema): definição de esquema de gráfico
  • etl_pipeline (ETLPipeline, opcional): Pipeline de dados para preparação de gráficos

Propriedades

nodes

def nodes() -> DataFrame

Obter o DataFrame dos nós (em diferido, em cache). Determina automaticamente a origem do pipeline de dados ou da tabela lake.

Devolve:

  • DataFrame: DataFrame do Spark que contém todos os nós

Gera:

  • ValueError: se o contexto estiver em falta ou se os DataFrames não puderem ser carregados

edges

def edges() -> DataFrame

Obter limites dataFrame (em diferido, em cache). Determina automaticamente a origem do pipeline de dados ou da tabela lake.

Devolve:

  • DataFrame: DataFrame do Spark que contém todas as arestas

Gera:

  • ValueError: se o contexto estiver em falta ou se os DataFrames não puderem ser carregados

Métodos

build_graph_with_data

Observação

build_graph_with_data foi preterido e será removido numa versão futura. Em vez disso, utilize Graph.build(spec) .

def build_graph_with_data() -> Dict[str, Any]

Execute o pipeline de dados e publique o gráfico. Chama internamente Graph.build(self), guarda o devolvido Graphe devolve um dicionário retrocompatível.

Devolve:

  • Dict[str, Any]: Dicionário que contém:
    • etl_result: Resultados da preparação de dados
    • api_result: Publicar resultados (se for bem-sucedido)
    • api_error: cadeia de erro (se a publicação tiver falhado)
    • instance_name: nome da instância do gráfico
    • status: "published" ou "prepared"

Exemplo:

graph = Graph.build(spec)
print(f"Status: {graph.build_status.status}")

get_schema

def get_schema() -> GraphSchema

Obtenha o esquema do gráfico.

Devolve:

  • GraphSchema: definição de esquema de gráfico

get_pipeline

Observação

Este método foi preterido e será removido numa versão futura. O pipeline de dados é um detalhe de implementação interna e não deve ser acedido diretamente.

def get_pipeline() -> Optional[ETLPipeline]

Obter o pipeline de dados (Nenhum para gráficos existentes).

Devolve:

  • ETLPipeline ou None: Pipeline de dados, se disponível

to_graphframe

def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame

Converta todo o gráfico em GraphFrame para executar algoritmos de gráficos. Opera apenas em dados locais (a partir do pipeline de dados ou da tabela lake).

Parâmetros:

  • column_mapping (Dict[str, str], opcional): Mapeamento de colunas personalizado com chaves:
    • "id": Nome da coluna do ID do Vértice
    • "source_id": Nome da coluna do ID de origem do Edge
    • "target_id": Nome da coluna do ID de destino do Edge

Devolve:

  • GraphFrame: objeto GraphFrame com todos os vértices e arestas

Gera:

  • ValueError: se ExecutionContext não estiver disponível

Exemplo:

gf = graph_spec.to_graphframe()
pagerank = gf.pageRank(resetProbability=0.15, maxIter=10)

show

def show(limit: int = 100, viz_format: str = "visual") -> None

Apresentar dados de grafos em vários formatos.

Parâmetros:

  • limit (int, default=100): Máximo de nós/arestas a apresentar
  • viz_format (str, default="visual"): Formato de saída
    • "table": tabelas DataFrame completas (todas as colunas)
    • "visual": visualização interativa de grafos
    • "all": Mostrar todos os formatos

Gera:

  • ValueError: se o formato não for um dos valores suportados

Exemplo:

graph_spec.show(limit=50, viz_format="table")

show_schema

def show_schema() -> None

Apresentar o esquema do gráfico como uma visualização interativa de grafos.

Exemplo:

spec.show_schema()

Graph

Instância de gráfico queryable. Criado através de Graph.get() (gráfico existente) ou Graph.build() (a partir de um GraphSpec).

Construtor

Graph(
    name: str,
    context: ExecutionContext,
    spec: Optional[GraphSpec] = None,
    build_status: Optional[BuildStatus] = None,
)

Parâmetros:

  • name (str): Nome do gráfico
  • context (ExecutionContext): Contexto de execução
  • spec (GraphSpec, opcional): Especificação de gráfico anexado (definida por Graph.build())
  • build_status (BuildStatus, opcional): compilar metadados de resultados (definidos por Graph.build())

Gera:

  • ValueError: Se ExecutionContext for Nenhum

Métodos Estáticos

get

Graph.get(name: str, context: Optional[ExecutionContext] = None) -> Graph

Obter uma instância de gráfico a partir de um gráfico existente. O devolvido Graph tem spec=None e build_status=None.

Parâmetros:

  • name (str): nome da instância do gráfico
  • context (ExecutionContext, opcional): Contexto de execução (predefinição para ExecutionContext.default())

Devolve:

  • Graph: instância de gráfico

Gera:

  • ValueError: se o nome do gráfico estiver vazio ou a instância de gráfico não existir

Exemplo:

graph = Graph.get("my_graph", context=context)
graph.query("MATCH (n) RETURN n")

prepare

Graph.prepare(spec: GraphSpec) -> Graph

Execute a fase de preparação de dados para um GraphSpecsem publicação. Utilize publish() posteriormente para registar o gráfico e torná-lo consultado.

Parâmetros:

  • spec (GraphSpec): Especificação de gráfico para preparar

Devolve:

  • Graph: instância de gráfico com spec anexado e build_status.status == "prepared"

Gera:

  • ValueError: se a especificação não tiver nenhum pipeline de dados ou nenhum contexto de execução
  • RuntimeError: se a execução do pipeline de dados falhar

Exemplo:

spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.prepare(spec)
# Inspect results before publishing
graph.nodes.show()
graph.publish()
graph.query("MATCH (n) RETURN n")

build

Graph.build(spec: GraphSpec) -> Graph

Crie um gráfico a partir de um GraphSpec ao preparar dados e publicação. Chama internamente Graph.prepare(spec) e, em graph.publish()seguida, tenta . Ao contrário de chamar estes dois métodos separadamente, as falhas de publicação são detetados: o gráfico devolvido tem build_status.status == "prepared" e build_status.api_error definiu em vez de aumentar.

Parâmetros:

  • spec (GraphSpec): Especificação do gráfico a partir da compilação

Devolve:

  • Graph: instância de gráfico com spec anexado e build_status preenchido

Gera:

  • ValueError: se a especificação não tiver nenhum pipeline de dados ou nenhum contexto de execução
  • RuntimeError: se a execução do pipeline de dados falhar

Exemplo:

spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.build(spec)
print(graph.build_status.status)  # "published" or "prepared" (None if neither ran)
graph.query("MATCH (n) RETURN n")

Propriedades

nodes

def nodes() -> Optional[DataFrame]

Obter DataFrame de nós. Delega a self.spec.nodes quando uma especificação é anexada; devolve None o contrário.

edges

def edges() -> Optional[DataFrame]

Obter limites dataFrame. Delega a self.spec.edges quando uma especificação é anexada; devolve None o contrário.

schema

def schema() -> Optional[GraphSchema]

Obter esquema de gráfico. Delega a self.spec.get_schema() quando uma especificação é anexada; devolve None o contrário.

Métodos

query

def query(query_string: str, query_language: str = "GQL") -> QueryResult

Execute uma consulta na instância do gráfico com o GQL.

Parâmetros:

  • query_string (str): cadeia de consulta de grafos (Linguagem GQL)
  • query_language (str, default="GQL"): Linguagem de consulta

Devolve:

  • QueryResult: objeto que contém nós, arestas e metadados

Gera:

  • ValueError: se executionContext ou a sessão do Spark estiverem em falta
  • RuntimeError: se a inicialização do cliente ou a execução da consulta falhar

Exemplo:

result = graph.query("MATCH (u:user) WHERE u.age > 30 RETURN u")
result.show()

reachability

def reachability(
    *,
    source_property_value: str = None,
    target_property_value: str = None,
    source_property: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

[! NOTA] reachability(query_input=ReachabilityQueryInput(...)) ainda é aceite, mas emite DeprecationWarning e será removido numa versão futura.

Efetue uma análise de alcance entre os nós de origem e de destino.

Parâmetros:

  • source_property_value (str): valor a corresponder para a propriedade de origem (validada no runtime. Tem de ser fornecido quando não está a utilizar query_input)
  • target_property_value (str): valor a corresponder para a propriedade de destino (validada no runtime. Tem de ser fornecido quando não está a utilizar query_input)
  • source_property (Opcional[str]): Nome da propriedade para filtrar nós de origem
  • participating_source_node_labels (Opcional[Lista[str]]): Etiquetas de nó a considerar como origens
  • target_property (Opcional[str]): Nome da propriedade para filtrar nós de destino
  • participating_target_node_labels (Opcional[Lista[str]]): Etiquetas de nó a considerar como destinos
  • participating_edge_labels (Opcional[Lista[str]]): Etiquetas do Edge a percorrer
  • is_directional (bool): se as arestas são direcionais (predefinição: True)
  • min_hop_count (int): Saltos mínimos (predefinição: 1)
  • max_hop_count (int): Máximo de saltos (predefinição: 4)
  • shortest_path (bool): devolver apenas os caminhos mais curtos (predefinição: False)
  • max_results (int): Máximo de resultados (predefinição: 500)

Gera:

  • ValueError: se source_property_value ou target_property_value estiver em falta, min_hop_count < 1, max_hop_count < min_hop_countou max_results < 1
  • RuntimeError: se a inicialização do cliente ou a execução da consulta falhar

Devolve:

  • QueryResult: que contém os caminhos de acessibilidade

Exemplo:

result = graph.reachability(
    source_property_value="user-001",
    target_property_value="device-003")
result.show()

k_hop

def k_hop(
    *,
    source_property: Optional[str] = None,
    source_property_value: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    target_property_value: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Observação

k_hop(query_input=K_HopQueryInput(...)) ainda é aceite, mas emite DeprecationWarning e será removido numa versão futura.

Efetue a análise de k-hop a partir de um determinado nó de origem.

Parâmetros:

Validação:

  • Pelo menos um ou tem de source_property_valuetarget_property_value ser fornecido

Gera:

  • ValueError: se não for fornecido source_property_value nem target_property_value for fornecido, ou se as restrições numéricas forem violadas (igual reachabilitya )
  • RuntimeError: se a inicialização do cliente ou a execução da consulta falhar

Devolve:

  • QueryResult: contendo os resultados de k-hop

Exemplo:

result = graph.k_hop(source_property_value="user-001")
result.show()

blast_radius

def blast_radius(
    *,
    source_property_value: str = None,
    target_property_value: str = None,
    source_property: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Observação

blast_radius(query_input=BlastRadiusQueryInput(...)) ainda é aceite, mas emite DeprecationWarning e será removido numa versão futura.

Efetue a análise do raio de explosão do nó de origem para o nó de destino.

Parâmetros:

  • source_property_value (str): valor que identifica o nó de origem (validado no runtime. Tem de ser fornecido quando não está a utilizar query_input)
  • target_property_value (str): valor que identifica o nó de destino (validado no runtime. Tem de ser fornecido quando não está a utilizar query_input)
  • Outros parâmetros: o mesmo que reachability

Gera:

  • ValueError: se source_property_value ou target_property_value estiver em falta, ou se as restrições numéricas forem violadas (igual reachabilitya )
  • RuntimeError: se a inicialização do cliente ou a execução da consulta falhar

Devolve:

  • QueryResult: contendo os resultados do raio de explosão

Exemplo:

result = graph.blast_radius(
    source_property_value="user-003",
    target_property_value="device-003",
    min_hop_count=1)
result.show()

centrality

def centrality(
    *,
    participating_source_node_labels: Optional[List[str]] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    threshold: int = 3,
    centrality_type: CentralityType = None,
    max_paths: int = 1000000,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Observação

centrality(query_input=CentralityQueryInput(...)) ainda é aceite, mas emite DeprecationWarning e será removido numa versão futura.

Efetue uma análise de centralidade no gráfico.

Parâmetros:

  • participating_source_node_labels (Opcional[Lista[str]]): Etiquetas de nó de origem
  • participating_target_node_labels (Opcional[Lista[str]]): Etiquetas de nó de destino
  • participating_edge_labels (Opcional[Lista[str]]): Etiquetas do Edge a percorrer
  • threshold (int): pontuação de centralidade mínima (predefinição: 3); tem de ser não negativo
  • centrality_type (CentralityType): CentralityType.Node ou CentralityType.Edge (predefinição: None, volta a CentralityType.Node)
  • max_paths (int): os caminhos máximos a considerar (predefinição: 1000000; 0 = todos os caminhos); têm de ser não negativos
  • is_directional (bool): se as arestas são direcionais (predefinição: True)
  • min_hop_count (int): Saltos mínimos (predefinição: 1); tem de ser ≥ 1
  • max_hop_count (int): Máximo de saltos (predefinição: 4); tem de ser ≥ min_hop_count
  • shortest_path (bool): devolver apenas os caminhos mais curtos (predefinição: False)
  • max_results (int): Máximo de resultados (predefinição: 500); tem de ser ≥ 1

Gera:

  • ValueError: Se threshold < 0, max_paths < 0, min_hop_count < 1, max_hop_count < min_hop_countou max_results < 1
  • RuntimeError: se a inicialização do cliente ou a execução da consulta falhar

Devolve:

  • QueryResult: contendo as métricas de centralidade

Exemplo:

result = graph.centrality(
    participating_source_node_labels=["user", "device"],
    participating_target_node_labels=["device", "user"],
    participating_edge_labels=["sign_in"],
    is_directional=False)
result.show()

ranked

def ranked(
    *,
    rank_property_name: str = None,
    threshold: int = 0,
    max_paths: int = 1000000,
    decay_factor: float = 1,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Observação

ranked(query_input=RankedQueryInput(...)) ainda é aceite, mas emite DeprecationWarning e será removido numa versão futura.

Efetue uma análise classificada no gráfico.

Parâmetros:

  • rank_property_name (str): nome da propriedade a utilizar para classificação (validado no runtime. Tem de ser fornecido quando não está a utilizar query_input)
  • threshold (int): apenas os caminhos devolvidos acima deste peso (predefinição: 0); têm de ser não negativos
  • max_paths (int): os caminhos máximos a considerar (predefinição: 1000000; 0 = todos os caminhos); têm de ser não negativos
  • decay_factor (float): Classificação da decadência por passo; 2 significa reduzir para metade (predefinição: 1); tem de ser não negativo
  • is_directional (bool): se as arestas são direcionais (predefinição: True)
  • min_hop_count (int): Saltos mínimos (predefinição: 1); tem de ser ≥ 1
  • max_hop_count (int): Máximo de saltos (predefinição: 4); tem de ser ≥ min_hop_count
  • shortest_path (bool): devolver apenas os caminhos mais curtos (predefinição: False)
  • max_results (int): Máximo de resultados (predefinição: 500); tem de ser ≥ 1

Gera:

  • ValueError: se rank_property_name estiver em falta, threshold < 0, max_paths < 0, decay_factor < 0, min_hop_count < 1, max_hop_count < min_hop_countou max_results < 1
  • RuntimeError: se a inicialização do cliente ou a execução da consulta falhar

Devolve:

  • QueryResult: contendo os nós/arestas classificados

Exemplo:

result = graph.ranked(
    rank_property_name="risk_score",
    threshold=5,
    decay_factor=2)
result.show()

to_graphframe

def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame

Converta todo o gráfico em GraphFrame. Utiliza dados de especificações quando disponíveis; lê a partir de tabelas lake, caso contrário.

Parâmetros:

  • column_mapping (Dict[str, str], opcional): Mapeamento de colunas personalizado

Devolve:

  • GraphFrame: objeto GraphFrame com todos os vértices e arestas

Exemplo:

gf = graph.to_graphframe()

show

def show() -> None

Apresentar informações do gráfico. Delega a para spec.show() apresentação avançada quando uma especificação é anexada; imprime informações mínimas caso contrário.

show_schema

def show_schema() -> None

Apresentar esquema de gráfico. Delega quando spec.show_schema() uma especificação é anexada; imprime uma mensagem a indicar que não existe nenhum esquema disponível.

publish (novo na v0.3.3)

def publish() -> Graph

Registe o gráfico com a API, tornando-o que pode ser consultado. Chame esta opção depois Graph.prepare() (ou em qualquer Graph uma que tenha uma especificação anexada) para publicar a instância do gráfico.

Devolve:

  • Graph: Auto para encadeamento de métodos

Gera:

  • ValueError: se nenhuma especificação estiver anexada ou o contexto estiver em falta
  • RuntimeError: se a publicação falhar

Exemplo:

graph = Graph.prepare(spec)
graph.publish()
# Now the graph is queryable
graph.query("MATCH (n) RETURN n")

BuildStatus

Classe de dados que transporta metadados de uma Graph.build() operação.

Campos

Campo Tipo Descrição
etl_result Any Resultado da prepare fase (execução do pipeline de dados)
api_result Optional[Dict] Resultado da fase de publicação (None se a publicação tiver falhado)
api_error Optional[str] Mensagem de erro se a publicação tiver falhado (None se a publicação tiver sido bem-sucedida)
instance_name str Nome da instância do gráfico
status Optional[BuildStatusKind] None, "published" ou "prepared"

Caminhos de Construção

GraphSpecBuilder.start(...).done()  →  GraphSpec             (spec only, no graph yet)
Graph.get(name, context)            →  Graph (spec=None, build_status=None)
Graph.prepare(spec)                 →  Graph (spec=spec, build_status.status="prepared")
graph.publish()                     →  Graph (build_status.status="published")
Graph.build(spec)                   →  Graph (prepare + publish in one step)

Exemplo:

graph = Graph.build(spec)
if graph.build_status.status == "published":
    print("Graph prepared and published successfully")
elif graph.build_status.status == "prepared":
    print(f"Prepare succeeded but publish failed: {graph.build_status.api_error}")
elif graph.build_status.status is None:
    print("Neither prepare nor publish has run")

Construtores de Nós

NodeBuilderInitial

Estado inicial do construtor de nós: apenas métodos de origem de dados disponíveis.

Construtor

NodeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)

Nota: Normalmente criado através de GraphSpecBuilder.add_node(), não foi instanciado diretamente.

Métodos

Observação

A utilização de carateres _ de sublinhado ao atribuir nomes a nós, arestas ou propriedades num gráfico personalizado não é suportada. É devolvido um erro de pedido inválido quando são utilizados carateres de sublinhado.

from_table
def from_table(table_name: str, database: Optional[str] = None) -> NodeBuilderSourceSet

Defina a tabela como origem de dados com resolução inteligente da base de dados.

Parâmetros:

  • table_name (str): nome da tabela (obrigatório)
  • database (str, opcional): nome de base de dados explícito (tem precedência sobre a predefinição de contexto)

Devolve:

  • NodeBuilderSourceSet: Builder para configuração adicional

Gera:

  • ValueError: se a tabela não for encontrada ou forem encontradas várias tabelas em conflito

Ordem de Resolução da Base de Dados:

  1. Parâmetro explícito database (precedência mais alta)
  2. ExecutionContext.default_database
  3. Procurar em todas as bases de dados (com deteção de conflitos)

Exemplo:

builder.add_node("user").from_table("SigninLogs", database="security_db")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> NodeBuilderSourceSet

Defina o DataFrame do Spark como origem de dados.

Parâmetros:

  • dataframe (DataFrame): DataFrame do Spark

Devolve:

  • NodeBuilderSourceSet: Builder para configuração adicional

Exemplo:

df = spark.read.table("users")
builder.add_node("user").from_dataframe(df)

NodeBuilderSourceSet

Construtor de nós após a origem de dados ser definida: métodos de configuração disponíveis.

Construtor

NodeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)

Nota: Criado internamente pelos métodos de origem NodeBuilderInitial.

Métodos

with_time_range
def with_time_range(
    time_column: str,
    start_time: Optional[Union[str, datetime]] = None,
    end_time: Optional[Union[str, datetime]] = None,
    lookback_hours: Optional[float] = None
) -> NodeBuilderSourceSet

Aplique a filtragem do intervalo de tempo à origem de dados do nó.

Parâmetros:

  • time_column (str): Nome da coluna que contém dados de carimbo de data/hora (obrigatório)
  • start_time (str ou datetime, opcional): Data de início ("20/10/25", "2025-10-20" ou objeto datetime)
  • end_time (str ou datetime, opcional): Data de fim (os mesmos formatos que start_time)
  • lookback_hours (flutuante, opcional): Horas para olhar para trás a partir de agora

Devolve:

  • NodeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se a coluna de hora não for encontrada no esquema de origem

Lógica de Intervalo de Tempo:

  1. Se start_time e end_time fornecidos: utilize-os diretamente
  2. Se apenas lookback_hours fornecido: end=now, start=now-lookback_hours
  3. Se nada for fornecido: sem filtragem de tempo
  4. Se início/fim E lookback_hours: início/fim têm precedência

Exemplo:

# Explicit date range
builder.add_node("user").from_table("SigninLogs") \
    .with_time_range(time_column="TimeGenerated", start_time="2025-01-01", end_time="2025-01-31")

# Lookback window
builder.add_node("user").from_table("SigninLogs") \
    .with_time_range(time_column="TimeGenerated", lookback_hours=24)
with_label
def with_label(label: str) -> NodeBuilderSourceSet

Defina a etiqueta do nó (predefinição para alias, se não for chamada).

Parâmetros:

  • label (str): etiqueta de nó

Devolve:

  • NodeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se a etiqueta já estiver definida

Exemplo:

builder.add_node("u").from_table("Users").with_label("user")
with_columns
def with_columns(
    *columns: str,
    key: str,
    display: str
) -> NodeBuilderSourceSet

Configure colunas com a chave necessária e designação de apresentação.

Parâmetros:

  • *columns (str): nomes de coluna a incluir (pelo menos um necessário)
  • key (str): Nome da coluna para marcar como chave (obrigatório, tem de estar em colunas)
  • display (str): Nome da coluna para marcar como valor a apresentar (obrigatório, tem de estar em colunas, pode ser igual a chave)

Devolve:

  • NodeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se a validação falhar (colunas duplicadas, tecla/apresentação em falta, etc.)

Observações:

  • As propriedades são criadas automaticamente a partir de tipos de coluna

  • A coluna filtro de tempo é adicionada automaticamente se especificada

  • Os tipos de propriedade são inferidos automaticamente a partir do esquema de origem

  • Veja Restrições

Exemplo:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", "email", "created_at", key="id", display="name")
add_node
def add_node(alias: str) -> NodeBuilderInitial

Conclua este nó e comece a criar outro nó.

Parâmetros:

  • alias (str): Alias para o novo nó

Devolve:

  • NodeBuilderInitial: Novo construtor de nós

Exemplo:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_node("device")
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial

Conclua este nó e comece a criar uma aresta.

Parâmetros:

  • alias (str): Alias for the edge

Devolve:

  • EdgeBuilderInitial: Novo edge builder

Exemplo:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_edge("accessed")
done
def done() -> GraphSpec

Finalize este nó e conclua a especificação do gráfico.

Devolve:

  • GraphSpec: Especificação completa do gráfico

Exemplo:

graph_spec = builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .done()

Edge Builders

EdgeBuilderInitial

Estado inicial do edge builder: apenas métodos de origem de dados disponíveis.

Construtor

EdgeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)

Nota: Normalmente criado através de GraphSpecBuilder.add_edge(), não foi instanciado diretamente.

Métodos

Observação

A utilização de carateres _ de sublinhado ao atribuir nomes a nós, arestas ou propriedades num gráfico personalizado não é suportada. É devolvido um erro de pedido inválido quando são utilizados carateres de sublinhado.

from_table
def from_table(table_name: str, database: Optional[str] = None) -> EdgeBuilderSourceSet

Defina a tabela como origem de dados com resolução inteligente da base de dados.

Parâmetros:

  • table_name (str): nome da tabela (obrigatório)
  • database (str, opcional): Nome explícito da base de dados

Devolve:

  • EdgeBuilderSourceSet: Builder para configuração adicional

Gera:

  • ValueError: se a tabela não for encontrada ou forem encontradas várias tabelas em conflito

Exemplo:

builder.add_edge("accessed").from_table("AccessLogs")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> EdgeBuilderSourceSet

Defina o DataFrame do Spark como origem de dados.

Parâmetros:

  • dataframe (DataFrame): DataFrame do Spark

Devolve:

  • EdgeBuilderSourceSet: Builder para configuração adicional

Exemplo:

df = spark.read.table("access_logs")
builder.add_edge("accessed").from_dataframe(df)

EdgeBuilderSourceSet

Construtor do Edge após a origem de dados ser definida: métodos de configuração disponíveis.

Construtor

EdgeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)

Nota: Criado internamente pelos métodos de origem EdgeBuilderInitial.

Métodos

with_label
def with_label(label: str) -> EdgeBuilderSourceSet

Definir tipo/etiqueta de relação de limite (predefinição para alias se não for chamado).

Parâmetros:

  • label (str): etiqueta edge

Devolve:

  • EdgeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se a etiqueta já estiver definida

Exemplo:

builder.add_edge("rel").from_table("AccessLogs").with_label("ACCESSED")
edge_label

Observação

Em vez disso, utilize with_label() . Este método será removido numa versão futura.

def edge_label(label: str) -> EdgeBuilderSourceSet

Definir tipo/etiqueta de relação de limite (predefinição para alias se não for chamado).

Parâmetros:

  • label (str): etiqueta edge

Devolve:

  • EdgeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se a etiqueta já estiver definida

Exemplo:

builder.add_edge("acc").from_table("AccessLogs").edge_label("accessed")
source
def source(id_column: str, node_type: str) -> EdgeBuilderSourceSet

Defina o nó de origem com a coluna e a etiqueta do ID.

Parâmetros:

  • id_column (str): Nome da coluna que contém o ID do nó de origem
  • node_type (str): etiqueta do nó de origem

Devolve:

  • EdgeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se a origem já estiver definida

Exemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user")
target
def target(id_column: str, node_type: str) -> EdgeBuilderSourceSet

Defina o nó de destino com a coluna e a etiqueta do ID.

Parâmetros:

  • id_column (str): Nome da coluna que contém o ID do nó de destino
  • node_type (str): etiqueta do nó de destino

Devolve:

  • EdgeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se o destino já estiver definido

Exemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device")
with_time_range
def with_time_range(
    time_column: str,
    start_time: Optional[Union[str, datetime]] = None,
    end_time: Optional[Union[str, datetime]] = None,
    lookback_hours: Optional[float] = None
) -> EdgeBuilderSourceSet

Aplique a filtragem do intervalo de tempo à origem de dados do edge.

Parâmetros:

  • time_column (str): Nome da coluna que contém dados de carimbo de data/hora (obrigatório)
  • start_time (str ou datetime, opcional): Data de início
  • end_time (str ou datetime, opcional): Data de fim
  • lookback_hours (flutuante, opcional): Horas para olhar para trás a partir de agora

Devolve:

  • EdgeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se a coluna de hora não for encontrada no esquema de origem

Exemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .with_time_range(time_column="TimeGenerated", lookback_hours=48)
with_columns
def with_columns(
    *columns: str,
    key: str,
    display: str
) -> EdgeBuilderSourceSet

Configure colunas com a chave necessária e designação de apresentação.

Parâmetros:

  • *columns (str): nomes de coluna a incluir (pelo menos um necessário)
  • key (str): Nome da coluna para marcar como chave (obrigatório, tem de estar em colunas)
  • display (str): Nome da coluna para marcar como valor de apresentação (obrigatório, tem de estar em colunas)

Devolve:

  • EdgeBuilderSourceSet: Auto para encadeamento de métodos

Gera:

  • ValueError: se a validação falhar

Exemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device") \
    .with_columns("id", "location", "status", key="id", display="location")
add_node
def add_node(alias: str) -> NodeBuilderInitial

Conclua este limite e comece a criar um nó.

Parâmetros:

  • alias (str): Alias para o novo nó

Devolve:

  • NodeBuilderInitial: Novo construtor de nós
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial

Conclua esta aresta e comece a criar outra aresta.

Parâmetros:

  • alias (str): Alias para a nova aresta

Devolve:

  • EdgeBuilderInitial: Novo edge builder

Exemplo:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device") \
    .with_columns("id", "location", key="id", display="location") \
    .add_edge("connected_to")
done
def done() -> GraphSpec

Finalize este limite e conclua a especificação do gráfico.

Devolve:

  • GraphSpec: Especificação completa do gráfico

Classes de Esquema

GraphDefinitionReference

Referência a uma definição de gráfico com nome e versão.

Construtor

GraphDefinitionReference(
    fully_qualified_name: str,
    version: str
)

Parâmetros:

  • fully_qualified_name (str): nome completamente qualificado do gráfico referenciado
  • version (str): Versão do gráfico referenciado

Gera:

  • ValueError: se fully_qualified_name ou versão estiver vazia

Métodos

to_dict
def to_dict() -> Dict[str, Any]

Serialize para dicionário.

Devolve:

  • Dict[str, Any]: Referência serializada

Propriedade

Definição de propriedade com interface segura para tipos.

Construtor

Property(
    name: str,
    property_type: PropertyType,
    is_non_null: bool = False,
    description: str = "",
    is_key: bool = False,
    is_display_value: bool = False,
    is_internal: bool = False
)

Parâmetros:

  • name (str): Nome da propriedade
  • property_type (PropertyType): tipo de dados de propriedade
  • is_non_null (bool, default=False): Se a propriedade é necessária
  • description (str, default=""): Descrição da propriedade
  • is_key (bool, default=False): Se a propriedade é uma chave
  • is_display_value (bool, default=False): Se a propriedade é o valor de apresentação
  • is_internal (bool, default=False): Se a propriedade é interna

Gera:

  • ValueError: se o nome estiver vazio ou a validação falhar

Métodos de Classe

key
@classmethod
Property.key(
    name: str,
    property_type: PropertyType,
    description: str = "",
    is_non_null: bool = False
) -> Property

Crie uma propriedade de chave com definições comuns (is_key=Verdadeiro, is_display_value=Verdadeiro).

display
@classmethod
Property.display(
    name: str,
    property_type: PropertyType,
    description: str = "",
    is_non_null: bool = False
) -> Property

Crie uma propriedade de valor de apresentação (is_display_value=Verdadeiro).

Métodos

describe
def describe(text: str) -> Property

Adicione a descrição fluentemente.

Parâmetros:

  • text (str): Texto de descrição

Devolve:

  • Property: Auto para encadeamento de métodos
to_dict
def to_dict() -> Dict[str, Any]

Serialize a propriedade para o dicionário com chaves de anotação com @prefixo.

Devolve:

  • Dict[str, Any]: Propriedade serializada
to_gql
def to_gql() -> str

Gerar definição de propriedade GQL.

Devolve:

  • str: representação da cadeia GQL

EdgeNode

Referência de nó utilizada nas definições de limite.

Construtor

EdgeNode(
    alias: Optional[str] = None,
    labels: List[str] = []
)

Parâmetros:

  • alias (str, opcional): alias de nó (definido automaticamente para a primeira etiqueta se Nenhum ou vazio)
  • labels (Lista[str]): etiquetas de nó (pelo menos uma necessária)

Gera:

  • ValueError: se a lista de etiquetas estiver vazia
  • TypeError: se as etiquetas não forem cadeias

Mutação automática:

  • Se o alias for Nenhum ou estiver vazio, será definido como a primeira etiqueta

Métodos

to_dict
def to_dict() -> Dict[str, Any]

Serialize para dicionário.

Devolve:

  • Dict[str, Any]: Referência de nó periférico serializado

Definição de nó com interface segura para tipos.

Construtor

Node(
    alias: str = "",
    labels: List[str] = [],
    implies_labels: List[str] = [],
    properties: List[Property] = [],
    description: str = "",
    entity_group: str = "",
    dynamic_labels: bool = False,
    abstract_edge_aliases: bool = False
)

Parâmetros:

  • alias (str, default=""): alias de nó (definido automaticamente para a primeira etiqueta se estiver vazio)
  • labels (Lista[str]): etiquetas de nó (pelo menos uma necessária)
  • implies_labels (Lista[str], default=[]): Etiquetas implícitas
  • properties (Lista[Propriedade], predefinição=[]): Propriedades do nó
  • description (str, default=""): Descrição do nó
  • entity_group (str, default=""): Nome do grupo de entidades
  • dynamic_labels (bool, default=False): se o nó tem etiquetas dinâmicas
  • abstract_edge_aliases (bool, default=False): se o nó utiliza aliases abstratos

Gera:

  • ValueError: se a validação falhar (sem etiquetas, sem propriedade de chave, sem propriedade de apresentação, etc.)

Mutação automática:

  • Se o alias estiver vazio, será definido como a primeira etiqueta
  • Se entity_group estiver vazio, está definido como a etiqueta primária

Métodos

get_primary_label
def get_primary_label() -> Optional[str]

Obtenha a etiqueta primária (primeira).

Devolve:

  • str ou None: Etiqueta primária ou Nenhuma se não existirem etiquetas
get_entity_group_name
def get_entity_group_name() -> str

Obter o nome ou contingência do grupo de entidades para a etiqueta primária.

Devolve:

  • str: Nome do grupo de entidades
get_primary_key_property_name
def get_primary_key_property_name() -> Optional[str]

Obtenha o nome da propriedade da chave primária.

Devolve:

  • str ou None: nome da propriedade da chave primária
get_properties
def get_properties() -> Dict[str, Property]

Obtenha propriedades como dicionário para facilitar o acesso.

Devolve:

  • Dict[str, Property]: Propriedades com chave por nome
get_property
def get_property(name: str) -> Optional[Property]

Obtenha uma propriedade específica por nome.

Parâmetros:

  • name (str): Nome da propriedade

Devolve:

  • Property ou None: Propriedade se for encontrada
add_property
def add_property(prop: Property) -> None

Adicione uma propriedade a este nó.

Parâmetros:

  • prop (Propriedade): Propriedade a adicionar

Gera:

  • ValueError: se o nome da propriedade for duplicado
is_dynamically_labeled
def is_dynamically_labeled() -> bool

Verifique se o nó tem etiquetas dinâmicas.

Devolve:

  • bool: Verdadeiro se as etiquetas dinâmicas estiverem ativadas
is_abstract_edge_node_aliases
def is_abstract_edge_node_aliases() -> bool

Verifique se o nó utiliza aliases de nós de extremidade abstratos.

Devolve:

  • bool: Verdadeiro se os aliases abstratos do edge estiverem ativados
describe
def describe(text: str) -> Node

Adicione a descrição fluentemente.

Parâmetros:

  • text (str): Texto de descrição

Devolve:

  • Node: Auto para encadeamento de métodos
to_dict
def to_dict() -> Dict[str, Any]

Serializar o nó para o dicionário.

Devolve:

  • Dict[str, Any]: nó serializado
to_gql
def to_gql() -> str

Gerar definição de nó GQL.

Devolve:

  • str: representação da cadeia GQL

Gera:

  • ValueError: se faltar campos necessários para o GQL no nó

Métodos de Classe

create
@classmethod
Node.create(
    alias: str,
    labels: List[str],
    properties: List[Property],
    description: str = "",
    entity_group: str = "",
    **kwargs
) -> Node

Crie um nó com todos os campos necessários.

Parâmetros:

  • alias (str): alias de nó
  • labels (List[str]): Node labels (Etiqueta
  • properties (Lista[Propriedade]): Propriedades do nó
  • description (str, default=""): Descrição do nó
  • entity_group (str, default=""): Nome do grupo de entidades

Devolve:

  • Node: Nova instância de nó

Borda

Definição de limite com interface segura para tipos.

Construtor

Edge(
    relationship_type: str,
    source_node_label: str,
    target_node_label: str,
    direction: EdgeDirection = EdgeDirection.DIRECTED_RIGHT,
    properties: List[Property] = [],
    description: str = "",
    entity_group: str = "",
    dynamic_type: bool = False
)

Parâmetros:

  • relationship_type (str): tipo de relação edge (por exemplo, "FOLLOWS", "OWNS")
  • source_node_label (str): etiqueta do nó de origem
  • target_node_label (str): etiqueta do nó de destino
  • direction (EdgeDirection, default=DIRECTED_RIGHT): Direção do edge
  • properties (Lista[Propriedade], predefinição=[]): propriedades do Edge
  • description (str, default=""): Descrição do Edge
  • entity_group (str, default=""): Nome do grupo de entidades
  • dynamic_type (bool, default=False): se o edge tem um tipo dinâmico

Gera:

  • ValueError: se a validação falhar

Mutação automática:

  • labels a lista é preenchida automaticamente com [relationship_type]
  • Se entity_group estiver vazio, está definido como relationship_type

Propriedades

edge_type
def edge_type() -> str

Alias de retrocompatibilidade para relationship_type.

Devolve:

  • str: Tipo de relação

Métodos

get_entity_group_name
def get_entity_group_name() -> str

Obter o nome ou contingência do grupo de entidades para o tipo de relação.

Devolve:

  • str: Nome do grupo de entidades
is_dynamic_type
def is_dynamic_type() -> bool

Verifique se o edge tem um tipo dinâmico.

Devolve:

  • bool: Verdadeiro se tipo dinâmico
add_property
def add_property(edge_property: Property) -> None

Adicione uma propriedade a este limite.

Parâmetros:

  • edge_property (Propriedade): Propriedade a adicionar
describe
def describe(text: str) -> Edge

Adicione a descrição fluentemente.

Parâmetros:

  • text (str): Texto de descrição

Devolve:

  • Edge: Auto para encadeamento de métodos
to_dict
def to_dict() -> Dict[str, Any]

Serializar o limite para o dicionário.

Devolve:

  • Dict[str, Any]: Edge serializado
to_gql
def to_gql() -> str

Gerar a definição de aresta GQL.

Devolve:

  • str: representação da cadeia GQL

Métodos de Classe

create
Edge.create(
    relationship_type: str,
    source_node_label: str,
    target_node_label: str,
    properties: List[Property] = None,
    description: str = "",
    entity_group: str = "",
    **kwargs
) -> Edge

Crie uma aresta com todos os campos necessários.

Parâmetros:

  • relationship_type (str): tipo de relação edge
  • source_node_label (str): etiqueta do nó de origem
  • target_node_label (str): etiqueta do nó de destino
  • properties (Lista[Propriedade], opcional): propriedades do Edge
  • description (str, default=""): Descrição do Edge
  • entity_group (str, default=""): Nome do grupo de entidades

Devolve:

  • Edge: Nova instância edge

GraphSchema

Definição de esquema de gráfico com interface segura para tipos.

Construtor

GraphSchema(
    name: str,
    nodes: List[Node] = [],
    edges: List[Edge] = [],
    base_graphs: List[GraphSchema] = [],
    description: str = "",
    version: str = "1.0",
    fully_qualified_name: str = "",
    namespace: str = ""
)

Parâmetros:

  • name (str): Graph schema name (Nome do esquema do gráfico)
  • nodes (List[Node], default=[]): Node definitions
  • edges (Lista[Edge], default=[]): Definições do Edge
  • base_graphs (List[GraphSchema], default=[]): Base graph schemas
  • description (str, default=""): Descrição do esquema
  • version (str, default="1.0"): versão do esquema
  • fully_qualified_name (str, default=""): Nome completamente qualificado
  • namespace (str, default=""): Espaço de Nomes

Gera:

  • ValueError: se a validação falhar (aliases duplicados, os limites referenciam nós inexistentes, etc.)

Métodos

get_fully_qualified_name
def get_fully_qualified_name() -> str

Obtenha um nome completamente qualificado.

Devolve:

  • str: nome completamente qualificado
get_namespace
def get_namespace() -> str

Obtenha o espaço de nomes a partir de um nome completamente qualificado ou devolva a predefinição.

Devolve:

  • str: Espaço de nomes
get_version
def get_version() -> str

Obter versão.

Devolve:

  • str: Cadeia de versão
get_node
def get_node(label_or_alias: str) -> Optional[Node]

Obter nó por etiqueta ou alias.

Parâmetros:

  • label_or_alias (str): etiqueta ou alias de nó

Devolve:

  • Node ou None: Nó se for encontrado
get_edge
def get_edge(name: str) -> Optional[Edge]

Obter limite por nome/tipo.

Parâmetros:

  • name (str): tipo de relação edge

Devolve:

  • Edge ou None: Edge, se encontrado
add_node
def add_node(node: Node) -> None

Adicione um nó a este gráfico.

Parâmetros:

  • node (Nó): Nó a adicionar

Gera:

  • ValueError: se o alias do nó for duplicado
add_edge
def add_edge(edge: Edge) -> None

Adicione uma margem a este gráfico.

Parâmetros:

  • edge (Edge): Edge a adicionar

Gera:

  • ValueError: se o tipo de limite for duplicado
include_graph
def include_graph(fully_qualified_name: str, version: str) -> GraphSchema

Adicione um grafo include (Fluent API).

Parâmetros:

  • fully_qualified_name (str): nome completamente qualificado do gráfico a incluir
  • version (str): Versão do gráfico a incluir

Devolve:

  • GraphSchema: Auto para encadeamento de métodos
get_included_graph_references
def get_included_graph_references() -> List[GraphDefinitionReference]

Obter lista de referências de grafos incluídas.

Devolve:

  • List[GraphDefinitionReference]: Lista de referências de definições de gráficos
describe
def describe(text: str) -> GraphSchema

Adicione a descrição fluentemente.

Parâmetros:

  • text (str): Texto de descrição

Devolve:

  • GraphSchema: Auto para encadeamento de métodos
to_dict
def to_dict() -> Dict[str, Any]

Serialize o esquema para o dicionário.

Devolve:

  • Dict[str, Any]: Esquema serializado
to_json
def to_json(indent: int = 2) -> str

Gerar representação JSON.

Parâmetros:

  • indent (int, default=2): nível de avanço JSON

Devolve:

  • str: cadeia JSON
to_gql
def to_gql() -> str

Gerar definição de esquema GQL.

Devolve:

  • str: representação da cadeia GQL

Métodos de Classe

create
@classmethod
GraphSchema.create(
    name: str,
    nodes: List[Node] = None,
    edges: List[Edge] = None,
    description: str = "",
    version: str = "1.0",
    **kwargs
) -> GraphSchema

Crie um esquema de gráfico com todos os campos necessários.

Parâmetros:

  • name (str): Graph schema name (Nome do esquema do gráfico)
  • nodes (Lista[Nó], opcional): Definições de nós
  • edges (Lista[Edge], opcional): Definições do Edge
  • description (str, default=""): Descrição do esquema
  • version (str, default="1.0"): versão do esquema

Devolve:

  • GraphSchema: Nova instância de esquema de gráfico

Classes de Entrada de Consulta

Classes de dados que representam parâmetros de entrada para consultas de grafos predefinidas.

Observação

A transmissão QueryInput de objetos diretamente para Graph métodos de consulta foi preterida e será removida em versões futuras. Em alternativa, utilize palavra-chave argumentos. Os Graph métodos (reachability, , blast_radiusk_hop, centrality, ranked) aceitam todos os parâmetros como argumentos palavra-chave e constroem os objetos de entrada internamente. Estas classes permanecem na base de código por enquanto, mas não devem ser utilizadas no novo código.

QueryInputBase

Classe base para todos os parâmetros de entrada de consulta.

Métodos

to_json_payload
def to_json_payload() -> Dict[str, Any]

Converta os parâmetros de entrada num dicionário para submissão de API.

Devolve:

  • Dict[str, Any]: representação do dicionário dos parâmetros de entrada
validate
def validate() -> None

Valide os parâmetros de entrada.

Gera:

  • ValueError: se os parâmetros de entrada forem inválidos

ReachabilityQueryInput

Parâmetros de entrada para uma consulta de acessibilidade entre os nós de origem e de destino. Herda do ReachabilityQueryInputBase qual herda de QueryInputBase.

Campos

Campo Tipo Padrão Descrição
source_property_value str (obrigatório) Valor a corresponder para a propriedade de origem
target_property_value str (obrigatório) Valor a corresponder para a propriedade de destino
source_property Optional[str] None Nome da propriedade para filtrar nós de origem
participating_source_node_labels Optional[List[str]] None Etiquetas de nós a considerar como nós de origem
target_property Optional[str] None Nome da propriedade para filtrar nós de destino
participating_target_node_labels Optional[List[str]] None Etiquetas de nós a considerar como nós de destino
participating_edge_labels Optional[List[str]] None Etiquetas edge a percorrer no caminho
is_directional Optional[bool] True Se as arestas são direcionais
min_hop_count Optional[int] 1 Número mínimo de saltos no caminho
max_hop_count Optional[int] 4 Número máximo de saltos no caminho
shortest_path Optional[bool] False Se pretende encontrar apenas o caminho mais curto
max_results Optional[int] 500 Número máximo de resultados a devolver

Validação:

  • source_property_value é necessário
  • target_property_value é necessário

Exemplo:

# Preferred: keyword arguments (no import needed)
result = graph.reachability(
    source_property="UserId",
    source_property_value="user123",
    target_property="DeviceId",
    target_property_value="device456",
    participating_edge_labels=["accessed", "connected_to"],
    shortest_path=True
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import ReachabilityQueryInput
result = graph.reachability(query_input=ReachabilityQueryInput(
    source_property_value="user123",
    target_property_value="device456"
))

K_HopQueryInput

Parâmetros de entrada para uma consulta k-hop a partir de um determinado nó de origem. Herda de ReachabilityQueryInputBase.

Herda todos os campos de ReachabilityQueryInput.

Validação:

  • Pelo menos um ou tem de source_property_valuetarget_property_value ser fornecido

Exemplo:

# Preferred: keyword arguments
result = graph.k_hop(
    source_property_value="user123",
    max_hop_count=3,
    participating_edge_labels=["accessed"]
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import K_HopQueryInput
result = graph.k_hop(query_input=K_HopQueryInput(source_property_value="user123"))

BlastRadiusQueryInput

Parâmetros de entrada para uma consulta de raio de explosão de origem para nós de destino. Herda de ReachabilityQueryInputBase.

Herda todos os campos de ReachabilityQueryInput, com os seguintes campos necessários:

Campo Tipo Obrigatório Descrição
source_property_value str Sim Valor para identificar o nó de origem
target_property_value str Sim Valor para identificar o nó de destino

Validação:

  • source_property_value é necessário
  • target_property_value é necessário

Exemplo:

# Preferred: keyword arguments
result = graph.blast_radius(
    source_property_value="user123",
    target_property_value="device456",
    participating_edge_labels=["accessed", "connected_to"]
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import BlastRadiusQueryInput
result = graph.blast_radius(query_input=BlastRadiusQueryInput(
    source_property_value="user123",
    target_property_value="device456"
))

CentralityQueryInput

Parâmetros de entrada para uma consulta de análise de centralidade. Herda de QueryInputBase.

Enumeração CentralityType

Valor Descrição
CentralityType.Node Centralidade do nó de computação
CentralityType.Edge Centralidade do edge de computação

Campos

Campo Tipo Padrão Descrição
threshold Optional[int] 3 Classificação de centralidade mínima a considerar
centrality_type CentralityType CentralityType.Node Tipo de centralidade para computação
max_paths Optional[int] 1000000 Caminhos máximos a considerar (0 = todos)
participating_source_node_labels Optional[List[str]] None Etiquetas de nó de origem
participating_target_node_labels Optional[List[str]] None Etiquetas de nó de destino
participating_edge_labels Optional[List[str]] None Etiquetas edge a percorrer
is_directional Optional[bool] True Se as arestas são direcionais
min_hop_count Optional[int] 1 Saltos mínimos
max_hop_count Optional[int] 4 Máximo de saltos
shortest_path Optional[bool] False Apenas os caminhos mais curtos
max_results Optional[int] 500 Máximo de resultados

Exemplo:

# Preferred: keyword arguments (works for all centrality types)
result = graph.centrality(
    centrality_type=CentralityType.Edge,  # or CentralityType.Node (default)
    participating_edge_labels=["accessed", "connected_to"],
    threshold=5,
    max_results=100
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import CentralityQueryInput, CentralityType
result = graph.centrality(query_input=CentralityQueryInput(
    centrality_type=CentralityType.Edge,
    participating_edge_labels=["accessed"]
))

RankedQueryInput

Parâmetros de entrada para uma consulta de análise classificada. Herda de QueryInputBase.

Campos

Campo Tipo Padrão Descrição
rank_property_name str (obrigatório) Nome da propriedade a utilizar para caminhos de classificação
threshold Optional[int] 0 Devolver apenas caminhos com pesos acima deste valor
max_paths Optional[int] 1000000 Caminhos máximos a considerar (0 = todos)
decay_factor Optional[float] 1 Quanto cada passo de gráfico reduz a classificação (2 = metades de cada passo)
is_directional Optional[bool] True Se as arestas são direcionais
min_hop_count Optional[int] 1 Saltos mínimos
max_hop_count Optional[int] 4 Máximo de saltos
shortest_path Optional[bool] False Apenas os caminhos mais curtos
max_results Optional[int] 500 Máximo de resultados

Exemplo:

# Preferred: keyword arguments
result = graph.ranked(
    rank_property_name="risk_score",
    threshold=5,
    decay_factor=2,
    max_results=50
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import RankedQueryInput
result = graph.ranked(query_input=RankedQueryInput(
    rank_property_name="risk_score",
    threshold=5
))

Resultados da Consulta

ConsultaResult

Resultado de uma consulta de grafo com acesso de DataFrame em diferido.

Construtor

QueryResult(raw_response: Dict[str, Any], graph: Graph)

Parâmetros:

  • raw_response (Dict[str, Any]): Dicionário de resposta da API não processada
  • graph (Gráfico): Referência ao Graph principal

Nota: Normalmente criado por Graph.query(), não é instanciado diretamente.

Métodos

to_dataframe
def to_dataframe() -> DataFrame

Converte o resultado da consulta num DataFrame do Spark.

Devolve:

  • DataFrame: Resultado da consulta como DataFrame do Apache Spark

Gera:

  • ValueError: se a conversão falhar

Exemplo:

result = graph.query("MATCH (u:user) RETURN u")
df = result.to_dataframe()
df.show()
get_raw_data
def get_raw_data() -> Dict[str, Any]

Obtenha a secção RawData da resposta.

Devolve:

  • Dict[str, Any]: dicionário com metadados não processados ou dict vazio se não estiver presente

Exemplo:

result = graph.query("MATCH (u:user) RETURN u")
metadata = result.get_raw_data()
show
def show(format: str = "visual") -> None

Apresentar o resultado da consulta em vários formatos.

Parâmetros:

  • format (str, default="visual"): Formato de saída
    • "table": tabelas DataFrame completas (todas as colunas)
    • "visual": visualização interativa de grafos com plug-in do VSC
    • "all": Mostrar todos os formatos

Gera:

  • ValueError: se o formato não for um dos valores suportados

Exemplo:

result = graph.query("MATCH (u:user)-[r:accessed]->(d:device) RETURN u, r, d")
result.show()  # Visual by default
result.show(format="table")  # Table format

# 0. Imports
from sentinel_graph import GraphSpecBuilder, Graph

# 1. Define graph specification
spec = (
    GraphSpecBuilder.start()
    
    .add_node("User")
        .from_dataframe(user_nodes)  # native Spark DF from groupBy → no .df
            .with_columns(
                "UserId", "UserDisplayName", "UserPrincipalName",
                "DistinctLocationCount", "DistinctIPCount", "DistinctAppCount",
                "TotalSignIns", "RiskySignInCount", "ImpossibleTravelFlag",
                key="UserId", display="UserDisplayName"
            )
    
    .add_node("IPAddress")
        .from_dataframe(ip_nodes)  # native Spark DF from groupBy → no .df
            .with_columns(
                "IPAddress", "UniqueUsers", "UniqueLocations",
                "SignInCount", "RiskySignInCount", "SharedIPFlag",
                key="IPAddress", display="IPAddress"
            )
    
    .add_edge("UsedIP")
        .from_dataframe(edge_used_ip)  # native Spark DF → no .df
            .source(id_column="UserId", node_type="User")
            .target(id_column="IPAddress", node_type="IPAddress")
            .with_columns(
                "SignInCount", "FirstSeen", "LastSeen", "EdgeKey",
                key="EdgeKey", display="EdgeKey"
            )
   
    .done()
)

# 2. Inspect schema before building (GraphSpec owns this)
spec.show_schema()

# 3. Build: prepares data + publishes graph → returns Graph
graph = Graph.build(spec)
print(f"Build status: {graph.build_status.status}")

# 4. Query the graph (query lives on Graph)
result = graph.query("MATCH (u:user)-[used:UsedIP]->(ip:IPAddress) RETURN * LIMIT 100")
result.show()

# 5. Access data via delegation
df = result.to_dataframe()
df.printSchema()

# 6. Graph algorithms
gf = graph.to_graphframe()
pagerank_result = gf.pageRank(resetProbability=0.15, maxIter=10)
pagerank_result.vertices.select("id", "pagerank").show()

# 7. Fetch an existing graph (no spec needed)
graph = Graph.get("my_existing_graph", context=context)
graph.query("MATCH (n) RETURN n LIMIT 10").show()

Notas sobre Padrões de Estrutura

Fluent API

Todos os construtores suportam o encadeamento de métodos para definições de grafos legíveis e declarativas:

builder.add_node("user") \
    .from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_edge("follows")

Esquemas de União

Várias arestas com o mesmo alias são automaticamente unidas com propriedades unidas:

# Both edges use alias "sign_in" - they will be merged into one schema edge
builder.add_edge("sign_in") \
    .from_table("AzureSignins") \
    .source(id_column="UserId", node_type="AZuser") \
    .target(id_column="DeviceId", node_type="device")

builder.add_edge("sign_in") \
    .from_table("EntraSignins") \
    .source(id_column="UserId", node_type="EntraUser") \
    .target(id_column="DeviceId", node_type="device")

Configuração automática

Muitos campos têm predefinições sensatas:

  • As etiquetas de nó/limite são predefinidas para os respetivos aliases
  • As propriedades são automaticamente inferidas a partir de esquemas de origem
  • Grupos de entidades predefinidos para etiquetas primárias/tipos de relação

Avaliação Em Diferido

Os DataFrames e os recursos são carregados de forma preguiçosa e em cache:

  • graph_spec.nodes e graph_spec.edges são carregados no primeiro acesso
  • Os resultados da consulta criam DataFrames apenas quando solicitado