Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
La classe sentinel_graph consente di interagire con il grafo Microsoft Sentinel, consentendo di definire lo schema del grafo, trasformare i dati dal data lake Microsoft Sentinel in nodi e archi, pubblicare un grafo, eseguire query graph ed eseguire algoritmi di grafo avanzati. Questa classe è progettata per funzionare con le sessioni Spark nei notebook di Jupyter in esecuzione in Microsoft Sentinel calcolo Spark.
GraphSpecBuilder
La classe GraphSpecBuilder fornisce un generatore fluent per la creazione di specifiche del grafo con pipeline di dati e integrazione dello schema.
Importante
L'alias GraphBuilder per questa classe è deprecato e verrà rimosso in una versione futura. Usare GraphSpecBuilder in tutto il nuovo codice.
# Deprecated — emits DeprecationWarning
from sentinel_graph.builders.graph_builder import GraphBuilder
# Recommended
from sentinel_graph import GraphSpecBuilder
Costruttore
GraphSpecBuilder(context: ExecutionContext)
Parametri:
-
context(ExecutionContext): contesto di esecuzione contenente la sessione e la configurazione di Spark
Genera:
-
ValueError: se il contesto è Nessuno o non è possibile determinare il nome del grafo
Metodi statici
start
GraphSpecBuilder.start(context: Optional[ExecutionContext] = None) -> GraphSpecBuilder
Definire un nuovo generatore di grafi fluenti.
Parametri:
-
context(ExecutionContext, facoltativo): istanza di ExecutionContext. Se None, usa il contesto predefinito.
Restituisce:
-
GraphSpecBuilder: nuova istanza del generatore
Esempio:
builder = GraphSpecBuilder.start(context=context)
Metodi dell'istanza
add_node
def add_node(alias: str) -> NodeBuilderInitial
Iniziare a compilare una definizione di nodo.
Parametri:
-
alias(str): identificatore univoco per questo nodo all'interno del grafico
Restituisce:
-
NodeBuilderInitial: generatore di nodi nello stato iniziale
Esempio:
builder.add_node("user")
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial
Iniziare a creare una definizione perimetrale.
Parametri:
-
alias(str): identificatore per questo bordo all'interno del grafo (può essere condiviso tra più archi)
Restituisce:
-
EdgeBuilderInitial: generatore edge nello stato iniziale
Esempio:
builder.add_edge("accessed")
done
def done() -> GraphSpec
Finalizzare la specifica del grafo e restituire l'istanza GraphSpec.
Restituisce:
-
GraphSpec: completare la specifica del grafo con la pipeline di dati e lo schema
Genera:
-
ValueError: se il grafo non ha nodi o archi o se la convalida ha esito negativo
Esempio:
graph_spec = builder.done()
GraphSpec
Specifica del grafo con funzionalità di pipeline di dati, schema e visualizzazione.
Costruttore
GraphSpec(
name: str,
context: ExecutionContext,
graph_schema: GraphSchema,
etl_pipeline: Optional[ETLPipeline] = None
)
Parametri:
-
name(str): nome del grafico -
context(ExecutionContext): contesto di esecuzione -
graph_schema(GraphSchema): definizione dello schema del grafico -
etl_pipeline(ETLPipeline, facoltativo): pipeline di dati per la preparazione del grafo
Proprietà
nodes
def nodes() -> DataFrame
Ottenere i nodi DataFrame (lazy, memorizzati nella cache). Determina automaticamente l'origine dalla pipeline di dati o dalla tabella lake.
Restituisce:
-
DataFrame: DataFrame Spark contenente tutti i nodi
Genera:
-
ValueError: se il contesto è mancante o non è possibile caricare i dataframe
edges
def edges() -> DataFrame
Ottenere bordi DataFrame (lazy, memorizzato nella cache). Determina automaticamente l'origine dalla pipeline di dati o dalla tabella lake.
Restituisce:
-
DataFrame: DataFrame Spark contenente tutti i bordi
Genera:
-
ValueError: se il contesto è mancante o non è possibile caricare i dataframe
Metodi
build_graph_with_data
Nota
build_graph_with_data è deprecato e verrà rimosso in una versione futura.
Usare Graph.build(spec) invece .
def build_graph_with_data() -> Dict[str, Any]
Eseguire la pipeline di dati e pubblicare il grafico.
Graph.build(self)Chiama internamente , stashes l'oggetto restituito Graphe restituisce un dizionario compatibile con le versioni precedenti.
Restituisce:
-
Dict[str, Any]: dizionario contenente:-
etl_result: Risultati della preparazione dei dati -
api_result: pubblicare i risultati (in caso di esito positivo) -
api_error: stringa di errore (se la pubblicazione non è riuscita) -
instance_name: nome dell'istanza del grafico -
status:"published"o"prepared"
-
Esempio:
graph = Graph.build(spec)
print(f"Status: {graph.build_status.status}")
get_schema
def get_schema() -> GraphSchema
Ottenere lo schema del grafico.
Restituisce:
-
GraphSchema: definizione dello schema del grafo
get_pipeline
Nota
Questo metodo è deprecato e verrà rimosso in una versione futura. La pipeline di dati è un dettaglio di implementazione interna e non deve essere accessibile direttamente.
def get_pipeline() -> Optional[ETLPipeline]
Ottenere la pipeline di dati (nessuno per i grafici esistenti).
Restituisce:
-
ETLPipelineoNone: pipeline di dati, se disponibile
to_graphframe
def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame
Convertire l'intero grafo in GraphFrame per l'esecuzione di algoritmi a grafo. Funziona solo sui dati locali (dalla pipeline di dati o dalla tabella lake).
Parametri:
-
column_mapping(Dict[str, str], facoltativo): Mapping di colonne personalizzato con chiavi:-
"id": nome colonna ID vertice -
"source_id": nome colonna ID origine edge -
"target_id": nome colonna ID destinazione edge
-
Restituisce:
-
GraphFrame: oggetto GraphFrame con tutti i vertici e i bordi
Genera:
-
ValueError: se ExecutionContext non è disponibile
Esempio:
gf = graph_spec.to_graphframe()
pagerank = gf.pageRank(resetProbability=0.15, maxIter=10)
show
def show(limit: int = 100, viz_format: str = "visual") -> None
Visualizzare i dati del grafo in vari formati.
Parametri:
-
limit(int, default=100): Numero massimo di nodi/archi da visualizzare -
viz_format(str, default="visual"): Formato di output-
"table": tabelle dataframe complete (tutte le colonne) -
"visual": visualizzazione interattiva del grafico -
"all": mostra tutti i formati
-
Genera:
-
ValueError: se il formato non è uno dei valori supportati
Esempio:
graph_spec.show(limit=50, viz_format="table")
show_schema
def show_schema() -> None
Visualizzare lo schema del grafo come visualizzazione interattiva del grafico.
Esempio:
spec.show_schema()
Graph
Istanza del grafico su cui è possibile eseguire query. Creato tramite Graph.get() (grafo esistente) o Graph.build() (da un GraphSpecoggetto ).
Costruttore
Graph(
name: str,
context: ExecutionContext,
spec: Optional[GraphSpec] = None,
build_status: Optional[BuildStatus] = None,
)
Parametri:
-
name(str): nome del grafico -
context(ExecutionContext): contesto di esecuzione -
spec(GraphSpec, facoltativo): specifica del grafo associata (impostata daGraph.build()) -
build_status(BuildStatus, facoltativo): metadati dei risultati della compilazione (impostati daGraph.build())
Genera:
-
ValueError: se ExecutionContext è None
Metodi statici
get
Graph.get(name: str, context: Optional[ExecutionContext] = None) -> Graph
Ottenere un'istanza del grafo da un grafo esistente.
L'oggetto restituito Graph ha spec=None e build_status=None.
Parametri:
-
name(str): nome dell'istanza del grafico -
context(ExecutionContext, facoltativo): contesto di esecuzione (impostazione predefinita: ExecutionContext.default())
Restituisce:
-
Graph: istanza di Graph
Genera:
-
ValueError: se il nome del grafo è vuoto o l'istanza del grafo non esiste
Esempio:
graph = Graph.get("my_graph", context=context)
graph.query("MATCH (n) RETURN n")
prepare
Graph.prepare(spec: GraphSpec) -> Graph
Eseguire la fase di preparazione dei dati per un oggetto GraphSpecsenza pubblicazione. Usare publish() in seguito per registrare il grafico e renderlo queryable.
Parametri:
-
spec(GraphSpec): specifica del grafo da preparare
Restituisce:
-
Graph: istanza del grafico conspeccollegato ebuild_status.status == "prepared"
Genera:
-
ValueError: se la specifica non dispone di una pipeline di dati o di un contesto di esecuzione -
RuntimeError: se l'esecuzione della pipeline di dati ha esito negativo
Esempio:
spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.prepare(spec)
# Inspect results before publishing
graph.nodes.show()
graph.publish()
graph.query("MATCH (n) RETURN n")
build
Graph.build(spec: GraphSpec) -> Graph
Compilare un grafico da un GraphSpec oggetto preparando i dati e la pubblicazione.
Graph.prepare(spec) Chiama internamente e quindi tenta graph.publish(). A differenza della chiamata separata di questi due metodi, vengono rilevati errori di pubblicazione, ovvero il grafico restituito ha build_status.status == "prepared" e build_status.api_error imposta invece di generare.
Parametri:
-
spec(GraphSpec): specifica del grafo da cui eseguire la compilazione
Restituisce:
-
Graph: istanza del grafico conspeccollegato ebuild_statuspopolato
Genera:
-
ValueError: se la specifica non dispone di una pipeline di dati o di un contesto di esecuzione -
RuntimeError: se l'esecuzione della pipeline di dati ha esito negativo
Esempio:
spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.build(spec)
print(graph.build_status.status) # "published" or "prepared" (None if neither ran)
graph.query("MATCH (n) RETURN n")
Proprietà
nodes
def nodes() -> Optional[DataFrame]
Ottiene i nodi DataFrame. Delega a self.spec.nodes quando una specifica è associata; restituisce None in caso contrario.
edges
def edges() -> Optional[DataFrame]
Ottiene i bordi dataframe. Delega a self.spec.edges quando una specifica è associata; restituisce None in caso contrario.
schema
def schema() -> Optional[GraphSchema]
Ottenere lo schema del grafo. Delega a self.spec.get_schema() quando una specifica è associata; restituisce None in caso contrario.
Metodi
query
def query(query_string: str, query_language: str = "GQL") -> QueryResult
Eseguire una query sull'istanza del grafico usando GQL.
Parametri:
-
query_string(str): stringa di query graph (linguaggio GQL) -
query_language(str, default="GQL"): Linguaggio di query
Restituisce:
-
QueryResult: oggetto contenente nodi, archi e metadati
Genera:
-
ValueError: se la sessione ExecutionContext o Spark è mancante -
RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce
Esempio:
result = graph.query("MATCH (u:user) WHERE u.age > 30 RETURN u")
result.show()
reachability
def reachability(
*,
source_property_value: str = None,
target_property_value: str = None,
source_property: Optional[str] = None,
participating_source_node_labels: Optional[List[str]] = None,
target_property: Optional[str] = None,
participating_target_node_labels: Optional[List[str]] = None,
participating_edge_labels: Optional[List[str]] = None,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
[! NOTA]
reachability(query_input=ReachabilityQueryInput(...))è ancora accettato, ma emetteDeprecationWarninge verrà rimosso in una versione futura.
Eseguire l'analisi della raggiungibilità tra i nodi di origine e di destinazione.
Parametri:
-
source_property_value(str): valore da trovare per la proprietà di origine (convalidata in fase di esecuzione. Deve essere specificato quando non si usaquery_input) -
target_property_value(str): valore da trovare per la proprietà di destinazione (convalidato in fase di esecuzione. Deve essere specificato quando non si usaquery_input) -
source_property(Facoltativo[str]): Nome della proprietà per filtrare i nodi di origine -
participating_source_node_labels(Facoltativo[List[str]]): Etichette di nodo da considerare come origini -
target_property(Facoltativo[str]): Nome della proprietà per filtrare i nodi di destinazione -
participating_target_node_labels(Facoltativo[List[str]]): Etichette di nodo da considerare come destinazioni -
participating_edge_labels(Facoltativo[List[str]]): Etichette perimetrali da attraversare -
is_directional(bool): indica se i bordi sono direzionali (impostazione predefinita:True) -
min_hop_count(int): hop minimi (impostazione predefinita:1) -
max_hop_count(int): hop massimi (impostazione predefinita:4) -
shortest_path(bool): restituisce solo i percorsi più brevi (impostazione predefinita:False) -
max_results(int): risultati massimi (impostazione predefinita:500)
Genera:
-
ValueError: sesource_property_valueotarget_property_valueè mancante, ,min_hop_count < 1max_hop_count < min_hop_countomax_results < 1 -
RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce
Restituisce:
-
QueryResult: contenente i percorsi di raggiungibilità
Esempio:
result = graph.reachability(
source_property_value="user-001",
target_property_value="device-003")
result.show()
k_hop
def k_hop(
*,
source_property: Optional[str] = None,
source_property_value: Optional[str] = None,
participating_source_node_labels: Optional[List[str]] = None,
target_property: Optional[str] = None,
target_property_value: Optional[str] = None,
participating_target_node_labels: Optional[List[str]] = None,
participating_edge_labels: Optional[List[str]] = None,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
Nota
k_hop(query_input=K_HopQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.
Eseguire l'analisi k-hop da un nodo di origine specificato.
Parametri:
- Uguale a
reachability
Convalida:
- Almeno uno di
source_property_valueotarget_property_valuedeve essere fornito
Genera:
-
ValueError: se non viene specificato nésource_property_valuetarget_property_valueo se vengono violati vincoli numerici (comereachability) -
RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce
Restituisce:
-
QueryResult: contenente i risultati dell'hop k
Esempio:
result = graph.k_hop(source_property_value="user-001")
result.show()
blast_radius
def blast_radius(
*,
source_property_value: str = None,
target_property_value: str = None,
source_property: Optional[str] = None,
participating_source_node_labels: Optional[List[str]] = None,
target_property: Optional[str] = None,
participating_target_node_labels: Optional[List[str]] = None,
participating_edge_labels: Optional[List[str]] = None,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
Nota
blast_radius(query_input=BlastRadiusQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.
Eseguire l'analisi del raggio di esplosione dal nodo di origine al nodo di destinazione.
Parametri:
-
source_property_value(str): valore che identifica il nodo di origine (convalidato in fase di esecuzione. Deve essere specificato quando non si usaquery_input) -
target_property_value(str): valore che identifica il nodo di destinazione (convalidato in fase di esecuzione. Deve essere specificato quando non si usaquery_input) - Altri parametri: uguale a
reachability
Genera:
-
ValueError: sesource_property_valueotarget_property_valueè mancante o se i vincoli numerici vengono violati (comereachability) -
RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce
Restituisce:
-
QueryResult: contenente i risultati del raggio di esplosione
Esempio:
result = graph.blast_radius(
source_property_value="user-003",
target_property_value="device-003",
min_hop_count=1)
result.show()
centrality
def centrality(
*,
participating_source_node_labels: Optional[List[str]] = None,
participating_target_node_labels: Optional[List[str]] = None,
participating_edge_labels: Optional[List[str]] = None,
threshold: int = 3,
centrality_type: CentralityType = None,
max_paths: int = 1000000,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
Nota
centrality(query_input=CentralityQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.
Eseguire l'analisi della centralità nel grafico.
Parametri:
-
participating_source_node_labels(Facoltativo[List[str]]): Etichette del nodo di origine -
participating_target_node_labels(Facoltativo[List[str]]): Etichette dei nodi di destinazione -
participating_edge_labels(Facoltativo[List[str]]): Etichette perimetrali da attraversare -
threshold(int): punteggio di centralità minimo (impostazione predefinita:3); deve essere non negativo -
centrality_type(CentralityType):CentralityType.NodeoCentralityType.Edge(impostazione predefinita:None, restituisceCentralityType.Node) -
max_paths(int): i percorsi massimi da considerare (impostazione predefinita:1000000;0= tutti i percorsi); devono essere non negativi -
is_directional(bool): indica se i bordi sono direzionali (impostazione predefinita:True) -
min_hop_count(int): hop minimi (impostazione predefinita:1); deve essere ≥ 1 -
max_hop_count(int): numero massimo di hop (impostazione predefinita:4); deve essere ≥min_hop_count -
shortest_path(bool): restituisce solo i percorsi più brevi (impostazione predefinita:False) -
max_results(int): I risultati massimi (impostazione predefinita:500); devono essere ≥ 1
Genera:
-
ValueError: sethreshold < 0,max_paths < 0,min_hop_count < 1,max_hop_count < min_hop_countomax_results < 1 -
RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce
Restituisce:
-
QueryResult: contenente le metriche di centralità
Esempio:
result = graph.centrality(
participating_source_node_labels=["user", "device"],
participating_target_node_labels=["device", "user"],
participating_edge_labels=["sign_in"],
is_directional=False)
result.show()
ranked
def ranked(
*,
rank_property_name: str = None,
threshold: int = 0,
max_paths: int = 1000000,
decay_factor: float = 1,
is_directional: bool = True,
min_hop_count: int = 1,
max_hop_count: int = 4,
shortest_path: bool = False,
max_results: int = 500
) -> QueryResult
Nota
ranked(query_input=RankedQueryInput(...)) è ancora accettato, ma emette DeprecationWarning e verrà rimosso in una versione futura.
Eseguire un'analisi classificata nel grafico.
Parametri:
-
rank_property_name(str): nome della proprietà da usare per la classificazione (convalidata in fase di esecuzione. Deve essere specificato quando non si usaquery_input) -
threshold(int): solo i percorsi restituiti al di sopra di questo peso (impostazione predefinita:0); devono essere non negativi -
max_paths(int): i percorsi massimi da considerare (impostazione predefinita:1000000;0= tutti i percorsi); devono essere non negativi -
decay_factor(float): decadimento del rango per passaggio; 2 significa dimezzamento (impostazione predefinita:1); deve essere non negativo -
is_directional(bool): indica se i bordi sono direzionali (impostazione predefinita:True) -
min_hop_count(int): hop minimi (impostazione predefinita:1); deve essere ≥ 1 -
max_hop_count(int): numero massimo di hop (impostazione predefinita:4); deve essere ≥min_hop_count -
shortest_path(bool): restituisce solo i percorsi più brevi (impostazione predefinita:False) -
max_results(int): I risultati massimi (impostazione predefinita:500); devono essere ≥ 1
Genera:
-
ValueError: serank_property_namemanca, ,threshold < 0max_paths < 0,decay_factor < 0,min_hop_count < 1,max_hop_count < min_hop_countomax_results < 1 -
RuntimeError: se l'inizializzazione o l'esecuzione di query del client non riesce
Restituisce:
-
QueryResult: contenente i nodi/archi classificati
Esempio:
result = graph.ranked(
rank_property_name="risk_score",
threshold=5,
decay_factor=2)
result.show()
to_graphframe
def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame
Convertire l'intero grafico in GraphFrame. Usa i dati delle specifiche quando disponibili; legge da tabelle lake in caso contrario.
Parametri:
-
column_mapping(Dict[str, str], facoltativo): Mapping di colonne personalizzato
Restituisce:
-
GraphFrame: oggetto GraphFrame con tutti i vertici e i bordi
Esempio:
gf = graph.to_graphframe()
show
def show() -> None
Visualizzare le informazioni del grafico. Delega a spec.show() per la visualizzazione avanzata quando una specifica è collegata. In caso contrario, stampa informazioni minime.
show_schema
def show_schema() -> None
Visualizzare lo schema del grafico. Delega a spec.show_schema() quando è associata una specifica. Stampa un messaggio che indica che in caso contrario non è disponibile alcun schema.
publish
(novità della versione 0.3.3)
def publish() -> Graph
Registrare il grafico con l'API, rendendolo eseguibile su query. Chiamare questo metodo dopo Graph.prepare() (o su qualsiasi Graph specifica associata) per pubblicare l'istanza del grafico.
Restituisce:
-
Graph: Self per il concatenamento dei metodi
Genera:
-
ValueError: se non è associata alcuna specifica o se manca il contesto -
RuntimeError: se la pubblicazione non riesce
Esempio:
graph = Graph.prepare(spec)
graph.publish()
# Now the graph is queryable
graph.query("MATCH (n) RETURN n")
Buildstatus
Classe di dati che trasporta metadati da un'operazione Graph.build() .
Fields
| Campo | Tipo | Descrizione |
|---|---|---|
etl_result |
Any |
Risultato della fase (esecuzione della prepare pipeline di dati) |
api_result |
Optional[Dict] |
Risultato della fase di pubblicazione (None se la pubblicazione non è riuscita) |
api_error |
Optional[str] |
Messaggio di errore se la pubblicazione non è riuscita (None se la pubblicazione è riuscita) |
instance_name |
str |
Nome dell'istanza del grafico |
status |
Optional[BuildStatusKind] |
None, "published", o "prepared" |
Percorsi di costruzione
GraphSpecBuilder.start(...).done() → GraphSpec (spec only, no graph yet)
Graph.get(name, context) → Graph (spec=None, build_status=None)
Graph.prepare(spec) → Graph (spec=spec, build_status.status="prepared")
graph.publish() → Graph (build_status.status="published")
Graph.build(spec) → Graph (prepare + publish in one step)
Esempio:
graph = Graph.build(spec)
if graph.build_status.status == "published":
print("Graph prepared and published successfully")
elif graph.build_status.status == "prepared":
print(f"Prepare succeeded but publish failed: {graph.build_status.api_error}")
elif graph.build_status.status is None:
print("Neither prepare nor publish has run")
Generatori di nodi
NodeBuilderInitial
Stato iniziale per il generatore di nodi: sono disponibili solo i metodi dell'origine dati.
Costruttore
NodeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)
Nota: In genere creato tramite GraphSpecBuilder.add_node(), non è stata creata un'istanza diretta.
Metodi
Nota
L'uso dei caratteri di sottolineatura _ quando si denominano nodi, archi o proprietà in un grafico personalizzato non è supportato. Quando vengono usati caratteri di sottolineatura, viene restituito un errore di richiesta non valido.
from_table
def from_table(table_name: str, database: Optional[str] = None) -> NodeBuilderSourceSet
Impostare la tabella come origine dati con risoluzione intelligente del database.
Parametri:
-
table_name(str): nome della tabella (obbligatorio) -
database(str, facoltativo): nome del database esplicito (ha la precedenza sull'impostazione predefinita del contesto)
Restituisce:
-
NodeBuilderSourceSet: Generatore per un'ulteriore configurazione
Genera:
-
ValueError: se la tabella non è stata trovata o sono stati trovati più tabelle in conflitto
Ordine di risoluzione del database:
- Parametro esplicito
database(precedenza massima) - ExecutionContext.default_database
- Eseguire ricerche in tutti i database (con rilevamento dei conflitti)
Esempio:
builder.add_node("user").from_table("SigninLogs", database="security_db")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> NodeBuilderSourceSet
Impostare Spark DataFrame come origine dati.
Parametri:
-
dataframe(DataFrame): DataFrame Spark
Restituisce:
-
NodeBuilderSourceSet: Generatore per un'ulteriore configurazione
Esempio:
df = spark.read.table("users")
builder.add_node("user").from_dataframe(df)
NodeBuilderSourceSet
Generatore di nodi dopo l'impostazione dell'origine dati: metodi di configurazione disponibili.
Costruttore
NodeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)
Nota: Creato internamente dai metodi di origine NodeBuilderInitial.
Metodi
with_time_range
def with_time_range(
time_column: str,
start_time: Optional[Union[str, datetime]] = None,
end_time: Optional[Union[str, datetime]] = None,
lookback_hours: Optional[float] = None
) -> NodeBuilderSourceSet
Applicare il filtro dell'intervallo di tempo all'origine dati del nodo.
Parametri:
-
time_column(str): nome colonna contenente i dati timestamp (obbligatorio) -
start_time(str o datetime, facoltativo): data di inizio ('20/20/25', '2025-10-20' o oggetto datetime) -
end_time(str o datetime, facoltativo): data di fine (stessi formati di start_time) -
lookback_hours(float, facoltativo): ore per guardare indietro da ora
Restituisce:
-
NodeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se la colonna ora non è stata trovata nello schema di origine
Logica intervallo di tempo:
- Se start_time e end_time forniti: usarli direttamente
- Se lookback_hours specificato solo: end=now, start=now-lookback_hours
- Se non viene specificato nulla: nessun filtro temporale
- Se start/end AND lookback_hours: start/end ha la precedenza
Esempio:
# Explicit date range
builder.add_node("user").from_table("SigninLogs") \
.with_time_range(time_column="TimeGenerated", start_time="2025-01-01", end_time="2025-01-31")
# Lookback window
builder.add_node("user").from_table("SigninLogs") \
.with_time_range(time_column="TimeGenerated", lookback_hours=24)
with_label
def with_label(label: str) -> NodeBuilderSourceSet
Impostare l'etichetta del nodo (il valore predefinito è alias se non viene chiamato).
Parametri:
-
label(str): etichetta del nodo
Restituisce:
-
NodeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se l'etichetta è già impostata
Esempio:
builder.add_node("u").from_table("Users").with_label("user")
with_columns
def with_columns(
*columns: str,
key: str,
display: str
) -> NodeBuilderSourceSet
Configurare le colonne con la chiave necessaria e la designazione della visualizzazione.
Parametri:
-
*columns(str): nomi di colonna da includere (almeno uno obbligatorio) -
key(str): Nome colonna da contrassegnare come chiave (obbligatorio, deve essere in colonne) -
display(str): il nome della colonna da contrassegnare come valore visualizzato (obbligatorio, deve essere in colonne, può essere uguale a key)
Restituisce:
-
NodeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se la convalida non riesce (colonne duplicate, chiave/visualizzazione mancante e così via)
Note:
Le proprietà vengono compilate automaticamente dai tipi di colonna
Se specificato, la colonna filtro ora viene aggiunta automaticamente
I tipi di proprietà vengono dedotti automaticamente dallo schema di origine
Vedere Restrizioni
Esempio:
builder.add_node("user").from_table("Users") \
.with_columns("id", "name", "email", "created_at", key="id", display="name")
add_node
def add_node(alias: str) -> NodeBuilderInitial
Completare questo nodo e iniziare a compilare un altro nodo.
Parametri:
-
alias(str): alias per il nuovo nodo
Restituisce:
-
NodeBuilderInitial: Nuovo generatore di nodi
Esempio:
builder.add_node("user").from_table("Users") \
.with_columns("id", "name", key="id", display="name") \
.add_node("device")
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial
Completare questo nodo e iniziare a creare un bordo.
Parametri:
-
alias(str): alias per il bordo
Restituisce:
-
EdgeBuilderInitial: nuovo generatore di dispositivi perimetrali
Esempio:
builder.add_node("user").from_table("Users") \
.with_columns("id", "name", key="id", display="name") \
.add_edge("accessed")
done
def done() -> GraphSpec
Finalizzare questo nodo e completare la specifica del grafo.
Restituisce:
-
GraphSpec: specifica completa del grafo
Esempio:
graph_spec = builder.add_node("user").from_table("Users") \
.with_columns("id", "name", key="id", display="name") \
.done()
Edge Builders
EdgeBuilderInitial
Stato iniziale per edge builder: sono disponibili solo i metodi dell'origine dati.
Costruttore
EdgeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)
Nota: In genere creato tramite GraphSpecBuilder.add_edge(), non è stata creata un'istanza diretta.
Metodi
Nota
L'uso dei caratteri di sottolineatura _ quando si denominano nodi, archi o proprietà in un grafico personalizzato non è supportato. Quando vengono usati caratteri di sottolineatura, viene restituito un errore di richiesta non valido.
from_table
def from_table(table_name: str, database: Optional[str] = None) -> EdgeBuilderSourceSet
Impostare la tabella come origine dati con risoluzione intelligente del database.
Parametri:
-
table_name(str): nome della tabella (obbligatorio) -
database(str, facoltativo): nome del database esplicito
Restituisce:
-
EdgeBuilderSourceSet: Generatore per un'ulteriore configurazione
Genera:
-
ValueError: se la tabella non è stata trovata o sono stati trovati più tabelle in conflitto
Esempio:
builder.add_edge("accessed").from_table("AccessLogs")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> EdgeBuilderSourceSet
Impostare Spark DataFrame come origine dati.
Parametri:
-
dataframe(DataFrame): DataFrame Spark
Restituisce:
-
EdgeBuilderSourceSet: Generatore per un'ulteriore configurazione
Esempio:
df = spark.read.table("access_logs")
builder.add_edge("accessed").from_dataframe(df)
EdgeBuilderSourceSet
Edge builder dopo l'impostazione dell'origine dati: metodi di configurazione disponibili.
Costruttore
EdgeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)
Nota: Creato internamente dai metodi di origine EdgeBuilderInitial.
Metodi
with_label
def with_label(label: str) -> EdgeBuilderSourceSet
Impostare il tipo o l'etichetta della relazione perimetrale (per impostazione predefinita, se non viene chiamato l'alias).
Parametri:
-
label(str): etichetta perimetrale
Restituisce:
-
EdgeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se l'etichetta è già impostata
Esempio:
builder.add_edge("rel").from_table("AccessLogs").with_label("ACCESSED")
edge_label
Nota
Usare with_label() invece . Questo metodo verrà rimosso in una versione futura.
def edge_label(label: str) -> EdgeBuilderSourceSet
Impostare il tipo o l'etichetta della relazione perimetrale (per impostazione predefinita, se non viene chiamato l'alias).
Parametri:
-
label(str): etichetta perimetrale
Restituisce:
-
EdgeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se l'etichetta è già impostata
Esempio:
builder.add_edge("acc").from_table("AccessLogs").edge_label("accessed")
source
def source(id_column: str, node_type: str) -> EdgeBuilderSourceSet
Impostare il nodo di origine con la colonna ID e l'etichetta.
Parametri:
-
id_column(str): nome colonna contenente l'ID nodo di origine -
node_type(str): etichetta del nodo di origine
Restituisce:
-
EdgeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se l'origine è già impostata
Esempio:
builder.add_edge("accessed").from_table("AccessLogs") \
.source(id_column="user_id", node_type="user")
target
def target(id_column: str, node_type: str) -> EdgeBuilderSourceSet
Impostare il nodo di destinazione con la colonna ID e l'etichetta.
Parametri:
-
id_column(str): nome della colonna contenente l'ID nodo di destinazione -
node_type(str): etichetta del nodo di destinazione
Restituisce:
-
EdgeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se la destinazione è già impostata
Esempio:
builder.add_edge("accessed").from_table("AccessLogs") \
.source(id_column="user_id", node_type="user") \
.target(id_column="device_id", node_type="device")
with_time_range
def with_time_range(
time_column: str,
start_time: Optional[Union[str, datetime]] = None,
end_time: Optional[Union[str, datetime]] = None,
lookback_hours: Optional[float] = None
) -> EdgeBuilderSourceSet
Applicare il filtro dell'intervallo di tempo all'origine dati del bordo.
Parametri:
-
time_column(str): nome colonna contenente i dati timestamp (obbligatorio) -
start_time(str o datetime, facoltativo): Data di inizio -
end_time(str o datetime, facoltativo): Data di fine -
lookback_hours(float, facoltativo): ore per guardare indietro da ora
Restituisce:
-
EdgeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se la colonna ora non è stata trovata nello schema di origine
Esempio:
builder.add_edge("accessed").from_table("AccessLogs") \
.with_time_range(time_column="TimeGenerated", lookback_hours=48)
with_columns
def with_columns(
*columns: str,
key: str,
display: str
) -> EdgeBuilderSourceSet
Configurare le colonne con la chiave necessaria e la designazione della visualizzazione.
Parametri:
-
*columns(str): nomi di colonna da includere (almeno uno obbligatorio) -
key(str): Nome colonna da contrassegnare come chiave (obbligatorio, deve essere in colonne) -
display(str): nome colonna da contrassegnare come valore visualizzato (obbligatorio, deve essere in colonne)
Restituisce:
-
EdgeBuilderSourceSet: Self per il concatenamento dei metodi
Genera:
-
ValueError: se la convalida non riesce
Esempio:
builder.add_edge("accessed").from_table("AccessLogs") \
.source(id_column="user_id", node_type="user") \
.target(id_column="device_id", node_type="device") \
.with_columns("id", "location", "status", key="id", display="location")
add_node
def add_node(alias: str) -> NodeBuilderInitial
Completare questo bordo e iniziare a creare un nodo.
Parametri:
-
alias(str): alias per il nuovo nodo
Restituisce:
-
NodeBuilderInitial: Nuovo generatore di nodi
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial
Completare questo bordo e iniziare a creare un altro bordo.
Parametri:
-
alias(str): alias per il nuovo bordo
Restituisce:
-
EdgeBuilderInitial: nuovo generatore di dispositivi perimetrali
Esempio:
builder.add_edge("accessed").from_table("AccessLogs") \
.source(id_column="user_id", node_type="user") \
.target(id_column="device_id", node_type="device") \
.with_columns("id", "location", key="id", display="location") \
.add_edge("connected_to")
done
def done() -> GraphSpec
Finalizzare questo bordo e completare la specifica del grafo.
Restituisce:
-
GraphSpec: specifica completa del grafo
Classi di schema
GraphDefinitionReference
Riferimento a una definizione del grafo con nome e versione.
Costruttore
GraphDefinitionReference(
fully_qualified_name: str,
version: str
)
Parametri:
-
fully_qualified_name(str): nome completo del grafo a cui si fa riferimento -
version(str): versione del grafico a cui si fa riferimento
Genera:
-
ValueError: se fully_qualified_name o la versione è vuota
Metodi
to_dict
def to_dict() -> Dict[str, Any]
Serializzare nel dizionario.
Restituisce:
-
Dict[str, Any]: riferimento serializzato
Proprietà
Definizione della proprietà con interfaccia indipendente dai tipi.
Costruttore
Property(
name: str,
property_type: PropertyType,
is_non_null: bool = False,
description: str = "",
is_key: bool = False,
is_display_value: bool = False,
is_internal: bool = False
)
Parametri:
-
name(str): Nome proprietà -
property_type(PropertyType): tipo di dati della proprietà -
is_non_null(bool, default=False): Indica se la proprietà è obbligatoria -
description(str, default=""): Descrizione della proprietà -
is_key(bool, default=False): Indica se la proprietà è una chiave -
is_display_value(bool, default=False): Indica se la proprietà è un valore visualizzato -
is_internal(bool, default=False): Indica se la proprietà è interna
Genera:
-
ValueError: se il nome è vuoto o la convalida ha esito negativo
Metodi di classe
key
@classmethod
Property.key(
name: str,
property_type: PropertyType,
description: str = "",
is_non_null: bool = False
) -> Property
Creare una proprietà chiave con impostazioni comuni (is_key=True, is_display_value=True).
display
@classmethod
Property.display(
name: str,
property_type: PropertyType,
description: str = "",
is_non_null: bool = False
) -> Property
Creare una proprietà del valore di visualizzazione (is_display_value=True).
Metodi
describe
def describe(text: str) -> Property
Aggiungere la descrizione in modo fluente.
Parametri:
-
text(str): testo descrizione
Restituisce:
-
Property: Self per il concatenamento dei metodi
to_dict
def to_dict() -> Dict[str, Any]
Serializzare la proprietà nel dizionario con chiavi di annotazione con prefisso @.
Restituisce:
-
Dict[str, Any]: proprietà serializzata
to_gql
def to_gql() -> str
Generare la definizione della proprietà GQL.
Restituisce:
-
str: Rappresentazione di stringa GQL
EdgeNode
Riferimento al nodo usato nelle definizioni perimetrali.
Costruttore
EdgeNode(
alias: Optional[str] = None,
labels: List[str] = []
)
Parametri:
-
alias(str, facoltativo): alias nodo (impostato automaticamente sulla prima etichetta se None o vuoto) -
labels(List[str]): etichette di nodo (almeno una richiesta)
Genera:
-
ValueError: se l'elenco di etichette è vuoto -
TypeError: se le etichette non sono stringhe
Mutazione automatica:
- Se l'alias è Nessuno o vuoto, viene impostato sulla prima etichetta
Metodi
to_dict
def to_dict() -> Dict[str, Any]
Serializzare nel dizionario.
Restituisce:
-
Dict[str, Any]: riferimento al nodo perimetrale serializzato
Nodo
Definizione del nodo con interfaccia indipendente dai tipi.
Costruttore
Node(
alias: str = "",
labels: List[str] = [],
implies_labels: List[str] = [],
properties: List[Property] = [],
description: str = "",
entity_group: str = "",
dynamic_labels: bool = False,
abstract_edge_aliases: bool = False
)
Parametri:
-
alias(str, default=""): Alias nodo (impostato automaticamente sulla prima etichetta se vuoto) -
labels(List[str]): etichette di nodo (almeno una richiesta) -
implies_labels(List[str], default=[]): Etichette implicite -
properties(List[Property], default=[]): Node properties -
description(str, default=""): Descrizione del nodo -
entity_group(str, default=""): Nome gruppo di entità -
dynamic_labels(bool, default=False): indica se il nodo dispone di etichette dinamiche -
abstract_edge_aliases(bool, default=False): Indica se il nodo usa alias perimetrali astratti
Genera:
-
ValueError: se la convalida non riesce (nessuna etichetta, nessuna proprietà chiave, nessuna proprietà di visualizzazione e così via)
Mutazione automatica:
- Se l'alias è vuoto, viene impostato sulla prima etichetta
- Se entity_group è vuoto, viene impostato sull'etichetta primaria
Metodi
get_primary_label
def get_primary_label() -> Optional[str]
Ottiene l'etichetta primaria (prima).
Restituisce:
-
stroNone: etichetta primaria o Nessuna se nessuna etichetta
get_entity_group_name
def get_entity_group_name() -> str
Ottenere il nome del gruppo di entità o il fallback all'etichetta primaria.
Restituisce:
-
str: nome del gruppo di entità
get_primary_key_property_name
def get_primary_key_property_name() -> Optional[str]
Ottiene il nome della proprietà della chiave primaria.
Restituisce:
-
stroNone: nome della proprietà della chiave primaria
get_properties
def get_properties() -> Dict[str, Property]
Ottenere le proprietà come dizionario per un facile accesso.
Restituisce:
-
Dict[str, Property]: proprietà con chiave in base al nome
get_property
def get_property(name: str) -> Optional[Property]
Ottenere una proprietà specifica in base al nome.
Parametri:
-
name(str): Nome proprietà
Restituisce:
-
PropertyoNone: proprietà se trovata
add_property
def add_property(prop: Property) -> None
Aggiungere una proprietà a questo nodo.
Parametri:
-
prop(Proprietà): proprietà da aggiungere
Genera:
-
ValueError: se il nome della proprietà è duplicato
is_dynamically_labeled
def is_dynamically_labeled() -> bool
Controllare se il nodo ha etichette dinamiche.
Restituisce:
-
bool: True se le etichette dinamiche sono abilitate
is_abstract_edge_node_aliases
def is_abstract_edge_node_aliases() -> bool
Controllare se il nodo usa alias di nodi perimetrali astratti.
Restituisce:
-
bool: True se gli alias di bordi astratti sono abilitati
describe
def describe(text: str) -> Node
Aggiungere la descrizione in modo fluente.
Parametri:
-
text(str): testo descrizione
Restituisce:
-
Node: Self per il concatenamento dei metodi
to_dict
def to_dict() -> Dict[str, Any]
Serializzare il nodo nel dizionario.
Restituisce:
-
Dict[str, Any]: nodo serializzato
to_gql
def to_gql() -> str
Generare la definizione del nodo GQL.
Restituisce:
-
str: Rappresentazione di stringa GQL
Genera:
-
ValueError: se il nodo non dispone dei campi necessari per GQL
Metodi di classe
create
@classmethod
Node.create(
alias: str,
labels: List[str],
properties: List[Property],
description: str = "",
entity_group: str = "",
**kwargs
) -> Node
Creare un nodo con tutti i campi obbligatori.
Parametri:
-
alias(str): alias del nodo -
labels(List[str]): Etichette dei nodi -
properties(List[Property]): Proprietà node -
description(str, default=""): Descrizione del nodo -
entity_group(str, default=""): Nome gruppo di entità
Restituisce:
-
Node: nuova istanza del nodo
Edge
Definizione edge con interfaccia indipendente dai tipi.
Costruttore
Edge(
relationship_type: str,
source_node_label: str,
target_node_label: str,
direction: EdgeDirection = EdgeDirection.DIRECTED_RIGHT,
properties: List[Property] = [],
description: str = "",
entity_group: str = "",
dynamic_type: bool = False
)
Parametri:
-
relationship_type(str): tipo di relazione edge (ad esempio, "FOLLOWS", "OWNS") -
source_node_label(str): etichetta del nodo di origine -
target_node_label(str): etichetta del nodo di destinazione -
direction(EdgeDirection, default=DIRECTED_RIGHT): Direzione del bordo -
properties(List[Property], default=[]): Proprietà edge -
description(str, default=""): Descrizione del bordo -
entity_group(str, default=""): Nome gruppo di entità -
dynamic_type(bool, default=False): indica se edge ha un tipo dinamico
Genera:
-
ValueError: se la convalida non riesce
Mutazione automatica:
-
labelsl'elenco viene popolato automaticamente con[relationship_type] - Se entity_group è vuoto, viene impostato su relationship_type
Proprietà
edge_type
def edge_type() -> str
Alias di compatibilità con le versioni precedenti per relationship_type.
Restituisce:
-
str: tipo di relazione
Metodi
get_entity_group_name
def get_entity_group_name() -> str
Ottenere il nome del gruppo di entità o il fallback al tipo di relazione.
Restituisce:
-
str: nome del gruppo di entità
is_dynamic_type
def is_dynamic_type() -> bool
Controllare se il bordo ha un tipo dinamico.
Restituisce:
-
bool: True se il tipo dinamico
add_property
def add_property(edge_property: Property) -> None
Aggiungere una proprietà a questo bordo.
Parametri:
-
edge_property(Proprietà): proprietà da aggiungere
describe
def describe(text: str) -> Edge
Aggiungere la descrizione in modo fluente.
Parametri:
-
text(str): testo descrizione
Restituisce:
-
Edge: Self per il concatenamento dei metodi
to_dict
def to_dict() -> Dict[str, Any]
Serializzare edge in dictionary.
Restituisce:
-
Dict[str, Any]: bordo serializzato
to_gql
def to_gql() -> str
Generare la definizione perimetrale GQL.
Restituisce:
-
str: Rappresentazione di stringa GQL
Metodi di classe
create
Edge.create(
relationship_type: str,
source_node_label: str,
target_node_label: str,
properties: List[Property] = None,
description: str = "",
entity_group: str = "",
**kwargs
) -> Edge
Creare un bordo con tutti i campi obbligatori.
Parametri:
-
relationship_type(str): tipo di relazione edge -
source_node_label(str): etichetta del nodo di origine -
target_node_label(str): etichetta del nodo di destinazione -
properties(List[Property], optional): Proprietà edge -
description(str, default=""): Descrizione del bordo -
entity_group(str, default=""): Nome gruppo di entità
Restituisce:
-
Edge: nuova istanza di edge
Graphschema
Definizione dello schema del grafico con interfaccia indipendente dai tipi.
Costruttore
GraphSchema(
name: str,
nodes: List[Node] = [],
edges: List[Edge] = [],
base_graphs: List[GraphSchema] = [],
description: str = "",
version: str = "1.0",
fully_qualified_name: str = "",
namespace: str = ""
)
Parametri:
-
name(str): nome dello schema del grafico -
nodes(List[Node], default=[]): Definizioni dei nodi -
edges(List[Edge], default=[]): Definizioni edge -
base_graphs(List[GraphSchema], default=[]): Schemi del grafo di base -
description(str, default=""): Descrizione dello schema -
version(str, default="1.0"): Versione dello schema -
fully_qualified_name(str, default=""): Nome completo -
namespace(str, default=""): Spazio dei nomi
Genera:
-
ValueError: se la convalida non riesce (alias duplicati, archi fanno riferimento a nodi inesistenti e così via)
Metodi
get_fully_qualified_name
def get_fully_qualified_name() -> str
Ottenere il nome completo.
Restituisce:
-
str: nome completo
get_namespace
def get_namespace() -> str
Ottenere lo spazio dei nomi dal nome completo o restituire l'impostazione predefinita.
Restituisce:
-
str:Namespace
get_version
def get_version() -> str
Ottenere la versione.
Restituisce:
-
str: stringa di versione
get_node
def get_node(label_or_alias: str) -> Optional[Node]
Ottenere il nodo in base all'etichetta o all'alias.
Parametri:
-
label_or_alias(str): etichetta o alias del nodo
Restituisce:
-
NodeoNone: Nodo se trovato
get_edge
def get_edge(name: str) -> Optional[Edge]
Ottenere edge per nome/tipo.
Parametri:
-
name(str): tipo di relazione edge
Restituisce:
-
EdgeoNone: Edge se trovato
add_node
def add_node(node: Node) -> None
Aggiungere un nodo a questo grafico.
Parametri:
-
node(Nodo): nodo da aggiungere
Genera:
-
ValueError: se l'alias del nodo è duplicato
add_edge
def add_edge(edge: Edge) -> None
Aggiungere un bordo a questo grafico.
Parametri:
-
edge(Edge): Edge da aggiungere
Genera:
-
ValueError: se il tipo di bordo è duplicato
include_graph
def include_graph(fully_qualified_name: str, version: str) -> GraphSchema
Aggiungere un'inclusione di grafi (API fluent).
Parametri:
-
fully_qualified_name(str): nome completo del grafo da includere -
version(str): versione del grafo da includere
Restituisce:
-
GraphSchema: Self per il concatenamento dei metodi
get_included_graph_references
def get_included_graph_references() -> List[GraphDefinitionReference]
Ottenere l'elenco dei riferimenti ai grafici inclusi.
Restituisce:
-
List[GraphDefinitionReference]: elenco dei riferimenti alle definizioni del grafo
describe
def describe(text: str) -> GraphSchema
Aggiungere la descrizione in modo fluente.
Parametri:
-
text(str): testo descrizione
Restituisce:
-
GraphSchema: Self per il concatenamento dei metodi
to_dict
def to_dict() -> Dict[str, Any]
Serializzare lo schema nel dizionario.
Restituisce:
-
Dict[str, Any]: schema serializzato
to_json
def to_json(indent: int = 2) -> str
Generare la rappresentazione JSON.
Parametri:
-
indent(int, default=2): livello di rientro JSON
Restituisce:
-
str: stringa JSON
to_gql
def to_gql() -> str
Generare la definizione dello schema GQL.
Restituisce:
-
str: Rappresentazione di stringa GQL
Metodi di classe
create
@classmethod
GraphSchema.create(
name: str,
nodes: List[Node] = None,
edges: List[Edge] = None,
description: str = "",
version: str = "1.0",
**kwargs
) -> GraphSchema
Creare uno schema a grafo con tutti i campi obbligatori.
Parametri:
-
name(str): nome dello schema del grafico -
nodes(List[Node], facoltativo): Definizioni di nodo -
edges(List[Edge], facoltativo): Definizioni di Edge -
description(str, default=""): Descrizione dello schema -
version(str, default="1.0"): Versione dello schema
Restituisce:
-
GraphSchema: nuova istanza dello schema del grafo
Classi di input di query
Classi di dati che rappresentano i parametri di input per le query a grafo predefinite.
Nota
Il passaggio QueryInput di oggetti direttamente ai Graph metodi di query è deprecato e verrà rimosso nelle versioni future.
Usare invece gli argomenti della parola chiave. I Graph metodi (reachability, k_hop, blast_radius, centrality, ) rankedaccettano tutti i parametri come argomenti di parola chiave e costruiscono internamente gli oggetti di input. Queste classi rimangono nella codebase per il momento, ma non devono essere usate nel nuovo codice.
QueryInputBase
Classe di base per tutti i parametri di input della query.
Metodi
to_json_payload
def to_json_payload() -> Dict[str, Any]
Convertire i parametri di input in un dizionario per l'invio dell'API.
Restituisce:
-
Dict[str, Any]: Rappresentazione del dizionario dei parametri di input
validate
def validate() -> None
Convalidare i parametri di input.
Genera:
-
ValueError: se i parametri di input non sono validi
ReachabilityQueryInput
Parametri di input per una query di raggiungibilità tra i nodi di origine e di destinazione.
Eredita da ReachabilityQueryInputBase cui eredita da QueryInputBase.
Fields
| Campo | Tipo | Impostazione predefinita | Descrizione |
|---|---|---|---|
source_property_value |
str |
(obbligatorio) | Valore da trovare per la proprietà di origine |
target_property_value |
str |
(obbligatorio) | Valore da trovare per la proprietà di destinazione |
source_property |
Optional[str] |
None |
Nome della proprietà per filtrare i nodi di origine |
participating_source_node_labels |
Optional[List[str]] |
None |
Etichette dei nodi da considerare come nodi di origine |
target_property |
Optional[str] |
None |
Nome della proprietà per filtrare i nodi di destinazione |
participating_target_node_labels |
Optional[List[str]] |
None |
Etichette dei nodi da considerare come nodi di destinazione |
participating_edge_labels |
Optional[List[str]] |
None |
Etichette perimetrali da attraversare nel percorso |
is_directional |
Optional[bool] |
True |
Indica se i bordi sono direzionali |
min_hop_count |
Optional[int] |
1 |
Numero minimo di hop nel percorso |
max_hop_count |
Optional[int] |
4 |
Numero massimo di hop nel percorso |
shortest_path |
Optional[bool] |
False |
Indica se trovare solo il percorso più breve |
max_results |
Optional[int] |
500 |
Numero massimo di risultati da restituire |
Convalida:
-
source_property_valueè obbligatorio -
target_property_valueè obbligatorio
Esempio:
# Preferred: keyword arguments (no import needed)
result = graph.reachability(
source_property="UserId",
source_property_value="user123",
target_property="DeviceId",
target_property_value="device456",
participating_edge_labels=["accessed", "connected_to"],
shortest_path=True
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import ReachabilityQueryInput
result = graph.reachability(query_input=ReachabilityQueryInput(
source_property_value="user123",
target_property_value="device456"
))
K_HopQueryInput
Parametri di input per una query k-hop da un nodo di origine specificato.
Eredita da ReachabilityQueryInputBase.
Eredita tutti i campi da ReachabilityQueryInput.
Convalida:
- Almeno uno di
source_property_valueotarget_property_valuedeve essere fornito
Esempio:
# Preferred: keyword arguments
result = graph.k_hop(
source_property_value="user123",
max_hop_count=3,
participating_edge_labels=["accessed"]
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import K_HopQueryInput
result = graph.k_hop(query_input=K_HopQueryInput(source_property_value="user123"))
BlastRadiusQueryInput
Parametri di input per una query con raggio di esplosione da nodi di origine a nodi di destinazione.
Eredita da ReachabilityQueryInputBase.
Eredita tutti i campi da ReachabilityQueryInput, con i campi obbligatori seguenti:
| Campo | Tipo | Obbligatorio | Descrizione |
|---|---|---|---|
source_property_value |
str |
Sì | Valore per identificare il nodo di origine |
target_property_value |
str |
Sì | Valore per identificare il nodo di destinazione |
Convalida:
-
source_property_valueè obbligatorio -
target_property_valueè obbligatorio
Esempio:
# Preferred: keyword arguments
result = graph.blast_radius(
source_property_value="user123",
target_property_value="device456",
participating_edge_labels=["accessed", "connected_to"]
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import BlastRadiusQueryInput
result = graph.blast_radius(query_input=BlastRadiusQueryInput(
source_property_value="user123",
target_property_value="device456"
))
CentralityQueryInput
Parametri di input per una query di analisi della centralità.
Eredita da QueryInputBase.
Enumerazione CentralityType
| Valore | Descrizione |
|---|---|
CentralityType.Node |
Centralità del nodo di calcolo |
CentralityType.Edge |
Centralità dei bordi di calcolo |
Fields
| Campo | Tipo | Impostazione predefinita | Descrizione |
|---|---|---|---|
threshold |
Optional[int] |
3 |
Punteggio di centralità minimo da considerare |
centrality_type |
CentralityType |
CentralityType.Node |
Tipo di centralità da calcolare |
max_paths |
Optional[int] |
1000000 |
Percorsi massimi da considerare (0 = all) |
participating_source_node_labels |
Optional[List[str]] |
None |
Etichette dei nodi di origine |
participating_target_node_labels |
Optional[List[str]] |
None |
Etichette dei nodi di destinazione |
participating_edge_labels |
Optional[List[str]] |
None |
Etichette perimetrali da attraversare |
is_directional |
Optional[bool] |
True |
Indica se i bordi sono direzionali |
min_hop_count |
Optional[int] |
1 |
Hop minimi |
max_hop_count |
Optional[int] |
4 |
Numero massimo di hop |
shortest_path |
Optional[bool] |
False |
Solo percorsi più brevi |
max_results |
Optional[int] |
500 |
Risultati massimi |
Esempio:
# Preferred: keyword arguments (works for all centrality types)
result = graph.centrality(
centrality_type=CentralityType.Edge, # or CentralityType.Node (default)
participating_edge_labels=["accessed", "connected_to"],
threshold=5,
max_results=100
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import CentralityQueryInput, CentralityType
result = graph.centrality(query_input=CentralityQueryInput(
centrality_type=CentralityType.Edge,
participating_edge_labels=["accessed"]
))
RankedQueryInput
Parametri di input per una query di analisi classificata.
Eredita da QueryInputBase.
Fields
| Campo | Tipo | Impostazione predefinita | Descrizione |
|---|---|---|---|
rank_property_name |
str |
(obbligatorio) | Nome della proprietà da usare per i percorsi di classificazione |
threshold |
Optional[int] |
0 |
Solo i percorsi restituiti con pesi superiori a questo valore |
max_paths |
Optional[int] |
1000000 |
Percorsi massimi da considerare (0 = all) |
decay_factor |
Optional[float] |
1 |
Quanto ogni passaggio del grafico riduce il rango (2 = metà per ogni passaggio) |
is_directional |
Optional[bool] |
True |
Indica se i bordi sono direzionali |
min_hop_count |
Optional[int] |
1 |
Hop minimi |
max_hop_count |
Optional[int] |
4 |
Numero massimo di hop |
shortest_path |
Optional[bool] |
False |
Solo percorsi più brevi |
max_results |
Optional[int] |
500 |
Risultati massimi |
Esempio:
# Preferred: keyword arguments
result = graph.ranked(
rank_property_name="risk_score",
threshold=5,
decay_factor=2,
max_results=50
)
# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import RankedQueryInput
result = graph.ranked(query_input=RankedQueryInput(
rank_property_name="risk_score",
threshold=5
))
Risultati query
QueryResult
Risultato di una query a grafo con accesso lazy DataFrame.
Costruttore
QueryResult(raw_response: Dict[str, Any], graph: Graph)
Parametri:
-
raw_response(Dict[str, Any]): Dizionario di risposta API non elaborato -
graph(Grafo): riferimento al grafico padre
Nota: In genere creato da Graph.query(), non viene creata direttamente un'istanza di .
Metodi
to_dataframe
def to_dataframe() -> DataFrame
Converte il risultato della query in un dataframe Spark.
Restituisce:
-
DataFrame: risultato della query come dataframe Spark
Genera:
-
ValueError: se la conversione non riesce
Esempio:
result = graph.query("MATCH (u:user) RETURN u")
df = result.to_dataframe()
df.show()
get_raw_data
def get_raw_data() -> Dict[str, Any]
Ottenere la sezione RawData dalla risposta.
Restituisce:
-
Dict[str, Any]: dizionario con metadati non elaborati o dict vuoto se non presente
Esempio:
result = graph.query("MATCH (u:user) RETURN u")
metadata = result.get_raw_data()
show
def show(format: str = "visual") -> None
Visualizzare il risultato della query in vari formati.
Parametri:
-
format(str, default="visual"): Formato di output-
"table": tabelle dataframe complete (tutte le colonne) -
"visual": visualizzazione interattiva del grafico con il plug-in VSC -
"all": mostra tutti i formati
-
Genera:
-
ValueError: se il formato non è uno dei valori supportati
Esempio:
result = graph.query("MATCH (u:user)-[r:accessed]->(d:device) RETURN u, r, d")
result.show() # Visual by default
result.show(format="table") # Table format
Esempio completo (consigliato - v0.3+)
# 0. Imports
from sentinel_graph import GraphSpecBuilder, Graph
# 1. Define graph specification
spec = (
GraphSpecBuilder.start()
.add_node("User")
.from_dataframe(user_nodes) # native Spark DF from groupBy → no .df
.with_columns(
"UserId", "UserDisplayName", "UserPrincipalName",
"DistinctLocationCount", "DistinctIPCount", "DistinctAppCount",
"TotalSignIns", "RiskySignInCount", "ImpossibleTravelFlag",
key="UserId", display="UserDisplayName"
)
.add_node("IPAddress")
.from_dataframe(ip_nodes) # native Spark DF from groupBy → no .df
.with_columns(
"IPAddress", "UniqueUsers", "UniqueLocations",
"SignInCount", "RiskySignInCount", "SharedIPFlag",
key="IPAddress", display="IPAddress"
)
.add_edge("UsedIP")
.from_dataframe(edge_used_ip) # native Spark DF → no .df
.source(id_column="UserId", node_type="User")
.target(id_column="IPAddress", node_type="IPAddress")
.with_columns(
"SignInCount", "FirstSeen", "LastSeen", "EdgeKey",
key="EdgeKey", display="EdgeKey"
)
.done()
)
# 2. Inspect schema before building (GraphSpec owns this)
spec.show_schema()
# 3. Build: prepares data + publishes graph → returns Graph
graph = Graph.build(spec)
print(f"Build status: {graph.build_status.status}")
# 4. Query the graph (query lives on Graph)
result = graph.query("MATCH (u:user)-[used:UsedIP]->(ip:IPAddress) RETURN * LIMIT 100")
result.show()
# 5. Access data via delegation
df = result.to_dataframe()
df.printSchema()
# 6. Graph algorithms
gf = graph.to_graphframe()
pagerank_result = gf.pageRank(resetProbability=0.15, maxIter=10)
pagerank_result.vertices.select("id", "pagerank").show()
# 7. Fetch an existing graph (no spec needed)
graph = Graph.get("my_existing_graph", context=context)
graph.query("MATCH (n) RETURN n LIMIT 10").show()
Note sui modelli di progettazione
Fluent API
Tutti i generatori supportano il concatenamento dei metodi per definizioni di grafi leggibili e dichiarative:
builder.add_node("user") \
.from_table("Users") \
.with_columns("id", "name", key="id", display="name") \
.add_edge("follows")
Union Schemas
Più archi con lo stesso alias vengono automaticamente uniti con le proprietà unite:
# Both edges use alias "sign_in" - they will be merged into one schema edge
builder.add_edge("sign_in") \
.from_table("AzureSignins") \
.source(id_column="UserId", node_type="AZuser") \
.target(id_column="DeviceId", node_type="device")
builder.add_edge("sign_in") \
.from_table("EntraSignins") \
.source(id_column="UserId", node_type="EntraUser") \
.target(id_column="DeviceId", node_type="device")
Configurazione automatica
Molti campi hanno valori predefiniti ragionevoli:
- Per impostazione predefinita, le etichette node/edge hanno gli alias
- Le proprietà vengono dedotte automaticamente dagli schemi di origine
- Per impostazione predefinita, i gruppi di entità sono etichette/tipi di relazione primari
Valutazione differita
I dataframe e le risorse vengono caricati in modo pigramente e memorizzati nella cache:
-
graph_spec.nodesegraph_spec.edgesvengono caricati al primo accesso - I risultati delle query creano dataframe solo quando richiesto