Source code for pynetim.utils.graph_utils

import re
from pathlib import Path
from typing import Union, Optional, Tuple, TYPE_CHECKING

import numpy as np

if TYPE_CHECKING:
    from pynetim.graph import IMGraph

from pynetim.graph import generate_er_graph, generate_ba_graph, generate_ws_graph


[docs] def compute_sir_beta( graph: "IMGraph", gamma: float = 0.1, c: float = 1.3 ) -> Tuple[float, float]: """计算 SIR 模型的感染率 β。 基于配置模型近似 (Configuration Model Approximation) 计算感染率。 公式: β_c = γ · <k> / <k²> β = c · β_c 其中: - <k>: 平均度 - <k²>: 度的平方均值 - γ: 恢复率 - c: 调节系数 Parameters ---------- graph : IMGraph 图对象。 gamma : float, optional 恢复率,默认 0.1。 c : float, optional 调节系数,默认 1.3。 - c < 1: 不爆发 - c = 1: 临界状态 - c > 1: 爆发 Returns ------- tuple[float, float] (beta, beta_c): 感染率和临界感染率。 References ---------- Pastor-Satorras, R., Castellano, C., Van Mieghem, P., & Vespignani, A. (2015). Epidemic processes in complex networks. Reviews of Modern Physics, 87(3), 925. Examples -------- >>> from pynetim.utils import generate_er_graph, compute_sir_beta >>> g = generate_er_graph(n=100, p=0.1, random_seed=42) >>> beta, beta_c = compute_sir_beta(g, gamma=0.1, c=1.3) >>> print(f"感染率: {beta:.4f}, 临界感染率: {beta_c:.4f}") """ degrees = np.array(graph.batch_out_degree(list(range(graph.num_nodes)))) k_mean = degrees.mean() k2_mean = (degrees ** 2).mean() if k2_mean == 0: raise ValueError("图的度平方均值为 0,无法计算感染率") beta_c = gamma * k_mean / k2_mean beta = c * beta_c return beta, beta_c
[docs] def load_edgelist( filepath: Union[str, Path], directed: bool = True, renumber: bool = False, comment: Optional[str] = None, skip_lines: int = 0 ) -> "IMGraph": """ 从文件读取边列表构造 IMGraph。 文件格式:每行 u v [weight],分隔符支持空格/制表符/逗号。 第三列可选,默认权重为 1.0。 Parameters ---------- filepath : str or Path 边列表文件路径 directed : bool, optional 是否有向图,默认 True renumber : bool, optional 是否重编号节点为连续整数,默认 False comment : str, optional 注释行前缀,如 '#' 或 '%',默认 None skip_lines : int, optional 跳过文件开头的行数,默认 0 Returns ------- IMGraph 构造的图对象 Examples -------- 文件内容 (edges.txt): 0 1 0.5 1 2 0.8 2 3 >>> g = load_edgelist("edges.txt") >>> g.num_nodes 4 >>> g.num_edges 3 """ from pynetim.graph import IMGraph filepath = Path(filepath) if not filepath.exists(): raise FileNotFoundError(f"文件不存在: {filepath}") edges = [] weights = [] split_pattern = re.compile(r'[\s,\t]+') with open(filepath, 'r', encoding='utf-8') as f: for _ in range(skip_lines): next(f) for line_num, line in enumerate(f, start=skip_lines + 1): line = line.strip() if not line: continue if comment and line.startswith(comment): continue parts = split_pattern.split(line) if len(parts) < 2: raise ValueError(f"第 {line_num} 行格式错误: {line!r}") try: u = int(parts[0]) v = int(parts[1]) w = float(parts[2]) if len(parts) > 2 else 1.0 except ValueError as e: raise ValueError(f"第 {line_num} 行解析错误: {line!r}") from e edges.append((u, v)) weights.append(w) return IMGraph(edges=edges, weights=weights, directed=directed, renumber=renumber)
[docs] def save_edgelist( graph: "IMGraph", filepath: Union[str, Path], delimiter: str = "\t", include_weight: bool = True ) -> None: """ 将 IMGraph 保存为边列表文件。 Parameters ---------- graph : IMGraph 图对象 filepath : str or Path 输出文件路径 delimiter : str, optional 分隔符,默认制表符 include_weight : bool, optional 是否包含权重列,默认 True Examples -------- >>> g = IMGraph(edges=[(0, 1), (1, 2)], weights=[0.5, 0.8]) >>> save_edgelist(g, "output.txt") """ filepath = Path(filepath) filepath.parent.mkdir(parents=True, exist_ok=True) with open(filepath, 'w', encoding='utf-8') as f: for u in range(graph.num_nodes): for neighbor, weight in graph.out_neighbors_with_weights(u): if include_weight: f.write(f"{u}{delimiter}{neighbor}{delimiter}{weight}\n") else: f.write(f"{u}{delimiter}{neighbor}\n")
[docs] def to_networkx(graph: "IMGraph"): """ 将 IMGraph 转换为 networkx.DiGraph 或 networkx.Graph。 Parameters ---------- graph : IMGraph 图对象 Returns ------- networkx.DiGraph 或 networkx.Graph 取决于原图是否有向 Examples -------- >>> g = IMGraph(edges=[(0, 1), (1, 2)], directed=True) >>> nx_g = to_networkx(g) >>> nx_g.number_of_nodes() 3 """ import networkx as nx if graph.directed: G = nx.DiGraph() else: G = nx.Graph() G.add_nodes_from(range(graph.num_nodes)) all_neighbors = graph.batch_out_neighbors_with_weights(list(range(graph.num_nodes))) for u, neighbors in enumerate(all_neighbors): for neighbor, weight in neighbors: G.add_edge(u, neighbor, weight=weight) return G
[docs] def to_igraph(graph: "IMGraph"): """ 将 IMGraph 转换为 igraph.Graph。 Parameters ---------- graph : IMGraph 图对象 Returns ------- igraph.Graph igraph 图对象,边权重存储在 'weight' 属性中 Examples -------- >>> g = IMGraph(edges=[(0, 1), (1, 2)], directed=True) >>> ig_g = to_igraph(g) >>> ig_g.vcount() 3 """ import igraph as ig edges = [] weights = [] all_neighbors = graph.batch_out_neighbors_with_weights(list(range(graph.num_nodes))) for u, neighbors in enumerate(all_neighbors): for neighbor, weight in neighbors: edges.append((u, neighbor)) weights.append(weight) G = ig.Graph(n=graph.num_nodes, edges=edges, directed=graph.directed) G.es['weight'] = weights return G
[docs] def to_scipy_sparse(graph: "IMGraph", format: str = "csr"): """ 将 IMGraph 转换为 scipy 稀疏矩阵。 Parameters ---------- graph : IMGraph 图对象 format : str, optional 稀疏矩阵格式,可选 'csr', 'csc', 'coo', 'lil', 'dok',默认 'csr' Returns ------- scipy.sparse.spmatrix 稀疏邻接矩阵,matrix[i, j] 表示边 (i, j) 的权重 Examples -------- >>> g = IMGraph(edges=[(0, 1), (1, 2)], weights=[0.5, 0.8], directed=True) >>> mat = to_scipy_sparse(g) >>> mat.toarray() array([[0. , 0.5, 0. ], [0. , 0. , 0.8], [0. , 0. , 0. ]]) """ import scipy.sparse as sp sparse_data = graph.get_adj_matrix_sparse() if not sparse_data: return sp.coo_matrix((graph.num_nodes, graph.num_nodes)) rows = [e[0] for e in sparse_data] cols = [e[1] for e in sparse_data] data = [e[2] for e in sparse_data] mat = sp.coo_matrix((data, (rows, cols)), shape=(graph.num_nodes, graph.num_nodes)) format = format.lower() if format == "csr": return mat.tocsr() elif format == "csc": return mat.tocsc() elif format == "lil": return mat.tolil() elif format == "dok": return mat.todok() else: return mat
[docs] def to_pyg(graph: "IMGraph"): """ 将 IMGraph 转换为 PyTorch Geometric Data 对象。 Parameters ---------- graph : IMGraph 图对象 Returns ------- torch_geometric.data.Data PyG Data 对象,包含 edge_index 和 edge_attr (权重) Examples -------- >>> g = IMGraph(edges=[(0, 1), (1, 2)], weights=[0.5, 0.8], directed=True) >>> pyg_g = to_pyg(g) >>> pyg_g.num_nodes 3 """ import torch from torch_geometric.data import Data sparse_data = graph.get_adj_matrix_sparse() if not sparse_data: edge_index = torch.empty((2, 0), dtype=torch.long) edge_attr = torch.empty((0, 1), dtype=torch.float) else: edge_index = torch.tensor([[e[0], e[1]] for e in sparse_data], dtype=torch.long).t() edge_attr = torch.tensor([[e[2]] for e in sparse_data], dtype=torch.float) return Data(edge_index=edge_index, edge_attr=edge_attr, num_nodes=graph.num_nodes)