from src.helper.debug_printer import debug_print import networkx as nx from copy import deepcopy import heapq def split_into_strongly_connected(G): """ Renvoie la liste des composantes fortement connexes de G Params : - G : Le graph à split Returns : - composantes (list) : la liste des sous-graphes (composantes) fortement connexes de G """ return [G.subgraph(c) for c in nx.strongly_connected_components(G)] def chinese_postman_greedy(G, debug_mode=False): if not nx.is_strongly_connected(G): raise ValueError("Le graphe doit être fortement connexe") G = nx.DiGraph(G) shortest_lengths = dict(nx.all_pairs_dijkstra_path_length(G, weight='length')) shortest_paths = dict(nx.all_pairs_dijkstra_path(G, weight='length')) delta = {v: G.out_degree(v) - G.in_degree(v) for v in G.nodes()} surplus = [v for v in G.nodes() if delta[v] > 0] deficit = [v for v in G.nodes() if delta[v] < 0] debug_print(f"Déficitaires: {len(deficit)}, Excédentaires: {len(surplus)}", debug_mode) for u in deficit: while delta[u] < 0: heap = [] for v in surplus: if delta[v] <= 0: continue if v in shortest_lengths[u]: heapq.heappush(heap, (shortest_lengths[u][v], v)) if not heap: raise RuntimeError("Aucun noeud excédentaire atteignable depuis", u) _, v = heapq.heappop(heap) flow = min(-delta[u], delta[v]) delta[u] += flow delta[v] -= flow path = shortest_paths[u][v] for i in range(len(path) - 1): a, b = path[i], path[i + 1] if not G.has_edge(a, b): raise ValueError(f"Chemin invalide : arête manquante ({a} → {b})") G[a][b]['duplicate'] = G[a][b].get('duplicate', 0) + flow debug_print(f"Arête ({a} → {b}) dupliquée {flow} fois", debug_mode) MG = nx.MultiDiGraph() for u, v, data in G.edges(data=True): weight = data.get('length', 1) MG.add_edge(u, v, length=weight) for _ in range(data.get('duplicate', 0)): MG.add_edge(u, v, length=weight) return list(nx.eulerian_circuit(MG)) def link_components(global_graph, path1, path2): """ Fusionne les 2 chemins en les reliant par leur plus court chemin Params : - global_graph : Graphe de toute la ville pour avoir les données des arcs - path1 : chemin de départ - path2 : chemin d'arrivée Returns : - fused_path : fusion des 2 chemins """ fused_path = [] if path1[0][0] != None: fused_path = path1 start = path1[-1][1] end = path2[0][0] invert = False # print(start, end, nx.has_path(global_graph,start,end)) if not nx.has_path(global_graph, start, end): start,end = end, start fused_path = path2 invert = True shortest_path = [start,end] if nx.has_path(global_graph, start, end): shortest_path = nx.shortest_path(global_graph, source=start, target=end, weight='length') for i in range(len(shortest_path) - 1): fused_path.append((shortest_path[i], shortest_path[i + 1])) if path2[0][1] != None: if invert: if path1[0][0] != None: fused_path.extend(path1) else: fused_path.extend(path2) return fused_path def oriented_cpt(local_graph, global_graph, debug_mode=False): """ Détermine le parcours de la déneigeuse sur local_graph en résolvant le problème du postier chinois orienté. Params : - local_graph : le graph à parcourir - global_graph : le graph global de référence - debug_mode : active/désactive les logs de debug Returns : - parcours : le parcours de la déneigeuse """ components = split_into_strongly_connected(local_graph) parcours = [] if len(components[0].edges) > 0: parcours = chinese_postman_greedy(components[0], debug_mode=debug_mode) else: parcours = [(None, list(components[0])[0])] for component in components[1:]: if len(component.edges) > 0: cpp = chinese_postman_greedy(component, debug_mode=debug_mode) parcours = link_components(global_graph, parcours, cpp) else: cpp = [(list(component)[0], None)] parcours = link_components(global_graph, parcours, cpp) return parcours # def oriented_cpt(G, debug_mode=False): # """ # Version orientée du Chinese Postman Problem. # Le graph ne doit pas contenir de cycles de poids négatifs # et le graph doit être fortement connexe pour que l'algo fonctionne # Parameters: # G: Graph à utiliser pour ce problème # """ # nb_nodes = len(G.nodes) # class CPP: # def __init__(self, graph): # self.G = nx.DiGraph(graph) # self.delta_tab = { elem: self.G.out_degree(elem) - self.G.in_degree(elem) for elem in self.G.nodes() } # self.neg_degree = [] # self.pos_degree = [] # self.f = dict() # self.c = dict() # self.path = dict() # # init the c and path dictionnary so it does not have to be in # self.setup_fields() # def addEdge(self,l, u, v): # if ((u, v) not in self.c or self.c[(u,v)] > l): # self.c[(u,v)] = l # self.path[(u,v)] = v # if (not self.G.has_edge(u, v)): # self.G.add_edge(u,v, length = l) # else: # nx.set_edge_attributes(self.G, {(u, v): {"length": l}}) # def setup_fields(self): # for i, j in self.G.edges(): # self.c[(i, j)] = self.G.get_edge_data(i,j)["length"] # self.path[(i, j)] = j # def least_cost_paths(self): # for edge in self.G.edges(): # i = edge[0] # k = edge[1] # for j in self.G.nodes: # if ((k, j) in self.c # and ((i,j) not in self.c # or self.c[(i, j)] > self.c[(i,k)] + self.c[(k,j)])): # self.path[(i, j)] = self.path[(i, k)] # self.c[(i, j)] = self.c[(i,k)] + self.c[(k,j)] # if i == j and self.c[(i,j)] < 0: return # cycle négatif tu connais # def findUnbalanced(self): # for (key, val) in self.delta_tab.items(): # if (val < 0): # self.neg_degree.append(key) # elif (val > 0): # self.pos_degree.append(key) # def findFeasible(self): # for i in self.neg_degree: # for j in self.pos_degree: # self.f[(i, j)] = -self.delta_tab[i] if -self.delta_tab[i] < self.delta_tab[j] else self.delta_tab[j] # self.delta_tab[i] += self.f[(i, j)] # self.delta_tab[j] -= self.f[(i, j)] # # Improvements needed # def improvements(self): # residual = CPP(self.G) # for i,j in zip(self.neg_degree, self.pos_degree): # length = 0 # if ((i,j) in self.c): # length = self.c[(i, j)] # residual.addEdge(length, i, j) # if (i,j) not in self.f: # self.f[(i,j)] = 0 # elif (self.f[(i,j)] != 0): # residual.addEdge(-length, j,i) # residual.least_cost_paths() # for node in self.G.nodes: # if (node,node) in residual.c and residual.c[(node,node)] < 0: # k = 0 # kunset = True # u = node # while True: # v = residual.path[(u,node)] # if residual.c[(u,v)] < 0 and (kunset or k > self.f[(v,u)]): # k = self.f[(v,u)] # kunset = False # u = v # if u == node: # break # u = node # while True: # v = residual.path[(u,node)] # if ((u, v) not in residual.c): # residual.c[(u,v)] = 0 # if ((u, v) not in self.f): # self.f[(u,v)] = 0 # if ((v, u) not in self.f): # self.f[(v,u)] = 0 # if residual.c[(u,v)] < 0: # self.f[(v,u)] -= k # else: # self.f[(u,v)] += k # u = v # if u == node: # break # return True # return False # def solve(self): # print(debug_mode) # self.least_cost_paths() # self.findUnbalanced() # self.findFeasible() # while self.improvements(): # pass # def solution(self,start = None, debug_mode=False): # if (start == None): # start = list(self.G.nodes)[0] # sol = [] # arcs = dict() # def findPath(fromNode): # for node in self.G.nodes: # if ((fromNode,node) in self.f and self.f[(fromNode,node)] > 0): # return node # return None # v = start # while True: # u = v # v = findPath(u) # if v != None: # self.f[(u,v)] -= 1 # while u != v: # if ((u, v) not in self.path): # debug_print("We found a path with a 0 in it. This is probably a bad thig idk", debug_mode) # return [] # p = self.path[(u,v)] # debug_print("Aller du noeud " + str(u) + " au noeud " + str(p), debug_mode) # sol.append((u,p)) # u = p # else: # if ((u, start) not in self.path): # self.path[(u,start)] = 0 # bridgeNode = self.path[(u,start)] # if (u, bridgeNode) not in arcs: # arcs[(u, bridgeNode)] = list(self.G.adj[u].keys()).count(bridgeNode) # if arcs[(u,bridgeNode)] == 0: # break # v = bridgeNode # for node in self.G.nodes: # if (u, node) not in arcs: # arcs[(u, node)] = list(self.G.adj[u].keys()).count(node) # if node != bridgeNode and arcs[(u,node)] > 0: # v = node # break # arcs[(u,v)] -= 1 # debug_print("Aller du noeud " + str(u) + " au noeud " + str(v), debug_mode) # sol.append((u,v)) # return sol # res = CPP(G) # res.solve() # return res.solution(debug_mode=debug_mode)