| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228 |
- import os
- import json
- import logging
- import base64
- from io import BytesIO
- import asyncio
- import random
- import mysql.connector
- import pytz
- from mysql.connector import Error
- from PIL import Image
- import tiktoken
- import discord
- from discord.ext import commands
- from discord import app_commands
- from datetime import datetime, timezone, timedelta
- from dotenv import load_dotenv
- from openai import AsyncOpenAI, OpenAIError
- from discord.utils import get
- # =================================
- # Configuration et Initialisation
- # =================================
- # Charger les variables d'environnement depuis le fichier .env
- load_dotenv()
- DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
- OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
- DISCORD_CHANNEL_ID = os.getenv('DISCORD_CHANNEL_ID')
- PERSONALITY_PROMPT_FILE = os.getenv('PERSONALITY_PROMPT_FILE', 'personality_prompt.txt')
- IMAGE_ANALYSIS_PROMPT_FILE = os.getenv('IMAGE_ANALYSIS_PROMPT_FILE', 'image_analysis_prompt.txt')
- BOT_NAME = os.getenv('BOT_NAME', 'ChatBot')
- BOT_VERSION = "3.0.0"
- GUILD_ID = os.getenv('GUILD_ID')
- SPECIFIC_ROLE_NAME = os.getenv('SPECIFIC_ROLE_NAME')
- # Validation des variables d'environnement
- required_env_vars = {
- 'DISCORD_TOKEN': DISCORD_TOKEN,
- 'OPENAI_API_KEY': OPENAI_API_KEY,
- 'DISCORD_CHANNEL_ID': DISCORD_CHANNEL_ID,
- 'IMAGE_ANALYSIS_PROMPT_FILE': IMAGE_ANALYSIS_PROMPT_FILE,
- 'GUILD_ID': GUILD_ID,
- 'SPECIFIC_ROLE_NAME': SPECIFIC_ROLE_NAME
- }
- missing_vars = [var for var, val in required_env_vars.items() if val is None]
- if missing_vars:
- raise ValueError(f"Les variables d'environnement suivantes ne sont pas définies: {', '.join(missing_vars)}")
- # Convertir l'ID du canal Discord en entier
- try:
- chatgpt_channel_id = int(DISCORD_CHANNEL_ID)
- except ValueError:
- raise ValueError("L'ID du canal Discord est invalide. Assurez-vous qu'il s'agit d'un entier.")
- # Convertir l'ID de la guild en entier
- try:
- GUILD_ID = int(GUILD_ID)
- except ValueError:
- raise ValueError("L'ID de la guild Discord est invalide. Assurez-vous qu'il s'agit d'un entier.")
- # Vérification de l'existence des fichiers de prompt
- for file_var, file_path in [('PERSONALITY_PROMPT_FILE', PERSONALITY_PROMPT_FILE),
- ('IMAGE_ANALYSIS_PROMPT_FILE', IMAGE_ANALYSIS_PROMPT_FILE)]:
- if not os.path.isfile(file_path):
- raise FileNotFoundError(f"Le fichier de prompt '{file_var}' '{file_path}' est introuvable.")
- # Lire les prompts depuis les fichiers
- with open(PERSONALITY_PROMPT_FILE, 'r', encoding='utf-8') as f:
- PERSONALITY_PROMPT = f.read().strip()
- with open(IMAGE_ANALYSIS_PROMPT_FILE, 'r', encoding='utf-8') as f:
- IMAGE_ANALYSIS_PROMPT = f.read().strip()
- # Configurer les logs
- LOG_FORMAT = '%(asctime)s : %(name)s : %(levelname)s : %(message)s'
- logging.basicConfig(
- handlers=[
- logging.FileHandler("./chatbot.log", mode='a', encoding='utf-8'),
- logging.StreamHandler()
- ],
- format=LOG_FORMAT,
- level=logging.INFO
- )
- logger = logging.getLogger(BOT_NAME)
- logger.setLevel(logging.INFO)
- logging.getLogger('httpx').setLevel(logging.WARNING) # Réduire le niveau de log pour 'httpx'
- # Initialiser les intents Discord
- intents = discord.Intents.default()
- intents.message_content = True
- intents.members = True
- intents.presences = True
- # Initialiser le client OpenAI asynchrone
- openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
- # ============================
- # Commande Décorateur Admin
- # ============================
- def admin_command(func):
- """Décorateur pour marquer une commande comme réservée aux administrateurs."""
- func.is_admin = True
- return func
- # =====================================
- # Gestion de la Base de Données MariaDB
- # =====================================
- class DatabaseManager:
- def __init__(self):
- self.connection = self.create_db_connection()
- def create_db_connection(self):
- try:
- connection = mysql.connector.connect(
- host=os.getenv('DB_HOST'),
- user=os.getenv('DB_USER'),
- password=os.getenv('DB_PASSWORD'),
- database=os.getenv('DB_NAME'),
- charset='utf8mb4',
- collation='utf8mb4_unicode_ci'
- )
- if connection.is_connected():
- logger.info("Connexion réussie à MariaDB")
- return connection
- except Error as e:
- logger.error(f"Erreur de connexion à MariaDB: {e}")
- return None
- def load_conversation_history(self):
- global conversation_history
- try:
- with self.connection.cursor(dictionary=True) as cursor:
- cursor.execute("SELECT role, content FROM conversation_history ORDER BY id ASC")
- rows = cursor.fetchall()
- conversation_history = [
- row for row in rows
- if not (row['role'] == "system" and row['content'] == PERSONALITY_PROMPT)
- ]
- logger.info("Historique chargé depuis la base de données")
- except Error as e:
- logger.error(f"Erreur lors du chargement de l'historique depuis la base de données: {e}")
- conversation_history = []
- def save_message(self, role, content):
- try:
- with self.connection.cursor() as cursor:
- sql = "INSERT INTO conversation_history (role, content) VALUES (%s, %s)"
- cursor.execute(sql, (role, json.dumps(content, ensure_ascii=False) if isinstance(content, (dict, list)) else content))
- self.connection.commit()
- logger.debug(f"Message sauvegardé dans la base de données: {role} - {content[:50]}...")
- except Error as e:
- logger.error(f"Erreur lors de la sauvegarde du message dans la base de données: {e}")
- def delete_old_image_analyses(self):
- try:
- with self.connection.cursor() as cursor:
- cursor.execute("DELETE FROM conversation_history WHERE role = 'system' AND content LIKE '__IMAGE_ANALYSIS__:%'")
- self.connection.commit()
- logger.info("Toutes les anciennes analyses d'image ont été supprimées de la base de données.")
- except Error as e:
- logger.error(f"Erreur lors de la suppression des analyses d'image: {e}")
- def reset_history(self):
- try:
- with self.connection.cursor() as cursor:
- cursor.execute("DELETE FROM conversation_history")
- self.connection.commit()
- logger.info("Historique des conversations réinitialisé.")
- except Error as e:
- logger.error(f"Erreur lors de la réinitialisation de l'historique: {e}")
- def delete_old_messages(self, limit):
- try:
- with self.connection.cursor() as cursor:
- cursor.execute("DELETE FROM conversation_history ORDER BY id ASC LIMIT %s", (limit,))
- self.connection.commit()
- logger.debug(f"{limit} messages les plus anciens ont été supprimés de la base de données pour maintenir l'historique à 150 messages.")
- except Error as e:
- logger.error(f"Erreur lors de la suppression des anciens messages: {e}")
- def close_connection(self):
- if self.connection and self.connection.is_connected():
- self.connection.close()
- logger.info("Connexion à la base de données fermée.")
- def add_reminder(self, user_id, channel_id, remind_at, content):
- try:
- with self.connection.cursor() as cursor:
- sql = """
- INSERT INTO reminders (user_id, channel_id, remind_at, content)
- VALUES (%s, %s, %s, %s)
- """
- cursor.execute(sql, (user_id, channel_id, remind_at, content))
- self.connection.commit()
- logger.info(f"Rappel ajouté pour l'utilisateur {user_id} à {remind_at}")
- except Error as e:
- logger.error(f"Erreur lors de l'ajout du rappel: {e}")
- def get_due_reminders(self, current_time):
- try:
- with self.connection.cursor(dictionary=True) as cursor:
- sql = "SELECT * FROM reminders WHERE remind_at <= %s"
- cursor.execute(sql, (current_time,))
- reminders = cursor.fetchall()
- return reminders
- except Error as e:
- logger.error(f"Erreur lors de la récupération des rappels: {e}")
- return []
- def delete_reminder(self, reminder_id):
- try:
- with self.connection.cursor() as cursor:
- sql = "DELETE FROM reminders WHERE id = %s"
- cursor.execute(sql, (reminder_id,))
- self.connection.commit()
- logger.info(f"Rappel ID {reminder_id} supprimé")
- except Error as e:
- logger.error(f"Erreur lors de la suppression du rappel ID {reminder_id}: {e}")
- def get_user_reminders(self, user_id):
- """Récupère tous les rappels futurs pour un utilisateur spécifique."""
- try:
- with self.connection.cursor(dictionary=True) as cursor:
- sql = """
- SELECT * FROM reminders
- WHERE user_id = %s AND remind_at > NOW()
- ORDER BY remind_at ASC
- """
- cursor.execute(sql, (user_id,))
- reminders = cursor.fetchall()
- logger.info(f"{len(reminders)} rappels récupérés pour l'utilisateur {user_id}.")
- return reminders
- except Error as e:
- logger.error(f"Erreur lors de la récupération des rappels de l'utilisateur {user_id}: {e}")
- return []
- def get_reminder_by_id(self, reminder_id):
- """Récupère un rappel spécifique par son ID."""
- try:
- with self.connection.cursor(dictionary=True) as cursor:
- sql = "SELECT * FROM reminders WHERE id = %s"
- cursor.execute(sql, (reminder_id,))
- reminder = cursor.fetchone()
- return reminder
- except Error as e:
- logger.error(f"Erreur lors de la récupération du rappel ID {reminder_id}: {e}")
- return None
- # ===============================
- # Gestion de l'Historique des Messages
- # ===============================
- conversation_history = []
- last_analysis_index = None
- messages_since_last_analysis = 0
- # ====================
- # Fonctions Utilitaires
- # ====================
- def split_message(message, max_length=2000):
- """Divise un message en plusieurs segments de longueur maximale spécifiée."""
- if len(message) <= max_length:
- return [message]
-
- parts = []
- current_part = ""
-
- for line in message.split('\n'):
- if len(current_part) + len(line) + 1 > max_length:
- parts.append(current_part)
- current_part = line + '\n'
- else:
- current_part += line + '\n'
-
- if current_part:
- parts.append(current_part)
-
- return parts
- def has_text(text):
- """Détermine si le texte fourni est non vide après suppression des espaces."""
- return bool(text.strip())
- def resize_image(image_bytes, mode='high', attachment_filename=None):
- """Redimensionne l'image selon le mode spécifié."""
- try:
- with Image.open(BytesIO(image_bytes)) as img:
- original_format = img.format # Stocker le format original
- if mode == 'high':
- img.thumbnail((2000, 2000))
- if min(img.size) < 768:
- scale = 768 / min(img.size)
- new_size = tuple(int(x * scale) for x in img.size)
- img = img.resize(new_size, Image.Resampling.LANCZOS)
- elif mode == 'low':
- img = img.resize((512, 512))
- buffer = BytesIO()
- img_format = img.format or _infer_image_format(attachment_filename)
- img.save(buffer, format=img_format)
- return buffer.getvalue()
- except Exception as e:
- logger.error(f"Erreur lors du redimensionnement de l'image : {e}")
- raise
- def _infer_image_format(filename):
- """Déduit le format de l'image basé sur l'extension du fichier."""
- if filename:
- _, ext = os.path.splitext(filename)
- ext = ext.lower()
- format_mapping = {
- '.jpg': 'JPEG',
- '.jpeg': 'JPEG',
- '.png': 'PNG',
- '.gif': 'GIF',
- '.bmp': 'BMP',
- '.tiff': 'TIFF'
- }
- return format_mapping.get(ext, 'PNG')
- return 'PNG'
- def extract_text_from_message(message):
- """Extrait le texte du message."""
- content = message.get("content", "")
- if isinstance(content, list):
- texts = [part.get("text", "") for part in content if isinstance(part, dict) and part.get("text")]
- return ' '.join(texts)
- elif isinstance(content, str):
- return content
- return ""
- def calculate_cost(usage, model='gpt-4o-mini'):
- """Calcule le coût basé sur l'utilisation des tokens."""
- input_tokens = usage.get('prompt_tokens', 0)
- output_tokens = usage.get('completion_tokens', 0)
- model_costs = {
- 'gpt-4o': {
- 'input_rate': 5.00 / 1_000_000, # 5$ pour 1M tokens d'entrée
- 'output_rate': 15.00 / 1_000_000 # 15$ pour 1M tokens de sortie
- },
- 'gpt-4o-mini': {
- 'input_rate': 0.150 / 1_000_000, # 0.150$ pour 1M tokens d'entrée
- 'output_rate': 0.600 / 1_000_000 # 0.600$ pour 1M tokens de sortie
- }
- }
- rates = model_costs.get(model, model_costs['gpt-4o-mini'])
- input_cost = input_tokens * rates['input_rate']
- output_cost = output_tokens * rates['output_rate']
- total_cost = input_cost + output_cost
- if model not in model_costs:
- logger.warning(f"Modèle inconnu '{model}'. Utilisation des tarifs par défaut pour 'gpt-4o-mini'.")
- return input_tokens, output_tokens, total_cost
- async def read_text_file(attachment):
- """Lit le contenu d'un fichier texte attaché."""
- file_bytes = await attachment.read()
- return file_bytes.decode('utf-8')
- async def encode_image_from_attachment(attachment, mode='high'):
- """Encode une image depuis une pièce jointe en base64 après redimensionnement."""
- image_data = await attachment.read()
- resized_image = resize_image(image_data, mode=mode, attachment_filename=attachment.filename)
- return base64.b64encode(resized_image).decode('utf-8')
- # =================================
- # Interaction avec OpenAI
- # =================================
- # Charger l'encodeur pour le modèle GPT-4o mini
- encoding = tiktoken.get_encoding("o200k_base")
- async def call_openai_model(model, messages, max_tokens, temperature=0.8):
- """Appelle un modèle OpenAI avec les paramètres spécifiés et gère la réponse."""
- try:
- response = await openai_client.chat.completions.create(
- model=model,
- messages=messages,
- max_tokens=max_tokens,
- temperature=temperature
- )
-
- if response and response.choices:
- reply = response.choices[0].message.content
- # Ne pas logger les réponses de 'gpt-4o-mini' et 'gpt-4o'
- if model not in ["gpt-4o-mini", "gpt-4o"]:
- logger.info(f"Réponse de {model}: {reply[:100]}...")
- if hasattr(response, 'usage') and response.usage:
- usage = {
- 'prompt_tokens': response.usage.prompt_tokens,
- 'completion_tokens': response.usage.completion_tokens
- }
- _, _, total_cost = calculate_cost(usage, model=model)
- # Log avec les tokens d'entrée et de sortie
- logger.info(f"Coût de l'utilisation de {model}: ${total_cost:.4f} / Input: {usage['prompt_tokens']} / Output: {usage['completion_tokens']}")
- else:
- logger.warning(f"Informations d'utilisation non disponibles pour {model}.")
-
- return reply
- except OpenAIError as e:
- logger.error(f"Erreur lors de l'appel à l'API OpenAI avec {model}: {e}")
- except Exception as e:
- logger.error(f"Erreur inattendue lors de l'appel à l'API OpenAI avec {model}: {e}")
-
- return None
- async def call_gpt4o_for_image_analysis(image_data, user_text=None, detail='high'):
- """Appelle GPT-4o pour analyser une image."""
- prompt = IMAGE_ANALYSIS_PROMPT
- if user_text:
- prompt += f" Voici ce que l'on te décrit : \"{user_text}\"."
-
- message_to_send = {
- "role": "user",
- "content": [
- {"type": "text", "text": prompt},
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{image_data}",
- "detail": detail
- }
- }
- ]
- }
-
- # Obtenir la date et l'heure actuelles
- tz = pytz.timezone('Europe/Paris')
- current_datetime = datetime.now(tz).strftime('%d/%m/%Y %H:%M:%S %Z')
-
- # Créer un message système avec la date et l'heure
- date_message = {
- "role": "system",
- "content": f"Date et heure actuelles : {current_datetime}"
- }
- # Construire la liste des messages avec le message de date ajouté
- messages = [
- {"role": "system", "content": PERSONALITY_PROMPT},
- date_message # Ajout du message de date et heure
- ] + conversation_history + [message_to_send]
- analysis = await call_openai_model(
- model="gpt-4o",
- messages=messages,
- max_tokens=4096,
- temperature=1.0
- )
-
- if analysis:
- logger.info(f"Analyse de l'image par GPT-4o : {analysis}")
- return analysis
- async def call_gpt4o_mini_with_analysis(analysis_text, user_name, user_question, has_text_flag):
- """Appelle GPT-4o Mini pour générer une réponse basée sur l'analyse de l'image."""
- system_messages = [
- {"role": "system", "content": PERSONALITY_PROMPT},
- {
- "role": "system",
- "content": f"L'analyse de l'image fournie est la suivante :\n{analysis_text}\n\n"
- }
- ]
- if has_text_flag:
- user_content = (
- f"{user_name} a posté un message contenant une image et a écrit avec : '{user_question}'. "
- "Réponds à l'utilisateur en te basant sur l'analyse, avec ta personnalité. "
- "Ne mentionne pas explicitement que l'analyse est pré-existante, fais comme si tu l'avais faite toi-même."
- )
- else:
- user_content = (
- f"{user_name} a partagé une image sans texte additionnel. "
- "Commente l'image en te basant sur l'analyse, avec ta personnalité. "
- "Ne mentionne pas que l'analyse a été fournie à l'avance, réagis comme si tu l'avais toi-même effectuée."
- )
- user_message = {"role": "user", "content": user_content}
- messages = system_messages + conversation_history + [user_message]
-
- reply = await call_openai_model(
- model="gpt-4o-mini",
- messages=messages,
- max_tokens=4096,
- temperature=1.0
- )
-
- return reply
- async def call_openai_api(user_text, user_name, image_data=None, detail='high'):
- """Appelle l'API OpenAI pour générer une réponse basée sur le texte et/ou l'image."""
- text = f"{user_name} dit : {user_text}"
- if image_data:
- text += " (a posté une image.)"
- message_to_send = {
- "role": "user",
- "content": [
- {"type": "text", "text": text}
- ]
- }
- if image_data:
- message_to_send["content"].append({
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{image_data}",
- "detail": detail
- }
- })
- # Obtenir la date et l'heure actuelles dans le fuseau horaire 'Europe/Paris'
- tz = pytz.timezone('Europe/Paris')
- current_datetime = datetime.now(tz).strftime('%d/%m/%Y %H:%M:%S %Z')
- # Créer un message système avec la date et l'heure
- date_message = {
- "role": "system",
- "content": f"Date et heure actuelles : {current_datetime}"
- }
- # Construire la liste des messages avec le message de date ajouté
- messages = [
- {"role": "system", "content": PERSONALITY_PROMPT},
- date_message # Ajout du message de date et heure
- ] + conversation_history + [message_to_send]
- reply = await call_openai_model(
- model="gpt-4o-mini",
- messages=messages,
- max_tokens=4096,
- temperature=1.0
- )
-
- return reply
- # =====================================
- # Gestion du Contenu de l'Historique
- # =====================================
- async def remove_old_image_analyses(db_manager, new_analysis=False):
- """Supprime les anciennes analyses d'images de l'historique."""
- global conversation_history, last_analysis_index, messages_since_last_analysis
- if new_analysis:
- logger.debug("Nouvelle analyse détectée. Suppression des anciennes analyses.")
- conversation_history = [
- msg for msg in conversation_history
- if not (msg.get("role") == "system" and msg.get("content", "").startswith("__IMAGE_ANALYSIS__:"))
- ]
- last_analysis_index = len(conversation_history)
- messages_since_last_analysis = 0
- # Supprimer les analyses d'images de la base de données
- db_manager.delete_old_image_analyses()
- else:
- # Exemple de logique additionnelle si nécessaire
- pass
- async def add_to_conversation_history(db_manager, new_message):
- global conversation_history, last_analysis_index, messages_since_last_analysis
- # Exclure le PERSONALITY_PROMPT de l'historique
- if new_message.get("role") == "system" and new_message.get("content") == PERSONALITY_PROMPT:
- logger.debug("PERSONALITY_PROMPT système non ajouté à l'historique.")
- return
- # Gérer les analyses d'images
- if new_message.get("role") == "system" and new_message.get("content", "").startswith("__IMAGE_ANALYSIS__:"):
- await remove_old_image_analyses(db_manager, new_analysis=True)
- # Ajouter le message à l'historique en mémoire
- conversation_history.append(new_message)
- # Sauvegarder dans la base de données
- db_manager.save_message(new_message.get("role"), new_message.get("content"))
- logger.debug(f"Message ajouté à l'historique. Taille actuelle : {len(conversation_history)}")
- # Mettre à jour les indices pour les analyses d'images
- if new_message.get("role") == "system" and new_message.get("content", "").startswith("__IMAGE_ANALYSIS__:"):
- last_analysis_index = len(conversation_history) - 1
- messages_since_last_analysis = 0
- else:
- await remove_old_image_analyses(db_manager, new_analysis=False)
- # Limiter l'historique à 50 messages
- if len(conversation_history) > 50:
- excess = len(conversation_history) - 50
- conversation_history = conversation_history[excess:]
- # Supprimer les messages les plus anciens de la base de données
- db_manager.delete_old_messages(excess)
- # =====================================
- # Gestion des Événements Discord
- # =====================================
- class MyDiscordBot(commands.Bot):
- def __init__(self, db_manager, **kwargs):
- super().__init__(**kwargs)
- self.db_manager = db_manager
- self.message_queue = asyncio.Queue()
- self.reminder_task = None
- self.random_message_delay = 240
- self.inactivity_task = None
- self.last_activity = datetime.now(pytz.timezone('Europe/Paris'))
- self.guild_id = GUILD_ID
- async def setup_hook(self):
- """Hook d'initialisation asynchrone pour configurer des tâches supplémentaires."""
- self.processing_task = asyncio.create_task(self.process_messages())
- self.reminder_task = asyncio.create_task(self.process_reminders())
- self.inactivity_task = asyncio.create_task(self.monitor_inactivity())
- # Charger les commandes slash
- await self.add_cog(AdminCommands(self, self.db_manager))
- await self.add_cog(ReminderCommands(self, self.db_manager))
- await self.add_cog(HelpCommands(self))
- await self.tree.sync() # Synchroniser les commandes slash
- async def close(self):
- if openai_client:
- await openai_client.close()
- self.db_manager.close_connection()
- self.processing_task.cancel()
- if self.reminder_task:
- self.reminder_task.cancel()
- if self.inactivity_task:
- self.inactivity_task.cancel()
- await super().close()
- async def get_personalized_reminder(self, content, user):
- """Utilise l'API OpenAI pour personnaliser le contenu du rappel."""
- messages = [
- {"role": "system", "content": PERSONALITY_PROMPT},
- {"role": "user", "content": f"Personnalise le rappel suivant pour {user.name} : {content}"}
- ]
- reply = await call_openai_model(
- model="gpt-4o-mini",
- messages=messages,
- max_tokens=4096,
- temperature=1.0
- )
- return reply if reply else content
- async def on_ready(self):
- """Événement déclenché lorsque le bot est prêt."""
- logger.info(f'{BOT_NAME} connecté en tant que {self.user}')
- if not conversation_history:
- logger.info("Aucun historique trouvé. L'historique commence vide.")
- # Envoyer un message de version dans le canal Discord
- channel = self.get_channel(chatgpt_channel_id)
- if channel:
- try:
- embed = discord.Embed(
- title="Bot Démarré",
- description=f"🎉 {BOT_NAME} est en ligne ! Version {BOT_VERSION}",
- color=0x00ff00 # Vert
- )
- await channel.send(embed=embed)
- logger.info(f"Message de connexion envoyé dans le canal ID {chatgpt_channel_id}")
- except discord.Forbidden:
- logger.error(f"Permissions insuffisantes pour envoyer des messages dans le canal ID {chatgpt_channel_id}.")
- except discord.HTTPException as e:
- logger.error(f"Erreur lors de l'envoi du message de connexion : {e}")
- else:
- logger.error(f"Canal avec ID {chatgpt_channel_id} non trouvé.")
- async def on_message(self, message):
- """Événement déclenché lorsqu'un message est envoyé dans un canal suivi."""
- # Ignorer les messages provenant d'autres canaux ou du bot lui-même
- if message.channel.id != chatgpt_channel_id or message.author == self.user:
- return
- # Mettre à jour le dernier temps d'activité
- self.last_activity = datetime.now(pytz.timezone('Europe/Paris'))
- await self.message_queue.put(message)
- async def monitor_inactivity(self):
- """Tâche en arrière-plan pour surveiller l'inactivité et envoyer des messages aléatoires."""
- await self.wait_until_ready()
- while not self.is_closed():
- try:
- # Calculer le temps écoulé depuis la dernière activité
- now = datetime.now(pytz.timezone('Europe/Paris'))
- elapsed = (now - self.last_activity).total_seconds() / 60 # en minutes
- if elapsed >= self.random_message_delay:
- # Vérifier si on est en dehors des heures silencieuses (minuit à 7h)
- if not (now.hour >= 0 and now.hour < 7):
- await self.perform_random_action()
- # Réinitialiser le dernier temps d'activité
- self.last_activity = now
- await asyncio.sleep(60) # Vérifier toutes les minutes
- except Exception as e:
- logger.error(f"Erreur dans la tâche de surveillance d'inactivité: {e}")
- await asyncio.sleep(60)
- async def perform_random_action(self):
- """Effectue l'action aléatoire de réagir à l'activité d'un membre."""
- guild = self.get_guild(self.guild_id) # Assurez-vous que self.guild_id est défini
- if not guild:
- logger.error("Guild non trouvée.")
- return
- # Obtenir les membres avec le rôle spécifique
- specific_role = get(guild.roles, name=SPECIFIC_ROLE_NAME)
- if not specific_role:
- logger.error(f"Rôle '{SPECIFIC_ROLE_NAME}' non trouvé dans la guild.")
- return
- active_members = [member for member in guild.members if specific_role in member.roles and member.activity]
- if active_members:
- # Sélectionner un membre aléatoire
- selected_member = random.choice(active_members)
- activity = selected_member.activity
- # Récupérer les informations d'activité
- # Vérifier si l'activité est de type Spotify
- if isinstance(activity, discord.Spotify):
- activity_details = (
- f"Spotify - {activity.title} by {', '.join(activity.artists)}"
- f" from the album {activity.album}"
- )
- else:
- # Pour d'autres types d'activités
- activity_details = f"{activity.type.name} - {activity.name}" if activity else "Aucune activité spécifique."
- # Préparer le message à envoyer à OpenAI
- messages = [
- {"role": "system", "content": PERSONALITY_PROMPT},
- {"role": "user", "content": f"L'utilisateur {selected_member.mention} est actuellement actif: {activity_details}. Réagis à ce que l'utilisateur est en train de faire en t'adressant à lui et en le citant."}
- ]
- reply = await call_openai_model(
- model="gpt-4o-mini",
- messages=messages,
- max_tokens=4096,
- temperature=1.0
- )
- if reply:
- channel = self.get_channel(chatgpt_channel_id)
- if channel:
- await channel.send(reply)
- self.db_manager.save_message('assistant', reply)
- conversation_history.append({
- "role": "assistant",
- "content": reply
- })
- logger.info(f"Message aléatoire posté par le bot.")
- else:
- logger.warning("OpenAI n'a pas généré de réponse pour l'activité du membre.")
- else:
- # Aucun membre actif, envoyer un message de boredom
- messages = [
- {"role": "system", "content": PERSONALITY_PROMPT},
- {"role": "user", "content": "Personne ne fait quoi que ce soit et on s'ennuie ici. Génère un message approprié avec ta personnalité."}
- ]
- reply = await call_openai_model(
- model="gpt-4o-mini",
- messages=messages,
- max_tokens=4096,
- temperature=1.0
- )
- if reply:
- channel = self.get_channel(chatgpt_channel_id)
- if channel:
- await channel.send(reply)
- self.db_manager.save_message('assistant', reply)
- conversation_history.append({
- "role": "assistant",
- "content": reply
- })
- logger.info(f"Message d'ennui posté par le bot.")
- else:
- logger.warning("OpenAI n'a pas généré de réponse pour l'état d'ennui.")
- # Actualiser le délai aléatoire
- self.random_message_delay = random.randint(180, 360)
- logger.info(f"`random_message_delay` mis à jour à {self.random_message_delay} minutes.")
- async def process_reminders(self):
- """Tâche en arrière-plan pour vérifier et envoyer les rappels."""
- await self.wait_until_ready()
- while not self.is_closed():
- try:
- now = datetime.now(pytz.timezone('Europe/Paris')) # Utiliser le même fuseau horaire
- reminders = self.db_manager.get_due_reminders(now.strftime('%Y-%m-%d %H:%M:%S'))
- for reminder in reminders:
- try:
- user = await self.fetch_user(int(reminder['user_id']))
- except discord.NotFound:
- logger.error(f"Utilisateur avec l'ID {reminder['user_id']} non trouvé.")
- continue
- except discord.HTTPException as e:
- logger.error(f"Erreur lors de la récupération de l'utilisateur {reminder['user_id']}: {e}")
- continue
- try:
- channel = await self.fetch_channel(int(reminder['channel_id']))
- except discord.NotFound:
- logger.error(f"Canal avec l'ID {reminder['channel_id']} non trouvé.")
- continue
- except discord.HTTPException as e:
- logger.error(f"Erreur lors de la récupération du canal {reminder['channel_id']}: {e}")
- continue
- if channel and user:
- personalized_content = await self.get_personalized_reminder(reminder['content'], user)
- try:
- reminder_message = f"{user.mention} 🕒 Rappel : {personalized_content}"
- await channel.send(reminder_message)
- logger.info(f"Rappel envoyé à {user} dans le canal {channel}.")
- self.db_manager.save_message('assistant', reminder_message)
- conversation_history.append({
- "role": "assistant",
- "content": reminder_message
- })
- except discord.Forbidden:
- logger.error(f"Permissions insuffisantes pour envoyer des messages dans le canal {channel}.")
- except discord.HTTPException as e:
- logger.error(f"Erreur lors de l'envoi du message dans le canal {channel}: {e}")
- else:
- logger.warning(f"Canal ou utilisateur introuvable pour le rappel ID {reminder['id']}.")
- # Supprimer le rappel après envoi
- self.db_manager.delete_reminder(reminder['id'])
- await asyncio.sleep(60) # Vérifier toutes les minutes
- except Exception as e:
- logger.error(f"Erreur dans la tâche de rappels: {e}")
- await asyncio.sleep(60)
- async def process_messages(self):
- """Tâche en arrière-plan pour traiter les messages séquentiellement."""
- while True:
- message = await self.message_queue.get()
- try:
- await self.handle_message(message)
- except Exception as e:
- logger.error(f"Erreur lors du traitement du message : {e}")
- try:
- await message.channel.send("Une erreur est survenue lors du traitement de votre message.")
- except Exception as send_error:
- logger.error(f"Erreur lors de l'envoi du message d'erreur : {send_error}")
- finally:
- self.message_queue.task_done()
- async def handle_message(self, message):
- """Fonction pour traiter un seul message."""
- global conversation_history, last_analysis_index, messages_since_last_analysis
- user_text = message.content.strip()
- # Traiter les pièces jointes
- image_data = None
- file_content = None
- attachment_filename = None
- allowed_extensions = ['.txt', '.py', '.html', '.css', '.js']
- if message.attachments:
- for attachment in message.attachments:
- if any(attachment.filename.lower().endswith(ext) for ext in allowed_extensions):
- file_content = await read_text_file(attachment)
- attachment_filename = attachment.filename
- break
- elif attachment.content_type and attachment.content_type.startswith('image/'):
- image_data = await encode_image_from_attachment(attachment, mode='high')
- break
- # Traitement des images
- if image_data:
- has_user_text = has_text(user_text)
- user_text_to_use = user_text if has_user_text else None
- temp_msg = await message.channel.send(f"*{BOT_NAME} observe l'image...*")
- try:
- # Analyser l'image avec GPT-4o
- analysis = await call_gpt4o_for_image_analysis(image_data, user_text=user_text_to_use)
- if analysis:
- # Ajouter l'analyse à l'historique
- analysis_message = {
- "role": "system",
- "content": f"__IMAGE_ANALYSIS__:{analysis}"
- }
- await add_to_conversation_history(self.db_manager, analysis_message)
- # Générer une réponse basée sur l'analyse
- reply = await call_gpt4o_mini_with_analysis(analysis, message.author.name, user_text, has_user_text)
- if reply:
- await temp_msg.delete()
- await message.channel.send(reply)
- # Construire et ajouter les messages à l'historique
- user_message_text = f"{user_text} (a posté une image.)" if has_user_text else (
- "Une image a été postée, mais elle n'est pas disponible pour analyse directe. Veuillez vous baser uniquement sur l'analyse fournie."
- )
- user_message = {
- "role": "user",
- "content": f"{message.author.name} dit : {user_message_text}"
- }
- assistant_message = {
- "role": "assistant",
- "content": reply
- }
- await add_to_conversation_history(self.db_manager, user_message)
- await add_to_conversation_history(self.db_manager, assistant_message)
- else:
- await temp_msg.delete()
- await message.channel.send("Désolé, je n'ai pas pu générer une réponse.")
- else:
- await temp_msg.delete()
- await message.channel.send("Désolé, je n'ai pas pu analyser l'image.")
- except Exception as e:
- await temp_msg.delete()
- await message.channel.send("Une erreur est survenue lors du traitement de l'image.")
- logger.error(f"Erreur lors du traitement de l'image: {e}")
- return # Ne pas continuer le traitement après une image
- # Ajouter le contenu du fichier au texte de l'utilisateur si un fichier est présent
- if file_content:
- user_text += f"\nContenu du fichier {attachment_filename}:\n{file_content}"
- # Vérifier si le texte n'est pas vide
- if not has_text(user_text):
- return # Ne pas appeler l'API si le texte est vide
- async with message.channel.typing():
- try:
- # Appeler l'API OpenAI pour le texte
- reply = await call_openai_api(user_text, message.author.name)
- if reply:
- # Diviser le message en plusieurs parties si nécessaire
- message_parts = split_message(reply)
- for part in message_parts:
- await message.channel.send(part)
- # Construire et ajouter les messages à l'historique
- user_message = {
- "role": "user",
- "content": f"{message.author.name} dit : {user_text}"
- }
- assistant_message = {
- "role": "assistant",
- "content": reply
- }
- await add_to_conversation_history(self.db_manager, user_message)
- await add_to_conversation_history(self.db_manager, assistant_message)
- else:
- await message.channel.send("Désolé, je n'ai pas pu générer une réponse.")
- except Exception as e:
- await message.channel.send("Une erreur est survenue lors de la génération de la réponse.")
- logger.error(f"Erreur lors du traitement du texte: {e}")
- # ============================
- # Commandes Slash via Cogs
- # ============================
- class AdminCommands(commands.Cog):
- """Cog pour les commandes administratives."""
- def __init__(self, bot: commands.Bot, db_manager):
- self.bot = bot
- self.db_manager = db_manager
- @app_commands.command(name="reset_history", description="Réinitialise l'historique des conversations.")
- @app_commands.checks.has_permissions(administrator=True)
- @admin_command
- async def reset_history(self, interaction: discord.Interaction):
- """Réinitialise l'historique des conversations."""
- global conversation_history
- conversation_history = []
- self.db_manager.reset_history()
- await interaction.response.send_message("✅ L'historique des conversations a été réinitialisé.")
- @reset_history.error
- async def reset_history_error(self, interaction: discord.Interaction, error):
- """Gère les erreurs de la commande reset_history."""
- if isinstance(error, app_commands.CheckFailure):
- await interaction.response.send_message("❌ Vous n'avez pas la permission d'utiliser cette commande.")
- else:
- logger.error(f"Erreur lors de l'exécution de la commande reset_history: {error}")
- await interaction.response.send_message("Une erreur est survenue lors de l'exécution de la commande.")
- class ReminderCommands(commands.Cog):
- """Cog pour les commandes de rappel."""
- def __init__(self, bot: commands.Bot, db_manager: DatabaseManager):
- self.bot = bot
- self.db_manager = db_manager
- @app_commands.command(name="rappel", description="Créer un rappel")
- @app_commands.describe(date="Date du rappel (DD/MM/YYYY)")
- @app_commands.describe(time="Heure du rappel (HH:MM, 24h)")
- @app_commands.describe(content="Contenu du rappel")
- async def rappel(self, interaction: discord.Interaction, date: str, time: str, content: str):
- """Commande pour créer un rappel."""
- user = interaction.user
- channel = interaction.channel
- # Valider et parser la date et l'heure
- try:
- remind_datetime_str = f"{date} {time}"
- remind_datetime = datetime.strptime(remind_datetime_str, "%d/%m/%Y %H:%M")
- # Vous pouvez ajuster le fuseau horaire selon vos besoins
- tz = pytz.timezone('Europe/Paris') # Exemple de fuseau horaire
- remind_datetime = tz.localize(remind_datetime)
- now = datetime.now(tz)
- if remind_datetime <= now:
- await interaction.response.send_message("❌ La date et l'heure doivent être dans le futur.", ephemeral=True)
- return
- except ValueError:
- await interaction.response.send_message("❌ Format de date ou d'heure invalide. Utilisez DD/MM/YYYY pour la date et HH:MM pour l'heure.", ephemeral=True)
- return
- # Ajouter le rappel à la base de données
- self.db_manager.add_reminder(
- user_id=str(user.id),
- channel_id=str(channel.id),
- remind_at=remind_datetime.strftime('%Y-%m-%d %H:%M:%S'),
- content=content
- )
- # Créer un embed pour la confirmation
- embed = discord.Embed(
- title="Rappel Créé ✅",
- description=(
- f"**Date et Heure** : {remind_datetime.strftime('%d/%m/%Y %H:%M')}\n"
- f"**Contenu** : {content}"
- ),
- color=0x00ff00, # Vert
- timestamp=datetime.now(timezone.utc)
- )
- embed.set_footer(text=f"Créé par {user}", icon_url=user.display_avatar.url if user.avatar else user.default_avatar.url)
- # Envoyer l'embed de confirmation
- await interaction.response.send_message(embed=embed)
- @rappel.error
- async def rappel_error(self, interaction: discord.Interaction, error):
- """Gère les erreurs de la commande rappel."""
- logger.error(f"Erreur lors de l'exécution de la commande rappel: {error}")
- await interaction.response.send_message("❌ Une erreur est survenue lors de la création du rappel.")
- @app_commands.command(name="mes_rappels", description="Voir tous vos rappels enregistrés à venir.")
- async def mes_rappels(self, interaction: discord.Interaction):
- """Commande pour voir tous les rappels de l'utilisateur."""
- user = interaction.user
- reminders = self.db_manager.get_user_reminders(str(user.id))
- if not reminders:
- await interaction.response.send_message(
- "🕒 Vous n'avez aucun rappel enregistré à venir.",
- ephemeral=True
- )
- return
- # Créer l'embed
- embed = discord.Embed(
- title="📋 Vos Rappels à Venir",
- description=f"Voici la liste de vos rappels enregistrés :",
- color=0x00ff00, # Vert
- timestamp=datetime.now(timezone.utc)
- )
- embed.set_footer(text=f"Demandé par {user}", icon_url=user.display_avatar.url if user.avatar else user.default_avatar.url)
- # Ajouter chaque rappel comme un champ dans l'embed
- for reminder in reminders:
- remind_at = reminder['remind_at']
- remind_at_formatted = remind_at.strftime('%d/%m/%Y %H:%M')
- embed.add_field(
- name=f"ID {reminder['id']} - {remind_at_formatted}",
- value=reminder['content'],
- inline=False
- )
- await interaction.response.send_message(embed=embed)
- @app_commands.command(name="supprimer_rappel", description="Supprime un de vos rappels à venir en utilisant son ID.")
- @app_commands.describe(id="L'ID du rappel à supprimer")
- async def supprimer_rappel(self, interaction: discord.Interaction, id: int):
- """Commande pour supprimer un rappel spécifique."""
- user = interaction.user
- reminder_id = id
- # Récupérer le rappel par ID
- reminder = self.db_manager.get_reminder_by_id(reminder_id)
- if not reminder:
- await interaction.response.send_message(
- f"❌ Aucun rappel trouvé avec l'ID `{reminder_id}`.",
- ephemeral=True
- )
- return
- # Vérifier si le rappel appartient à l'utilisateur
- if reminder['user_id'] != str(user.id):
- await interaction.response.send_message(
- "❌ Vous ne pouvez supprimer que vos propres rappels.",
- ephemeral=True
- )
- return
- # Supprimer le rappel
- self.db_manager.delete_reminder(reminder_id)
- # Confirmer la suppression à l'utilisateur
- embed = discord.Embed(
- title="Rappel Supprimé ✅",
- description=f"Le rappel avec l'ID `{reminder_id}` et le contenu \"{reminder['content']}\" a été supprimé avec succès.",
- color=0xff0000, # Rouge
- timestamp=datetime.now(timezone.utc)
- )
- embed.set_footer(text=f"Supprimé par {user}", icon_url=user.display_avatar.url if user.avatar else user.default_avatar.url)
- await interaction.response.send_message(embed=embed)
- @supprimer_rappel.error
- async def supprimer_rappel_error(self, interaction: discord.Interaction, error):
- """Gère les erreurs de la commande supprimer_rappel."""
- logger.error(f"Erreur lors de l'exécution de la commande supprimer_rappel: {error}")
- await interaction.response.send_message("❌ Une erreur est survenue lors de la suppression du rappel.", ephemeral=True)
- class HelpCommands(commands.Cog):
- """Cog pour la commande /help."""
- def __init__(self, bot: commands.Bot):
- self.bot = bot
- @app_commands.command(name="help", description="Liste toutes les commandes disponibles.")
- async def help(self, interaction: discord.Interaction):
- """Commande /help qui liste toutes les commandes disponibles dans un embed."""
- general_commands = []
- admin_commands = []
- # Parcourir toutes les commandes de l'arbre de commandes du bot
- for command in self.bot.tree.get_commands():
- # Ignorer la commande /help elle-même pour éviter l'auto-inclusion
- if command.name == "help":
- continue
- # Déterminer si la commande est réservée aux administrateurs en vérifiant l'attribut personnalisé
- is_admin = getattr(command.callback, 'is_admin', False)
- # Ajouter la commande à la liste appropriée
- if is_admin:
- admin_commands.append((command.name, command.description))
- else:
- general_commands.append((command.name, command.description))
- # Créer l'embed
- embed = discord.Embed(
- title="📚 Liste des Commandes",
- description="Voici la liste des commandes disponibles :",
- color=0x00ff00
- )
- if general_commands:
- general_desc = "\n".join([f"`/{name}` - {desc}" for name, desc in general_commands])
- embed.add_field(name="Commandes Générales", value=general_desc, inline=False)
- if admin_commands:
- admin_desc = "\n".join([f"`/{name}` - {desc} *(Admin)*" for name, desc in admin_commands])
- embed.add_field(name="Commandes Administratives", value=admin_desc, inline=False)
- embed.set_footer(text=f"Demandé par {interaction.user}", icon_url=interaction.user.display_avatar.url if interaction.user.avatar else interaction.user.default_avatar.url)
- # Envoyer l'embed en réponse
- await interaction.response.send_message(embed=embed)
- async def setup(bot: commands.Bot):
- await bot.add_cog(HelpCommands(bot))
- # ============================
- # Démarrage du Bot Discord
- # ============================
- def main():
- db_manager = DatabaseManager()
- if not db_manager.connection:
- logger.error("Le bot ne peut pas démarrer sans connexion à la base de données.")
- return
- db_manager.load_conversation_history()
- # Initialiser le bot avec le préfixe "!" et les intents définis
- bot = MyDiscordBot(command_prefix="!", db_manager=db_manager, intents=intents)
- # Démarrer le bot
- try:
- bot.run(DISCORD_TOKEN)
- except Exception as e:
- logger.error(f"Erreur lors du démarrage du bot Discord: {e}")
- finally:
- db_manager.close_connection()
- if __name__ == "__main__":
- main()
|