1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import parameters as params
import itertools
from src.helper.debug_printer import debug_print
import networkx as nx
import pandas as pd
def postier_chinois(G, debug_mode=False):
"""
Parcourt le graphe en résolvant le problème du postier chinois
Source: https://brooksandrew.github.io/simpleblog/articles/intro-to-graph-optimization-solving-cpp/#solving-the-chinese-postman-problem
Parameters:
G: Le graphe des routes
debug_mode: Logs de debug
Returns:
La liste des arêtes (routes) avec entre 2.5 et 15 cm de neige
"""
gu = nx.to_undirected(G)
nodes_odd_degree = [v for v, d in gu.degree() if d % 2 == 1]
debug_print('Number of nodes of odd degree: {}'.format(len(nodes_odd_degree)), debug_mode)
debug_print('Number of total nodes: {}'.format(len(gu.nodes())), debug_mode)
odd_node_pairs = list(itertools.combinations(nodes_odd_degree, 2))
debug_print('Number of odd degree node pairs: {}'.format(len(odd_node_pairs)), debug_mode)
def get_shortest_paths_distances(graph, pairs, edge_weight_name):
"""
Compute shortest distance between each pair of nodes in a graph. Return a dictionary keyed on node pairs (tuples).
"""
distances = {}
for pair in pairs:
distances[pair] = nx.dijkstra_path_length(graph, pair[0], pair[1], weight=edge_weight_name)
return distances
odd_node_pairs_shortest_paths = get_shortest_paths_distances(gu, odd_node_pairs, 'length')
def create_complete_graph(pair_weights, flip_weights=True):
"""
Create a completely connected graph using a list of vertex pairs and the shortest path distances between them
Parameters:
pair_weights: list[tuple] from the output of get_shortest_paths_distances
flip_weights: Boolean. Should we negate the edge attribute in pair_weights?
"""
g = nx.Graph()
for k, v in pair_weights.items():
wt_i = - v if flip_weights else v
# g.add_edge(k[0], k[1], {'distance': v, 'weight': wt_i}) # deprecated after NX 1.11
g.add_edge(k[0], k[1], **{'distance': v, 'weight': wt_i})
return g
g_odd_complete = create_complete_graph(odd_node_pairs_shortest_paths, flip_weights=True)
debug_print('Number of nodes in complete graph of odd nodes: {}'.format(len(g_odd_complete.nodes())), debug_mode)
debug_print('Number of edges in complete graph of odd nodes: {}'.format(len(g_odd_complete.edges())), debug_mode)
odd_matching_dupes = nx.algorithms.max_weight_matching(g_odd_complete, True)
odd_matching = list(pd.unique([tuple(sorted([k, v])) for k, v in odd_matching_dupes]))
debug_print('Number of edges in max weight matching: {}'.format(len(odd_matching)), debug_mode)
def add_augmenting_path_to_graph(graph, min_weight_pairs):
"""
Add the min weight matching edges to the original graph
Parameters:
graph: NetworkX graph (original graph from trailmap)
min_weight_pairs: list[tuples] of node pairs from min weight matching
Returns:
augmented NetworkX graph
"""
# We need to make the augmented graph a MultiGraph so we can add parallel edges
graph_aug = nx.MultiGraph(graph.copy())
for pair in min_weight_pairs:
graph_aug.add_edge(pair[0],
pair[1],
**{'length': nx.dijkstra_path_length(graph, pair[0], pair[1]), 'name': 'augmented'}
# attr_dict={'distance': nx.dijkstra_path_length(graph, pair[0], pair[1]),
# 'trail': 'augmented'} # deprecated after 1.11
)
return graph_aug
g_aug = add_augmenting_path_to_graph(gu, odd_matching)
# Counts
debug_print('Number of edges in original graph: {}'.format(len(gu.edges())), debug_mode)
debug_print('Number of edges in augmented graph: {}'.format(len(g_aug.edges())), debug_mode)
def create_eulerian_circuit(graph_augmented, graph_original):
"""
Create the eulerian path using only edges from the original graph.
"""
debug_print("Creating eulerian circuit", debug_mode)
euler_circuit = []
naive_circuit = list(nx.eulerian_circuit(graph_augmented))
dbg = False
for edge in naive_circuit:
edge_data = graph_augmented.get_edge_data(edge[0], edge[1])
if len(edge_data) == 1 and 'name' in edge_data[0] and edge_data[0]['name'] != 'augmented':
# If `edge` exists in original graph, grab the edge attributes and add to eulerian circuit.
edge_att = graph_original[edge[0]][edge[1]]
euler_circuit.append((edge[0], edge[1], edge_att))
else:
aug_path = nx.shortest_path(graph_original, edge[0], edge[1], weight='length')
aug_path_pairs = list(zip(aug_path[:-1], aug_path[1:]))
debug_print('Filling in edges for augmented edge: {}'.format(edge), dbg)
debug_print('Augmenting path: {}'.format(' => '.join(str(aug_path))), dbg)
debug_print('Augmenting path pairs: {}\n'.format(aug_path_pairs), dbg)
# If `edge` does not exist in original graph, find the shortest path between its nodes and
# add the edge attributes for each link in the shortest path.
for edge_aug in aug_path_pairs:
edge_aug_att = graph_original[edge_aug[0]][edge_aug[1]]
euler_circuit.append((edge_aug[0], edge_aug[1], edge_aug_att))
return euler_circuit
euler_circuit = create_eulerian_circuit(g_aug, gu)
debug_print('Length of Eulerian circuit: {}'.format(len(euler_circuit)), debug_mode)
debug_print("Extraction des la météo des neiges depuis le parcours eulérien...", debug_mode)
routes_enneigées = []
for edge in euler_circuit:
if params.SNOW_THRESHOLD <= edge[2][0]['snow'] <= params.SNOW_MAX:
routes_enneigées.append(edge)
return routes_enneigées
|