Informazioni di riferimento sulle API Graph Builder (anteprima)

La classe sentinel_graph consente di interagire con il grafo Microsoft Sentinel, consentendo di definire lo schema del grafo, trasformare i dati dal data lake Microsoft Sentinel in nodi e archi, pubblicare un grafo, eseguire query graph ed eseguire algoritmi di grafo avanzati. Questa classe è progettata per funzionare con le sessioni Spark nei notebook di Jupyter in esecuzione in Microsoft Sentinel calcolo Spark.

GraphSpecBuilder

La classe GraphSpecBuilder fornisce un generatore fluent per la creazione di specifiche del grafo con pipeline di dati e integrazione dello schema.

Importante

L'alias GraphBuilder per questa classe è deprecato e verrà rimosso in una versione futura. Usare GraphSpecBuilder in tutto il nuovo codice.

# Deprecated — emits DeprecationWarning
from sentinel_graph.builders.graph_builder import GraphBuilder

# Recommended
from sentinel_graph import GraphSpecBuilder

Costruttore

GraphSpecBuilder(context: ExecutionContext)

Parametri:

  • context (ExecutionContext): contesto di esecuzione contenente la sessione e la configurazione di Spark

Genera:

  • ValueError: se il contesto è Nessuno o non è possibile determinare il nome del grafo

Metodi statici

start

GraphSpecBuilder.start(context: Optional[ExecutionContext] = None) -> GraphSpecBuilder

Definire un nuovo generatore di grafi fluenti.

Parametri:

  • context (ExecutionContext, facoltativo): istanza di ExecutionContext. Se None, usa il contesto predefinito.

Restituisce:

  • GraphSpecBuilder: nuova istanza del generatore

Esempio:

builder = GraphSpecBuilder.start(context=context)

Metodi dell'istanza

add_node

def add_node(alias: str) -> NodeBuilderInitial

Iniziare a compilare una definizione di nodo.

Parametri:

  • alias (str): identificatore univoco per questo nodo all'interno del grafico

Restituisce:

  • NodeBuilderInitial: generatore di nodi nello stato iniziale

Esempio:

builder.add_node("user")

add_edge

def add_edge(alias: str) -> EdgeBuilderInitial

Iniziare a creare una definizione perimetrale.

Parametri:

  • alias (str): identificatore per questo bordo all'interno del grafo (può essere condiviso tra più archi)

Restituisce:

  • EdgeBuilderInitial: generatore edge nello stato iniziale

Esempio:

builder.add_edge("accessed")

done

def done() -> GraphSpec

Finalizzare la specifica del grafo e restituire l'istanza GraphSpec.

Restituisce:

  • GraphSpec: completare la specifica del grafo con la pipeline di dati e lo schema

Genera:

  • ValueError: se il grafo non ha nodi o archi o se la convalida ha esito negativo

Esempio:

graph_spec = builder.done()

GraphSpec

Specifica del grafo con funzionalità di pipeline di dati, schema e visualizzazione.

Costruttore

GraphSpec(
    name: str,
    context: ExecutionContext,
    graph_schema: GraphSchema,
    etl_pipeline: Optional[ETLPipeline] = None
)

Parametri:

  • name (str): nome del grafico
  • context (ExecutionContext): contesto di esecuzione
  • graph_schema (GraphSchema): definizione dello schema del grafico
  • etl_pipeline (ETLPipeline, facoltativo): pipeline di dati per la preparazione del grafo

Proprietà

nodes

def nodes() -> DataFrame

Ottenere i nodi DataFrame (lazy, memorizzati nella cache). Determina automaticamente l'origine dalla pipeline di dati o dalla tabella lake.

Restituisce:

  • DataFrame: DataFrame Spark contenente tutti i nodi

Genera:

  • ValueError: se il contesto è mancante o non è possibile caricare i dataframe

edges

def edges() -> DataFrame

Ottenere bordi DataFrame (lazy, memorizzato nella cache). Determina automaticamente l'origine dalla pipeline di dati o dalla tabella lake.

Restituisce:

  • DataFrame: DataFrame Spark contenente tutti i bordi

Genera:

  • ValueError: se il contesto è mancante o non è possibile caricare i dataframe

Metodi

build_graph_with_data

Nota

build_graph_with_data è deprecato e verrà rimosso in una versione futura. Usare Graph.build(spec) invece .

def build_graph_with_data() -> Dict[str, Any]

Eseguire la pipeline di dati e pubblicare il grafico. Graph.build(self)Chiama internamente , stashes l'oggetto restituito Graphe restituisce un dizionario compatibile con le versioni precedenti.

Restituisce:

  • Dict[str, Any]: dizionario contenente:
    • etl_result: Risultati della preparazione dei dati
    • api_result: pubblicare i risultati (in caso di esito positivo)
    • api_error: stringa di errore (se la pubblicazione non è riuscita)
    • instance_name: nome dell'istanza del grafico
    • status: "published" o "prepared"

Esempio:

graph = Graph.build(spec)
print(f"Status: {graph.build_status.status}")

get_schema

def get_schema() -> GraphSchema

Ottenere lo schema del grafico.

Restituisce:

  • GraphSchema: definizione dello schema del grafo

get_pipeline

Nota

Questo metodo è deprecato e verrà rimosso in una versione futura. La pipeline di dati è un dettaglio di implementazione interna e non deve essere accessibile direttamente.

def get_pipeline() -> Optional[ETLPipeline]

Ottenere la pipeline di dati (nessuno per i grafici esistenti).

Restituisce:

  • ETLPipeline o None: pipeline di dati, se disponibile

to_graphframe

def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame

Convertire l'intero grafo in GraphFrame per l'esecuzione di algoritmi a grafo. Funziona solo sui dati locali (dalla pipeline di dati o dalla tabella lake).

Parametri:

  • column_mapping (Dict[str, str], facoltativo): Mapping di colonne personalizzato con chiavi:
    • "id": nome colonna ID vertice
    • "source_id": nome colonna ID origine edge
    • "target_id": nome colonna ID destinazione edge

Restituisce:

  • GraphFrame: oggetto GraphFrame con tutti i vertici e i bordi

Genera:

  • ValueError: se ExecutionContext non è disponibile

Esempio:

gf = graph_spec.to_graphframe()
pagerank = gf.pageRank(resetProbability=0.15, maxIter=10)

show

def show(limit: int = 100, viz_format: str = "visual") -> None

Visualizzare i dati del grafo in vari formati.

Parametri:

  • limit (int, default=100): Numero massimo di nodi/archi da visualizzare
  • viz_format (str, default="visual"): Formato di output
    • "table": tabelle dataframe complete (tutte le colonne)
    • "visual": visualizzazione interattiva del grafico
    • "all": mostra tutti i formati

Genera:

  • ValueError: se il formato non è uno dei valori supportati

Esempio:

graph_spec.show(limit=50, viz_format="table")

show_schema

def show_schema() -> None

Visualizzare lo schema del grafo come visualizzazione interattiva del grafico.

Esempio:

spec.show_schema()

Graph

Istanza del grafico su cui è possibile eseguire query. Creato tramite Graph.get() (grafo esistente) o Graph.build() (da un GraphSpecoggetto ).

Costruttore

Graph(
    name: str,
    context: ExecutionContext,
    spec: Optional[GraphSpec] = None,
    build_status: Optional[BuildStatus] = None,
)

Parametri:

  • name (str): nome del grafico
  • context (ExecutionContext): contesto di esecuzione
  • spec (GraphSpec, facoltativo): specifica del grafo associata (impostata da Graph.build())
  • build_status (BuildStatus, facoltativo): metadati dei risultati della compilazione (impostati da Graph.build())

Genera:

  • ValueError: se ExecutionContext è None

Metodi statici

get

Graph.get(name: str, context: Optional[ExecutionContext] = None) -> Graph

Ottenere un'istanza del grafo da un grafo esistente. L'oggetto restituito Graph ha spec=None e build_status=None.

Parametri:

  • name (str): nome dell'istanza del grafico
  • context (ExecutionContext, facoltativo): contesto di esecuzione (impostazione predefinita: ExecutionContext.default())

Restituisce:

  • Graph: istanza di Graph

Genera:

  • ValueError: se il nome del grafo è vuoto o l'istanza del grafo non esiste

Esempio:

graph = Graph.get("my_graph", context=context)
graph.query("MATCH (n) RETURN n")

prepare

Graph.prepare(spec: GraphSpec) -> Graph

Eseguire la fase di preparazione dei dati per un oggetto GraphSpecsenza pubblicazione. Usare publish() in seguito per registrare il grafico e renderlo queryable.

Parametri:

  • spec (GraphSpec): specifica del grafo da preparare

Restituisce:

  • Graph: istanza del grafico con spec collegato e build_status.status == "prepared"

Genera:

  • ValueError: se la specifica non dispone di una pipeline di dati o di un contesto di esecuzione
  • RuntimeError: se l'esecuzione della pipeline di dati ha esito negativo

Esempio:

spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.prepare(spec)
# Inspect results before publishing
graph.nodes.show()
graph.publish()
graph.query("MATCH (n) RETURN n")

build

Graph.build(spec: GraphSpec) -> Graph

Compilare un grafico da un GraphSpec oggetto preparando i dati e la pubblicazione. Graph.prepare(spec) Chiama internamente e quindi tenta graph.publish(). A differenza della chiamata separata di questi due metodi, vengono rilevati errori di pubblicazione, ovvero il grafico restituito ha build_status.status == "prepared" e build_status.api_error imposta invece di generare.

Parametri:

  • spec (GraphSpec): specifica del grafo da cui eseguire la compilazione

Restituisce:

  • Graph: istanza del grafico con spec collegato e build_status popolato

Genera:

  • ValueError: se la specifica non dispone di una pipeline di dati o di un contesto di esecuzione
  • RuntimeError: se l'esecuzione della pipeline di dati ha esito negativo

Esempio:

spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.build(spec)
print(graph.build_status.status)  # "published" or "prepared" (None if neither ran)
graph.query("MATCH (n) RETURN n")

Proprietà

nodes

def nodes() -> Optional[DataFrame]

Ottiene i nodi DataFrame. Delega a self.spec.nodes quando una specifica è associata; restituisce None in caso contrario.

edges

def edges() -> Optional[DataFrame]

Ottiene i bordi dataframe. Delega a self.spec.edges quando una specifica è associata; restituisce None in caso contrario.

schema

def schema() -> Optional[GraphSchema]

Ottenere lo schema del grafo. Delega a self.spec.get_schema() quando una specifica è associata; restituisce None in caso contrario.

Metodi

query

def query(query_string: str, query_language: str = "GQL") -> QueryResult

Eseguire una query sull'istanza del grafico usando GQL.

Parametri:

  • query_string (str): stringa di query graph (linguaggio GQL)
  • query_language (str, default="GQL"): Linguaggio di query

Restituisce:

  • QueryResult: oggetto contenente nodi, archi e metadati

Genera:

  • ValueError: se la sessione ExecutionContext o Spark è mancante
  • RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce

Esempio:

result = graph.query("MATCH (u:user) WHERE u.age > 30 RETURN u")
result.show()

reachability

def reachability(
    *,
    source_property_value: str = None,
    target_property_value: str = None,
    source_property: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

[! NOTA] reachability(query_input=ReachabilityQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.

Eseguire l'analisi della raggiungibilità tra i nodi di origine e di destinazione.

Parametri:

  • source_property_value (str): valore da trovare per la proprietà di origine (convalidata in fase di esecuzione. Deve essere specificato quando non si usa query_input)
  • target_property_value (str): valore da trovare per la proprietà di destinazione (convalidato in fase di esecuzione. Deve essere specificato quando non si usa query_input)
  • source_property (Facoltativo[str]): Nome della proprietà per filtrare i nodi di origine
  • participating_source_node_labels (Facoltativo[List[str]]): Etichette di nodo da considerare come origini
  • target_property (Facoltativo[str]): Nome della proprietà per filtrare i nodi di destinazione
  • participating_target_node_labels (Facoltativo[List[str]]): Etichette di nodo da considerare come destinazioni
  • participating_edge_labels (Facoltativo[List[str]]): Etichette perimetrali da attraversare
  • is_directional (bool): indica se i bordi sono direzionali (impostazione predefinita: True)
  • min_hop_count (int): hop minimi (impostazione predefinita: 1)
  • max_hop_count (int): hop massimi (impostazione predefinita: 4)
  • shortest_path (bool): restituisce solo i percorsi più brevi (impostazione predefinita: False)
  • max_results (int): risultati massimi (impostazione predefinita: 500)

Genera:

  • ValueError: se source_property_value o target_property_value è mancante, , min_hop_count < 1max_hop_count < min_hop_countomax_results < 1
  • RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce

Restituisce:

  • QueryResult: contenente i percorsi di raggiungibilità

Esempio:

result = graph.reachability(
    source_property_value="user-001",
    target_property_value="device-003")
result.show()

k_hop

def k_hop(
    *,
    source_property: Optional[str] = None,
    source_property_value: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    target_property_value: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Nota

k_hop(query_input=K_HopQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.

Eseguire l'analisi k-hop da un nodo di origine specificato.

Parametri:

Convalida:

  • Almeno uno di source_property_value o target_property_value deve essere fornito

Genera:

  • ValueError: se non viene specificato né source_property_valuetarget_property_value o se vengono violati vincoli numerici (come reachability)
  • RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce

Restituisce:

  • QueryResult: contenente i risultati dell'hop k

Esempio:

result = graph.k_hop(source_property_value="user-001")
result.show()

blast_radius

def blast_radius(
    *,
    source_property_value: str = None,
    target_property_value: str = None,
    source_property: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Nota

blast_radius(query_input=BlastRadiusQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.

Eseguire l'analisi del raggio di esplosione dal nodo di origine al nodo di destinazione.

Parametri:

  • source_property_value (str): valore che identifica il nodo di origine (convalidato in fase di esecuzione. Deve essere specificato quando non si usa query_input)
  • target_property_value (str): valore che identifica il nodo di destinazione (convalidato in fase di esecuzione. Deve essere specificato quando non si usa query_input)
  • Altri parametri: uguale a reachability

Genera:

  • ValueError: se source_property_value o target_property_value è mancante o se i vincoli numerici vengono violati (come reachability)
  • RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce

Restituisce:

  • QueryResult: contenente i risultati del raggio di esplosione

Esempio:

result = graph.blast_radius(
    source_property_value="user-003",
    target_property_value="device-003",
    min_hop_count=1)
result.show()

centrality

def centrality(
    *,
    participating_source_node_labels: Optional[List[str]] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    threshold: int = 3,
    centrality_type: CentralityType = None,
    max_paths: int = 1000000,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Nota

centrality(query_input=CentralityQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.

Eseguire l'analisi della centralità nel grafico.

Parametri:

  • participating_source_node_labels (Facoltativo[List[str]]): Etichette del nodo di origine
  • participating_target_node_labels (Facoltativo[List[str]]): Etichette dei nodi di destinazione
  • participating_edge_labels (Facoltativo[List[str]]): Etichette perimetrali da attraversare
  • threshold (int): punteggio di centralità minimo (impostazione predefinita: 3); deve essere non negativo
  • centrality_type (CentralityType): CentralityType.Node o CentralityType.Edge (impostazione predefinita: None, restituisce CentralityType.Node)
  • max_paths (int): i percorsi massimi da considerare (impostazione predefinita: 1000000; 0 = tutti i percorsi); devono essere non negativi
  • is_directional (bool): indica se i bordi sono direzionali (impostazione predefinita: True)
  • min_hop_count (int): hop minimi (impostazione predefinita: 1); deve essere ≥ 1
  • max_hop_count (int): numero massimo di hop (impostazione predefinita: 4); deve essere ≥ min_hop_count
  • shortest_path (bool): restituisce solo i percorsi più brevi (impostazione predefinita: False)
  • max_results (int): I risultati massimi (impostazione predefinita: 500); devono essere ≥ 1

Genera:

  • ValueError: se threshold < 0, max_paths < 0, min_hop_count < 1, max_hop_count < min_hop_counto max_results < 1
  • RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce

Restituisce:

  • QueryResult: contenente le metriche di centralità

Esempio:

result = graph.centrality(
    participating_source_node_labels=["user", "device"],
    participating_target_node_labels=["device", "user"],
    participating_edge_labels=["sign_in"],
    is_directional=False)
result.show()

ranked

def ranked(
    *,
    rank_property_name: str = None,
    threshold: int = 0,
    max_paths: int = 1000000,
    decay_factor: float = 1,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

Nota

ranked(query_input=RankedQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.

Eseguire un'analisi classificata nel grafico.

Parametri:

  • rank_property_name (str): nome della proprietà da usare per la classificazione (convalidata in fase di esecuzione. Deve essere specificato quando non si usa query_input)
  • threshold (int): solo i percorsi restituiti al di sopra di questo peso (impostazione predefinita: 0); devono essere non negativi
  • max_paths (int): i percorsi massimi da considerare (impostazione predefinita: 1000000; 0 = tutti i percorsi); devono essere non negativi
  • decay_factor (float): decadimento del rango per passaggio; 2 significa dimezzamento (impostazione predefinita: 1); deve essere non negativo
  • is_directional (bool): indica se i bordi sono direzionali (impostazione predefinita: True)
  • min_hop_count (int): hop minimi (impostazione predefinita: 1); deve essere ≥ 1
  • max_hop_count (int): numero massimo di hop (impostazione predefinita: 4); deve essere ≥ min_hop_count
  • shortest_path (bool): restituisce solo i percorsi più brevi (impostazione predefinita: False)
  • max_results (int): I risultati massimi (impostazione predefinita: 500); devono essere ≥ 1

Genera:

  • ValueError: se rank_property_name manca, , threshold < 0max_paths < 0, decay_factor < 0, min_hop_count < 1, max_hop_count < min_hop_countomax_results < 1
  • RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce

Restituisce:

  • QueryResult: contenente i nodi/archi classificati

Esempio:

result = graph.ranked(
    rank_property_name="risk_score",
    threshold=5,
    decay_factor=2)
result.show()

to_graphframe

def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame

Convertire l'intero grafico in GraphFrame. Usa i dati delle specifiche quando disponibili; legge da tabelle lake in caso contrario.

Parametri:

  • column_mapping (Dict[str, str], facoltativo): Mapping di colonne personalizzato

Restituisce:

  • GraphFrame: oggetto GraphFrame con tutti i vertici e i bordi

Esempio:

gf = graph.to_graphframe()

show

def show() -> None

Visualizzare le informazioni del grafico. Delega a spec.show() per la visualizzazione avanzata quando una specifica è collegata. In caso contrario, stampa informazioni minime.

show_schema

def show_schema() -> None

Visualizzare lo schema del grafico. Delega a spec.show_schema() quando è associata una specifica. Stampa un messaggio che indica che in caso contrario non è disponibile alcun schema.

publish (novità della versione 0.3.3)

def publish() -> Graph

Registrare il grafico con l'API, rendendolo eseguibile su query. Chiamare questo metodo dopo Graph.prepare() (o su qualsiasi Graph specifica associata) per pubblicare l'istanza del grafico.

Restituisce:

  • Graph: Self per il concatenamento dei metodi

Genera:

  • ValueError: se non è associata alcuna specifica o se manca il contesto
  • RuntimeError: se la pubblicazione non riesce

Esempio:

graph = Graph.prepare(spec)
graph.publish()
# Now the graph is queryable
graph.query("MATCH (n) RETURN n")

Buildstatus

Classe di dati che trasporta metadati da un'operazione Graph.build() .

Fields

Campo Tipo Descrizione
etl_result Any Risultato della fase (esecuzione della prepare pipeline di dati)
api_result Optional[Dict] Risultato della fase di pubblicazione (None se la pubblicazione non è riuscita)
api_error Optional[str] Messaggio di errore se la pubblicazione non è riuscita (None se la pubblicazione è riuscita)
instance_name str Nome dell'istanza del grafico
status Optional[BuildStatusKind] None, "published", o "prepared"

Percorsi di costruzione

GraphSpecBuilder.start(...).done()  →  GraphSpec             (spec only, no graph yet)
Graph.get(name, context)            →  Graph (spec=None, build_status=None)
Graph.prepare(spec)                 →  Graph (spec=spec, build_status.status="prepared")
graph.publish()                     →  Graph (build_status.status="published")
Graph.build(spec)                   →  Graph (prepare + publish in one step)

Esempio:

graph = Graph.build(spec)
if graph.build_status.status == "published":
    print("Graph prepared and published successfully")
elif graph.build_status.status == "prepared":
    print(f"Prepare succeeded but publish failed: {graph.build_status.api_error}")
elif graph.build_status.status is None:
    print("Neither prepare nor publish has run")

Generatori di nodi

NodeBuilderInitial

Stato iniziale per il generatore di nodi: sono disponibili solo i metodi dell'origine dati.

Costruttore

NodeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)

Nota: In genere creato tramite GraphSpecBuilder.add_node(), non è stata creata un'istanza diretta.

Metodi

Nota

L'uso dei caratteri di sottolineatura _ quando si denominano nodi, archi o proprietà in un grafico personalizzato non è supportato. Quando vengono usati caratteri di sottolineatura, viene restituito un errore di richiesta non valido.

from_table
def from_table(table_name: str, database: Optional[str] = None) -> NodeBuilderSourceSet

Impostare la tabella come origine dati con risoluzione intelligente del database.

Parametri:

  • table_name (str): nome della tabella (obbligatorio)
  • database (str, facoltativo): nome del database esplicito (ha la precedenza sull'impostazione predefinita del contesto)

Restituisce:

  • NodeBuilderSourceSet: Generatore per un'ulteriore configurazione

Genera:

  • ValueError: se la tabella non è stata trovata o sono stati trovati più tabelle in conflitto

Ordine di risoluzione del database:

  1. Parametro esplicito database (precedenza massima)
  2. ExecutionContext.default_database
  3. Eseguire ricerche in tutti i database (con rilevamento dei conflitti)

Esempio:

builder.add_node("user").from_table("SigninLogs", database="security_db")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> NodeBuilderSourceSet

Impostare Spark DataFrame come origine dati.

Parametri:

  • dataframe (DataFrame): DataFrame Spark

Restituisce:

  • NodeBuilderSourceSet: Generatore per un'ulteriore configurazione

Esempio:

df = spark.read.table("users")
builder.add_node("user").from_dataframe(df)

NodeBuilderSourceSet

Generatore di nodi dopo l'impostazione dell'origine dati: metodi di configurazione disponibili.

Costruttore

NodeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)

Nota: Creato internamente dai metodi di origine NodeBuilderInitial.

Metodi

with_time_range
def with_time_range(
    time_column: str,
    start_time: Optional[Union[str, datetime]] = None,
    end_time: Optional[Union[str, datetime]] = None,
    lookback_hours: Optional[float] = None
) -> NodeBuilderSourceSet

Applicare il filtro dell'intervallo di tempo all'origine dati del nodo.

Parametri:

  • time_column (str): nome colonna contenente i dati timestamp (obbligatorio)
  • start_time (str o datetime, facoltativo): data di inizio ('20/20/25', '2025-10-20' o oggetto datetime)
  • end_time (str o datetime, facoltativo): data di fine (stessi formati di start_time)
  • lookback_hours (float, facoltativo): ore per guardare indietro da ora

Restituisce:

  • NodeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se la colonna ora non è stata trovata nello schema di origine

Logica intervallo di tempo:

  1. Se start_time e end_time forniti: usarli direttamente
  2. Se lookback_hours specificato solo: end=now, start=now-lookback_hours
  3. Se non viene specificato nulla: nessun filtro temporale
  4. Se start/end AND lookback_hours: start/end ha la precedenza

Esempio:

# Explicit date range
builder.add_node("user").from_table("SigninLogs") \
    .with_time_range(time_column="TimeGenerated", start_time="2025-01-01", end_time="2025-01-31")

# Lookback window
builder.add_node("user").from_table("SigninLogs") \
    .with_time_range(time_column="TimeGenerated", lookback_hours=24)
with_label
def with_label(label: str) -> NodeBuilderSourceSet

Impostare l'etichetta del nodo (il valore predefinito è alias se non viene chiamato).

Parametri:

  • label (str): etichetta del nodo

Restituisce:

  • NodeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se l'etichetta è già impostata

Esempio:

builder.add_node("u").from_table("Users").with_label("user")
with_columns
def with_columns(
    *columns: str,
    key: str,
    display: str
) -> NodeBuilderSourceSet

Configurare le colonne con la chiave necessaria e la designazione della visualizzazione.

Parametri:

  • *columns (str): nomi di colonna da includere (almeno uno obbligatorio)
  • key (str): Nome colonna da contrassegnare come chiave (obbligatorio, deve essere in colonne)
  • display (str): il nome della colonna da contrassegnare come valore visualizzato (obbligatorio, deve essere in colonne, può essere uguale a key)

Restituisce:

  • NodeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se la convalida non riesce (colonne duplicate, chiave/visualizzazione mancante e così via)

Note:

  • Le proprietà vengono compilate automaticamente dai tipi di colonna

  • Se specificato, la colonna filtro ora viene aggiunta automaticamente

  • I tipi di proprietà vengono dedotti automaticamente dallo schema di origine

  • Vedere Restrizioni

Esempio:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", "email", "created_at", key="id", display="name")
add_node
def add_node(alias: str) -> NodeBuilderInitial

Completare questo nodo e iniziare a compilare un altro nodo.

Parametri:

  • alias (str): alias per il nuovo nodo

Restituisce:

  • NodeBuilderInitial: Nuovo generatore di nodi

Esempio:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_node("device")
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial

Completare questo nodo e iniziare a creare un bordo.

Parametri:

  • alias (str): alias per il bordo

Restituisce:

  • EdgeBuilderInitial: nuovo generatore di dispositivi perimetrali

Esempio:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_edge("accessed")
done
def done() -> GraphSpec

Finalizzare questo nodo e completare la specifica del grafo.

Restituisce:

  • GraphSpec: specifica completa del grafo

Esempio:

graph_spec = builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .done()

Edge Builders

EdgeBuilderInitial

Stato iniziale per edge builder: sono disponibili solo i metodi dell'origine dati.

Costruttore

EdgeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)

Nota: In genere creato tramite GraphSpecBuilder.add_edge(), non è stata creata un'istanza diretta.

Metodi

Nota

L'uso dei caratteri di sottolineatura _ quando si denominano nodi, archi o proprietà in un grafico personalizzato non è supportato. Quando vengono usati caratteri di sottolineatura, viene restituito un errore di richiesta non valido.

from_table
def from_table(table_name: str, database: Optional[str] = None) -> EdgeBuilderSourceSet

Impostare la tabella come origine dati con risoluzione intelligente del database.

Parametri:

  • table_name (str): nome della tabella (obbligatorio)
  • database (str, facoltativo): nome del database esplicito

Restituisce:

  • EdgeBuilderSourceSet: Generatore per un'ulteriore configurazione

Genera:

  • ValueError: se la tabella non è stata trovata o sono stati trovati più tabelle in conflitto

Esempio:

builder.add_edge("accessed").from_table("AccessLogs")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> EdgeBuilderSourceSet

Impostare Spark DataFrame come origine dati.

Parametri:

  • dataframe (DataFrame): DataFrame Spark

Restituisce:

  • EdgeBuilderSourceSet: Generatore per un'ulteriore configurazione

Esempio:

df = spark.read.table("access_logs")
builder.add_edge("accessed").from_dataframe(df)

EdgeBuilderSourceSet

Edge builder dopo l'impostazione dell'origine dati: metodi di configurazione disponibili.

Costruttore

EdgeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)

Nota: Creato internamente dai metodi di origine EdgeBuilderInitial.

Metodi

with_label
def with_label(label: str) -> EdgeBuilderSourceSet

Impostare il tipo o l'etichetta della relazione perimetrale (per impostazione predefinita, se non viene chiamato l'alias).

Parametri:

  • label (str): etichetta perimetrale

Restituisce:

  • EdgeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se l'etichetta è già impostata

Esempio:

builder.add_edge("rel").from_table("AccessLogs").with_label("ACCESSED")
edge_label

Nota

Usare with_label() invece . Questo metodo verrà rimosso in una versione futura.

def edge_label(label: str) -> EdgeBuilderSourceSet

Impostare il tipo o l'etichetta della relazione perimetrale (per impostazione predefinita, se non viene chiamato l'alias).

Parametri:

  • label (str): etichetta perimetrale

Restituisce:

  • EdgeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se l'etichetta è già impostata

Esempio:

builder.add_edge("acc").from_table("AccessLogs").edge_label("accessed")
source
def source(id_column: str, node_type: str) -> EdgeBuilderSourceSet

Impostare il nodo di origine con la colonna ID e l'etichetta.

Parametri:

  • id_column (str): nome colonna contenente l'ID nodo di origine
  • node_type (str): etichetta del nodo di origine

Restituisce:

  • EdgeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se l'origine è già impostata

Esempio:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user")
target
def target(id_column: str, node_type: str) -> EdgeBuilderSourceSet

Impostare il nodo di destinazione con la colonna ID e l'etichetta.

Parametri:

  • id_column (str): nome della colonna contenente l'ID nodo di destinazione
  • node_type (str): etichetta del nodo di destinazione

Restituisce:

  • EdgeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se la destinazione è già impostata

Esempio:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device")
with_time_range
def with_time_range(
    time_column: str,
    start_time: Optional[Union[str, datetime]] = None,
    end_time: Optional[Union[str, datetime]] = None,
    lookback_hours: Optional[float] = None
) -> EdgeBuilderSourceSet

Applicare il filtro dell'intervallo di tempo all'origine dati del bordo.

Parametri:

  • time_column (str): nome colonna contenente i dati timestamp (obbligatorio)
  • start_time (str o datetime, facoltativo): Data di inizio
  • end_time (str o datetime, facoltativo): Data di fine
  • lookback_hours (float, facoltativo): ore per guardare indietro da ora

Restituisce:

  • EdgeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se la colonna ora non è stata trovata nello schema di origine

Esempio:

builder.add_edge("accessed").from_table("AccessLogs") \
    .with_time_range(time_column="TimeGenerated", lookback_hours=48)
with_columns
def with_columns(
    *columns: str,
    key: str,
    display: str
) -> EdgeBuilderSourceSet

Configurare le colonne con la chiave necessaria e la designazione della visualizzazione.

Parametri:

  • *columns (str): nomi di colonna da includere (almeno uno obbligatorio)
  • key (str): Nome colonna da contrassegnare come chiave (obbligatorio, deve essere in colonne)
  • display (str): nome colonna da contrassegnare come valore visualizzato (obbligatorio, deve essere in colonne)

Restituisce:

  • EdgeBuilderSourceSet: Self per il concatenamento dei metodi

Genera:

  • ValueError: se la convalida non riesce

Esempio:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device") \
    .with_columns("id", "location", "status", key="id", display="location")
add_node
def add_node(alias: str) -> NodeBuilderInitial

Completare questo bordo e iniziare a creare un nodo.

Parametri:

  • alias (str): alias per il nuovo nodo

Restituisce:

  • NodeBuilderInitial: Nuovo generatore di nodi
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial

Completare questo bordo e iniziare a creare un altro bordo.

Parametri:

  • alias (str): alias per il nuovo bordo

Restituisce:

  • EdgeBuilderInitial: nuovo generatore di dispositivi perimetrali

Esempio:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device") \
    .with_columns("id", "location", key="id", display="location") \
    .add_edge("connected_to")
done
def done() -> GraphSpec

Finalizzare questo bordo e completare la specifica del grafo.

Restituisce:

  • GraphSpec: specifica completa del grafo

Classi di schema

GraphDefinitionReference

Riferimento a una definizione del grafo con nome e versione.

Costruttore

GraphDefinitionReference(
    fully_qualified_name: str,
    version: str
)

Parametri:

  • fully_qualified_name (str): nome completo del grafo a cui si fa riferimento
  • version (str): versione del grafico a cui si fa riferimento

Genera:

  • ValueError: se fully_qualified_name o la versione è vuota

Metodi

to_dict
def to_dict() -> Dict[str, Any]

Serializzare nel dizionario.

Restituisce:

  • Dict[str, Any]: riferimento serializzato

Proprietà

Definizione della proprietà con interfaccia indipendente dai tipi.

Costruttore

Property(
    name: str,
    property_type: PropertyType,
    is_non_null: bool = False,
    description: str = "",
    is_key: bool = False,
    is_display_value: bool = False,
    is_internal: bool = False
)

Parametri:

  • name (str): Nome proprietà
  • property_type (PropertyType): tipo di dati della proprietà
  • is_non_null (bool, default=False): Indica se la proprietà è obbligatoria
  • description (str, default=""): Descrizione della proprietà
  • is_key (bool, default=False): Indica se la proprietà è una chiave
  • is_display_value (bool, default=False): Indica se la proprietà è un valore visualizzato
  • is_internal (bool, default=False): Indica se la proprietà è interna

Genera:

  • ValueError: se il nome è vuoto o la convalida ha esito negativo

Metodi di classe

key
@classmethod
Property.key(
    name: str,
    property_type: PropertyType,
    description: str = "",
    is_non_null: bool = False
) -> Property

Creare una proprietà chiave con impostazioni comuni (is_key=True, is_display_value=True).

display
@classmethod
Property.display(
    name: str,
    property_type: PropertyType,
    description: str = "",
    is_non_null: bool = False
) -> Property

Creare una proprietà del valore di visualizzazione (is_display_value=True).

Metodi

describe
def describe(text: str) -> Property

Aggiungere la descrizione in modo fluente.

Parametri:

  • text (str): testo descrizione

Restituisce:

  • Property: Self per il concatenamento dei metodi
to_dict
def to_dict() -> Dict[str, Any]

Serializzare la proprietà nel dizionario con chiavi di annotazione con prefisso @.

Restituisce:

  • Dict[str, Any]: proprietà serializzata
to_gql
def to_gql() -> str

Generare la definizione della proprietà GQL.

Restituisce:

  • str: Rappresentazione di stringa GQL

EdgeNode

Riferimento al nodo usato nelle definizioni perimetrali.

Costruttore

EdgeNode(
    alias: Optional[str] = None,
    labels: List[str] = []
)

Parametri:

  • alias (str, facoltativo): alias nodo (impostato automaticamente sulla prima etichetta se None o vuoto)
  • labels (List[str]): etichette di nodo (almeno una richiesta)

Genera:

  • ValueError: se l'elenco di etichette è vuoto
  • TypeError: se le etichette non sono stringhe

Mutazione automatica:

  • Se l'alias è Nessuno o vuoto, viene impostato sulla prima etichetta

Metodi

to_dict
def to_dict() -> Dict[str, Any]

Serializzare nel dizionario.

Restituisce:

  • Dict[str, Any]: riferimento al nodo perimetrale serializzato

Nodo

Definizione del nodo con interfaccia indipendente dai tipi.

Costruttore

Node(
    alias: str = "",
    labels: List[str] = [],
    implies_labels: List[str] = [],
    properties: List[Property] = [],
    description: str = "",
    entity_group: str = "",
    dynamic_labels: bool = False,
    abstract_edge_aliases: bool = False
)

Parametri:

  • alias (str, default=""): Alias nodo (impostato automaticamente sulla prima etichetta se vuoto)
  • labels (List[str]): etichette di nodo (almeno una richiesta)
  • implies_labels (List[str], default=[]): Etichette implicite
  • properties (List[Property], default=[]): Node properties
  • description (str, default=""): Descrizione del nodo
  • entity_group (str, default=""): Nome gruppo di entità
  • dynamic_labels (bool, default=False): indica se il nodo dispone di etichette dinamiche
  • abstract_edge_aliases (bool, default=False): Indica se il nodo usa alias perimetrali astratti

Genera:

  • ValueError: se la convalida non riesce (nessuna etichetta, nessuna proprietà chiave, nessuna proprietà di visualizzazione e così via)

Mutazione automatica:

  • Se l'alias è vuoto, viene impostato sulla prima etichetta
  • Se entity_group è vuoto, viene impostato sull'etichetta primaria

Metodi

get_primary_label
def get_primary_label() -> Optional[str]

Ottiene l'etichetta primaria (prima).

Restituisce:

  • str o None: etichetta primaria o Nessuna se nessuna etichetta
get_entity_group_name
def get_entity_group_name() -> str

Ottenere il nome del gruppo di entità o il fallback all'etichetta primaria.

Restituisce:

  • str: nome del gruppo di entità
get_primary_key_property_name
def get_primary_key_property_name() -> Optional[str]

Ottiene il nome della proprietà della chiave primaria.

Restituisce:

  • str o None: nome della proprietà della chiave primaria
get_properties
def get_properties() -> Dict[str, Property]

Ottenere le proprietà come dizionario per un facile accesso.

Restituisce:

  • Dict[str, Property]: proprietà con chiave in base al nome
get_property
def get_property(name: str) -> Optional[Property]

Ottenere una proprietà specifica in base al nome.

Parametri:

  • name (str): Nome proprietà

Restituisce:

  • Property o None: proprietà se trovata
add_property
def add_property(prop: Property) -> None

Aggiungere una proprietà a questo nodo.

Parametri:

  • prop (Proprietà): proprietà da aggiungere

Genera:

  • ValueError: se il nome della proprietà è duplicato
is_dynamically_labeled
def is_dynamically_labeled() -> bool

Controllare se il nodo ha etichette dinamiche.

Restituisce:

  • bool: True se le etichette dinamiche sono abilitate
is_abstract_edge_node_aliases
def is_abstract_edge_node_aliases() -> bool

Controllare se il nodo usa alias di nodi perimetrali astratti.

Restituisce:

  • bool: True se gli alias di bordi astratti sono abilitati
describe
def describe(text: str) -> Node

Aggiungere la descrizione in modo fluente.

Parametri:

  • text (str): testo descrizione

Restituisce:

  • Node: Self per il concatenamento dei metodi
to_dict
def to_dict() -> Dict[str, Any]

Serializzare il nodo nel dizionario.

Restituisce:

  • Dict[str, Any]: nodo serializzato
to_gql
def to_gql() -> str

Generare la definizione del nodo GQL.

Restituisce:

  • str: Rappresentazione di stringa GQL

Genera:

  • ValueError: se il nodo non dispone dei campi necessari per GQL

Metodi di classe

create
@classmethod
Node.create(
    alias: str,
    labels: List[str],
    properties: List[Property],
    description: str = "",
    entity_group: str = "",
    **kwargs
) -> Node

Creare un nodo con tutti i campi obbligatori.

Parametri:

  • alias (str): alias del nodo
  • labels (List[str]): Etichette dei nodi
  • properties (List[Property]): Proprietà node
  • description (str, default=""): Descrizione del nodo
  • entity_group (str, default=""): Nome gruppo di entità

Restituisce:

  • Node: nuova istanza del nodo

Edge

Definizione edge con interfaccia indipendente dai tipi.

Costruttore

Edge(
    relationship_type: str,
    source_node_label: str,
    target_node_label: str,
    direction: EdgeDirection = EdgeDirection.DIRECTED_RIGHT,
    properties: List[Property] = [],
    description: str = "",
    entity_group: str = "",
    dynamic_type: bool = False
)

Parametri:

  • relationship_type (str): tipo di relazione edge (ad esempio, "FOLLOWS", "OWNS")
  • source_node_label (str): etichetta del nodo di origine
  • target_node_label (str): etichetta del nodo di destinazione
  • direction (EdgeDirection, default=DIRECTED_RIGHT): Direzione del bordo
  • properties (List[Property], default=[]): Proprietà edge
  • description (str, default=""): Descrizione del bordo
  • entity_group (str, default=""): Nome gruppo di entità
  • dynamic_type (bool, default=False): indica se edge ha un tipo dinamico

Genera:

  • ValueError: se la convalida non riesce

Mutazione automatica:

  • labels l'elenco viene popolato automaticamente con [relationship_type]
  • Se entity_group è vuoto, viene impostato su relationship_type

Proprietà

edge_type
def edge_type() -> str

Alias di compatibilità con le versioni precedenti per relationship_type.

Restituisce:

  • str: tipo di relazione

Metodi

get_entity_group_name
def get_entity_group_name() -> str

Ottenere il nome del gruppo di entità o il fallback al tipo di relazione.

Restituisce:

  • str: nome del gruppo di entità
is_dynamic_type
def is_dynamic_type() -> bool

Controllare se il bordo ha un tipo dinamico.

Restituisce:

  • bool: True se il tipo dinamico
add_property
def add_property(edge_property: Property) -> None

Aggiungere una proprietà a questo bordo.

Parametri:

  • edge_property (Proprietà): proprietà da aggiungere
describe
def describe(text: str) -> Edge

Aggiungere la descrizione in modo fluente.

Parametri:

  • text (str): testo descrizione

Restituisce:

  • Edge: Self per il concatenamento dei metodi
to_dict
def to_dict() -> Dict[str, Any]

Serializzare edge in dictionary.

Restituisce:

  • Dict[str, Any]: bordo serializzato
to_gql
def to_gql() -> str

Generare la definizione perimetrale GQL.

Restituisce:

  • str: Rappresentazione di stringa GQL

Metodi di classe

create
Edge.create(
    relationship_type: str,
    source_node_label: str,
    target_node_label: str,
    properties: List[Property] = None,
    description: str = "",
    entity_group: str = "",
    **kwargs
) -> Edge

Creare un bordo con tutti i campi obbligatori.

Parametri:

  • relationship_type (str): tipo di relazione edge
  • source_node_label (str): etichetta del nodo di origine
  • target_node_label (str): etichetta del nodo di destinazione
  • properties (List[Property], optional): Proprietà edge
  • description (str, default=""): Descrizione del bordo
  • entity_group (str, default=""): Nome gruppo di entità

Restituisce:

  • Edge: nuova istanza di edge

Graphschema

Definizione dello schema del grafico con interfaccia indipendente dai tipi.

Costruttore

GraphSchema(
    name: str,
    nodes: List[Node] = [],
    edges: List[Edge] = [],
    base_graphs: List[GraphSchema] = [],
    description: str = "",
    version: str = "1.0",
    fully_qualified_name: str = "",
    namespace: str = ""
)

Parametri:

  • name (str): nome dello schema del grafico
  • nodes (List[Node], default=[]): Definizioni dei nodi
  • edges (List[Edge], default=[]): Definizioni edge
  • base_graphs (List[GraphSchema], default=[]): Schemi del grafo di base
  • description (str, default=""): Descrizione dello schema
  • version (str, default="1.0"): Versione dello schema
  • fully_qualified_name (str, default=""): Nome completo
  • namespace (str, default=""): Spazio dei nomi

Genera:

  • ValueError: se la convalida non riesce (alias duplicati, archi fanno riferimento a nodi inesistenti e così via)

Metodi

get_fully_qualified_name
def get_fully_qualified_name() -> str

Ottenere il nome completo.

Restituisce:

  • str: nome completo
get_namespace
def get_namespace() -> str

Ottenere lo spazio dei nomi dal nome completo o restituire l'impostazione predefinita.

Restituisce:

  • str:Namespace
get_version
def get_version() -> str

Ottenere la versione.

Restituisce:

  • str: stringa di versione
get_node
def get_node(label_or_alias: str) -> Optional[Node]

Ottenere il nodo in base all'etichetta o all'alias.

Parametri:

  • label_or_alias (str): etichetta o alias del nodo

Restituisce:

  • Node o None: Nodo se trovato
get_edge
def get_edge(name: str) -> Optional[Edge]

Ottenere edge per nome/tipo.

Parametri:

  • name (str): tipo di relazione edge

Restituisce:

  • Edge o None: Edge se trovato
add_node
def add_node(node: Node) -> None

Aggiungere un nodo a questo grafico.

Parametri:

  • node (Nodo): nodo da aggiungere

Genera:

  • ValueError: se l'alias del nodo è duplicato
add_edge
def add_edge(edge: Edge) -> None

Aggiungere un bordo a questo grafico.

Parametri:

  • edge (Edge): Edge da aggiungere

Genera:

  • ValueError: se il tipo di bordo è duplicato
include_graph
def include_graph(fully_qualified_name: str, version: str) -> GraphSchema

Aggiungere un'inclusione di grafi (API fluent).

Parametri:

  • fully_qualified_name (str): nome completo del grafo da includere
  • version (str): versione del grafo da includere

Restituisce:

  • GraphSchema: Self per il concatenamento dei metodi
get_included_graph_references
def get_included_graph_references() -> List[GraphDefinitionReference]

Ottenere l'elenco dei riferimenti ai grafici inclusi.

Restituisce:

  • List[GraphDefinitionReference]: elenco dei riferimenti alle definizioni del grafo
describe
def describe(text: str) -> GraphSchema

Aggiungere la descrizione in modo fluente.

Parametri:

  • text (str): testo descrizione

Restituisce:

  • GraphSchema: Self per il concatenamento dei metodi
to_dict
def to_dict() -> Dict[str, Any]

Serializzare lo schema nel dizionario.

Restituisce:

  • Dict[str, Any]: schema serializzato
to_json
def to_json(indent: int = 2) -> str

Generare la rappresentazione JSON.

Parametri:

  • indent (int, default=2): livello di rientro JSON

Restituisce:

  • str: stringa JSON
to_gql
def to_gql() -> str

Generare la definizione dello schema GQL.

Restituisce:

  • str: Rappresentazione di stringa GQL

Metodi di classe

create
@classmethod
GraphSchema.create(
    name: str,
    nodes: List[Node] = None,
    edges: List[Edge] = None,
    description: str = "",
    version: str = "1.0",
    **kwargs
) -> GraphSchema

Creare uno schema a grafo con tutti i campi obbligatori.

Parametri:

  • name (str): nome dello schema del grafico
  • nodes (List[Node], facoltativo): Definizioni di nodo
  • edges (List[Edge], facoltativo): Definizioni di Edge
  • description (str, default=""): Descrizione dello schema
  • version (str, default="1.0"): Versione dello schema

Restituisce:

  • GraphSchema: nuova istanza dello schema del grafo

Classi di input di query

Classi di dati che rappresentano i parametri di input per le query a grafo predefinite.

Nota

Il passaggio QueryInput di oggetti direttamente ai Graph metodi di query è deprecato e verrà rimosso nelle versioni future. Usare invece gli argomenti della parola chiave. I Graph metodi (reachability, k_hop, blast_radius, centrality, ) rankedaccettano tutti i parametri come argomenti di parola chiave e costruiscono internamente gli oggetti di input. Queste classi rimangono nella codebase per il momento, ma non devono essere usate nel nuovo codice.

QueryInputBase

Classe di base per tutti i parametri di input della query.

Metodi

to_json_payload
def to_json_payload() -> Dict[str, Any]

Convertire i parametri di input in un dizionario per l'invio dell'API.

Restituisce:

  • Dict[str, Any]: Rappresentazione del dizionario dei parametri di input
validate
def validate() -> None

Convalidare i parametri di input.

Genera:

  • ValueError: se i parametri di input non sono validi

ReachabilityQueryInput

Parametri di input per una query di raggiungibilità tra i nodi di origine e di destinazione. Eredita da ReachabilityQueryInputBase cui eredita da QueryInputBase.

Fields

Campo Tipo Impostazione predefinita Descrizione
source_property_value str (obbligatorio) Valore da trovare per la proprietà di origine
target_property_value str (obbligatorio) Valore da trovare per la proprietà di destinazione
source_property Optional[str] None Nome della proprietà per filtrare i nodi di origine
participating_source_node_labels Optional[List[str]] None Etichette dei nodi da considerare come nodi di origine
target_property Optional[str] None Nome della proprietà per filtrare i nodi di destinazione
participating_target_node_labels Optional[List[str]] None Etichette dei nodi da considerare come nodi di destinazione
participating_edge_labels Optional[List[str]] None Etichette perimetrali da attraversare nel percorso
is_directional Optional[bool] True Indica se i bordi sono direzionali
min_hop_count Optional[int] 1 Numero minimo di hop nel percorso
max_hop_count Optional[int] 4 Numero massimo di hop nel percorso
shortest_path Optional[bool] False Indica se trovare solo il percorso più breve
max_results Optional[int] 500 Numero massimo di risultati da restituire

Convalida:

  • source_property_value è obbligatorio
  • target_property_value è obbligatorio

Esempio:

# Preferred: keyword arguments (no import needed)
result = graph.reachability(
    source_property="UserId",
    source_property_value="user123",
    target_property="DeviceId",
    target_property_value="device456",
    participating_edge_labels=["accessed", "connected_to"],
    shortest_path=True
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import ReachabilityQueryInput
result = graph.reachability(query_input=ReachabilityQueryInput(
    source_property_value="user123",
    target_property_value="device456"
))

K_HopQueryInput

Parametri di input per una query k-hop da un nodo di origine specificato. Eredita da ReachabilityQueryInputBase.

Eredita tutti i campi da ReachabilityQueryInput.

Convalida:

  • Almeno uno di source_property_value o target_property_value deve essere fornito

Esempio:

# Preferred: keyword arguments
result = graph.k_hop(
    source_property_value="user123",
    max_hop_count=3,
    participating_edge_labels=["accessed"]
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import K_HopQueryInput
result = graph.k_hop(query_input=K_HopQueryInput(source_property_value="user123"))

BlastRadiusQueryInput

Parametri di input per una query con raggio di esplosione da nodi di origine a nodi di destinazione. Eredita da ReachabilityQueryInputBase.

Eredita tutti i campi da ReachabilityQueryInput, con i campi obbligatori seguenti:

Campo Tipo Obbligatorio Descrizione
source_property_value str Valore per identificare il nodo di origine
target_property_value str Valore per identificare il nodo di destinazione

Convalida:

  • source_property_value è obbligatorio
  • target_property_value è obbligatorio

Esempio:

# Preferred: keyword arguments
result = graph.blast_radius(
    source_property_value="user123",
    target_property_value="device456",
    participating_edge_labels=["accessed", "connected_to"]
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import BlastRadiusQueryInput
result = graph.blast_radius(query_input=BlastRadiusQueryInput(
    source_property_value="user123",
    target_property_value="device456"
))

CentralityQueryInput

Parametri di input per una query di analisi della centralità. Eredita da QueryInputBase.

Enumerazione CentralityType

Valore Descrizione
CentralityType.Node Centralità del nodo di calcolo
CentralityType.Edge Centralità dei bordi di calcolo

Fields

Campo Tipo Impostazione predefinita Descrizione
threshold Optional[int] 3 Punteggio di centralità minimo da considerare
centrality_type CentralityType CentralityType.Node Tipo di centralità da calcolare
max_paths Optional[int] 1000000 Percorsi massimi da considerare (0 = all)
participating_source_node_labels Optional[List[str]] None Etichette dei nodi di origine
participating_target_node_labels Optional[List[str]] None Etichette dei nodi di destinazione
participating_edge_labels Optional[List[str]] None Etichette perimetrali da attraversare
is_directional Optional[bool] True Indica se i bordi sono direzionali
min_hop_count Optional[int] 1 Hop minimi
max_hop_count Optional[int] 4 Numero massimo di hop
shortest_path Optional[bool] False Solo percorsi più brevi
max_results Optional[int] 500 Risultati massimi

Esempio:

# Preferred: keyword arguments (works for all centrality types)
result = graph.centrality(
    centrality_type=CentralityType.Edge,  # or CentralityType.Node (default)
    participating_edge_labels=["accessed", "connected_to"],
    threshold=5,
    max_results=100
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import CentralityQueryInput, CentralityType
result = graph.centrality(query_input=CentralityQueryInput(
    centrality_type=CentralityType.Edge,
    participating_edge_labels=["accessed"]
))

RankedQueryInput

Parametri di input per una query di analisi classificata. Eredita da QueryInputBase.

Fields

Campo Tipo Impostazione predefinita Descrizione
rank_property_name str (obbligatorio) Nome della proprietà da usare per i percorsi di classificazione
threshold Optional[int] 0 Solo i percorsi restituiti con pesi superiori a questo valore
max_paths Optional[int] 1000000 Percorsi massimi da considerare (0 = all)
decay_factor Optional[float] 1 Quanto ogni passaggio del grafico riduce il rango (2 = metà per ogni passaggio)
is_directional Optional[bool] True Indica se i bordi sono direzionali
min_hop_count Optional[int] 1 Hop minimi
max_hop_count Optional[int] 4 Numero massimo di hop
shortest_path Optional[bool] False Solo percorsi più brevi
max_results Optional[int] 500 Risultati massimi

Esempio:

# Preferred: keyword arguments
result = graph.ranked(
    rank_property_name="risk_score",
    threshold=5,
    decay_factor=2,
    max_results=50
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import RankedQueryInput
result = graph.ranked(query_input=RankedQueryInput(
    rank_property_name="risk_score",
    threshold=5
))

Risultati query

QueryResult

Risultato di una query a grafo con accesso lazy DataFrame.

Costruttore

QueryResult(raw_response: Dict[str, Any], graph: Graph)

Parametri:

  • raw_response (Dict[str, Any]): Dizionario di risposta API non elaborato
  • graph (Grafo): riferimento al grafico padre

Nota: In genere creato da Graph.query(), non viene creata direttamente un'istanza di .

Metodi

to_dataframe
def to_dataframe() -> DataFrame

Converte il risultato della query in un dataframe Spark.

Restituisce:

  • DataFrame: risultato della query come dataframe Spark

Genera:

  • ValueError: se la conversione non riesce

Esempio:

result = graph.query("MATCH (u:user) RETURN u")
df = result.to_dataframe()
df.show()
get_raw_data
def get_raw_data() -> Dict[str, Any]

Ottenere la sezione RawData dalla risposta.

Restituisce:

  • Dict[str, Any]: dizionario con metadati non elaborati o dict vuoto se non presente

Esempio:

result = graph.query("MATCH (u:user) RETURN u")
metadata = result.get_raw_data()
show
def show(format: str = "visual") -> None

Visualizzare il risultato della query in vari formati.

Parametri:

  • format (str, default="visual"): Formato di output
    • "table": tabelle dataframe complete (tutte le colonne)
    • "visual": visualizzazione interattiva del grafico con il plug-in VSC
    • "all": mostra tutti i formati

Genera:

  • ValueError: se il formato non è uno dei valori supportati

Esempio:

result = graph.query("MATCH (u:user)-[r:accessed]->(d:device) RETURN u, r, d")
result.show()  # Visual by default
result.show(format="table")  # Table format

# 0. Imports
from sentinel_graph import GraphSpecBuilder, Graph

# 1. Define graph specification
spec = (
    GraphSpecBuilder.start()
    
    .add_node("User")
        .from_dataframe(user_nodes)  # native Spark DF from groupBy → no .df
            .with_columns(
                "UserId", "UserDisplayName", "UserPrincipalName",
                "DistinctLocationCount", "DistinctIPCount", "DistinctAppCount",
                "TotalSignIns", "RiskySignInCount", "ImpossibleTravelFlag",
                key="UserId", display="UserDisplayName"
            )
    
    .add_node("IPAddress")
        .from_dataframe(ip_nodes)  # native Spark DF from groupBy → no .df
            .with_columns(
                "IPAddress", "UniqueUsers", "UniqueLocations",
                "SignInCount", "RiskySignInCount", "SharedIPFlag",
                key="IPAddress", display="IPAddress"
            )
    
    .add_edge("UsedIP")
        .from_dataframe(edge_used_ip)  # native Spark DF → no .df
            .source(id_column="UserId", node_type="User")
            .target(id_column="IPAddress", node_type="IPAddress")
            .with_columns(
                "SignInCount", "FirstSeen", "LastSeen", "EdgeKey",
                key="EdgeKey", display="EdgeKey"
            )
   
    .done()
)

# 2. Inspect schema before building (GraphSpec owns this)
spec.show_schema()

# 3. Build: prepares data + publishes graph → returns Graph
graph = Graph.build(spec)
print(f"Build status: {graph.build_status.status}")

# 4. Query the graph (query lives on Graph)
result = graph.query("MATCH (u:user)-[used:UsedIP]->(ip:IPAddress) RETURN * LIMIT 100")
result.show()

# 5. Access data via delegation
df = result.to_dataframe()
df.printSchema()

# 6. Graph algorithms
gf = graph.to_graphframe()
pagerank_result = gf.pageRank(resetProbability=0.15, maxIter=10)
pagerank_result.vertices.select("id", "pagerank").show()

# 7. Fetch an existing graph (no spec needed)
graph = Graph.get("my_existing_graph", context=context)
graph.query("MATCH (n) RETURN n LIMIT 10").show()

Note sui modelli di progettazione

Fluent API

Tutti i generatori supportano il concatenamento dei metodi per definizioni di grafi leggibili e dichiarative:

builder.add_node("user") \
    .from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_edge("follows")

Union Schemas

Più archi con lo stesso alias vengono automaticamente uniti con le proprietà unite:

# Both edges use alias "sign_in" - they will be merged into one schema edge
builder.add_edge("sign_in") \
    .from_table("AzureSignins") \
    .source(id_column="UserId", node_type="AZuser") \
    .target(id_column="DeviceId", node_type="device")

builder.add_edge("sign_in") \
    .from_table("EntraSignins") \
    .source(id_column="UserId", node_type="EntraUser") \
    .target(id_column="DeviceId", node_type="device")

Configurazione automatica

Molti campi hanno valori predefiniti ragionevoli:

  • Per impostazione predefinita, le etichette node/edge hanno gli alias
  • Le proprietà vengono dedotte automaticamente dagli schemi di origine
  • Per impostazione predefinita, i gruppi di entità sono etichette/tipi di relazione primari

Valutazione differita

I dataframe e le risorse vengono caricati in modo pigramente e memorizzati nella cache:

  • graph_spec.nodes e graph_spec.edges vengono caricati al primo accesso
  • I risultati delle query creano dataframe solo quando richiesto