import os import base64 import logging import re from io import BytesIO import discord from dotenv import load_dotenv from PIL import Image import emoji import tiktoken from openai import AsyncOpenAI, OpenAIError import json # Charger les variables d'environnement depuis le fichier .env load_dotenv() DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') DISCORD_CHANNEL_ID = os.getenv('DISCORD_CHANNEL_ID') PERSONALITY_PROMPT_FILE = os.getenv('PERSONALITY_PROMPT_FILE', 'personality_prompt.txt') CONVERSATION_HISTORY_FILE = os.getenv('CONVERSATION_HISTORY_FILE', 'conversation_history.json') BOT_NAME = os.getenv('BOT_NAME', 'ChatBot') # Initialiser le client OpenAI asynchrone ici openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY) BOT_VERSION = "2.4.0" # Vérifier que les tokens et le prompt de personnalité sont récupérés if DISCORD_TOKEN is None or OPENAI_API_KEY is None or DISCORD_CHANNEL_ID is None: raise ValueError("Les tokens ou l'ID du canal ne sont pas définis dans les variables d'environnement.") if not os.path.isfile(PERSONALITY_PROMPT_FILE): raise FileNotFoundError(f"Le fichier de prompt de personnalité '{PERSONALITY_PROMPT_FILE}' est introuvable.") # Lire le prompt de personnalité depuis le fichier with open(PERSONALITY_PROMPT_FILE, 'r', encoding='utf-8') as f: PERSONALITY_PROMPT = f.read().strip() # Log configuration log_format = '%(asctime)-13s : %(name)-15s : %(levelname)-8s : %(message)s' logging.basicConfig(handlers=[logging.FileHandler("./chatbot.log", 'a', 'utf-8')], format=log_format, level="INFO") console = logging.StreamHandler() console.setLevel(logging.INFO) console.setFormatter(logging.Formatter(log_format)) logger = logging.getLogger(BOT_NAME) logger.setLevel("INFO") logging.getLogger('').addHandler(console) httpx_logger = logging.getLogger('httpx') httpx_logger.setLevel(logging.WARNING) # Initialiser les intents intents = discord.Intents.default() intents.message_content = True # Activer l'intent pour les contenus de message # Liste pour stocker l'historique des conversations conversation_history = [] def load_conversation_history(): global conversation_history if os.path.isfile(CONVERSATION_HISTORY_FILE): try: with open(CONVERSATION_HISTORY_FILE, 'r', encoding='utf-8') as f: loaded_history = json.load(f) # Exclure uniquement le PERSONALITY_PROMPT conversation_history = [ msg for msg in loaded_history if not (msg.get("role") == "system" and msg.get("content") == PERSONALITY_PROMPT) ] logger.info(f"Historique chargé depuis {CONVERSATION_HISTORY_FILE}") except Exception as e: logger.error(f"Erreur lors du chargement de l'historique : {e}") conversation_history = [] else: logger.info(f"Aucun fichier d'historique trouvé. Un nouveau fichier sera créé à {CONVERSATION_HISTORY_FILE}") def has_text(text): """ Détermine si le texte fourni est non vide après suppression des espaces. """ return bool(text.strip()) # Fonction de sauvegarde de l'historique def save_conversation_history(): try: with open(CONVERSATION_HISTORY_FILE, 'w', encoding='utf-8') as f: json.dump(conversation_history, f, ensure_ascii=False, indent=4) except Exception as e: logger.error(f"Erreur lors de la sauvegarde de l'historique : {e}") # Charger l'encodeur pour le modèle GPT-4o mini encoding = tiktoken.get_encoding("o200k_base") # Convertir l'ID du channel en entier try: chatgpt_channel_id = int(DISCORD_CHANNEL_ID) except ValueError: raise ValueError("L'ID du channel Discord est invalide. Assurez-vous qu'il s'agit d'un entier.") class MyDiscordClient(discord.Client): async def close(self): global openai_client if openai_client is not None: await openai_client.close() openai_client = None await super().close() # Initialiser le client Discord avec les intents modifiés client_discord = MyDiscordClient(intents=intents) # Appeler la fonction pour charger l'historique au démarrage load_conversation_history() def resize_image(image_bytes, mode='high', attachment_filename=None): try: with Image.open(BytesIO(image_bytes)) as img: original_format = img.format # Store the original format if mode == 'high': # Redimensionner pour le mode haute fidélité img.thumbnail((2000, 2000)) if min(img.size) < 768: scale = 768 / min(img.size) new_size = tuple(int(x * scale) for x in img.size) img = img.resize(new_size, Image.Resampling.LANCZOS) elif mode == 'low': # Redimensionner pour le mode basse fidélité img = img.resize((512, 512)) buffer = BytesIO() img_format = img.format if not img_format: if attachment_filename: _, ext = os.path.splitext(attachment_filename) ext = ext.lower() format_mapping = { '.jpg': 'JPEG', '.jpeg': 'JPEG', '.png': 'PNG', '.gif': 'GIF', '.bmp': 'BMP', '.tiff': 'TIFF' } img_format = format_mapping.get(ext, 'PNG') else: img_format = 'PNG' img.save(buffer, format=img_format) return buffer.getvalue() except Exception as e: logger.error(f"Error resizing image: {e}") raise def extract_text_from_message(message): content = message.get("content", "") if isinstance(content, list): # Extraire le texte de chaque élément de la liste texts = [] for part in content: if isinstance(part, dict): text = part.get("text", "") if text: texts.append(text) return ' '.join(texts) elif isinstance(content, str): return content else: return "" def calculate_cost(usage, model='gpt-4o-mini'): input_tokens = usage.get('prompt_tokens', 0) output_tokens = usage.get('completion_tokens', 0) # Définir les tarifs par modèle model_costs = { 'gpt-4o': { 'input_rate': 5.00 / 1_000_000, # 5$ pour 1M tokens d'entrée 'output_rate': 15.00 / 1_000_000 # 15$ pour 1M tokens de sortie }, 'gpt-4o-mini': { 'input_rate': 0.150 / 1_000_000, # 0.150$ pour 1M tokens d'entrée 'output_rate': 0.600 / 1_000_000 # 0.600$ pour 1M tokens de sortie } } # Obtenir les tarifs du modèle spécifié if model not in model_costs: logger.warning(f"Modèle inconnu '{model}'. Utilisation des tarifs par défaut pour 'gpt-4o-mini'.") model = 'gpt-4o-mini' input_rate = model_costs[model]['input_rate'] output_rate = model_costs[model]['output_rate'] # Calculer les coûts input_cost = input_tokens * input_rate output_cost = output_tokens * output_rate total_cost = input_cost + output_cost return input_tokens, output_tokens, total_cost def is_relevant_message(message): content = message["content"] if isinstance(content, list): content = ''.join(part.get('text', '') for part in content if 'text' in part) if len(content.strip()) < 5: return False discord_emoji_pattern = r'' def is_discord_emoji(part): return bool(re.fullmatch(discord_emoji_pattern, part)) tokens = re.split(discord_emoji_pattern, content) emojis_only = True standard_emojis = [char for char in content if emoji.is_emoji(char)] discord_emojis = re.findall(discord_emoji_pattern, content) text_without_emojis = re.sub(discord_emoji_pattern, '', content) for char in text_without_emojis: if not char.isspace() and not emoji.is_emoji(char): emojis_only = False break if len(standard_emojis) + len(discord_emojis) == 0: emojis_only = False if emojis_only and len(content.strip()) > 0: return False return True async def summarize_conversation(messages_to_summarize): try: # Préparer le prompt pour la synthèse prompt = "Synthétise les messages suivants en un résumé concis de maximum 1000 tokens :\n" for msg in messages_to_summarize: role = msg.get("role", "user") content = msg.get("content", "") prompt += f"{role.capitalize()}: {content}\n" # Appel à l'API OpenAI pour générer la synthèse response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "Tu es un assistant utile qui résume les conversations."}, {"role": "user", "content": prompt} ], max_tokens=1000, temperature=0.5 ) if response and response.choices: summary = response.choices[0].message.content.strip() logger.info("Synthèse générée avec succès.") # Calcul et log du coût de la synthèse if hasattr(response, 'usage') and response.usage: usage = { 'prompt_tokens': response.usage.prompt_tokens, 'completion_tokens': response.usage.completion_tokens } input_tokens, output_tokens, total_cost = calculate_cost(usage, model='gpt-4o-mini') logger.info( f"Coût : ${total_cost:.6f}, Input Tokens: {input_tokens}, Output Tokens: {output_tokens}." ) else: logger.warning("Informations d'utilisation non disponibles pour le calcul du coût de la synthèse.") return summary else: logger.error("Aucune réponse reçue lors de la synthèse.") return None except OpenAIError as e: logger.error(f"Erreur lors de la synthèse : {e}") return None async def call_gpt4o_for_image_analysis(image_data, user_text=None, detail='high'): try: # Préparer la requête pour GPT-4o if user_text: prompt = ( f"Analyse cette image de manière extrêmement précise en tenant compte de la description suivante : \"{user_text}\"." "Si des personnages sont présents, décris avec détails leurs vêtements, accessoires et physique." "Décris leurs courbes, leur taille, leur poids, leurs mensurations." "Décris comment ils intéragissent avec leur environnement et avec les autres personnages." ) else: prompt = ( "Analyse cette image de manière extrêmement précise s'il te plaît." "Si des personnages sont présents, décris avec détails leurs vêtements, accessoires et physique." "Décris leurs courbes, leur taille, leur poids, leurs mensurations." "Décris comment ils intéragissent avec leur environnement et avec les autres personnages." ) message_to_send = { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}", "detail": detail } } ] } # Appel à GPT-4o response = await openai_client.chat.completions.create( model="gpt-4o", messages=[message_to_send], max_tokens=4096 ) if response: analysis = response.choices[0].message.content logging.info(f"Analyse de l'image par GPT-4o : {analysis}") # Calcul et affichage du coût if hasattr(response, 'usage') and response.usage: usage = { 'prompt_tokens': response.usage.prompt_tokens, 'completion_tokens': response.usage.completion_tokens } input_tokens, output_tokens, total_cost = calculate_cost(usage, model='gpt-4o') logging.info(f"Coût de l'analyse de l'image : ${total_cost:.4f} / Input: {input_tokens} / Output: {output_tokens}") else: logging.warning("Informations d'utilisation non disponibles pour le calcul du coût.") return analysis else: return None except OpenAIError as e: logger.error(f"Erreur lors de l'analyse de l'image avec GPT-4o: {e}") return None async def call_gpt4o_mini_with_analysis(analysis_text, user_name, user_question, has_text): try: # Préparer le message avec le prompt de personnalité et l'analyse messages = [ {"role": "system", "content": PERSONALITY_PROMPT}, { "role": "system", "content": f"L'analyse de l'image fournie est la suivante :\n{analysis_text}\n\n" } ] if has_text: # Préparer le message utilisateur avec le texte user_message = { "role": "user", "content": ( f"{user_name} a écrit : '{user_question}'.\n" "Réponds en te basant sur l'analyse, avec ta personnalité. " "Ne mentionne pas explicitement que l'analyse est pré-existante, fais comme si tu l'avais faite toi-même." ) } else: # Préparer une instruction pour commenter l'image sans texte user_message = { "role": "user", "content": ( f"{user_name} a partagé une image sans texte additionnel.\n" "Commente l'image en te basant sur l'analyse, avec ta personnalité. " "Ne mentionne pas que l'analyse a été fournie à l'avance, réagis comme si tu l'avais toi-même effectuée." ) } # Inclure l'historique de conversation avant d'ajouter le message utilisateur messages += conversation_history messages.append(user_message) # Appel à GPT-4o Mini pour répondre response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=messages, max_tokens=450 ) if response: reply = response.choices[0].message.content logging.info(f"Réponse de GPT-4o Mini : {reply}") return reply else: return None except OpenAIError as e: logger.error(f"Erreur lors de la génération de réponse avec GPT-4o Mini: {e}") return None async def read_text_file(attachment): file_bytes = await attachment.read() return file_bytes.decode('utf-8') async def encode_image_from_attachment(attachment, mode='high'): image_data = await attachment.read() resized_image = resize_image(image_data, mode=mode, attachment_filename=attachment.filename) return base64.b64encode(resized_image).decode('utf-8') async def call_openai_api(user_text, user_name, image_data=None, detail='high'): # Préparer le contenu pour l'appel API message_to_send = { "role": "user", "content": [ {"type": "text", "text": f"{user_name} dit : {user_text}"} ] } # Inclure l'image dans l'appel API courant if image_data: message_to_send["content"].append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}", "detail": detail } }) # Assembler les messages avec le prompt de personnalité en premier messages = [ {"role": "system", "content": PERSONALITY_PROMPT} ] + conversation_history + [message_to_send] try: response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=messages, max_tokens=400, temperature=1.0 ) if response: reply = response.choices[0].message.content # Ajouter le message de l'utilisateur à l'historique global, mais uniquement s'il ne s'agit pas d'une image if image_data is None: await add_to_conversation_history(message_to_send) # Ajouter la réponse de l'IA directement à l'historique await add_to_conversation_history({ "role": "assistant", "content": reply }) if hasattr(response, 'usage') and response.usage: usage = response.usage input_tokens, output_tokens, total_cost = calculate_cost({ 'prompt_tokens': usage.prompt_tokens, 'completion_tokens': usage.completion_tokens }) # Afficher dans la console logging.info(f"Coût de la réponse : ${total_cost:.4f} / Input: {input_tokens} / Output: {output_tokens} / Total: {input_tokens + output_tokens}") return response except OpenAIError as e: logger.error(f"Error calling OpenAI API: {e}") except Exception as e: logger.error(f"Error calling OpenAI API: {e}") return None @client_discord.event async def on_ready(): logger.info(f'{BOT_NAME} connecté en tant que {client_discord.user}') if not conversation_history: logger.info("Aucun historique trouvé. L'historique commence vide.") # Envoyer un message de version dans le canal Discord channel = client_discord.get_channel(chatgpt_channel_id) if channel: try: embed = discord.Embed( title=f"Bot Démarré", description=f"🎉 {BOT_NAME} est en ligne ! Version {BOT_VERSION}", color=0x00ff00 # Vert ) await channel.send(embed=embed) logger.info(f"Message de connexion envoyé dans le canal ID {chatgpt_channel_id}") except discord.Forbidden: logger.error(f"Permissions insuffisantes pour envoyer des messages dans le canal ID {chatgpt_channel_id}.") except discord.HTTPException as e: logger.error(f"Erreur lors de l'envoi du message de connexion : {e}") else: logger.error(f"Canal avec ID {chatgpt_channel_id} non trouvé.") @client_discord.event async def on_message(message): global conversation_history # Vérifier si le message provient du canal autorisé if message.channel.id != chatgpt_channel_id: return # Ignorer les messages du bot lui-même if message.author == client_discord.user: return user_text = message.content.strip() image_data = None file_content = None attachment_filename = None # Vérifier si le message est la commande de réinitialisation if user_text.lower() == "!reset_history": # Vérifier si l'utilisateur a les permissions administratives if not message.author.guild_permissions.administrator: await message.channel.send("❌ Vous n'avez pas la permission d'utiliser cette commande.") return conversation_history = [] save_conversation_history() await message.channel.send("✅ L'historique des conversations a été réinitialisé.") logger.info(f"Historique des conversations réinitialisé par {message.author}.") return # Arrêter le traitement du message après la réinitialisation # Extensions de fichiers autorisées allowed_extensions = ['.txt', '.py', '.html', '.css', '.js'] # Variables pour stocker si le message contient une image et/ou un fichier has_image = False has_file = False # Vérifier s'il y a une pièce jointe if message.attachments: for attachment in message.attachments: # Vérifier si c'est un fichier avec une extension autorisée if any(attachment.filename.endswith(ext) for ext in allowed_extensions): file_content = await read_text_file(attachment) attachment_filename = attachment.filename break # Vérifier si c'est une image elif attachment.content_type in ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/tiff']: image_data = await encode_image_from_attachment(attachment, mode='high') break # Si une image est présente, la traiter if image_data: has_user_text = has_text(user_text) user_text_to_use = user_text if has_user_text else None # **Étape 1 : Envoyer un message temporaire indiquant que l'image est en cours d'analyse** temp_msg = await message.channel.send(f"*{BOT_NAME} observe l'image...*") try: # Étape 2 : GPT-4o analyse l'image, potentiellement guidée par le texte de l'utilisateur analysis = await call_gpt4o_for_image_analysis(image_data, user_text=user_text_to_use) if analysis: # **Ajouter l'analyse à l'historique avant de réagir avec GPT-4o Mini** analysis_message = { "role": "system", "content": f"Analyse de l'image : {analysis}" } await add_to_conversation_history(analysis_message) # Étape 3 : GPT-4o Mini réagit à la question et à l'analyse reply = await call_gpt4o_mini_with_analysis(analysis, message.author.name, user_text, has_user_text) if reply: # **Étape 4 : Supprimer le message temporaire** await temp_msg.delete() # **Étape 5 : Envoyer la réponse finale** await message.channel.send(reply) # **Ajout des messages à l'historique** # Créer un message utilisateur modifié indiquant qu'une image a été postée if has_user_text: user_message_content = f"{user_text} (a posté une image.)" else: user_message_content = ( "Une image a été postée, mais elle n'est pas disponible pour analyse directe. " "Veuillez vous baser uniquement sur l'analyse fournie." ) user_message = { "role": "user", "content": user_message_content } # Ajouter le message utilisateur à l'historique await add_to_conversation_history(user_message) # Créer le message assistant avec la réponse de GPT-4o Mini assistant_message = { "role": "assistant", "content": reply } # Ajouter le message assistant à l'historique await add_to_conversation_history(assistant_message) else: # **Étape 4 : Supprimer le message temporaire en cas d'échec de génération de réponse** await temp_msg.delete() await message.channel.send("Désolé, je n'ai pas pu générer une réponse.") else: # **Étape 4 : Supprimer le message temporaire en cas d'échec d'analyse** await temp_msg.delete() await message.channel.send("Désolé, je n'ai pas pu analyser l'image.") except Exception as e: # **Étape 4 : Supprimer le message temporaire en cas d'erreur** await temp_msg.delete() await message.channel.send("Une erreur est survenue lors du traitement de l'image.") logger.error(f"Error during image processing: {e}") # Après traitement de l'image, ne pas continuer return # Ajouter le contenu du fichier à la requête si présent if file_content: user_text += f"\nContenu du fichier {attachment.filename}:\n{file_content}" # Vérifier si le texte n'est pas vide après ajout du contenu du fichier if not has_text(user_text): return # Ne pas appeler l'API si le texte est vide # Appeler l'API OpenAI result = await call_openai_api(user_text, message.author.name, image_data) if result: reply = result.choices[0].message.content await message.channel.send(reply) async def add_to_conversation_history(new_message): global conversation_history # Ne pas ajouter le PERSONALITY_PROMPT à l'historique if new_message.get("role") == "system" and new_message.get("content") == PERSONALITY_PROMPT: logger.debug("PERSONALITY_PROMPT système non ajouté à l'historique.") return # Filtrer les messages pertinents pour l'historique if is_relevant_message(new_message): # Ajouter le message à l'historique conversation_history.append(new_message) save_conversation_history() logger.debug(f"Message ajouté à l'historique. Taille actuelle : {len(conversation_history)}") # Vérifier si la limite de 150 messages est atteinte if len(conversation_history) > 150: logger.info("Limite de 150 messages atteinte. Démarrage de la synthèse des messages les plus anciens.") # Extraire les 50 messages les plus anciens pour la synthèse messages_to_summarize = conversation_history[:50] # Générer la synthèse summary = await summarize_conversation(messages_to_summarize) if summary: # Créer un message de synthèse summary_message = { "role": "system", "content": f"Synthèse des précédents messages : {summary}" } # Remplacer les 50 premiers messages par la synthèse conversation_history = [summary_message] + conversation_history[50:] save_conversation_history() logger.info("Synthèse ajoutée à l'historique et les 50 anciens messages ont été supprimés.") else: logger.error("Échec de la génération de la synthèse. L'historique n'a pas été modifié.") # Démarrer le bot Discord client_discord.run(DISCORD_TOKEN)