diff options
| author | Martial Simon <msimon_fr@hotmail.com> | 2025-09-15 01:07:58 +0200 |
|---|---|---|
| committer | Martial Simon <msimon_fr@hotmail.com> | 2025-09-15 01:07:58 +0200 |
| commit | 967be9e750221ab2ab783f95df79bb26d290a45e (patch) | |
| tree | 6802900a5e975f9f68b169f0f503f040056d6952 /ero1/src/deneigeuses/directed_cpp.py | |
Diffstat (limited to 'ero1/src/deneigeuses/directed_cpp.py')
| -rw-r--r-- | ero1/src/deneigeuses/directed_cpp.py | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/ero1/src/deneigeuses/directed_cpp.py b/ero1/src/deneigeuses/directed_cpp.py new file mode 100644 index 0000000..552c4d9 --- /dev/null +++ b/ero1/src/deneigeuses/directed_cpp.py @@ -0,0 +1,294 @@ +from src.helper.debug_printer import debug_print +import networkx as nx +from copy import deepcopy +import heapq + +def split_into_strongly_connected(G): + """ + Renvoie la liste des composantes fortement connexes de G + + Params : + - G : Le graph à split + + Returns : + - composantes (list) : la liste des sous-graphes (composantes) fortement connexes de G + """ + return [G.subgraph(c) for c in nx.strongly_connected_components(G)] + +def chinese_postman_greedy(G, debug_mode=False): + if not nx.is_strongly_connected(G): + raise ValueError("Le graphe doit être fortement connexe") + + G = nx.DiGraph(G) + shortest_lengths = dict(nx.all_pairs_dijkstra_path_length(G, weight='length')) + shortest_paths = dict(nx.all_pairs_dijkstra_path(G, weight='length')) + + delta = {v: G.out_degree(v) - G.in_degree(v) for v in G.nodes()} + surplus = [v for v in G.nodes() if delta[v] > 0] + deficit = [v for v in G.nodes() if delta[v] < 0] + + debug_print(f"Déficitaires: {len(deficit)}, Excédentaires: {len(surplus)}", debug_mode) + + for u in deficit: + while delta[u] < 0: + heap = [] + for v in surplus: + if delta[v] <= 0: + continue + if v in shortest_lengths[u]: + heapq.heappush(heap, (shortest_lengths[u][v], v)) + if not heap: + raise RuntimeError("Aucun noeud excédentaire atteignable depuis", u) + _, v = heapq.heappop(heap) + + flow = min(-delta[u], delta[v]) + delta[u] += flow + delta[v] -= flow + + path = shortest_paths[u][v] + for i in range(len(path) - 1): + a, b = path[i], path[i + 1] + if not G.has_edge(a, b): + raise ValueError(f"Chemin invalide : arête manquante ({a} → {b})") + G[a][b]['duplicate'] = G[a][b].get('duplicate', 0) + flow + debug_print(f"Arête ({a} → {b}) dupliquée {flow} fois", debug_mode) + + MG = nx.MultiDiGraph() + for u, v, data in G.edges(data=True): + weight = data.get('length', 1) + MG.add_edge(u, v, length=weight) + for _ in range(data.get('duplicate', 0)): + MG.add_edge(u, v, length=weight) + + return list(nx.eulerian_circuit(MG)) + +def link_components(global_graph, path1, path2): + """ + Fusionne les 2 chemins en les reliant par leur plus court chemin + + Params : + - global_graph : Graphe de toute la ville pour avoir les données des arcs + - path1 : chemin de départ + - path2 : chemin d'arrivée + + Returns : + - fused_path : fusion des 2 chemins + """ + + fused_path = [] + if path1[0][0] != None: + fused_path = path1 + start = path1[-1][1] + end = path2[0][0] + invert = False + # print(start, end, nx.has_path(global_graph,start,end)) + if not nx.has_path(global_graph, start, end): + start,end = end, start + fused_path = path2 + invert = True + + shortest_path = [start,end] + if nx.has_path(global_graph, start, end): + shortest_path = nx.shortest_path(global_graph, source=start, target=end, weight='length') + for i in range(len(shortest_path) - 1): + fused_path.append((shortest_path[i], shortest_path[i + 1])) + if path2[0][1] != None: + if invert: + if path1[0][0] != None: + fused_path.extend(path1) + else: + fused_path.extend(path2) + return fused_path + +def oriented_cpt(local_graph, global_graph, debug_mode=False): + """ + Détermine le parcours de la déneigeuse sur local_graph en résolvant le problème du postier chinois orienté. + + Params : + - local_graph : le graph à parcourir + - global_graph : le graph global de référence + - debug_mode : active/désactive les logs de debug + + Returns : + - parcours : le parcours de la déneigeuse + """ + + components = split_into_strongly_connected(local_graph) + parcours = [] + if len(components[0].edges) > 0: + parcours = chinese_postman_greedy(components[0], debug_mode=debug_mode) + else: + parcours = [(None, list(components[0])[0])] + for component in components[1:]: + if len(component.edges) > 0: + cpp = chinese_postman_greedy(component, debug_mode=debug_mode) + parcours = link_components(global_graph, parcours, cpp) + else: + cpp = [(list(component)[0], None)] + parcours = link_components(global_graph, parcours, cpp) + return parcours + +# def oriented_cpt(G, debug_mode=False): +# """ +# Version orientée du Chinese Postman Problem. +# Le graph ne doit pas contenir de cycles de poids négatifs +# et le graph doit être fortement connexe pour que l'algo fonctionne +# Parameters: +# G: Graph à utiliser pour ce problème +# """ +# nb_nodes = len(G.nodes) +# class CPP: +# def __init__(self, graph): +# self.G = nx.DiGraph(graph) +# self.delta_tab = { elem: self.G.out_degree(elem) - self.G.in_degree(elem) for elem in self.G.nodes() } +# self.neg_degree = [] +# self.pos_degree = [] +# self.f = dict() +# self.c = dict() +# self.path = dict() +# # init the c and path dictionnary so it does not have to be in +# self.setup_fields() + +# def addEdge(self,l, u, v): +# if ((u, v) not in self.c or self.c[(u,v)] > l): +# self.c[(u,v)] = l +# self.path[(u,v)] = v +# if (not self.G.has_edge(u, v)): +# self.G.add_edge(u,v, length = l) +# else: +# nx.set_edge_attributes(self.G, {(u, v): {"length": l}}) + +# def setup_fields(self): +# for i, j in self.G.edges(): +# self.c[(i, j)] = self.G.get_edge_data(i,j)["length"] +# self.path[(i, j)] = j + +# def least_cost_paths(self): +# for edge in self.G.edges(): +# i = edge[0] +# k = edge[1] +# for j in self.G.nodes: +# if ((k, j) in self.c +# and ((i,j) not in self.c +# or self.c[(i, j)] > self.c[(i,k)] + self.c[(k,j)])): +# self.path[(i, j)] = self.path[(i, k)] +# self.c[(i, j)] = self.c[(i,k)] + self.c[(k,j)] +# if i == j and self.c[(i,j)] < 0: return # cycle négatif tu connais + +# def findUnbalanced(self): +# for (key, val) in self.delta_tab.items(): +# if (val < 0): +# self.neg_degree.append(key) +# elif (val > 0): +# self.pos_degree.append(key) + +# def findFeasible(self): +# for i in self.neg_degree: +# for j in self.pos_degree: +# self.f[(i, j)] = -self.delta_tab[i] if -self.delta_tab[i] < self.delta_tab[j] else self.delta_tab[j] +# self.delta_tab[i] += self.f[(i, j)] +# self.delta_tab[j] -= self.f[(i, j)] + +# # Improvements needed +# def improvements(self): +# residual = CPP(self.G) +# for i,j in zip(self.neg_degree, self.pos_degree): +# length = 0 +# if ((i,j) in self.c): +# length = self.c[(i, j)] +# residual.addEdge(length, i, j) +# if (i,j) not in self.f: +# self.f[(i,j)] = 0 +# elif (self.f[(i,j)] != 0): +# residual.addEdge(-length, j,i) +# residual.least_cost_paths() + +# for node in self.G.nodes: +# if (node,node) in residual.c and residual.c[(node,node)] < 0: +# k = 0 +# kunset = True +# u = node +# while True: +# v = residual.path[(u,node)] +# if residual.c[(u,v)] < 0 and (kunset or k > self.f[(v,u)]): +# k = self.f[(v,u)] +# kunset = False +# u = v +# if u == node: +# break +# u = node +# while True: +# v = residual.path[(u,node)] +# if ((u, v) not in residual.c): +# residual.c[(u,v)] = 0 +# if ((u, v) not in self.f): +# self.f[(u,v)] = 0 +# if ((v, u) not in self.f): +# self.f[(v,u)] = 0 +# if residual.c[(u,v)] < 0: +# self.f[(v,u)] -= k +# else: +# self.f[(u,v)] += k +# u = v +# if u == node: +# break +# return True +# return False + +# def solve(self): +# print(debug_mode) +# self.least_cost_paths() +# self.findUnbalanced() +# self.findFeasible() +# while self.improvements(): +# pass + +# def solution(self,start = None, debug_mode=False): +# if (start == None): +# start = list(self.G.nodes)[0] +# sol = [] +# arcs = dict() + +# def findPath(fromNode): +# for node in self.G.nodes: +# if ((fromNode,node) in self.f and self.f[(fromNode,node)] > 0): +# return node +# return None + +# v = start +# while True: +# u = v +# v = findPath(u) +# if v != None: +# self.f[(u,v)] -= 1 +# while u != v: +# if ((u, v) not in self.path): +# debug_print("We found a path with a 0 in it. This is probably a bad thig idk", debug_mode) +# return [] +# p = self.path[(u,v)] +# debug_print("Aller du noeud " + str(u) + " au noeud " + str(p), debug_mode) +# sol.append((u,p)) +# u = p +# else: +# if ((u, start) not in self.path): +# self.path[(u,start)] = 0 +# bridgeNode = self.path[(u,start)] +# if (u, bridgeNode) not in arcs: +# arcs[(u, bridgeNode)] = list(self.G.adj[u].keys()).count(bridgeNode) +# if arcs[(u,bridgeNode)] == 0: +# break +# v = bridgeNode +# for node in self.G.nodes: +# if (u, node) not in arcs: +# arcs[(u, node)] = list(self.G.adj[u].keys()).count(node) +# if node != bridgeNode and arcs[(u,node)] > 0: +# v = node +# break +# arcs[(u,v)] -= 1 +# debug_print("Aller du noeud " + str(u) + " au noeud " + str(v), debug_mode) +# sol.append((u,v)) +# return sol + +# res = CPP(G) +# res.solve() +# return res.solution(debug_mode=debug_mode) |
