| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- import os
- import pandas as pd
- from langchain_core.tools import tool
- from langchain_community.tools import DuckDuckGoSearchRun
- import matplotlib.pyplot as plt
- import seaborn as sns
- from langchain_core.messages import HumanMessage, AIMessage
- import sys
- import io
- import matplotlib
- matplotlib.use('Agg') # Force Matplotlib à ne pas ouvrir de fenêtre graphique
- @tool
- def convert_csv_to_excel(csv_path: str):
- """Convertit un fichier CSV en Excel (.xlsx)."""
- if not os.path.exists(csv_path):
- return f"Désolé, je ne trouve pas le fichier '{csv_path}'. Vérifiez qu'il est bien présent dans le dossier du projet."
-
- try:
- df = pd.read_csv(csv_path)
- new_path = csv_path.replace(".csv", ".xlsx")
- df.to_excel(new_path, index=False)
- return f"Succès : Le fichier a été converti en {new_path}"
-
- except Exception as e:
- return f"Erreur lors de la lecture du CSV : {str(e)}"
-
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
- @tool
- def inspect_data(file_name: str):
- """
- Explore le fichier situé dans le dossier 'data/' pour retourner les colonnes et un aperçu.
- Passer uniquement le nom du fichier (ex: 'data.csv').
- """
- # 1. On nettoie le nom du fichier au cas où l'IA ajoute des guillemets
- clean_name = file_name.strip().replace("'", "").replace('"', "")
-
- # 2. On construit le chemin ABSOLU vers le dossier data
- # On suppose que ton script est à la racine du projet
- data_path = os.path.join(os.getcwd(), "data", clean_name)
- try:
- # 3. Vérification de l'existence du fichier
- if not os.path.exists(data_path):
- return f"Erreur : Le fichier '{clean_name}' est introuvable dans le dossier data/."
- # 4. Lecture selon l'extension (Note : correction de 'endiwith' en 'endswith')
- if data_path.lower().endswith(".csv"):
- df = pd.read_csv(data_path) # Utilisation de la variable data_path, PAS de la chaîne "file_path"
- elif data_path.lower().endswith((".xlsx", ".xls")):
- df = pd.read_excel(data_path)
- else:
- return "Format de fichier non supporté. Utilisez .csv ou .xlsx"
- # 5. Extraction des infos
- columns_names = df.columns.tolist()
- preview = df.head(3).to_string()
- return f"Colonnes trouvées : {columns_names}\n\nAperçu des données :\n{preview}"
-
- except Exception as e:
- return f"Erreur lors de l'inspection : {str(e)}"
- @tool
- def excel_code_interpreter(file_path: str, code: str):
- """Exécute du code Python sur le fichier (CSV ou Excel) chargé dans 'df'."""
-
- import warnings
- warnings.filterwarnings("ignore")
-
- # 1. Nettoyage et construction du chemin
- file_name = os.path.basename(file_path.strip().replace("'", "").replace('"', ""))
- # Assure-toi que BASE_DIR est défini globalement ou remplace-le par os.getcwd()
- data_folder_path = os.path.join(BASE_DIR, "data", file_name)
- root_path = os.path.join(BASE_DIR, file_name)
-
- if os.path.exists(data_folder_path):
- full_path = data_folder_path
- elif os.path.exists(root_path):
- full_path = root_path
- else:
- return f"ERREUR : Fichier '{file_name}' introuvable."
- # 2. CAPTURE DE LA CONSOLE (stdout)
- # On crée un tampon pour intercepter les print()
- buffer = io.StringIO()
- old_stdout = sys.stdout # On sauvegarde la sortie d'origine (ton terminal)
- sys.stdout = buffer # On redirige vers notre variable
- try:
- # Lecture du fichier
- df = pd.read_csv(full_path) if file_name.endswith('.csv') else pd.read_excel(full_path)
- # 3. Préparation du contexte d'exécution
- context = {
- "df": df,
- "pd": pd,
- "plt": plt,
- "os": os,
- "result": None,
- "__builtins__": __builtins__
- }
- # 4. Exécution du code généré par l'IA
- exec(code, context)
- # 5. Récupération des données capturées
- output_console = buffer.getvalue()
- final_result_variable = context.get("result", "")
- # On remet le système à la normale
- sys.stdout = old_stdout
- # 6. ON FUSIONNE TOUT POUR LE REPORTER
- # C'est ce bloc de texte que le Reporter va recevoir dans Langfuse
- full_report = f"--- RÉSULTATS DE LA CONSOLE ---\n{output_console}\n"
- full_report += f"--- RÉSUMÉ FINAL ---\n{final_result_variable}"
-
- return full_report
- except Exception as e:
- # En cas d'erreur, on n'oublie pas de remettre le stdout à la normale
- sys.stdout = old_stdout
- return f"ERREUR PYTHON : {str(e)}"
-
- ddg = DuckDuckGoSearchRun()
- @tool
- def search_tool(query : str) :
- """Recherche sur le web. Limité pour économiser les tokens."""
- try:
- results = ddg.run(query)
-
- if not results:
- return "Aucun résultat trouvé."
-
- # 1. On nettoie les espaces superflus pour gagner des tokens
- clean_results = " ".join(results.split())
-
- # 2. On limite intelligemment (ex: 1200 chars pour plus de contexte)
- # Mais on s'assure de ne pas couper un mot au milieu
- limit = 1200
- if len(clean_results) <= limit:
- return clean_results
-
- return clean_results[:limit] + "... [Résultat tronqué pour économie]"
- except Exception as e:
- return f"Erreur lors de la recherche : {str(e)}"
- def call_tool_and_format_for_ollama(tool_output):
- """
- Cette fonction transforme la sortie brute de l'outil
- en un format que ton Llama 3.1 8B local peut comprendre.
- """
- # Au lieu de renvoyer un ToolMessage (qui fait planter Ollama)
- # On crée un message "Observation"
- observation_message = HumanMessage(
- content=f"OBSERVATION DE L'OUTIL :\n{tool_output}\n\nUtilise ces données pour continuer ton analyse."
- )
- return observation_message
- import re
- import json
- def robust_json_parse(text):
- try:
- cleaned = text.strip()
- # Extraction JSON
- match = re.search(r"\{.*\}", cleaned, re.DOTALL)
- if not match:
- raise ValueError("Aucun JSON détecté")
- json_str = match.group()
- # Parsing direct
- try:
- return json.loads(json_str)
- except json.JSONDecodeError:
- pass
- # Auto-fix
- json_str = json_str.replace("\n", " ")
- json_str = re.sub(r",\s*}", "}", json_str)
- json_str = re.sub(r",\s*]", "]", json_str)
- return json.loads(json_str)
- except Exception as e:
- raise ValueError(f"Parsing JSON échoué : {e}\nRAW:\n{text}")
|