import os import logging import discord import json import urllib3 import base64 import pytz from dotenv import load_dotenv from openai import AsyncOpenAI, OpenAIError from PIL import Image from io import BytesIO from datetime import datetime, timezone, timedelta # Charger les variables d'environnement depuis le fichier .env load_dotenv() DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') DISCORD_CHANNEL_ID = os.getenv('DISCORD_CHANNEL_ID') PERSONALITY_PROMPT_FILE = os.getenv('PERSONALITY_PROMPT_FILE', 'personality_prompt.txt') CONVERSATION_HISTORY_FILE = os.getenv('CONVERSATION_HISTORY_FILE', 'conversation_history.json') CONVERSATION_HISTORY_SIZE = int(os.getenv('CONVERSATION_HISTORY_SIZE', '100')) BOT_NAME = os.getenv('BOT_NAME', 'ChatBot') MODEL = os.getenv('MODEL', 'llama:3.2') URL_OPENAI_API = os.getenv('URL_OPENAI_API', 'http://localai.localai.svc.cluster.local:8080/v1') TEMPERATURE = float(os.getenv('TEMPERATURE', "1.0")) # Initialiser le client OpenAI asynchrone ici openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=URL_OPENAI_API) BOT_VERSION = "2.8.0-beta" # Vérifier que les tokens et le prompt de personnalité sont récupérés if DISCORD_TOKEN is None or OPENAI_API_KEY is None or DISCORD_CHANNEL_ID is None: raise ValueError("Les tokens ou l'ID du canal ne sont pas définis dans les variables d'environnement.") if not os.path.isfile(PERSONALITY_PROMPT_FILE): raise FileNotFoundError(f"Le fichier de prompt de personnalité '{PERSONALITY_PROMPT_FILE}' est introuvable.") # Lire le prompt de personnalité depuis le fichier with open(PERSONALITY_PROMPT_FILE, 'r', encoding='utf-8') as f: PERSONALITY_PROMPT = f.read().strip() # Log configuration log_format = '%(asctime)-13s : %(name)-15s : %(levelname)-8s : %(message)s' logging.basicConfig(handlers=[logging.FileHandler("./chatbot.log", 'a', 'utf-8')], format=log_format, level="INFO") console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(logging.Formatter(log_format)) logger = logging.getLogger(BOT_NAME) logger.setLevel("DEBUG") logging.getLogger('').addHandler(console) httpx_logger = logging.getLogger('httpx') httpx_logger.setLevel(logging.DEBUG) urllib3.disable_warnings() # Initialiser les intents intents = discord.Intents.default() intents.message_content = True # Activer l'intent pour les contenus de message # Liste pour stocker l'historique des conversations conversation_history = [] def filter_message(message): """Filtre le contenu d'un retour de modèle de language, comme pour enlever les pensées dans le cas de DeepSeek""" if len(message.split('')) > 1: result = message.split('')[1] elif len(message.split('')) > 1: result = message.split('')[1] else: result = message result.rstrip("") return result def transform_emote(message: str, output: bool) -> str: """Remplace les smileys par les codes Discord correspondant""" list_emote = [ (":hap:", "<:hap:355854929073537026>"), (":angryvault:", "<:angryvault:585550568806940672>"), (":minou:", "<:minou:358054423462936576>"), (":cetaitsur:", ""), (":eh:", "<:eh:395979132896280576>"), (":desu:", "<:desu:388007643077410837>"), (":bave2:", "<:bave2:412252920558387221>"), (":haptriste:", "<:haptriste:358054014262181889>"), (":perplexe:", "<:perplexe:358054891274371082>"), (":sueur:", "<:sueur:358051940631838721>"), (":chien:", "<:chien:507606737646518293>"), (":kemar:", "<:kemar:419607012796792842>"), (":ouch2:", "<:ouch2:777984650710745138>"), (":coeur:", "<:coeur:355853389399195649>"), (":what:", "<:what:587019571207077928>") ] for smiley, discord_code in list_emote: if output: message = message.replace(smiley, discord_code) else: message = message.replace(discord_code, smiley) return message def split_message(message, max_length=2000): """Divise un message en plusieurs segments de longueur maximale spécifiée.""" if len(message) <= max_length: return [message] parts = [] current_part = "" for line in message.split('\n'): if len(current_part) + len(line) + 1 > max_length: parts.append(current_part) current_part = line + '\n' else: current_part += line + '\n' if current_part: parts.append(current_part) return parts def load_conversation_history(): global conversation_history if os.path.isfile(CONVERSATION_HISTORY_FILE): try: with open(CONVERSATION_HISTORY_FILE, 'r', encoding='utf-8') as f: loaded_history = json.load(f) # Exclure uniquement le PERSONALITY_PROMPT conversation_history = [ msg for msg in loaded_history if not (msg.get("role") == "system" and msg.get("content") == PERSONALITY_PROMPT) ] logger.info(f"Historique chargé depuis {CONVERSATION_HISTORY_FILE}") except Exception as e: logger.error(f"Erreur lors du chargement de l'historique : {e}") conversation_history = [] else: logger.info(f"Aucun fichier d'historique trouvé. Un nouveau fichier sera créé à {CONVERSATION_HISTORY_FILE}") def has_text(text): """ Détermine si le texte fourni est non vide après suppression des espaces. """ return bool(text.strip()) def resize_image(image_bytes, mode='high', attachment_filename=None): """Redimensionne l'image selon le mode spécifié.""" try: with Image.open(BytesIO(image_bytes)) as img: original_format = img.format # Stocker le format original if mode == 'high': img.thumbnail((2000, 2000)) if min(img.size) < 768: scale = 768 / min(img.size) new_size = tuple(int(x * scale) for x in img.size) img = img.resize(new_size, Image.Resampling.LANCZOS) elif mode == 'low': img = img.resize((512, 512)) buffer = BytesIO() img_format = img.format or _infer_image_format(attachment_filename) img.save(buffer, format=img_format) return buffer.getvalue() except Exception as e: logger.error(f"Erreur lors du redimensionnement de l'image : {e}") raise async def encode_image_from_attachment(attachment, mode='high'): """Encode une image depuis une pièce jointe en base64 après redimensionnement.""" image_data = await attachment.read() resized_image = resize_image(image_data, mode=mode, attachment_filename=attachment.filename) return base64.b64encode(resized_image).decode('utf-8') def _infer_image_format(filename): """Déduit le format de l'image basé sur l'extension du fichier.""" if filename: _, ext = os.path.splitext(filename) ext = ext.lower() format_mapping = { '.jpg': 'JPEG', '.jpeg': 'JPEG', '.png': 'PNG', '.gif': 'GIF', '.bmp': 'BMP', '.tiff': 'TIFF' } return format_mapping.get(ext, 'PNG') return 'PNG' # Fonction de sauvegarde de l'historique def save_conversation_history(): try: with open(CONVERSATION_HISTORY_FILE, 'w', encoding='utf-8') as f: json.dump(conversation_history, f, ensure_ascii=False, indent=4) except Exception as e: logger.error(f"Erreur lors de la sauvegarde de l'historique : {e}") # Convertir l'ID du channel en entier try: discord_channel_id = int(DISCORD_CHANNEL_ID) except ValueError: raise ValueError("L'ID du channel Discord est invalide. Assurez-vous qu'il s'agit d'un entier.") class MyDiscordClient(discord.Client): async def close(self): global openai_client if openai_client is not None: await openai_client.close() openai_client = None await super().close() # Initialiser le client Discord avec les intents modifiés client_discord = MyDiscordClient(intents=intents) # Appeler la fonction pour charger l'historique au démarrage load_conversation_history() async def call_for_image_analysis(image_data, user_name, user_text=None, detail='high'): """Appelle l'API pour analyser une image.""" prompt = PERSONALITY_PROMPT if user_text: user_content = f"{user_name} a envoyé une image avec le messsage : \"{user_text}\"." else: user_content = f"{user_name} a envoyé une image:" prompt += user_content message_to_send = { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}", "detail": detail } } ] } user_message = {"role": "user", "content": user_content} messages = [user_message] + [message_to_send] + conversation_history analysis = await openai_client.chat.completions.create( model=MODEL, messages=messages, temperature=TEMPERATURE ) if analysis: logger.info(f"Analyse de l'image par l'API : {analysis.choices[0].message.content}") return analysis.choices[0].message.content async def call_openai_api(user_text, user_name, detail='high'): # Préparer le contenu pour l'appel API message_to_send = { "role": "user", "content": [ {"type": "text", "text": f"{user_name} dit : {transform_emote(user_text, False)}"} ] } # Assembler les messages avec le prompt de personnalité en premier messages = [ {"role": "system", "content": PERSONALITY_PROMPT} ] + conversation_history + [message_to_send] try: response = await openai_client.chat.completions.create( model=MODEL, messages=messages, temperature=TEMPERATURE ) if response: reply = response.choices[0].message.content await add_to_conversation_history(message_to_send) # Ajouter la réponse de l'IA directement à l'historique await add_to_conversation_history({ "role": "assistant", "content": reply }) return response except Exception as e: logger.error(f"Erreur durant l'appel de l'API : {e}") return None @client_discord.event async def on_ready(): logger.info(f'{BOT_NAME} connecté en tant que {client_discord.user}') logger.info(f'Utilisation du modèle {MODEL}') if not conversation_history: logger.info("Aucun historique trouvé. L'historique commence vide.") # Envoyer un message de version dans le canal Discord channel = client_discord.get_channel(discord_channel_id) if channel: try: embed = discord.Embed( title=f"Bot Démarré", description=f"🎉 {BOT_NAME} est en ligne ! Version {BOT_VERSION}\nUtilisation du modèle: **{MODEL}**", color=0x2222aa # Bleu ) await channel.send(embed=embed) logger.info(f"Message de connexion envoyé dans le canal ID {discord_channel_id}") except discord.Forbidden: logger.error(f"Permissions insuffisantes pour envoyer des messages dans le canal ID {discord_channel_id}.") except discord.HTTPException as e: logger.error(f"Erreur lors de l'envoi du message de connexion : {e}") else: logger.error(f"Canal avec ID {discord_channel_id} non trouvé.") @client_discord.event async def on_message(message): global conversation_history # Vérifier si le message provient du canal autorisé if message.channel.id != discord_channel_id: return # Ignorer les messages du bot lui-même if message.author == client_discord.user: return user_text = message.content.strip() file_content = None attachment_filename = None # Vérifier si le message est la commande de réinitialisation if user_text.lower() == "!reset_history": # Vérifier si l'utilisateur a les permissions administratives if not message.author.guild_permissions.administrator: await message.channel.send("❌ Vous n'avez pas la permission d'utiliser cette commande.") return conversation_history = [] save_conversation_history() await message.channel.send("✅ L'historique des conversations a été réinitialisé.") logger.info(f"Historique des conversations réinitialisé par {message.author}.") return # Arrêter le traitement du message après la réinitialisation # Extensions de fichiers autorisées allowed_extensions = ['.txt', '.py', '.html', '.css', '.js'] # Variable pour stocker si le message contient un fichier has_file = False image_data = None # Vérifier s'il y a une pièce jointe if message.attachments: for attachment in message.attachments: # Vérifier si c'est un fichier avec une extension autorisée if any(attachment.filename.endswith(ext) for ext in allowed_extensions): file_content = await read_text_file(attachment) attachment_filename = attachment.filename break elif attachment.content_type and attachment.content_type.startswith('image/'): image_data = await encode_image_from_attachment(attachment, mode='high') break # Traitement des images if image_data: logger.debug(image_data) has_user_text = has_text(user_text) user_text_to_use = user_text if has_user_text else None temp_msg = await message.channel.send(f"*{BOT_NAME} observe l'image...*") try: # Analyser l'image avec l'API analysis = await call_for_image_analysis(image_data, message.author.name, user_text=user_text_to_use) if analysis: await temp_msg.delete() await message.channel.send(analysis) if has_user_text: user_message_text = f"{message.author.name} a envoyé une image avec le messsage : \"{user_text}\"." else: user_message_text = f"{message.author.name} a envoyé une image." user_message = { "role": "user", "content": f"{user_message_text}" } assistant_message = { "role": "assistant", "content": analysis } await add_to_conversation_history(user_message) await add_to_conversation_history(assistant_message) else: await temp_msg.delete() await message.channel.send("Désolé, je n'ai pas pu analyser l'image.") except Exception as e: await temp_msg.delete() await message.channel.send("Une erreur est survenue lors du traitement de l'image.") logger.error(f"Erreur lors du traitement de l'image: {e}") return # Ne pas continuer le traitement après une image # Ajouter le contenu du fichier à la requête si présent if file_content: user_text += f"\nContenu du fichier {attachment.filename}:\n{file_content}" # Vérifier si le texte n'est pas vide après ajout du contenu du fichier if not has_text(user_text): return # Ne pas appeler l'API si le texte est vide async with message.channel.typing(): try: # Appeler l'API result = await call_openai_api(user_text, message.author.name) if result: reply = result.choices[0].message.content reply = filter_message(reply) reply = transform_emote(reply, True) message_parts = split_message(reply) for part in message_parts: await message.channel.send(part) # Afficher dans la console logging.info(f"Réponse envoyée. ({len(message_parts)} message(s))") except Exception as e: await message.channel.send("Franchement, je sais pas quoi te répondre. <:haptriste:358054014262181889>") logger.error(f"Erreur lors du traitement du texte: {e}") async def add_to_conversation_history(new_message): global conversation_history # Ne pas ajouter le PERSONALITY_PROMPT à l'historique if new_message.get("role") == "system" and new_message.get("content") == PERSONALITY_PROMPT: logger.debug("PERSONALITY_PROMPT système non ajouté à l'historique.") return conversation_history.append(new_message) save_conversation_history() logger.debug(f"Message ajouté à l'historique. Taille actuelle : {len(conversation_history)}") if len(conversation_history) > CONVERSATION_HISTORY_SIZE: logger.info(f"Limite de {CONVERSATION_HISTORY_SIZE} messages atteinte.") excess_messages = len(conversation_history) - CONVERSATION_HISTORY_SIZE if excess_messages > 0: # Supprimer les messages les plus anciens del conversation_history[:excess_messages] save_conversation_history() logger.info(f"{excess_messages} messages les plus anciens ont été supprimés pour maintenir l'historique à {CONVERSATION_HISTORY_SIZE} messages.") # Démarrer le bot Discord client_discord.run(DISCORD_TOKEN)