Source code for pynetim.diffusion_model.base_multiprocess_diffusion_model

import random
import multiprocessing as mp
from typing import Set, List, Tuple, TYPE_CHECKING
from abc import ABC, abstractmethod

if TYPE_CHECKING:
    from ..graph import IMGraph

_global_graph = None
_global_model_class = None


def _init_worker(graph_data, model_class):
    """初始化工作进程。

    Args:
        graph_data: 图数据字典,包含 edges、weights、directed。
        model_class: 模型类。
    """
    global _global_graph, _global_model_class
    from ..graph import IMGraph
    _global_graph = IMGraph(graph_data['edges'], graph_data['weights'], graph_data['directed'])
    _global_model_class = model_class


def _run_trial_worker(args):
    """执行单次试验的工作函数。

    Args:
        args: 包含 (seeds, random_seed) 的元组。

    Returns:
        单次试验的结果元组。
    """
    seeds, random_seed = args
    model = _global_model_class(_global_graph, seeds)
    return model.run_single_trial(list(seeds), random_seed)


[docs] class BaseMultiprocessDiffusionModel(ABC): """多进程版自定义传播模型基类。 纯 Python 实现,使用 multiprocessing 实现真正的并行计算。 不受 Python GIL 限制,多进程可真正并行加速。 适合大规模模拟和需要并行加速的自定义模型。 Attributes: graph: 图对象。 seeds: 种子节点集合。 num_nodes: 节点数量。 Example: >>> from pynetim.diffusion_model import BaseMultiprocessDiffusionModel >>> >>> class MyICModel(BaseMultiprocessDiffusionModel): ... def run_single_trial(self, seeds, random_seed): ... import random ... random.seed(random_seed) ... activated = set(seeds) ... current = list(seeds) ... count = len(seeds) ... frequency = [0] * self.graph.num_nodes ... ... while current: ... new_active = [] ... for node in current: ... for neighbor, weight in self.graph.out_neighbors_with_weights(node): ... if neighbor not in activated and random.random() < weight: ... activated.add(neighbor) ... new_active.append(neighbor) ... count += 1 ... frequency[neighbor] += 1 ... current = new_active ... ... return count, activated, frequency ... >>> model = MyICModel(graph, {0, 1}) >>> avg = model.run_monte_carlo_diffusion(mc_rounds=1000, num_processes=4, random_seed=42) """
[docs] def __init__(self, graph: 'IMGraph', seeds: Set[int]): """初始化传播模型。 Args: graph: 图对象。 seeds: 种子节点集合。 """ self.graph = graph self.seeds = set(seeds) self.num_nodes = graph.num_nodes
[docs] def set_seeds(self, seeds: Set[int]): """设置种子节点集合。 Args: seeds: 新的种子节点集合。 """ self.seeds = set(seeds)
[docs] @abstractmethod def run_single_trial(self, seeds: List[int], random_seed: int) -> Tuple[int, Set[int], List[int]]: """执行单次传播试验。 子类必须重写此方法实现自定义传播逻辑。 Args: seeds: 初始种子节点列表。 random_seed: 随机数种子,用于确保结果可重现。 Returns: Tuple[int, Set[int], List[int]]: 包含三个元素的元组: - 激活节点总数 - 激活的节点集合 - 每个节点的激活频数列表 """ raise NotImplementedError
def _get_graph_data(self) -> dict: """获取图数据用于进程间传递。 Returns: dict: 包含 edges、weights、directed 的字典。 """ sparse_matrix = self.graph.get_adj_matrix_sparse() edges = [(u, v) for u, v, _ in sparse_matrix] weights = [w for _, _, w in sparse_matrix] return { 'edges': edges, 'weights': weights, 'directed': self.graph.directed }
[docs] def run_monte_carlo_diffusion( self, mc_rounds: int, num_processes: int = None, show_progress: bool = False, random_seed: int = None, normalize: bool = False ) -> float: """运行蒙特卡洛模拟,计算平均影响力。 Args: mc_rounds: 蒙特卡洛模拟次数,建议 1000-10000 次。 num_processes: 进程数,默认使用 CPU 核心数。 show_progress: 是否显示进度条(暂未实现)。 random_seed: 随机数种子,默认为 None(每次结果不同)。 normalize: 是否将结果归一化(除以图节点数),默认为 False。 Returns: float: 平均激活节点数。若 normalize=True,返回归一化后的影响力比例。 Note: 当模拟次数较多时(>= 进程数 * 10),自动启用多进程并行。 """ if num_processes is None: num_processes = mp.cpu_count() if random_seed is None: base_seed = random.randint(0, 2**31 - 1) else: base_seed = random_seed rng_seeds = [base_seed + i for i in range(mc_rounds)] if num_processes > 1 and mc_rounds >= num_processes * 10: graph_data = self._get_graph_data() args_list = [(self.seeds, rng_seeds[i]) for i in range(mc_rounds)] ctx = mp.get_context('fork') with ctx.Pool( processes=num_processes, initializer=_init_worker, initargs=(graph_data, self.__class__) ) as pool: results = pool.map(_run_trial_worker, args_list) total_activated = sum(r[0] for r in results) else: total_activated = 0 for i in range(mc_rounds): count, _, _ = self.run_single_trial(list(self.seeds), rng_seeds[i]) total_activated += count avg = total_activated / mc_rounds return avg / self.num_nodes if normalize else avg
[docs] def run_monte_carlo_with_frequency( self, mc_rounds: int, num_processes: int = None, random_seed: int = None, normalize: bool = False ) -> Tuple[float, List[int]]: """运行蒙特卡洛模拟,返回平均影响力和激活频数。 Args: mc_rounds: 蒙特卡洛模拟次数,建议 1000-10000 次。 num_processes: 进程数,默认使用 CPU 核心数。 random_seed: 随机数种子,默认为 None(每次结果不同)。 normalize: 是否将结果归一化(除以图节点数),默认为 False。 Returns: Tuple[float, List[int]]: 包含两个元素: - 平均激活节点数。若 normalize=True,返回归一化后的影响力比例。 - 每个节点的激活频数列表 """ if num_processes is None: num_processes = mp.cpu_count() if random_seed is None: base_seed = random.randint(0, 2**31 - 1) else: base_seed = random_seed rng_seeds = [base_seed + i for i in range(mc_rounds)] if num_processes > 1 and mc_rounds >= num_processes * 10: graph_data = self._get_graph_data() args_list = [(self.seeds, rng_seeds[i]) for i in range(mc_rounds)] ctx = mp.get_context('fork') with ctx.Pool( processes=num_processes, initializer=_init_worker, initargs=(graph_data, self.__class__) ) as pool: results = pool.map(_run_trial_worker, args_list) total_activated = sum(r[0] for r in results) total_frequency = [0] * self.num_nodes for _, _, freq in results: for i, f in enumerate(freq): total_frequency[i] += f else: total_activated = 0 total_frequency = [0] * self.num_nodes for i in range(mc_rounds): count, _, freq = self.run_single_trial(list(self.seeds), rng_seeds[i]) total_activated += count for j, f in enumerate(freq): total_frequency[j] += f avg = total_activated / mc_rounds return (avg / self.num_nodes if normalize else avg), total_frequency