import os import base64 import logging import re from io import BytesIO import discord from dotenv import load_dotenv from PIL import Image import emoji import tiktoken from openai import AsyncOpenAI, OpenAIError import json import urllib3 # Charger les variables d'environnement depuis le fichier .env load_dotenv() DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') DISCORD_CHANNEL_ID = os.getenv('DISCORD_CHANNEL_ID') PERSONALITY_PROMPT_FILE = os.getenv('PERSONALITY_PROMPT_FILE', 'personality_prompt.txt') CONVERSATION_HISTORY_FILE = os.getenv('CONVERSATION_HISTORY_FILE', 'conversation_history.json') BOT_NAME = os.getenv('BOT_NAME', 'ChatBot') MODEL = os.getenv('MODEL', 'gpt-4o-mini') URL_OPENAI_API = os.getenv('URL_OPENAI_API', 'http://localai.localai.svc.cluster.local:8080/v1') TEMPERATURE = float(os.getenv('TEMPERATURE', "1.1")) # Initialiser le client OpenAI asynchrone ici openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=URL_OPENAI_API) BOT_VERSION = "2.5.0-penta" # Vérifier que les tokens et le prompt de personnalité sont récupérés if DISCORD_TOKEN is None or OPENAI_API_KEY is None or DISCORD_CHANNEL_ID is None: raise ValueError("Les tokens ou l'ID du canal ne sont pas définis dans les variables d'environnement.") if not os.path.isfile(PERSONALITY_PROMPT_FILE): raise FileNotFoundError(f"Le fichier de prompt de personnalité '{PERSONALITY_PROMPT_FILE}' est introuvable.") # Lire le prompt de personnalité depuis le fichier with open(PERSONALITY_PROMPT_FILE, 'r', encoding='utf-8') as f: PERSONALITY_PROMPT = f.read().strip() # Log configuration log_format = '%(asctime)-13s : %(name)-15s : %(levelname)-8s : %(message)s' logging.basicConfig(handlers=[logging.FileHandler("./chatbot.log", 'a', 'utf-8')], format=log_format, level="INFO") console = logging.StreamHandler() console.setLevel(logging.INFO) console.setFormatter(logging.Formatter(log_format)) logger = logging.getLogger(BOT_NAME) logger.setLevel("INFO") logging.getLogger('').addHandler(console) httpx_logger = logging.getLogger('httpx') httpx_logger.setLevel(logging.WARNING) urllib3.disable_warnings() # Initialiser les intents intents = discord.Intents.default() intents.message_content = True # Activer l'intent pour les contenus de message # Liste pour stocker l'historique des conversations conversation_history = [] # Variable globale pour suivre la position de la dernière analyse last_analysis_index = None messages_since_last_analysis = 0 def load_conversation_history(): global conversation_history if os.path.isfile(CONVERSATION_HISTORY_FILE): try: with open(CONVERSATION_HISTORY_FILE, 'r', encoding='utf-8') as f: loaded_history = json.load(f) # Exclure uniquement le PERSONALITY_PROMPT conversation_history = [ msg for msg in loaded_history if not (msg.get("role") == "system" and msg.get("content") == PERSONALITY_PROMPT) ] logger.info(f"Historique chargé depuis {CONVERSATION_HISTORY_FILE}") except Exception as e: logger.error(f"Erreur lors du chargement de l'historique : {e}") conversation_history = [] else: logger.info(f"Aucun fichier d'historique trouvé. Un nouveau fichier sera créé à {CONVERSATION_HISTORY_FILE}") def has_text(text): """ Détermine si le texte fourni est non vide après suppression des espaces. """ return bool(text.strip()) # Fonction de sauvegarde de l'historique def save_conversation_history(): try: with open(CONVERSATION_HISTORY_FILE, 'w', encoding='utf-8') as f: json.dump(conversation_history, f, ensure_ascii=False, indent=4) except Exception as e: logger.error(f"Erreur lors de la sauvegarde de l'historique : {e}") # Charger l'encodeur pour le modèle GPT-4o mini encoding = tiktoken.get_encoding("o200k_base") # Convertir l'ID du channel en entier try: chatgpt_channel_id = int(DISCORD_CHANNEL_ID) except ValueError: raise ValueError("L'ID du channel Discord est invalide. Assurez-vous qu'il s'agit d'un entier.") class MyDiscordClient(discord.Client): async def close(self): global openai_client if openai_client is not None: await openai_client.close() openai_client = None await super().close() # Initialiser le client Discord avec les intents modifiés client_discord = MyDiscordClient(intents=intents) # Appeler la fonction pour charger l'historique au démarrage load_conversation_history() def resize_image(image_bytes, mode='high', attachment_filename=None): try: with Image.open(BytesIO(image_bytes)) as img: original_format = img.format # Store the original format if mode == 'high': # Redimensionner pour le mode haute fidélité img.thumbnail((2000, 2000)) if min(img.size) < 768: scale = 768 / min(img.size) new_size = tuple(int(x * scale) for x in img.size) img = img.resize(new_size, Image.Resampling.LANCZOS) elif mode == 'low': # Redimensionner pour le mode basse fidélité img = img.resize((512, 512)) buffer = BytesIO() img_format = img.format if not img_format: if attachment_filename: _, ext = os.path.splitext(attachment_filename) ext = ext.lower() format_mapping = { '.jpg': 'JPEG', '.jpeg': 'JPEG', '.png': 'PNG', '.gif': 'GIF', '.bmp': 'BMP', '.tiff': 'TIFF' } img_format = format_mapping.get(ext, 'PNG') else: img_format = 'PNG' img.save(buffer, format=img_format) return buffer.getvalue() except Exception as e: logger.error(f"Error resizing image: {e}") raise def extract_text_from_message(message): content = message.get("content", "") if isinstance(content, list): # Extraire le texte de chaque élément de la liste texts = [] for part in content: if isinstance(part, dict): text = part.get("text", "") if text: texts.append(text) return ' '.join(texts) elif isinstance(content, str): return content else: return "" def calculate_cost(usage, model=MODEL): input_tokens = usage.get('prompt_tokens', 0) output_tokens = usage.get('completion_tokens', 0) # Définir les tarifs par modèle model_costs = { 'gpt-4o': { 'input_rate': 5.00 / 1_000_000, # 5$ pour 1M tokens d'entrée 'output_rate': 15.00 / 1_000_000 # 15$ pour 1M tokens de sortie }, 'gpt-4o-mini': { 'input_rate': 30.00 / 1_000_000, # 30$ pour 1M tokens d'entrée 'output_rate': 30.00 / 1_000_000 # 30$ pour 1M tokens de sortie }, 'gpt-4': { 'input_rate': 0.150 / 1_000_000, # 0.150$ pour 1M tokens d'entrée 'output_rate': 0.600 / 1_000_000 # 0.600$ pour 1M tokens de sortie } } # Obtenir les tarifs du modèle spécifié if model not in model_costs: model = 'gpt-4o' input_rate = model_costs[model]['input_rate'] output_rate = model_costs[model]['output_rate'] # Calculer les coûts input_cost = input_tokens * input_rate output_cost = output_tokens * output_rate total_cost = input_cost + output_cost return input_tokens, output_tokens, total_cost async def call_gpt4o_for_image_analysis(image_data, user_text=None, detail='high'): try: # Préparer la requête pour GPT-4o if user_text: prompt = ( f"Analyse cette image de manière extrêmement précise en tenant compte de la description suivante : \"{user_text}\"." "Si des personnages sont présents, décris avec détails leurs vêtements, accessoires et physique." "Décris leurs courbes, leur taille, leur poids, leurs mensurations." "Décris comment ils intéragissent avec leur environnement et avec les autres personnages." ) else: prompt = ( "Analyse cette image de manière extrêmement précise s'il te plaît." "Si des personnages sont présents, décris avec détails leurs vêtements, accessoires et physique." "Décris leurs courbes, leur taille, leur poids, leurs mensurations." "Décris comment ils intéragissent avec leur environnement et avec les autres personnages." ) message_to_send = { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}", "detail": detail } } ] } # Appel à GPT-4o response = await openai_client.chat.completions.create( model=MODEL, messages=[message_to_send], max_tokens=4096 ) if response: analysis = response.choices[0].message.content logging.info(f"Analyse de l'image par {str.capitalize(MODEL)} : {analysis}") # Calcul et affichage du coût if hasattr(response, 'usage') and response.usage: usage = { 'prompt_tokens': response.usage.prompt_tokens, 'completion_tokens': response.usage.completion_tokens } input_tokens, output_tokens, total_cost = calculate_cost(usage, model=MODEL) logging.info(f"Coût de l'analyse de l'image : ${total_cost:.4f} / Input: {input_tokens} / Output: {output_tokens}") else: logging.warning("Informations d'utilisation non disponibles pour le calcul du coût.") return analysis else: return None except OpenAIError as e: logger.error(f"Erreur lors de l'analyse de l'image avec {str.capitalize(MODEL)} : {e}") return None async def remove_old_image_analyses(new_analysis=False): global conversation_history, last_analysis_index, messages_since_last_analysis if new_analysis: logger.debug("Nouvelle analyse détectée. Suppression des anciennes analyses.") # Lorsqu'une nouvelle analyse est ajoutée, supprimer toutes les anciennes conversation_history = [ msg for msg in conversation_history if not (msg.get("role") == "system" and msg.get("content", "").startswith("__IMAGE_ANALYSIS__:")) ] last_analysis_index = len(conversation_history) messages_since_last_analysis = 0 save_conversation_history() logger.info("Toutes les anciennes analyses d'image ont été supprimées.") else: if last_analysis_index is not None: messages_since_last_analysis += 1 if messages_since_last_analysis > 15: # Supprimer l'analyse if 0 <= last_analysis_index < len(conversation_history): removed_msg = conversation_history.pop(last_analysis_index) logger.info(f"Analyse d'image supprimée de l'historique : {removed_msg.get('content')[:50]}...") last_analysis_index = None messages_since_last_analysis = 0 save_conversation_history() async def call_gpt4o_mini_with_analysis(analysis_text, user_name, user_question, has_text): try: # Préparer le message avec le prompt de personnalité et l'analyse messages = [ {"role": "system", "content": PERSONALITY_PROMPT}, { "role": "system", "content": f"L'analyse de l'image fournie est la suivante :\n{analysis_text}\n\n" } ] if has_text: # Préparer le message utilisateur avec le texte user_message = { "role": "user", "content": ( f"Tu es {BOT_NAME}, une femme de 24 ans et membre à part entière du serveur Discord sur lequel tu te trouves." f"{user_name} a posté un message contenant une image et a écrit avec : '{user_question}'.\n" "Réponds à l'utilisateur en te basant sur l'analyse, avec ta personnalité" "Ne mentionne pas explicitement que l'analyse est pré-existante, fais comme si tu l'avais faite toi-même." ) } else: # Préparer une instruction pour commenter l'image sans texte user_message = { "role": "user", "content": ( f"Tu es {BOT_NAME}, une femme de 24 ans et membre à part entière du serveur Discord sur lequel tu te trouves." f"{user_name} a partagé une image sans texte additionnel.\n" "Commente l'image en te basant sur l'analyse, avec ta personnalité. " "Ne mentionne pas que l'analyse a été fournie à l'avance, réagis comme si tu l'avais toi-même effectuée." ) } # Inclure l'historique de conversation avant d'ajouter le message utilisateur messages += conversation_history messages.append(user_message) # Appel à GPT-4o Mini pour répondre response = await openai_client.chat.completions.create( model=MODEL, messages=messages, max_tokens=450 ) if response: reply = response.choices[0].message.content # Calculer et enregistrer le coût de la réponse de GPT-4o Mini if hasattr(response, 'usage') and response.usage: usage = { 'prompt_tokens': response.usage.prompt_tokens, 'completion_tokens': response.usage.completion_tokens } input_tokens, output_tokens, total_cost = calculate_cost(usage, model=MODEL) logging.info(f"Coût de la réponse de {str.capitalize(MODEL)} : ${total_cost:.4f} / Input: {input_tokens} / Output: {output_tokens}") else: logging.warning(f"Informations d'utilisation non disponibles pour le calcul du coût de {str.capitalize(MODEL)}") return reply else: return None except OpenAIError as e: logger.error(f"Erreur lors de la génération de réponse avec {str.capitalize(MODEL)} : {e}") return None async def read_text_file(attachment): file_bytes = await attachment.read() return file_bytes.decode('utf-8') async def encode_image_from_attachment(attachment, mode='high'): image_data = await attachment.read() resized_image = resize_image(image_data, mode=mode, attachment_filename=attachment.filename) return base64.b64encode(resized_image).decode('utf-8') async def call_openai_api(user_text, user_name, image_data=None, detail='high'): # Préparer le contenu pour l'appel API message_to_send = { "role": "user", "content": [ {"type": "text", "text": f"{user_name} dit : {user_text}"} ] } # Inclure l'image dans l'appel API courant if image_data: message_to_send["content"].append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}", "detail": detail } }) # Assembler les messages avec le prompt de personnalité en premier messages = [ {"role": "system", "content": PERSONALITY_PROMPT} ] + conversation_history + [message_to_send] try: response = await openai_client.chat.completions.create( model=MODEL, messages=messages, max_tokens=400, temperature=TEMPERATURE ) if response: reply = response.choices[0].message.content # Ajouter le message de l'utilisateur à l'historique global, mais uniquement s'il ne s'agit pas d'une image if image_data is None: await add_to_conversation_history(message_to_send) # Ajouter la réponse de l'IA directement à l'historique await add_to_conversation_history({ "role": "assistant", "content": reply }) if hasattr(response, 'usage') and response.usage: usage = response.usage input_tokens, output_tokens, total_cost = calculate_cost({ 'prompt_tokens': usage.prompt_tokens, 'completion_tokens': usage.completion_tokens }) # Afficher dans la console logging.info("Réponse envoyée.") #logging.info(f"Coût de la réponse : ${total_cost:.4f} / Input: {input_tokens} / Output: {output_tokens} / Total: {input_tokens + output_tokens}") return response except OpenAIError as e: logger.error(f"Erreur durant l'appel de l'API : {e}") except Exception as e: logger.error(f"Erreur durant l'appel de l'API : {e}") return None @client_discord.event async def on_ready(): logger.info(f'{BOT_NAME} connecté en tant que {client_discord.user}') logger.info(f'Utilisation du modèle {MODEL}') if not conversation_history: logger.info("Aucun historique trouvé. L'historique commence vide.") # Envoyer un message de version dans le canal Discord channel = client_discord.get_channel(chatgpt_channel_id) if channel: try: embed = discord.Embed( title=f"Bot Démarré", description=f"🎉 {BOT_NAME} est en ligne ! Version {BOT_VERSION}\nUtilisation du modèle: **{MODEL}**", color=0x2222aa # Bleu ) await channel.send(embed=embed) logger.info(f"Message de connexion envoyé dans le canal ID {chatgpt_channel_id}") except discord.Forbidden: logger.error(f"Permissions insuffisantes pour envoyer des messages dans le canal ID {chatgpt_channel_id}.") except discord.HTTPException as e: logger.error(f"Erreur lors de l'envoi du message de connexion : {e}") else: logger.error(f"Canal avec ID {chatgpt_channel_id} non trouvé.") @client_discord.event async def on_message(message): global conversation_history, last_analysis_index, messages_since_last_analysis # Vérifier si le message provient du canal autorisé if message.channel.id != chatgpt_channel_id: return # Ignorer les messages du bot lui-même if message.author == client_discord.user: return user_text = message.content.strip() image_data = None file_content = None attachment_filename = None # Vérifier si le message est la commande de réinitialisation if user_text.lower() == "!reset_history": # Vérifier si l'utilisateur a les permissions administratives if not message.author.guild_permissions.administrator: await message.channel.send("❌ Vous n'avez pas la permission d'utiliser cette commande.") return conversation_history = [] save_conversation_history() await message.channel.send("✅ L'historique des conversations a été réinitialisé.") logger.info(f"Historique des conversations réinitialisé par {message.author}.") return # Arrêter le traitement du message après la réinitialisation # Extensions de fichiers autorisées allowed_extensions = ['.txt', '.py', '.html', '.css', '.js'] # Variables pour stocker si le message contient une image et/ou un fichier has_image = False has_file = False # Vérifier s'il y a une pièce jointe if message.attachments: for attachment in message.attachments: # Vérifier si c'est un fichier avec une extension autorisée if any(attachment.filename.endswith(ext) for ext in allowed_extensions): file_content = await read_text_file(attachment) attachment_filename = attachment.filename break # Vérifier si c'est une image elif attachment.content_type in ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/tiff']: image_data = await encode_image_from_attachment(attachment, mode='high') break # Si une image est présente, la traiter if image_data: has_user_text = has_text(user_text) user_text_to_use = user_text if has_user_text else None # **Étape 1 : Envoyer un message temporaire indiquant que l'image est en cours d'analyse** temp_msg = await message.channel.send(f"*{BOT_NAME} observe l'image...*") try: # Étape 2 : GPT-4o analyse l'image, potentiellement guidée par le texte de l'utilisateur analysis = await call_gpt4o_for_image_analysis(image_data, user_text=user_text_to_use) if analysis: # **Ajouter l'analyse à l'historique avant de réagir avec GPT-4o Mini** analysis_message = { "role": "system", "content": f"__IMAGE_ANALYSIS__:{analysis}" } await add_to_conversation_history(analysis_message) # Mettre à jour l'index de la dernière analyse last_analysis_index = len(conversation_history) - 1 messages_since_last_analysis = 0 # Étape 3 : GPT-4o Mini réagit à la question et à l'analyse reply = await call_gpt4o_mini_with_analysis(analysis, message.author.name, user_text, has_user_text) if reply: # **Étape 4 : Supprimer le message temporaire** await temp_msg.delete() # **Étape 5 : Envoyer la réponse finale** await message.channel.send(reply.rstrip("")) # **Ajout des messages à l'historique** # Créer un message utilisateur modifié indiquant qu'une image a été postée if has_user_text: user_message_content = f"{user_text} (a posté une image.)" else: user_message_content = ( "Une image a été postée, mais elle n'est pas disponible pour analyse directe. " "Veuillez vous baser uniquement sur l'analyse fournie." ) user_message = { "role": "user", "content": user_message_content } # Ajouter le message utilisateur à l'historique await add_to_conversation_history(user_message) # Créer le message assistant avec la réponse de GPT-4o Mini assistant_message = { "role": "assistant", "content": reply } # Ajouter le message assistant à l'historique await add_to_conversation_history(assistant_message) else: # **Étape 4 : Supprimer le message temporaire en cas d'échec de génération de réponse** await temp_msg.delete() await message.channel.send("Désolé, je n'ai pas pu générer une réponse.") else: # **Étape 4 : Supprimer le message temporaire en cas d'échec d'analyse** await temp_msg.delete() await message.channel.send("Désolé, je n'ai pas pu analyser l'image.") except Exception as e: # **Étape 4 : Supprimer le message temporaire en cas d'erreur** await temp_msg.delete() await message.channel.send("Une erreur est survenue lors du traitement de l'image.") logger.error(f"Error during image processing: {e}") # Après traitement de l'image, ne pas continuer return # Ajouter le contenu du fichier à la requête si présent if file_content: user_text += f"\nContenu du fichier {attachment.filename}:\n{file_content}" # Vérifier si le texte n'est pas vide après ajout du contenu du fichier if not has_text(user_text): return # Ne pas appeler l'API si le texte est vide # Appeler l'API OpenAI result = await call_openai_api(user_text, message.author.name, image_data) if result: reply = result.choices[0].message.content reply = reply.rstrip("") if len(reply.split('')) > 1: reply = reply.split('')[1] elif len(reply.split('')) > 1: reply = reply.split('')[1] await message.channel.send(reply) async def add_to_conversation_history(new_message): global conversation_history, last_analysis_index, messages_since_last_analysis # Ne pas ajouter le PERSONALITY_PROMPT à l'historique if new_message.get("role") == "system" and new_message.get("content") == PERSONALITY_PROMPT: logger.debug("PERSONALITY_PROMPT système non ajouté à l'historique.") return if new_message.get("role") == "system" and new_message.get("content", "").startswith("__IMAGE_ANALYSIS__:"): await remove_old_image_analyses(new_analysis=True) conversation_history.append(new_message) save_conversation_history() logger.debug(f"Message ajouté à l'historique. Taille actuelle : {len(conversation_history)}") # Gérer la suppression des analyses d'images après 15 messages if new_message.get("role") == "system" and new_message.get("content", "").startswith("__IMAGE_ANALYSIS__:"): last_analysis_index = len(conversation_history) - 1 messages_since_last_analysis = 0 else: await remove_old_image_analyses(new_analysis=False) if len(conversation_history) > 50: logger.info("Limite de 50 messages atteinte.") excess_messages = len(conversation_history) - 50 if excess_messages > 0: # Supprimer les messages les plus anciens del conversation_history[:excess_messages] save_conversation_history() logger.info(f"{excess_messages} messages les plus anciens ont été supprimés pour maintenir l'historique à 50 messages.") # Démarrer le bot Discord client_discord.run(DISCORD_TOKEN)