import os from typing import Annotated , Sequence , TypedDict , Optional , List , Any from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage , AIMessage from langgraph.graph.message import add_messages from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain_groq import ChatGroq import pandas as pd from langfuse import get_client import json from langchain_google_genai import ChatGoogleGenerativeAI import re import base64 import cv2 from pathlib import Path from tools import excel_code_interpreter from paddleocr import PaddleOCR from function import ( preparer_image_zoom_hd, extraire_donnees_ocr, nettoyage_sortie_ocr, formater_donnees_section, sauvegarder_fichier_unique, to_points , build_lines, cluster_lines , merge_close_lines ) from clean_DBSCAN import ( transform_to_clean_markdown ) load_dotenv() tools = [excel_code_interpreter] class AgentState(TypedDict) : messages : Annotated[Sequence[BaseMessage],add_messages] pdf_path : str pages : List[int] page : List[int] entreprise_name : str section_name : str lignes : str markdown : str use_vision : bool image_path: str model_gemini = ChatGoogleGenerativeAI(model="gemini-2.5-flash") model_llama = ChatGroq(model="llama-3.3-70b-versatile") model_openai = ChatOpenAI(model="gpt-4o" , temperature=0.2) model_ai = model_llama.bind_tools(tools) ocr = PaddleOCR(use_angle_cls=True, lang='fr', det_limit_side_len=10000, show_log=False) def has_rc_codes(points): has_r = False has_c = False for p in points: text = str(p[2]).strip() if text.startswith("R"): has_r = True if text.startswith("C"): has_c = True return has_r and has_c def agent_ocr(state: AgentState): pdf_path = state.get("pdf_path") page_val = state.get("page") if page_val is None: raise ValueError("page est None dans state") try: page_index = int(page_val) - 1 except TypeError: page_index = int(page_val[0]) - 1 all_points = [] texte_accumule = [] try: img_finale = preparer_image_zoom_hd(pdf_path, page_index) raw_data = extraire_donnees_ocr(img_finale, ocr) data_propre = nettoyage_sortie_ocr(raw_data) points = to_points(data_propre) all_points.extend(points) # 🎯 CAS ROUGE : Pas de codes R/C détectés -> Mode Vision if not has_rc_codes(points): print(f" Pas de codes R/C détectés — activation du mode LLM vision") # 🛠️ SAUVEGARDE ET COMPRESSION TEMPORAIRE # On utilise le format .jpg avec une qualité de 85% pour réduire drastiquement la taille chemin_image_temp = f"temp_page_{page_index + 1}.jpg" import cv2 cv2.imwrite(chemin_image_temp, img_finale, [int(cv2.IMWRITE_JPEG_QUALITY), 85]) return { "use_vision": True, "image_path": chemin_image_temp, # 🟢 On ne passe qu'un bête texte de 20 caractères ! "messages": [HumanMessage(content="Mode LLM Vision activé.")], } # Texte pour le LLM (Cas normal) lignes_page = formater_donnees_section(data_propre, page_index) texte_accumule.extend(lignes_page) print(f" Page {page_index + 1} traitée par l'OCR.") except Exception as e: print(f"Erreur lors de l'OCR Page {page_index + 1}: {e}") raise e # Traitement classique si des codes R/C sont présents clusters = cluster_lines(all_points, eps=0.5) lignes = build_lines(clusters) lignes = merge_close_lines(lignes) markdown = transform_to_clean_markdown(lignes) contenu_final = "\n".join(texte_accumule) return { "points": points, "messages": [HumanMessage(content=f"Voici les données OCR brutes :\n{contenu_final}")], "clusters": clusters, "lignes": lignes, "markdown": markdown, "use_vision": False # <-- Cas normal, on continue vers agent_extracteur } def agent_extracteur(state: AgentState): prompt = """ Tu es un extracteur de tableaux QRT (Solvabilité II). Objectif : convertir le tableau markdown en JSON sans AUCUNE modification. Règles STRICTES : - La position de chaque valeur dans le tableau est ABSOLUE et NE DOIT PAS être modifiée. - Si une colonne contient 0, garde 0. Ne déplace jamais une valeur vers une autre colonne. - Utilise Rxxxx (lignes) et Cxxxx (colonnes) comme clés directement. - Copie STRICTEMENT les valeurs dans leur colonne exacte. - Supprime uniquement les espaces dans les nombres : "3 297 388" → 3297388. - Une colonne à 0 reste à 0, même si une valeur non-nulle existe dans une colonne adjacente. - Structure attendue : {"Cxxxx": {"Rxxxx": valeur, ...}, ...} ⚠️ CONTRAINTE DE SORTIE : - Réponds UNIQUEMENT avec l'objet JSON. - PAS de markdown (```json), PAS de texte, PAS d'explications. - NE PAS réorganiser, NE PAS interpréter, NE PAS corriger les données. Tableau à convertir : {markdown} """ markdown_content = state.get("markdown", "") lignes = state.get("lignes", []) # Garde-fou : markdown vide ou trop pauvre (moins de 2 lignes de données) data_rows = [l for l in str(markdown_content).splitlines() if l.strip().startswith("|") and "R0" in l] if not markdown_content or not str(markdown_content).strip() or len(data_rows) == 0: backup_content = json.dumps(lignes, ensure_ascii=False) input_content = f"Note : Le markdown était vide ou invalide. Voici les lignes brutes OCR :\n{backup_content}" else: input_content = markdown_content msg = [ SystemMessage(content=prompt), HumanMessage(content=input_content) # ← plus de json.dumps() sur du markdown déjà str ] response = model_openai.invoke(msg) return {"messages": [response]} def encoder_image_en_base64(chemin_image: str) -> str: """Convertit une image locale en chaîne base64 pour l'API OpenAI.""" with open(chemin_image, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") def agent_llm_vision(state: AgentState): print("[LLM Vision] Début de l'analyse visuelle du tableau QRT...") # 1. On récupère le CHEMIN du fichier image image_path = state.get("image_path") section_name = state.get("section_name", "Non spécifiée") if not image_path or not os.path.exists(image_path): raise ValueError(f"Le fichier image est introuvable : {image_path}") section_name_raw = state.get("section_name") # Ex: "S.23_page_64" ou "S.02.01_table_1" if not section_name_raw: raise ValueError("L'état de l'agent doit contenir un 'section_name' valide.") # Extraction de la racine de la section (ex: "S.23" ou "S.02.01") # Cette regex capture tout ce qui commence par S. suivi de chiffres et de points match = re.match(r"^(S\.\d+(?:\.\d+)*)", section_name_raw) if not match: raise ValueError(f"Impossible de déterminer la racine réglementaire depuis : {section_name_raw}") section_racine = match.group(1) mapping_path = Path(__file__).resolve().parent / "mapping.json" """ print("Mapping path réel =", mapping_path) print("Existe ?", mapping_path.exists())""" try: with open(mapping_path, "r", encoding="utf-8") as f: full_mapping = json.load(f) # On cherche d'abord la racine exacte, sinon on tente une correspondance partielle section_mapping = full_mapping.get(section_racine) if not section_mapping: # Fallback au cas où le JSON contient "S.23.01" mais votre racine est "S.23" alternative_key = next((k for k in full_mapping.keys() if k.startswith(section_racine)), None) if alternative_key: section_mapping = full_mapping[alternative_key] else: raise KeyError(f"Aucun mapping trouvé pour '{section_racine}' (déduit de '{section_name_raw}') dans mapping.json.") mapping_json_reduit = json.dumps(section_mapping, ensure_ascii=False, indent=2) except Exception as e: raise RuntimeError(f"Erreur mapping pour {section_name_raw} : {str(e)}") # 2. Prompt (Identique) PROMPT_VISION_SANS_CODES = f""" Tu es un expert Solvabilité II. Ce tableau est un QRT SFCR sans codes R/C visibles. Ta tâche : 1. Identifie les lignes (descriptions) et colonnes (headers) du tableau. 2. Associe chaque description de ligne au bon code Rxxxx selon la nomenclature Solvabilité II. 3. Associe chaque header de colonne au bon code Cxxxx. Utilise le mapping suivant : {mapping_json_reduit} Règles STRICTES : - Utilise directement les codes Rxxxx (lignes) et Cxxxx (colonnes) comme clés. - Supprime uniquement les espaces dans les nombres (ex: "3 297 388" → 3297388). - OPTIMISATION DE TOKENS : Exclura COMPLÈTEMENT du JSON de sortie les lignes/colonnes dont la valeur est égale à 0, vide, "-", "–" ou "N/A". Ne les écris pas. - Structure attendue : {{ "Cxxxx": {{ "Rxxxx": valeur, ... }}, ... }} - Section détectée : {section_name} ⚠️ CONTRAINTE DE SORTIE : - Réponds UNIQUEMENT avec l'objet JSON. - PAS de markdown. - PAS de balises ```json. - PAS de texte explicatif. - PAS de commentaires. - Le JSON doit être valide et directement parsable par json.loads(). """ try: # 3. Encodage à la volée du fichier disque en Base64 with open(image_path, "rb") as image_file: base64_image = base64.b64encode(image_file.read()).decode("utf-8") # 4. Préparation du message multimodal msg_vision = HumanMessage( content=[ {"type": "text", "text": PROMPT_VISION_SANS_CODES}, { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}, }, ] ) # 5. Appel OpenAI avec format JSON model_json = model_openai.bind(response_format={"type": "json_object"}) response = model_json.invoke([msg_vision]) # Nettoyage des balises si présentes contenu_propre = response.content.strip() if contenu_propre.startswith("```json"): contenu_propre = contenu_propre.replace("```json", "").replace("```", "").strip() print(" [LLM Vision] Extraction réussie.") # 6. NETTOYAGE DU DISQUE (Optionnel mais propre) # Supprime le fichier temporaire pour ne pas encombrer votre dossier de travail if os.path.exists(image_path): os.remove(image_path) # 7. Retour de la réponse (UNIQUEMENT le texte JSON) # L'image géante n'est PAS stockée dans l'historique du graphe, elle disparaît ici ! return { "messages": [AIMessage(content=contenu_propre)], "image_path": None # On réinitialise la clé à None pour vider le State } except Exception as e: # En cas d'erreur, on essaie quand même de nettoyer le fichier if os.path.exists(image_path): os.remove(image_path) print(f" [LLM Vision] Erreur : {e}") raise e def agent_builder(state: AgentState): print(f" Construction du fichier Excel pour : {state['entreprise_name']}...") try: import json import pandas as pd import os import re # 1. Extraction du contenu JSON pur content = state["messages"][-1].content # Nettoyage au cas où le modèle aurait ajouté des balises markdown malgré les consignes content = re.sub(r'```json|```', '', content).strip() data_json = json.loads(content) # 2. Gestion du chemin de sauvegarde (identique) base_outputs = os.path.join("..", "04 - Outputs") match = re.search(r'(\d{4})', state['entreprise_name']) annee = match.group(1) if match else "2025" nom_entreprise = state['entreprise_name'].split('_')[0].replace(" ", "_") target_folder = os.path.join(base_outputs, annee, nom_entreprise) if not os.path.exists(target_folder): os.makedirs(target_folder) output_file = os.path.join(target_folder, f"Rapport_{state['section_name']}.xlsx") # 3. Construction du DataFrame à partir de 'data_json' uniquement # On récupère l'ensemble unique de tous les Rxxxx présents dans toutes les colonnes all_rows = sorted(list(set(r for col in data_json.values() for r in col.keys()))) all_cols = sorted(list(data_json.keys())) df = pd.DataFrame(index=all_rows, columns=all_cols) for col, row_values in data_json.items(): for row, val in row_values.items(): df.at[row, col] = val # 4. Exportation with pd.ExcelWriter(output_file, engine="xlsxwriter") as writer: df.to_excel(writer, sheet_name="QRT_Export", index=True) # Ajoute ici tes formats xlsxwriter si nécessaire success_msg = f" Sauvegardé dans : {output_file}" print(success_msg) return {"messages": [HumanMessage(content=success_msg)]} except Exception as e: error_msg = f" Erreur construction Excel : {str(e)}" print(error_msg) return {"messages": [HumanMessage(content=error_msg)]}