blob: 862e8b00590d6e4a1127d2197c253cb9d561da01 [file] [log] [blame]
# Copyright 2026 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Graph algorithms and utilities."""
import logging
from typing import Any
class Graph:
"""A simple directed graph implementation."""
def __init__(self) -> None:
self.vertices: set[Any] = set()
self.graph: dict[Any, list[Any]] = {}
def add_vertex(self, u: Any) -> None:
"""Adds a vertex to the graph."""
self.vertices.add(u)
if u not in self.graph:
self.graph[u] = []
def add_edge(self, u: Any, v: Any) -> None:
"""Adds a directed edge from u to v."""
self.add_vertex(u)
self.add_vertex(v)
self.graph[u].append(v)
def get_sccs(self) -> list[list[Any]]:
"""
Finds Strongly Connected Components using Tarjan's algorithm.
"""
index_counter = 0
stack = []
lowlink = {}
index = {}
result = []
def connect(node: Any) -> None:
nonlocal index_counter
index[node] = index_counter
lowlink[node] = index_counter
index_counter += 1
stack.append(node)
try:
successors = self.graph.get(node, [])
for successor in successors:
if successor not in index:
connect(successor)
lowlink[node] = min(lowlink[node], lowlink[successor])
elif successor in stack:
lowlink[node] = min(lowlink[node], index[successor])
except RecursionError:
# A RecursionError occurs when the DFS encounters a deeply
# nested, linear chain of function calls that exceeds Python's
# recursion limit (e.g. 1000 nodes deep in a straight line).
logging.warning(
"Recursion error encountered on node %s. "
"Graph is too deep.",
node,
)
# Fallback or handle deep recursion if needed
if lowlink[node] == index[node]:
# SCC found. The current node is the root of the SCC.
# Nodes belonging to this SCC are currently on the stack.
# The stack is guaranteed to not be empty because 'node'
# itself was added at the start of connect(node) and
# hasn't been popped yet.
connected_component = []
while True:
successor = stack.pop()
connected_component.append(successor)
if successor == node:
break
result.append(connected_component)
for node in self.vertices:
if node not in index:
connect(node)
return result
def topological_sort(self) -> list[Any]:
"""
Returns vertices in topological order (u before v if u->v).
Note: This is only valid for DAGs. For cyclic graphs, this returns
*some* order compatible with the condensation DAG if used on SCCs,
but here it's run directly on the SCC DAG.
So this function just needs to support DAGs.
"""
visited = set()
stack = []
def visit(node: Any) -> None:
visited.add(node)
for neighbor in self.graph.get(node, []):
if neighbor not in visited:
visit(neighbor)
stack.append(node)
for node in self.vertices:
if node not in visited:
visit(node)
# Stack has reverse topological order (children first).
# We want u before v (parents first).
return stack[::-1]