Source code for easygraph.functions.structural_holes.evaluation

import math

from easygraph.utils import *

__all__ = ["effective_size", "efficiency", "constraint", "hierarchy"]

def mutual_weight(G, u, v, weight=None):
        a_uv = G[u][v].get(weight, 1)
    except KeyError:
        a_uv = 0
        a_vu = G[v][u].get(weight, 1)
    except KeyError:
        a_vu = 0
    return a_uv + a_vu

sum_nmw_rec = {}
max_nmw_rec = {}

def normalized_mutual_weight(G, u, v, norm=sum, weight=None):
    if norm == sum:
            return sum_nmw_rec[(u, v)]
        except KeyError:
            scale = norm(
                mutual_weight(G, u, w, weight) for w in set(G.all_neighbors(u))
            nmw = 0 if scale == 0 else mutual_weight(G, u, v, weight) / scale
            sum_nmw_rec[(u, v)] = nmw
            return nmw
    elif norm == max:
            return max_nmw_rec[(u, v)]
        except KeyError:
            scale = norm(
                mutual_weight(G, u, w, weight) for w in set(G.all_neighbors(u))
            nmw = 0 if scale == 0 else mutual_weight(G, u, v, weight) / scale
            max_nmw_rec[(u, v)] = nmw
            return nmw

def effective_size_parallel(nodes, G, weight):
    ret = []
    for node in nodes:
        neighbors_of_node = set(G.all_neighbors(node))
        if len(neighbors_of_node) == 0:
            ret.append([node, float("nan")])
            [node, sum(redundancy(G, node, u, weight) for u in neighbors_of_node)]
    return ret

def effective_size_borgatti_parallel(nodes, G, weight):
    ret = []
    for node in nodes:
        # Effective size is not defined for isolated nodes
        if len(G[node]) == 0:
            ret.append([node, float("nan")])
        E = G.ego_subgraph(node)
        ret.append([node, len(E) - (2 * E.size()) / len(E)])
    return ret

def redundancy(G, u, v, weight=None):
    nmw = normalized_mutual_weight
    r = sum(
        nmw(G, u, w, weight=weight) * nmw(G, v, w, norm=max, weight=weight)
        for w in set(G.all_neighbors(u))
    return 1 - r

[docs]@not_implemented_for("multigraph") @hybrid("cpp_effective_size") def effective_size(G, nodes=None, weight=None, n_workers=None): """Burt's metric - Effective Size. Parameters ---------- G : easygraph.Graph or easygraph.DiGraph nodes : list of nodes or None, optional (default : None) The nodes you want to calculate. If *None*, all nodes in `G` will be calculated. weight : string or None, optional (default : None) The key for edge weight. If *None*, `G` will be regarded as unweighted graph. Returns ------- effective_size : dict The Effective Size of node in `nodes`. Examples -------- >>> effective_size(G, ... nodes=[1,2,3], # Compute the Effective Size of some nodes. The default is None for all nodes in G. ... weight='weight' # The weight key of the graph. The default is None for unweighted graph. ... ) References ---------- .. [1] Burt R S. Structural holes: The social structure of competition[M]. Harvard university press, 2009. """ sum_nmw_rec.clear() max_nmw_rec.clear() effective_size = {} if nodes is None: nodes = G # Use Borgatti's simplified formula for unweighted and undirected graphs if not G.is_directed() and weight is None: if n_workers is not None: import random from functools import partial from multiprocessing import Pool local_function = partial( effective_size_borgatti_parallel, G=G, weight=weight ) nodes = list(nodes) random.shuffle(nodes) if len(nodes) > n_workers * 50000: nodes = split_len(nodes, step=50000) else: nodes = split(nodes, n_workers) with Pool(n_workers) as p: ret = p.imap(local_function, nodes) res = [x for i in ret for x in i] effective_size = dict(res) else: for v in nodes: # Effective size is not defined for isolated nodes if len(G[v]) == 0: effective_size[v] = float("nan") continue E = G.ego_subgraph(v) E.remove_node(v) effective_size[v] = len(E) - (2 * E.size()) / len(E) else: if n_workers is not None: import random from functools import partial from multiprocessing import Pool local_function = partial(effective_size_parallel, G=G, weight=weight) nodes = list(nodes) random.shuffle(nodes) if len(nodes) > n_workers * 30000: nodes = split_len(nodes, step=30000) else: nodes = split(nodes, n_workers) with Pool(n_workers) as p: ret = p.imap(local_function, nodes) res = [x for i in ret for x in i] effective_size = dict(res) else: for v in nodes: # Effective size is not defined for isolated nodes if len(G[v]) == 0: effective_size[v] = float("nan") continue effective_size[v] = sum( redundancy(G, v, u, weight) for u in set(G.all_neighbors(v)) ) return effective_size
[docs]@not_implemented_for("multigraph") def efficiency(G, nodes=None, weight=None): """Burt's metric - Efficiency. Parameters ---------- G : easygraph.Graph nodes : list of nodes or None, optional (default : None) The nodes you want to calculate. If *None*, all nodes in `G` will be calculated. weight : string or None, optional (default : None) The key for edge weight. If *None*, `G` will be regarded as unweighted graph. Returns ------- efficiency : dict The Efficiency of node in `nodes`. Examples -------- >>> efficiency(G, ... nodes=[1,2,3], # Compute the Efficiency of some nodes. The default is None for all nodes in G. ... weight='weight' # The weight key of the graph. The default is None for unweighted graph. ... ) References ---------- .. [1] Burt R S. Structural holes: The social structure of competition[M]. Harvard university press, 2009. """ e_size = effective_size(G, nodes=nodes, weight=weight) degree = efficiency = {n: v / degree[n] for n, v in e_size.items()} return efficiency
def compute_constraint_of_nodes(nodes, G, weight): ret = [] for node in nodes: neighbors_of_node = set(G.all_neighbors(node)) if len(neighbors_of_node) == 0: ret.append([node, float("nan")]) continue ret.append( [node, sum(local_constraint(G, node, u, weight) for u in neighbors_of_node)] ) return ret
[docs]@not_implemented_for("multigraph") @hybrid("cpp_constraint") def constraint(G, nodes=None, weight=None, n_workers=None): """Burt's metric - Constraint. Parameters ---------- G : easygraph.Graph nodes : list of nodes or None, optional (default : None) The nodes you want to calculate. If *None*, all nodes in `G` will be calculated. weight : string or None, optional (default : None) The key for edge weight. If *None*, `G` will be regarded as unweighted graph. workers : int or None, optional (default : None) The number of workers calculating (default: None). None if not using only one worker. Returns ------- constraint : dict The Constraint of node in `nodes`. Examples -------- >>> constraint(G, ... nodes=[1,2,3], # Compute the Constraint of some nodes. The default is None for all nodes in G. ... weight='weight', # The weight key of the graph. The default is None for unweighted graph. ... n_workers=4 # Parallel computing on four workers. The default is None for serial computing. ... ) References ---------- .. [1] Burt R S. Structural holes: The social structure of competition[M]. Harvard university press, 2009. """ sum_nmw_rec.clear() max_nmw_rec.clear() local_constraint_rec.clear() if nodes is None: nodes = G.nodes constraint = {} def compute_constraint_of_v(v): neighbors_of_v = set(G.all_neighbors(v)) if len(neighbors_of_v) == 0: constraint_of_v = float("nan") else: constraint_of_v = sum( local_constraint(G, v, n, weight) for n in neighbors_of_v ) return v, constraint_of_v if n_workers is not None: import random from functools import partial from multiprocessing import Pool local_function = partial(compute_constraint_of_nodes, G=G, weight=weight) nodes = list(nodes) random.shuffle(nodes) if len(nodes) > n_workers * 30000: nodes = split_len(nodes, step=30000) else: nodes = split(nodes, n_workers) with Pool(n_workers) as p: ret = p.imap(local_function, nodes) constraint_results = [x for i in ret for x in i] else: constraint_results = [] for v in nodes: constraint_results.append(compute_constraint_of_v(v)) constraint = dict(constraint_results) return constraint
local_constraint_rec = {} def local_constraint(G, u, v, weight=None): try: return local_constraint_rec[(u, v)] except KeyError: nmw = normalized_mutual_weight direct = nmw(G, u, v, weight=weight) indirect = sum( nmw(G, u, w, weight=weight) * nmw(G, w, v, weight=weight) for w in set(G.all_neighbors(u)) ) result = (direct + indirect) ** 2 local_constraint_rec[(u, v)] = result return result def hierarchy_parallel(nodes, G, weight): ret = [] for v in nodes: E = G.ego_subgraph(v) n = len(E) - 1 C = 0 c = {} neighbors_of_v = set(G.all_neighbors(v)) for w in neighbors_of_v: C += local_constraint(G, v, w, weight) c[w] = local_constraint(G, v, w, weight) if n > 1: ret.append( [ v, sum( c[w] / C * n * math.log(c[w] / C * n) / (n * math.log(n)) for w in neighbors_of_v ), ] ) else: ret.append([v, 0]) return ret
[docs]@not_implemented_for("multigraph") @hybrid("cpp_hierarchy") def hierarchy(G, nodes=None, weight=None, n_workers=None): """Returns the hierarchy of nodes in the graph Parameters ---------- G : graph nodes : dict, optional (default: None) weight : dict, optional (default: None) Returns ------- hierarchy : dict the hierarchy of nodes in the graph Examples -------- Returns the hierarchy of nodes in the graph G >>> hierarchy(G) Reference --------- """ sum_nmw_rec.clear() max_nmw_rec.clear() local_constraint_rec.clear() if nodes is None: nodes = G.nodes hierarchy = {} if n_workers is not None: import random from functools import partial from multiprocessing import Pool local_function = partial(hierarchy_parallel, G=G, weight=weight) nodes = list(nodes) random.shuffle(nodes) if len(nodes) > n_workers * 30000: nodes = split_len(nodes, step=30000) else: nodes = split(nodes, n_workers) with Pool(n_workers) as p: ret = p.imap(local_function, nodes) res = [x for i in ret for x in i] hierarchy = dict(res) else: for v in nodes: E = G.ego_subgraph(v) n = len(E) - 1 C = 0 c = {} neighbors_of_v = set(G.all_neighbors(v)) for w in neighbors_of_v: C += local_constraint(G, v, w, weight) c[w] = local_constraint(G, v, w, weight) if n > 1: hierarchy[v] = sum( c[w] / C * n * math.log(c[w] / C * n) / (n * math.log(n)) for w in neighbors_of_v ) if v not in hierarchy: hierarchy[v] = 0 return hierarchy