||
- import os
- import logging
- import discord
- import json
- import urllib3
- import base64
- import re
- import asyncio
- import websockets
- import numpy as np
- import time
- from dotenv import load_dotenv
- from openai import AsyncOpenAI, OpenAIError
- from PIL import Image
- from io import BytesIO
- from datetime import datetime, timezone, timedelta
- from charset_normalizer import from_bytes
- from enum import Enum
- from discord.sinks import Sink
- from scipy.signal import resample_poly
- from discord.ext import tasks
- # Charger les variables d'environnement depuis le fichier .env
- load_dotenv()
- DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
- OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
- DISCORD_CHANNEL_ID = os.getenv('DISCORD_CHANNEL_ID')
- PERSONALITY_PROMPT_FILE = os.getenv('PERSONALITY_PROMPT_FILE', 'personality_prompt.txt')
- CONVERSATION_HISTORY_FILE = os.getenv('CONVERSATION_HISTORY_FILE', 'conversation_history.json')
- CONVERSATION_HISTORY_SIZE = int(os.getenv('CONVERSATION_HISTORY_SIZE', '100'))
- BOT_NAME = os.getenv('BOT_NAME', 'ChatBot')
- MODEL = os.getenv('MODEL', 'llama:3.2')
- URL_OPENAI_API = os.getenv('URL_OPENAI_API', 'http://openwebui:8080/v1')
- TEMPERATURE = float(os.getenv('TEMPERATURE', "1.0"))
- BOOT_MESSAGE = os.getenv('BOOT_MESSAGE', "true").lower()
- LOG_LEVEL = os.getenv('LOG_LEVEL', "INFO").upper()
- HISTORY_ANALYSIS_IMAGE = os.getenv('HISTORY_ANALYSIS_IMAGE', "false").lower()
- PROMPT_STATUS_CHANGE = str(os.getenv('PROMPT_STATUS_CHANGE', "Rédige un message court qui sera utilisé en tant que status sur Discord"))
- DELAY_TASK_UPDATE_STATUS = int(os.getenv('DELAY_TASK_UPDATE_STATUS', '30'))
- WHISPER_WS_URL = os.getenv("WHISPER_WS_URL", "ws://whisper-stt:8000/ws/transcribe")
- # Initialiser le client OpenAI asynchrone ici
- openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=URL_OPENAI_API)
- BOT_VERSION = "3.0.0-alpha1"
- # Vérifier que les tokens et le prompt de personnalité sont récupérés
- if DISCORD_TOKEN is None or OPENAI_API_KEY is None or DISCORD_CHANNEL_ID is None:
- raise ValueError("Les tokens ou l'ID du canal ne sont pas définis dans les variables d'environnement.")
- if not os.path.isfile(PERSONALITY_PROMPT_FILE):
- raise FileNotFoundError(f"Le fichier de prompt de personnalité '{PERSONALITY_PROMPT_FILE}' est introuvable.")
- # Lire le prompt de personnalité depuis le fichier
- with open(PERSONALITY_PROMPT_FILE, 'r', encoding='utf-8') as f:
- PERSONALITY_PROMPT = f.read().strip()
- # Log configuration
- log_format = '%(asctime)-13s : %(name)-15s : %(levelname)-8s : %(message)s'
- logging.basicConfig(handlers=[logging.FileHandler("./chatbot.log", 'a', 'utf-8')], format=log_format, level=LOG_LEVEL)
- console = logging.StreamHandler()
- console.setLevel(logging.DEBUG)
- console.setFormatter(logging.Formatter(log_format))
- logger = logging.getLogger(BOT_NAME)
- logger.setLevel("DEBUG")
- logging.getLogger('').addHandler(console)
- httpx_logger = logging.getLogger('httpx')
- httpx_logger.setLevel(logging.DEBUG)
- urllib3.disable_warnings()
- update_status_started = False
- # Initialiser les intents
- intents = discord.Intents.default()
- intents.message_content = True # Activer l'intent pour les contenus de message
- intents.voice_states = True
- class ReplyMode(Enum):
- VOICE = "voice"
- TEXT = "text"
- reply_mode = ReplyMode.VOICE
- reply_text_channel = None
- class STTSink(Sink):
- def __init__(self):
- super().__init__()
- self.user_ws = {}
- self.buffers = {}
- self.last_voice = {}
- self.flush_tasks = {}
- async def _get_ws(self, user_id):
- if user_id not in self.user_ws:
- ws = await websockets.connect(WHISPER_WS_URL)
- self.user_ws[user_id] = ws
- asyncio.create_task(self._listen_ws(user_id, ws))
- return self.user_ws[user_id]
- async def _listen_ws(self, user_id, ws):
- try:
- async for msg in ws:
- data = json.loads(msg)
- if data.get("type") == "final":
- text = data["text"].strip()
- if self._ignore_text(text):
- continue # ✅ PAS return
- if reply_mode == ReplyMode.TEXT and reply_text_channel:
- member = reply_text_channel.guild.get_member(user_id)
- name = member.display_name if member else f"User {user_id}"
- await reply_text_channel.send(f"🗣️ **{name}** : {text}")
- else:
- logger.info(f"[STT][{user_id}] {text}")
- except Exception as e:
- logger.warning(f"[STT][{user_id}] WS fermé : {e}")
- def write(self, pcm_bytes: bytes, user_id: int):
- if not pcm_bytes:
- return
- audio = discord_pcm_to_whisper_int16(pcm_bytes)
- if not audio:
- return
- now = time.time()
- self.last_voice[user_id] = now
- if user_id not in self.buffers:
- self.buffers[user_id] = bytearray()
- self.buffers[user_id].extend(audio)
- buffer_sec = len(self.buffers[user_id]) / (16000 * 2)
- if buffer_sec >= 1.0 and user_id not in self.flush_tasks:
- self.flush_tasks[user_id] = asyncio.run_coroutine_threadsafe(
- self._flush_if_silence(user_id),
- MAIN_LOOP
- )
- async def _flush_if_silence(self, user_id):
- await asyncio.sleep(1.2)
- if time.time() - self.last_voice.get(user_id, 0) < 0.6:
- self.flush_tasks.pop(user_id, None)
- return
- chunk = bytes(self.buffers.get(user_id, b""))
- self.buffers[user_id] = bytearray()
- self.flush_tasks.pop(user_id, None)
- if len(chunk) < 16000 * 2 * 2:
- self.buffers[user_id].extend(chunk)
- return
- try:
- ws = await self._get_ws(user_id)
- await ws.send(chunk)
- logger.debug(f"[STT] chunk envoyé user={user_id} bytes={len(chunk)}")
- except Exception as e:
- logger.warning(f"[STT] envoi échoué user={user_id}: {e}")
- self.user_ws.pop(user_id, None)
- def _ignore_text(self, text: str) -> bool:
- BAD = [
- "amara",
- "sous-titres",
- "merci",
- "musique",
- "applaudissements"
- ]
- t = text.lower()
- return len(t) < 3 or any(b in t for b in BAD)
-
- # Liste pour stocker l'historique des conversations
- conversation_history = []
- def discord_pcm_to_whisper_int16(pcm_bytes: bytes) -> bytes:
- audio = np.frombuffer(pcm_bytes, dtype=np.int16).astype(np.float32)
- # normalisation RMS
- rms = np.sqrt(np.mean(audio**2) + 1e-8)
- audio = audio / max(rms, 5000)
- # passe-bas léger avant downsample
- audio = audio / 32768.0
- # resample 48k → 16k
- audio_16k = resample_poly(audio, up=1, down=3, window=('kaiser', 5.0))
- # clip sécurité
- audio_16k = np.clip(audio_16k, -1.0, 1.0)
- return (audio_16k * 32767).astype(np.int16).tobytes()
- def filter_message(message):
- """Filtre le contenu d'un retour de modèle de language, comme pour enlever les pensées dans le cas par exemple de DeepSeek"""
- THOUGHT_TAGS = [
- "think",
- "analysis",
- "response",
- "reasoning"
- ]
- for tag in THOUGHT_TAGS:
- message = re.sub(rf"<{tag}>.*?</{tag}>", "", message, flags=re.DOTALL | re.IGNORECASE)
- message = re.sub(r"</s>$", "", message)
- return message.strip()
- def transform_emote(message: str, output: bool) -> str:
- """Remplace les smileys par les codes Discord correspondant"""
- list_emote = [
- (":hap:", "<:hap:355854929073537026>"),
- (":angryvault:", "<:angryvault:585550568806940672>"),
- (":minou:", "<:minou:358054423462936576>"),
- (":cetaitsur:", "<a:cetaitsur:826102032963469324>"),
- (":eh:", "<:eh:395979132896280576>"),
- (":desu:", "<:desu:388007643077410837>"),
- (":bave2:", "<:bave2:412252920558387221>"),
- (":haptriste:", "<:haptriste:358054014262181889>"),
- (":perplexe:", "<:perplexe:358054891274371082>"),
- (":sueur:", "<:sueur:358051940631838721>"),
- (":chien:", "<:chien:507606737646518293>"),
- (":kemar:", "<:kemar:419607012796792842>"),
- (":ouch2:", "<:ouch2:777984650710745138>"),
- (":coeur:", "<:coeur:355853389399195649>"),
- (":what:", "<:what:587019571207077928>"),
- # Un peu un hack mais flemme de renommer la fonction
- ("@Rika", "<@1284696824804016138>")
- ]
- for smiley, discord_code in list_emote:
- if output:
- message = message.replace(smiley, discord_code)
- else:
- message = message.replace(discord_code, smiley)
- return message
- def split_message(message, max_length=2000):
- """Divise un message en plusieurs segments de longueur maximale spécifiée."""
- if len(message) <= max_length:
- return [message]
-
- parts = []
- current_part = ""
-
- for line in message.split('\n'):
- if len(current_part) + len(line) + 1 > max_length:
- parts.append(current_part)
- current_part = line + '\n'
- else:
- current_part += line + '\n'
-
- if current_part:
- parts.append(current_part)
-
- return parts
- def load_conversation_history():
- global conversation_history
- if os.path.isfile(CONVERSATION_HISTORY_FILE):
- try:
- with open(CONVERSATION_HISTORY_FILE, 'r', encoding='utf-8') as f:
- loaded_history = json.load(f)
- # Exclure uniquement le PERSONALITY_PROMPT
- conversation_history = [
- msg for msg in loaded_history
- if not (msg.get("role") == "system" and msg.get("content") == PERSONALITY_PROMPT)
- ]
- logger.info(f"Historique chargé depuis {CONVERSATION_HISTORY_FILE}")
- except Exception as e:
- logger.error(f"Erreur lors du chargement de l'historique : {e}")
- conversation_history = []
- else:
- logger.info(f"Aucun fichier d'historique trouvé. Un nouveau fichier sera créé à {CONVERSATION_HISTORY_FILE}")
- def has_text(text):
- """Détermine si le texte fourni est non vide après suppression des espaces."""
- return bool(text.strip())
- def resize_image(image_bytes, attachment_filename=None):
- """Redimensionne l'image selon le mode spécifié."""
- with Image.open(BytesIO(image_bytes)) as img:
- original_format = img.format # Stocker le format original
- img.thumbnail((2000, 2000))
- buffer = BytesIO()
- img_format = img.format or _infer_image_format(attachment_filename)
- img.save(buffer, format=img_format)
- return buffer.getvalue()
- async def encode_image_from_attachment(attachment):
- """Encode une image depuis une pièce jointe en base64 après redimensionnement."""
- image_data = await attachment.read()
- resized_image = resize_image(image_data, attachment_filename=attachment.filename)
- return base64.b64encode(resized_image).decode('utf-8')
- def _infer_image_format(filename):
- """Déduit le format de l'image basé sur l'extension du fichier."""
- if filename:
- _, ext = os.path.splitext(filename)
- ext = ext.lower()
- format_mapping = {
- '.jpg': 'JPEG',
- '.jpeg': 'JPEG',
- '.png': 'PNG',
- '.gif': 'GIF',
- '.bmp': 'BMP',
- '.tiff': 'TIFF'
- }
- return format_mapping.get(ext, 'PNG')
- return 'PNG'
- # Fonction de sauvegarde de l'historique
- def save_conversation_history():
- try:
- with open(CONVERSATION_HISTORY_FILE, 'w', encoding='utf-8') as f:
- json.dump(conversation_history, f, ensure_ascii=False, indent=4)
- except Exception as e:
- logger.error(f"Erreur lors de la sauvegarde de l'historique : {e}")
- # Convertir l'ID du channel en entier
- try:
- discord_channel_id = int(DISCORD_CHANNEL_ID)
- except ValueError:
- raise ValueError("L'ID du channel Discord est invalide. Assurez-vous qu'il s'agit d'un entier.")
- # Initialiser le client Discord avec les intents modifiés
- client_discord = discord.Bot(intents=intents)
- MAIN_LOOP = asyncio.get_event_loop()
- # Appeler la fonction pour charger l'historique au démarrage
- load_conversation_history()
- async def call_for_image_analysis(image_data, user_name, user_text=None):
- """Appelle l'API pour analyser une image."""
- user_content = (
- f"{user_name} a envoyé une image avec le messsage : {transform_emote(user_text, False)}."
- if user_text
- else f"{user_name} a envoyé une image."
- )
- message_history = {
- "role": "user",
- "content": [{ "type": "text", "text": user_content }]
- }
- prompt = {
- "role": "system",
- "content": PERSONALITY_PROMPT
- }
-
- message_to_send = {
- "role": "user",
- "content": [
- {
- "type": "text",
- "text": user_content
- },
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{image_data}"
- }
- }
- ]
- }
- if HISTORY_ANALYSIS_IMAGE != "false":
- messages = [prompt] + conversation_history + [message_to_send]
- else:
- messages = [prompt] + [message_to_send]
- await add_to_conversation_history(message_history)
- analysis = await openai_client.chat.completions.create(
- model=MODEL,
- stream=False,
- messages=messages,
- temperature=TEMPERATURE
- )
- if not analysis or not getattr(analysis, "choices", None):
- raise RuntimeError("Réponse API invalide ou vide")
- else:
- logger.info(f"Analyse de l'image par l'API : {analysis.choices[0].message.content}")
- await add_to_conversation_history({
- "role": "assistant",
- "content": analysis.choices[0].message.content
- })
- return analysis.choices[0].message.content
- async def read_text_file(attachment):
- """Lit le contenu d'un fichier texte attaché."""
- file_bytes = await attachment.read()
- result = from_bytes(file_bytes).best()
- if result is None:
- raise ValueError("Impossible de détecter l'encodage du fichier")
- return str(result)
- async def call_openai_api(user_text, user_name):
- # Préparer le contenu pour l'appel API
- message_to_send = {
- "role": "user",
- "content": [
- {
- "type": "text",
- "text": f"{user_name} dit : {transform_emote(user_text, False)}"
- }
- ]
- }
- # Assembler les messages avec le prompt de personnalité en premier
- messages = [
- {"role": "system", "content": PERSONALITY_PROMPT}
- ] + conversation_history + [message_to_send]
- try:
- response = await openai_client.chat.completions.create(
- model=MODEL,
- stream=False,
- messages=messages,
- temperature=TEMPERATURE
- )
- await add_to_conversation_history(message_to_send)
- # Ajouter la réponse de l'IA directement à l'historique
- await add_to_conversation_history({
- "role": "assistant",
- "content": response.choices[0].message.content
- })
- return response
- except Exception as e:
- logger.error(f"Erreur durant l'appel de l'API : {e}")
- return None
- async def call_openai_api_system(system_text):
- # Préparer le contenu pour l'appel API
- message = {
- "role": "system",
- "content": PERSONALITY_PROMPT + f"\n\nPour cette réponse uniquement : {system_text}"
- }
- response = await openai_client.chat.completions.create(
- model=MODEL,
- stream=False,
- messages=[message],
- temperature=1.2
- )
- return response
- @client_discord.event
- async def on_ready():
- global update_status_started
-
- logger.info(f'{BOT_NAME} connecté en tant que {client_discord.user}')
- logger.info(f'Utilisation du modèle {MODEL}')
- if not update_status_started:
- update_status.start()
- update_status_started = True
- if not conversation_history:
- logger.info("Aucun historique trouvé. L'historique commence vide.")
- # Envoyer un message de version dans le canal Discord
- channel = client_discord.get_channel(discord_channel_id)
- if channel:
- if BOOT_MESSAGE != "false":
- try:
- embed = discord.Embed(
- title=f"Bot Démarré",
- description=f"🎉 {BOT_NAME} est en ligne ! Version {BOT_VERSION}\nUtilisation du modèle: **{MODEL}**",
- color=0x2222aa # Bleu
- )
- await channel.send(embed=embed)
- logger.info(f"Message de connexion envoyé dans le canal ID {discord_channel_id}")
- except discord.Forbidden:
- logger.error(f"Permissions insuffisantes pour envoyer des messages dans le canal ID {discord_channel_id}.")
- except discord.HTTPException as e:
- logger.error(f"Erreur lors de l'envoi du message de connexion : {e}")
- else:
- logger.error(f"Canal avec ID {discord_channel_id} non trouvé.")
- @client_discord.event
- async def on_message(message):
- global conversation_history
- # Vérifier si le message provient du canal autorisé
- if message.channel.id != discord_channel_id:
- return
- # Ignorer les messages du bot lui-même
- if message.author == client_discord.user:
- return
- user_text = message.content.strip()
- file_content = None
- attachment_filename = None
- # Vérifier si le message est la commande de réinitialisation
- if user_text.lower() == "!reset_history":
- # Vérifier si l'utilisateur a les permissions administratives
- if not message.author.guild_permissions.administrator:
- await message.channel.send("❌ Vous n'avez pas la permission d'utiliser cette commande.")
- return
- conversation_history = []
- save_conversation_history()
- await message.channel.send("✅ L'historique des conversations a été réinitialisé.")
- logger.info(f"Historique des conversations réinitialisé par {message.author}.")
- return # Arrêter le traitement du message après la réinitialisation
- # Variable pour stocker si le message contient un fichier
- has_file = False
- image_data = None
- # Vérifier s'il y a une pièce jointe
- if message.attachments:
- for attachment in message.attachments:
- # Vérifier si c'est un fichier avec une extension autorisée
- if attachment.content_type and attachment.content_type.startswith('image/'):
- try:
- image_data = await encode_image_from_attachment(attachment)
- break
- except Exception as e:
- await message.channel.send("Il semble qu'il y ai un souci avec ton image, je ne peux pas l'ouvrir.")
- logger.error(f"Erreur lors de la conversion de l'image : {e}")
- else:
- try:
- file_content = await read_text_file(attachment)
- attachment_filename = attachment.filename
- break
- except Exception as e:
- await message.channel.send("Désolé, je n'ai pas pu lire ton fichier.")
- logger.error(f"Erreur lors de la lecture d'une pièce jointe : {e}")
- # Traitement des images
- if image_data:
- logger.debug(image_data)
- has_user_text = has_text(user_text)
- user_text_to_use = user_text if has_user_text else None
- temp_msg = await message.channel.send(f"*{BOT_NAME} observe l'image...*")
- try:
- # Analyser l'image avec l'API
- analysis = await call_for_image_analysis(image_data, message.author.name, user_text=user_text_to_use)
- reply = filter_message(analysis)
- reply = transform_emote(analysis, True)
- if analysis:
- await temp_msg.delete()
- await message.channel.send(reply)
- if has_user_text:
- user_message_text = f"{message.author.name} a envoyé une image avec le messsage : \"{user_text}\"."
- else:
- user_message_text = f"{message.author.name} a envoyé une image."
- user_message = {
- "role": "user",
- "content": f"{user_message_text}"
- }
- assistant_message = {
- "role": "assistant",
- "content": analysis
- }
- await add_to_conversation_history(user_message)
- await add_to_conversation_history(assistant_message)
- else:
- await temp_msg.delete()
- await message.channel.send("Désolé, je n'ai pas pu analyser l'image.")
- except Exception as e:
- await temp_msg.delete()
- await message.channel.send("Une erreur est survenue lors du traitement de l'image.")
- logger.error(f"Erreur lors du traitement de l'image: {e}")
- return # Ne pas continuer le traitement après une image
- # Ajouter le contenu du fichier à la requête si présent
- if file_content:
- user_text += f"\nPièce jointe ajoutée au message, contenu du fichier '{attachment.filename}':\n{file_content}"
- # Vérifier si le texte n'est pas vide après ajout du contenu du fichier
- if not has_text(user_text):
- return # Ne pas appeler l'API si le texte est vide
- async with message.channel.typing():
- try:
- # Appeler l'API
- result = await call_openai_api(user_text, message.author.name)
- if result:
- reply = result.choices[0].message.content
- reply = filter_message(reply)
- reply = transform_emote(reply, True)
- message_parts = split_message(reply)
- for part in message_parts:
- await message.channel.send(part)
- # Afficher dans la console
- logging.info(f"Réponse envoyée. ({len(message_parts)} message(s))")
- except Exception as e:
- await message.channel.send("Franchement, je sais pas quoi te répondre. <:haptriste:358054014262181889>")
- logger.error(f"Erreur lors du traitement du texte: {e}")
- async def add_to_conversation_history(new_message):
- global conversation_history
- conversation_history.append(new_message)
- save_conversation_history()
- logger.debug(f"Message ajouté à l'historique. Taille actuelle : {len(conversation_history)}")
- if len(conversation_history) > CONVERSATION_HISTORY_SIZE:
- logger.info(f"Limite de {CONVERSATION_HISTORY_SIZE} messages atteinte.")
- excess_messages = len(conversation_history) - CONVERSATION_HISTORY_SIZE
- # Supprimer les messages les plus anciens
- del conversation_history[:excess_messages]
- save_conversation_history()
- logger.info(f"{excess_messages} messages les plus anciens ont été supprimés.")
- @client_discord.slash_command(name="join", description="Le bot rejoint le vocal (réponse voix ou texte)")
- async def join(ctx: discord.ApplicationContext, mode: str = discord.Option(str, choices=["voice", "texte"], default="voice", description="Mode de réponse du bot")):
- global reply_mode, reply_text_channel
- if not ctx.author.voice:
- await ctx.respond("❌ Tu n'es pas dans un salon vocal.", ephemeral=True)
- return
- channel = ctx.author.voice.channel
- if ctx.guild.voice_client:
- vc = ctx.guild.voice_client
- await vc.move_to(channel)
- else:
- vc = await channel.connect()
- # Démarrer l'écoute audio
- vc.start_recording(STTSink(), lambda sink: None)
- if mode == "texte":
- reply_mode = ReplyMode.TEXT
- reply_text_channel = ctx.channel
- await ctx.respond("🎧 Connecté au vocal — réponses **en texte** ici.")
- else:
- reply_mode = ReplyMode.VOICE
- reply_text_channel = None
- await ctx.respond("🎧 Connecté au vocal — réponses **en voix**.")
- @client_discord.slash_command(name="quit", description="Le bot quitte le salon vocal")
- async def quit(ctx: discord.ApplicationContext):
- vc = ctx.guild.voice_client
- if not vc:
- await ctx.respond("❌ Je ne suis pas dans un salon vocal.", ephemeral=True)
- return
- await vc.disconnect()
- await ctx.respond("👋 Déconnecté du salon vocal.")
- @tasks.loop(minutes=DELAY_TASK_UPDATE_STATUS)
- async def update_status():
- try:
- reply = await call_openai_api_system(PROMPT_STATUS_CHANGE)
- status = reply.choices[0].message.content
- logger.info(f"Nouveau status Discord: '{status}'")
- await client_discord.change_presence(activity=discord.CustomActivity(name=status))
- except Exception as e:
- logger.warning(f"Impossible de changer le status Discord : {e}")
- @update_status.before_loop
- async def before_update_status():
- await client_discord.wait_until_ready()
- logger.info("Démarrage de la tâche update_status")
- # Démarrer le bot Discord
- client_discord.run(DISCORD_TOKEN)
|