import os import logging import discord import json import urllib3 import base64 import re import asyncio import websockets import numpy as np import time from dotenv import load_dotenv from openai import AsyncOpenAI, OpenAIError from PIL import Image from io import BytesIO from datetime import datetime, timezone, timedelta from charset_normalizer import from_bytes from enum import Enum from discord.sinks import Sink from scipy.signal import resample_poly from discord.ext import tasks # Charger les variables d'environnement depuis le fichier .env load_dotenv() DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') DISCORD_CHANNEL_ID = os.getenv('DISCORD_CHANNEL_ID') PERSONALITY_PROMPT_FILE = os.getenv('PERSONALITY_PROMPT_FILE', 'personality_prompt.txt') CONVERSATION_HISTORY_FILE = os.getenv('CONVERSATION_HISTORY_FILE', 'conversation_history.json') CONVERSATION_HISTORY_SIZE = int(os.getenv('CONVERSATION_HISTORY_SIZE', '100')) BOT_NAME = os.getenv('BOT_NAME', 'ChatBot') MODEL = os.getenv('MODEL', 'llama:3.2') URL_OPENAI_API = os.getenv('URL_OPENAI_API', 'http://openwebui:8080/v1') TEMPERATURE = float(os.getenv('TEMPERATURE', "1.0")) BOOT_MESSAGE = os.getenv('BOOT_MESSAGE', "true").lower() LOG_LEVEL = os.getenv('LOG_LEVEL', "INFO").upper() HISTORY_ANALYSIS_IMAGE = os.getenv('HISTORY_ANALYSIS_IMAGE', "false").lower() PROMPT_STATUS_CHANGE = str(os.getenv('PROMPT_STATUS_CHANGE', "Rédige un message court qui sera utilisé en tant que status sur Discord")) DELAY_TASK_UPDATE_STATUS = int(os.getenv('DELAY_TASK_UPDATE_STATUS', '30')) WHISPER_WS_URL = os.getenv("WHISPER_WS_URL", "ws://whisper-stt:8000/ws/transcribe") # Initialiser le client OpenAI asynchrone ici openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=URL_OPENAI_API) BOT_VERSION = "3.0.0-alpha1" # Vérifier que les tokens et le prompt de personnalité sont récupérés if DISCORD_TOKEN is None or OPENAI_API_KEY is None or DISCORD_CHANNEL_ID is None: raise ValueError("Les tokens ou l'ID du canal ne sont pas définis dans les variables d'environnement.") if not os.path.isfile(PERSONALITY_PROMPT_FILE): raise FileNotFoundError(f"Le fichier de prompt de personnalité '{PERSONALITY_PROMPT_FILE}' est introuvable.") # Lire le prompt de personnalité depuis le fichier with open(PERSONALITY_PROMPT_FILE, 'r', encoding='utf-8') as f: PERSONALITY_PROMPT = f.read().strip() # Log configuration log_format = '%(asctime)-13s : %(name)-15s : %(levelname)-8s : %(message)s' logging.basicConfig(handlers=[logging.FileHandler("./chatbot.log", 'a', 'utf-8')], format=log_format, level=LOG_LEVEL) console = logging.StreamHandler() console.setLevel(logging.DEBUG) console.setFormatter(logging.Formatter(log_format)) logger = logging.getLogger(BOT_NAME) logger.setLevel("DEBUG") logging.getLogger('').addHandler(console) httpx_logger = logging.getLogger('httpx') httpx_logger.setLevel(logging.DEBUG) urllib3.disable_warnings() update_status_started = False # Initialiser les intents intents = discord.Intents.default() intents.message_content = True # Activer l'intent pour les contenus de message intents.voice_states = True class ReplyMode(Enum): VOICE = "voice" TEXT = "text" reply_mode = ReplyMode.VOICE reply_text_channel = None class STTSink(Sink): def __init__(self): super().__init__() self.user_ws = {} self.buffers = {} # audio accumulé par user self.last_send = {} # timestamp dernier envoi async def _get_ws(self, user_id): if user_id not in self.user_ws: ws = await websockets.connect(WHISPER_WS_URL) self.user_ws[user_id] = ws asyncio.create_task(self._listen_ws(user_id, ws)) return self.user_ws[user_id] async def _listen_ws(self, user_id, ws): try: async for msg in ws: data = json.loads(msg) if data.get("type") == "final": text = data["text"] if reply_mode == ReplyMode.TEXT and reply_text_channel: await reply_text_channel.send(f"🗣️ {text}") else: logger.info(f"[STT][{user_id}] {text}") except Exception as e: logger.warning(f"[STT][{user_id}] WS fermé : {e}") def write(self, pcm_bytes: bytes, user_id: int): if not pcm_bytes: return audio = discord_pcm_to_whisper_int16(pcm_bytes) if not audio: return if user_id not in self.buffers: self.buffers[user_id] = [] self.last_send[user_id] = time.time() self.buffers[user_id].append(audio) now = time.time() # Envoi toutes les ~600 ms if now - self.last_send[user_id] >= 0.6: chunk = b"".join(self.buffers[user_id]) self.buffers[user_id].clear() self.last_send[user_id] = now asyncio.run_coroutine_threadsafe(self._send_audio(user_id, chunk), MAIN_LOOP) logger.debug(f"[STT] audio envoyé user={user_id} bytes={len(chunk)}") async def _send_audio(self, user_id, pcm_bytes): ws = await self._get_ws(user_id) await ws.send(pcm_bytes) # Liste pour stocker l'historique des conversations conversation_history = [] def discord_pcm_to_whisper_int16(pcm_bytes: bytes) -> bytes: # PCM 48 kHz int16 -> numpy audio_48k = np.frombuffer(pcm_bytes, dtype=np.int16) if audio_48k.size == 0: return b"" # int16 -> float32 pour resample audio_float32 = audio_48k.astype(np.float32) / 32768.0 # resample 48kHz -> 16kHz audio_16k = resample_poly(audio_float32, up=1, down=3) # float32 -> int16 (CE QUE WHISPER ATTEND) audio_16k_int16 = np.clip(audio_16k * 32768.0, -32768, 32767).astype(np.int16) return audio_16k_int16.tobytes() def filter_message(message): """Filtre le contenu d'un retour de modèle de language, comme pour enlever les pensées dans le cas par exemple de DeepSeek""" THOUGHT_TAGS = [ "think", "analysis", "response", "reasoning" ] for tag in THOUGHT_TAGS: message = re.sub(rf"<{tag}>.*?", "", message, flags=re.DOTALL | re.IGNORECASE) message = re.sub(r"$", "", message) return message.strip() def transform_emote(message: str, output: bool) -> str: """Remplace les smileys par les codes Discord correspondant""" list_emote = [ (":hap:", "<:hap:355854929073537026>"), (":angryvault:", "<:angryvault:585550568806940672>"), (":minou:", "<:minou:358054423462936576>"), (":cetaitsur:", ""), (":eh:", "<:eh:395979132896280576>"), (":desu:", "<:desu:388007643077410837>"), (":bave2:", "<:bave2:412252920558387221>"), (":haptriste:", "<:haptriste:358054014262181889>"), (":perplexe:", "<:perplexe:358054891274371082>"), (":sueur:", "<:sueur:358051940631838721>"), (":chien:", "<:chien:507606737646518293>"), (":kemar:", "<:kemar:419607012796792842>"), (":ouch2:", "<:ouch2:777984650710745138>"), (":coeur:", "<:coeur:355853389399195649>"), (":what:", "<:what:587019571207077928>"), # Un peu un hack mais flemme de renommer la fonction ("@Rika", "<@1284696824804016138>") ] for smiley, discord_code in list_emote: if output: message = message.replace(smiley, discord_code) else: message = message.replace(discord_code, smiley) return message def split_message(message, max_length=2000): """Divise un message en plusieurs segments de longueur maximale spécifiée.""" if len(message) <= max_length: return [message] parts = [] current_part = "" for line in message.split('\n'): if len(current_part) + len(line) + 1 > max_length: parts.append(current_part) current_part = line + '\n' else: current_part += line + '\n' if current_part: parts.append(current_part) return parts def load_conversation_history(): global conversation_history if os.path.isfile(CONVERSATION_HISTORY_FILE): try: with open(CONVERSATION_HISTORY_FILE, 'r', encoding='utf-8') as f: loaded_history = json.load(f) # Exclure uniquement le PERSONALITY_PROMPT conversation_history = [ msg for msg in loaded_history if not (msg.get("role") == "system" and msg.get("content") == PERSONALITY_PROMPT) ] logger.info(f"Historique chargé depuis {CONVERSATION_HISTORY_FILE}") except Exception as e: logger.error(f"Erreur lors du chargement de l'historique : {e}") conversation_history = [] else: logger.info(f"Aucun fichier d'historique trouvé. Un nouveau fichier sera créé à {CONVERSATION_HISTORY_FILE}") def has_text(text): """Détermine si le texte fourni est non vide après suppression des espaces.""" return bool(text.strip()) def resize_image(image_bytes, attachment_filename=None): """Redimensionne l'image selon le mode spécifié.""" with Image.open(BytesIO(image_bytes)) as img: original_format = img.format # Stocker le format original img.thumbnail((2000, 2000)) buffer = BytesIO() img_format = img.format or _infer_image_format(attachment_filename) img.save(buffer, format=img_format) return buffer.getvalue() async def encode_image_from_attachment(attachment): """Encode une image depuis une pièce jointe en base64 après redimensionnement.""" image_data = await attachment.read() resized_image = resize_image(image_data, attachment_filename=attachment.filename) return base64.b64encode(resized_image).decode('utf-8') def _infer_image_format(filename): """Déduit le format de l'image basé sur l'extension du fichier.""" if filename: _, ext = os.path.splitext(filename) ext = ext.lower() format_mapping = { '.jpg': 'JPEG', '.jpeg': 'JPEG', '.png': 'PNG', '.gif': 'GIF', '.bmp': 'BMP', '.tiff': 'TIFF' } return format_mapping.get(ext, 'PNG') return 'PNG' # Fonction de sauvegarde de l'historique def save_conversation_history(): try: with open(CONVERSATION_HISTORY_FILE, 'w', encoding='utf-8') as f: json.dump(conversation_history, f, ensure_ascii=False, indent=4) except Exception as e: logger.error(f"Erreur lors de la sauvegarde de l'historique : {e}") # Convertir l'ID du channel en entier try: discord_channel_id = int(DISCORD_CHANNEL_ID) except ValueError: raise ValueError("L'ID du channel Discord est invalide. Assurez-vous qu'il s'agit d'un entier.") # Initialiser le client Discord avec les intents modifiés client_discord = discord.Bot(intents=intents) MAIN_LOOP = asyncio.get_event_loop() # Appeler la fonction pour charger l'historique au démarrage load_conversation_history() async def call_for_image_analysis(image_data, user_name, user_text=None): """Appelle l'API pour analyser une image.""" user_content = ( f"{user_name} a envoyé une image avec le messsage : {transform_emote(user_text, False)}." if user_text else f"{user_name} a envoyé une image." ) message_history = { "role": "user", "content": [{ "type": "text", "text": user_content }] } prompt = { "role": "system", "content": PERSONALITY_PROMPT } message_to_send = { "role": "user", "content": [ { "type": "text", "text": user_content }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_data}" } } ] } if HISTORY_ANALYSIS_IMAGE != "false": messages = [prompt] + conversation_history + [message_to_send] else: messages = [prompt] + [message_to_send] await add_to_conversation_history(message_history) analysis = await openai_client.chat.completions.create( model=MODEL, stream=False, messages=messages, temperature=TEMPERATURE ) if not analysis or not getattr(analysis, "choices", None): raise RuntimeError("Réponse API invalide ou vide") else: logger.info(f"Analyse de l'image par l'API : {analysis.choices[0].message.content}") await add_to_conversation_history({ "role": "assistant", "content": analysis.choices[0].message.content }) return analysis.choices[0].message.content async def read_text_file(attachment): """Lit le contenu d'un fichier texte attaché.""" file_bytes = await attachment.read() result = from_bytes(file_bytes).best() if result is None: raise ValueError("Impossible de détecter l'encodage du fichier") return str(result) async def call_openai_api(user_text, user_name): # Préparer le contenu pour l'appel API message_to_send = { "role": "user", "content": [ { "type": "text", "text": f"{user_name} dit : {transform_emote(user_text, False)}" } ] } # Assembler les messages avec le prompt de personnalité en premier messages = [ {"role": "system", "content": PERSONALITY_PROMPT} ] + conversation_history + [message_to_send] try: response = await openai_client.chat.completions.create( model=MODEL, stream=False, messages=messages, temperature=TEMPERATURE ) await add_to_conversation_history(message_to_send) # Ajouter la réponse de l'IA directement à l'historique await add_to_conversation_history({ "role": "assistant", "content": response.choices[0].message.content }) return response except Exception as e: logger.error(f"Erreur durant l'appel de l'API : {e}") return None async def call_openai_api_system(system_text): # Préparer le contenu pour l'appel API message = { "role": "system", "content": PERSONALITY_PROMPT + f"\n\nPour cette réponse uniquement : {system_text}" } response = await openai_client.chat.completions.create( model=MODEL, stream=False, messages=[message], temperature=1.2 ) return response @client_discord.event async def on_ready(): global update_status_started logger.info(f'{BOT_NAME} connecté en tant que {client_discord.user}') logger.info(f'Utilisation du modèle {MODEL}') if not update_status_started: update_status.start() update_status_started = True if not conversation_history: logger.info("Aucun historique trouvé. L'historique commence vide.") # Envoyer un message de version dans le canal Discord channel = client_discord.get_channel(discord_channel_id) if channel: if BOOT_MESSAGE != "false": try: embed = discord.Embed( title=f"Bot Démarré", description=f"🎉 {BOT_NAME} est en ligne ! Version {BOT_VERSION}\nUtilisation du modèle: **{MODEL}**", color=0x2222aa # Bleu ) await channel.send(embed=embed) logger.info(f"Message de connexion envoyé dans le canal ID {discord_channel_id}") except discord.Forbidden: logger.error(f"Permissions insuffisantes pour envoyer des messages dans le canal ID {discord_channel_id}.") except discord.HTTPException as e: logger.error(f"Erreur lors de l'envoi du message de connexion : {e}") else: logger.error(f"Canal avec ID {discord_channel_id} non trouvé.") @client_discord.event async def on_message(message): global conversation_history # Vérifier si le message provient du canal autorisé if message.channel.id != discord_channel_id: return # Ignorer les messages du bot lui-même if message.author == client_discord.user: return user_text = message.content.strip() file_content = None attachment_filename = None # Vérifier si le message est la commande de réinitialisation if user_text.lower() == "!reset_history": # Vérifier si l'utilisateur a les permissions administratives if not message.author.guild_permissions.administrator: await message.channel.send("❌ Vous n'avez pas la permission d'utiliser cette commande.") return conversation_history = [] save_conversation_history() await message.channel.send("✅ L'historique des conversations a été réinitialisé.") logger.info(f"Historique des conversations réinitialisé par {message.author}.") return # Arrêter le traitement du message après la réinitialisation # Variable pour stocker si le message contient un fichier has_file = False image_data = None # Vérifier s'il y a une pièce jointe if message.attachments: for attachment in message.attachments: # Vérifier si c'est un fichier avec une extension autorisée if attachment.content_type and attachment.content_type.startswith('image/'): try: image_data = await encode_image_from_attachment(attachment) break except Exception as e: await message.channel.send("Il semble qu'il y ai un souci avec ton image, je ne peux pas l'ouvrir.") logger.error(f"Erreur lors de la conversion de l'image : {e}") else: try: file_content = await read_text_file(attachment) attachment_filename = attachment.filename break except Exception as e: await message.channel.send("Désolé, je n'ai pas pu lire ton fichier.") logger.error(f"Erreur lors de la lecture d'une pièce jointe : {e}") # Traitement des images if image_data: logger.debug(image_data) has_user_text = has_text(user_text) user_text_to_use = user_text if has_user_text else None temp_msg = await message.channel.send(f"*{BOT_NAME} observe l'image...*") try: # Analyser l'image avec l'API analysis = await call_for_image_analysis(image_data, message.author.name, user_text=user_text_to_use) reply = filter_message(analysis) reply = transform_emote(analysis, True) if analysis: await temp_msg.delete() await message.channel.send(reply) if has_user_text: user_message_text = f"{message.author.name} a envoyé une image avec le messsage : \"{user_text}\"." else: user_message_text = f"{message.author.name} a envoyé une image." user_message = { "role": "user", "content": f"{user_message_text}" } assistant_message = { "role": "assistant", "content": analysis } await add_to_conversation_history(user_message) await add_to_conversation_history(assistant_message) else: await temp_msg.delete() await message.channel.send("Désolé, je n'ai pas pu analyser l'image.") except Exception as e: await temp_msg.delete() await message.channel.send("Une erreur est survenue lors du traitement de l'image.") logger.error(f"Erreur lors du traitement de l'image: {e}") return # Ne pas continuer le traitement après une image # Ajouter le contenu du fichier à la requête si présent if file_content: user_text += f"\nPièce jointe ajoutée au message, contenu du fichier '{attachment.filename}':\n{file_content}" # Vérifier si le texte n'est pas vide après ajout du contenu du fichier if not has_text(user_text): return # Ne pas appeler l'API si le texte est vide async with message.channel.typing(): try: # Appeler l'API result = await call_openai_api(user_text, message.author.name) if result: reply = result.choices[0].message.content reply = filter_message(reply) reply = transform_emote(reply, True) message_parts = split_message(reply) for part in message_parts: await message.channel.send(part) # Afficher dans la console logging.info(f"Réponse envoyée. ({len(message_parts)} message(s))") except Exception as e: await message.channel.send("Franchement, je sais pas quoi te répondre. <:haptriste:358054014262181889>") logger.error(f"Erreur lors du traitement du texte: {e}") async def add_to_conversation_history(new_message): global conversation_history conversation_history.append(new_message) save_conversation_history() logger.debug(f"Message ajouté à l'historique. Taille actuelle : {len(conversation_history)}") if len(conversation_history) > CONVERSATION_HISTORY_SIZE: logger.info(f"Limite de {CONVERSATION_HISTORY_SIZE} messages atteinte.") excess_messages = len(conversation_history) - CONVERSATION_HISTORY_SIZE # Supprimer les messages les plus anciens del conversation_history[:excess_messages] save_conversation_history() logger.info(f"{excess_messages} messages les plus anciens ont été supprimés.") @client_discord.slash_command(name="join", description="Le bot rejoint le vocal (réponse voix ou texte)") async def join(ctx: discord.ApplicationContext, mode: str = discord.Option(str, choices=["voice", "texte"], default="voice", description="Mode de réponse du bot")): global reply_mode, reply_text_channel if not ctx.author.voice: await ctx.respond("❌ Tu n'es pas dans un salon vocal.", ephemeral=True) return channel = ctx.author.voice.channel if ctx.guild.voice_client: vc = ctx.guild.voice_client await vc.move_to(channel) else: vc = await channel.connect() # Démarrer l'écoute audio vc.start_recording(STTSink(), lambda sink: None) if mode == "texte": reply_mode = ReplyMode.TEXT reply_text_channel = ctx.channel await ctx.respond("🎧 Connecté au vocal — réponses **en texte** ici.") else: reply_mode = ReplyMode.VOICE reply_text_channel = None await ctx.respond("🎧 Connecté au vocal — réponses **en voix**.") @client_discord.slash_command(name="quit", description="Le bot quitte le salon vocal") async def quit(ctx: discord.ApplicationContext): vc = ctx.guild.voice_client if not vc: await ctx.respond("❌ Je ne suis pas dans un salon vocal.", ephemeral=True) return await vc.disconnect() await ctx.respond("👋 Déconnecté du salon vocal.") @tasks.loop(minutes=DELAY_TASK_UPDATE_STATUS) async def update_status(): try: reply = await call_openai_api_system(PROMPT_STATUS_CHANGE) status = reply.choices[0].message.content logger.info(f"Nouveau status Discord: '{status}'") await client_discord.change_presence(activity=discord.CustomActivity(name=status)) except Exception as e: logger.warning(f"Impossible de changer le status Discord : {e}") @update_status.before_loop async def before_update_status(): await client_discord.wait_until_ready() logger.info("Démarrage de la tâche update_status") # Démarrer le bot Discord client_discord.run(DISCORD_TOKEN)