1
0

chatbot.py 17 KB


  1. import discord
  2. from discord.ext import commands
  3. import requests
  4. import json
  5. import os
  6. import random
  7. import re
  8. from dotenv import load_dotenv
  9. import logging
  10. from logging.handlers import RotatingFileHandler
  11. # Configuration du logger
  12. logger = logging.getLogger('discord_bot')
  13. logger.setLevel(logging.INFO)
  14. formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
  15. # Créer un gestionnaire de fichier avec rotation
  16. file_handler = RotatingFileHandler('bot.log', maxBytes=5*1024*1024, backupCount=2) # 5 Mo par fichier, garder 2 sauvegardes
  17. file_handler.setFormatter(formatter)
  18. logger.addHandler(file_handler)
  19. # Optionnel : ajouter un gestionnaire de console pour afficher les logs dans la console
  20. console_handler = logging.StreamHandler()
  21. console_handler.setFormatter(formatter)
  22. logger.addHandler(console_handler)
  23. # Charger les variables d'environnement
  24. load_dotenv()
  25. # Version du bot
  26. VERSION = "4.2.0" # Modifiable selon la version actuelle
  27. # Récupérer les variables d'environnement avec validation
  28. def get_env_variable(var_name, is_critical=True, default=None, var_type=str):
  29. value = os.getenv(var_name)
  30. if value is None:
  31. if is_critical:
  32. logger.error(f"Variable d'environnement critique manquante: {var_name}")
  33. if default is not None:
  34. logger.warning(f"Utilisation de la valeur par défaut pour {var_name}")
  35. return default
  36. else:
  37. raise ValueError(f"La variable d'environnement {var_name} est requise mais non définie.")
  38. else:
  39. logger.warning(f"Variable d'environnement non critique manquante: {var_name}. Utilisation de la valeur par défaut: {default}")
  40. return default
  41. if var_type == int:
  42. try:
  43. return int(value)
  44. except ValueError:
  45. logger.error(f"La variable d'environnement {var_name} doit être un entier. Valeur actuelle: {value}")
  46. if default is not None:
  47. return default
  48. else:
  49. raise ValueError(f"La variable d'environnement {var_name} doit être un entier.")
  50. return value
  51. try:
  52. # Variables d'environnement critiques
  53. MISTRAL_API_KEY = get_env_variable('MISTRAL_API_KEY')
  54. DISCORD_TOKEN = get_env_variable('DISCORD_TOKEN')
  55. CHANNEL_ID = get_env_variable('CHANNEL_ID', var_type=int)
  56. # Variables d'environnement non critiques avec valeurs par défaut
  57. MAX_HISTORY_LENGTH = get_env_variable('MAX_HISTORY_LENGTH', is_critical=False, default=10, var_type=int)
  58. HISTORY_FILE = get_env_variable('HISTORY_FILE', is_critical=False, default="conversation_history.json")
  59. logger.info("Toutes les variables d'environnement critiques ont été chargées avec succès.")
  60. except ValueError as e:
  61. logger.error(f"Erreur lors du chargement des variables d'environnement: {e}")
  62. # Si une variable critique est manquante, le bot ne peut pas fonctionner correctement.
  63. # Il est donc préférable de quitter le programme avec un code d'erreur.
  64. exit(1)
  65. # Endpoint API Mistral
  66. MISTRAL_API_URL = "https://api.mistral.ai/v1/chat/completions"
  67. def load_history():
  68. """Charge l'historique depuis un fichier JSON."""
  69. if os.path.exists(HISTORY_FILE):
  70. with open(HISTORY_FILE, 'r', encoding='utf-8') as f:
  71. try:
  72. data = json.load(f)
  73. # Vérifier et limiter la taille de chaque historique
  74. for channel_id in data:
  75. if "messages" in data[channel_id]:
  76. if len(data[channel_id]["messages"]) > MAX_HISTORY_LENGTH:
  77. data[channel_id]["messages"] = data[channel_id]["messages"][-MAX_HISTORY_LENGTH:]
  78. return data
  79. except json.JSONDecodeError:
  80. logger.error("Erreur de lecture du fichier d'historique. Création d'un nouveau fichier.")
  81. return {}
  82. return {}
  83. def save_history(history):
  84. """Sauvegarde l'historique dans un fichier JSON."""
  85. with open(HISTORY_FILE, 'w', encoding='utf-8') as f:
  86. json.dump(history, f, ensure_ascii=False, indent=4)
  87. def get_personality_prompt():
  88. try:
  89. with open('personality_prompt.txt', 'r', encoding='utf-8') as file:
  90. return file.read().strip()
  91. except FileNotFoundError:
  92. logger.error("Le fichier personality_prompt.txt n'a pas été trouvé. Utilisation d'un prompt par défaut.")
  93. return """Tu es un assistant utile et poli qui peut analyser des images.
  94. Quand on te montre une image, décris-la et donne ton avis si on te le demande.
  95. Réponds toujours en français avec un ton naturel et amical.
  96. Lorsque tu analyses une image, décris d'abord ce que tu vois en détail,
  97. puis réponds à la question si elle est posée. Utilise un langage clair et accessible."""
  98. # Charger l'historique au démarrage
  99. conversation_history = load_history()
  100. intents = discord.Intents.default()
  101. intents.messages = True
  102. intents.message_content = True
  103. intents.presences = True
  104. bot = commands.Bot(command_prefix='!', intents=intents)
  105. @bot.event
  106. async def on_ready():
  107. logger.info(f'Le bot est connecté en tant que {bot.user}')
  108. global conversation_history
  109. conversation_history = load_history()
  110. # Récupérer le canal spécifié
  111. channel = bot.get_channel(CHANNEL_ID)
  112. if channel is not None:
  113. # Trouver la guilde (serveur) à laquelle appartient le canal
  114. guild = channel.guild
  115. if guild is not None:
  116. # Récupérer le membre du bot sur cette guilde
  117. bot_member = guild.me
  118. # Récupérer le pseudo du bot sur cette guilde
  119. bot_nickname = bot_member.display_name
  120. else:
  121. bot_nickname = bot.user.name # Utiliser le nom global si la guilde n'est pas trouvée
  122. # Créer un embed avec le pseudo du bot
  123. embed = discord.Embed(
  124. title="Bot en ligne",
  125. description=f"{bot_nickname} est désormais en ligne. Version {VERSION}.",
  126. color=discord.Color.green()
  127. )
  128. # Envoyer l'embed dans le canal
  129. await channel.send(embed=embed)
  130. def call_mistral_api(prompt, history, image_url=None, user_id=None, username=None):
  131. headers = {
  132. "Content-Type": "application/json",
  133. "Authorization": f"Bearer {MISTRAL_API_KEY}"
  134. }
  135. personality_prompt = get_personality_prompt()
  136. # Vérifier si la structure messages existe
  137. if "messages" not in history:
  138. history["messages"] = []
  139. # Création du message utilisateur selon qu'il y a une image ou non
  140. if image_url:
  141. # Format multimodal pour les messages avec image
  142. user_content = [
  143. {"type": "text", "text": f"{username}: {prompt}" if username else prompt},
  144. {
  145. "type": "image_url",
  146. "image_url": {
  147. "url": image_url,
  148. "detail": "high" # Demander une analyse détaillée de l'image
  149. }
  150. }
  151. ]
  152. user_message = {
  153. "role": "user",
  154. "content": user_content
  155. }
  156. else:
  157. # Format standard pour les messages texte seulement
  158. user_content = f"{username}: {prompt}" if username else prompt
  159. user_message = {"role": "user", "content": user_content}
  160. # Ajouter le message utilisateur à l'historique
  161. history["messages"].append(user_message)
  162. # Limiter l'historique à MAX_HISTORY_LENGTH messages
  163. if len(history["messages"]) > MAX_HISTORY_LENGTH:
  164. history["messages"] = history["messages"][-MAX_HISTORY_LENGTH:]
  165. # Préparer les messages pour l'API
  166. messages = []
  167. # Ajouter le message système en premier
  168. messages.append({"role": "system", "content": personality_prompt})
  169. # Ajouter l'historique des messages (en gardant le format)
  170. for msg in history["messages"]:
  171. if isinstance(msg["content"], list): # C'est un message multimodal
  172. messages.append({
  173. "role": msg["role"],
  174. "content": msg["content"]
  175. })
  176. else: # C'est un message texte standard
  177. messages.append({
  178. "role": msg["role"],
  179. "content": msg["content"]
  180. })
  181. data = {
  182. "model": "pixtral-large-latest",
  183. "messages": messages,
  184. "max_tokens": 1000
  185. }
  186. try:
  187. response = requests.post(MISTRAL_API_URL, headers=headers, data=json.dumps(data))
  188. response.raise_for_status() # Lève une exception pour les erreurs HTTP
  189. if response.status_code == 200:
  190. response_data = response.json()
  191. # Vérifier si la réponse contient bien le champ attendu
  192. if 'choices' in response_data and len(response_data['choices']) > 0:
  193. assistant_response = response_data['choices'][0]['message']['content']
  194. # Ajouter la réponse de l'assistant à l'historique
  195. history["messages"].append({"role": "assistant", "content": assistant_response})
  196. # Limiter à nouveau après avoir ajouté la réponse
  197. if len(history["messages"]) > MAX_HISTORY_LENGTH:
  198. history["messages"] = history["messages"][-MAX_HISTORY_LENGTH:]
  199. # Sauvegarder l'historique après chaque modification
  200. save_history(conversation_history)
  201. return assistant_response
  202. else:
  203. logger.error(f"Réponse API inattendue: {response_data}")
  204. return "Désolé, je n'ai pas reçu de réponse valide de l'API."
  205. else:
  206. return f"Erreur API: {response.status_code}"
  207. except requests.exceptions.RequestException as e:
  208. logger.error(f"Erreur lors de l'appel API: {e}")
  209. return "Désolé, une erreur réseau est survenue lors de la communication avec l'API."
  210. @bot.command(name='reset')
  211. async def reset_history(ctx):
  212. channel_id = str(ctx.channel.id)
  213. if channel_id in conversation_history:
  214. # Conserver le même ID de conversation mais vider les messages
  215. if "messages" in conversation_history[channel_id]:
  216. conversation_history[channel_id]["messages"] = []
  217. else:
  218. conversation_history[channel_id] = {
  219. "conversation_id": conversation_history[channel_id].get("conversation_id", str(len(conversation_history) + 1)),
  220. "messages": []
  221. }
  222. save_history(conversation_history)
  223. await ctx.send("L'historique de conversation a été réinitialisé.")
  224. else:
  225. conversation_id = str(len(conversation_history) + 1)
  226. conversation_history[channel_id] = {
  227. "conversation_id": conversation_id,
  228. "messages": []
  229. }
  230. save_history(conversation_history)
  231. await ctx.send("Aucun historique de conversation trouvé pour ce channel. Créé un nouvel historique.")
  232. @bot.event
  233. async def on_message(message):
  234. # Ignorer les messages du bot lui-même
  235. if message.author == bot.user:
  236. return
  237. # Vérifier si le message provient du channel spécifique
  238. if message.channel.id != CHANNEL_ID:
  239. return
  240. # Vérifier si le message contient des stickers
  241. if message.stickers:
  242. # Obtenir le serveur (guild) du message
  243. guild = message.guild
  244. if guild:
  245. # Obtenir la liste des stickers personnalisés du serveur
  246. stickers = guild.stickers
  247. if stickers:
  248. # Mélanger la liste des stickers pour essayer dans un ordre aléatoire
  249. random_stickers = random.sample(stickers, len(stickers))
  250. for sticker in random_stickers:
  251. try:
  252. logger.info(f"Envoi du sticker: {sticker.name} (ID: {sticker.id})")
  253. await message.channel.send(stickers=[sticker])
  254. break # Si ça marche, on sort de la boucle
  255. except discord.errors.Forbidden as e:
  256. logger.error(f"Erreur lors de l'envoi du sticker: {sticker.name} (ID: {sticker.id}). Erreur: {e}")
  257. continue
  258. else:
  259. # Si aucun sticker n'a pu être envoyé
  260. logger.error("Aucun sticker utilisable trouvé sur ce serveur.")
  261. await message.channel.send("Aucun sticker utilisable trouvé sur ce serveur.")
  262. else:
  263. await message.channel.send("Aucun sticker personnalisé trouvé sur ce serveur.")
  264. else:
  265. await message.channel.send("Ce message ne provient pas d'un serveur.")
  266. return
  267. # Vérifier si le message contient uniquement un emoji personnalisé
  268. emoji_pattern = re.compile(r'^<a?:\w+:\d+>$|^[\u2600-\u27BF\u1F300-\u1F6FF\u1F900-\u1F9FF]+$', re.UNICODE)
  269. content = message.content.strip()
  270. if emoji_pattern.match(content):
  271. guild = message.guild
  272. if guild and guild.emojis:
  273. # Choisir un emoji aléatoire parmi ceux du serveur
  274. random_emoji = random.choice(guild.emojis)
  275. try:
  276. await message.channel.send(str(random_emoji))
  277. return # Retourner pour éviter que le reste du code ne s'exécute
  278. except discord.errors.Forbidden as e:
  279. logger.error(f"Erreur lors de l'envoi de l'emoji: {random_emoji.name} (ID: {random_emoji.id}). Erreur: {e}")
  280. await message.channel.send("Je n'ai pas pu envoyer d'emoji en réponse.")
  281. else:
  282. await message.channel.send("Aucun emoji personnalisé trouvé sur ce serveur.")
  283. return
  284. # Si le message commence par le préfixe du bot, traiter comme une commande
  285. if message.content.startswith('!'):
  286. await bot.process_commands(message)
  287. return
  288. # Résolution des mentions dans le message
  289. resolved_content = message.content
  290. for user in message.mentions:
  291. # Remplacer chaque mention par le nom d'utilisateur
  292. resolved_content = resolved_content.replace(f"<@{user.id}>", f"@{user.display_name}")
  293. # Vérifier les pièces jointes pour les images
  294. if message.attachments:
  295. image_count = 0
  296. too_large_images = []
  297. max_size = 5 * 1024 * 1024 # 5 Mo en octets
  298. for attachment in message.attachments:
  299. if attachment.content_type and attachment.content_type.startswith('image/'):
  300. image_count += 1
  301. if attachment.size > max_size:
  302. too_large_images.append(attachment.filename)
  303. # Vérifier le nombre d'images
  304. if image_count > 1:
  305. await message.channel.send("Erreur : Vous ne pouvez pas envoyer plus de trois images dans un seul message. Veuillez diviser votre envoi en plusieurs messages.")
  306. return
  307. # Vérifier la taille des images
  308. if too_large_images:
  309. image_list = ", ".join(too_large_images)
  310. await message.channel.send(f"Erreur : Les images suivantes dépassent la limite de 5 Mo : {image_list}. Veuillez envoyer des images plus petites.")
  311. return
  312. # Récupérer ou initialiser l'historique pour ce channel
  313. channel_id = str(message.channel.id)
  314. global conversation_history
  315. # Charger l'historique actuel
  316. conversation_history = load_history()
  317. if channel_id not in conversation_history:
  318. conversation_id = str(len(conversation_history) + 1)
  319. conversation_history[channel_id] = {
  320. "conversation_id": conversation_id,
  321. "messages": []
  322. }
  323. # Assurer que la clé messages existe
  324. if "messages" not in conversation_history[channel_id]:
  325. conversation_history[channel_id]["messages"] = []
  326. # Traitement des images dans le message
  327. image_url = None
  328. if message.attachments:
  329. for attachment in message.attachments:
  330. if attachment.content_type and attachment.content_type.startswith('image/'):
  331. image_url = attachment.url
  332. break
  333. # Utiliser le contenu résolu (avec les mentions remplacées)
  334. prompt = resolved_content
  335. # Indiquer que le bot est en train de taper
  336. async with message.channel.typing():
  337. try:
  338. # Appeler l'API Mistral avec l'historique de conversation, l'image si présente, et les infos de l'utilisateur
  339. response = call_mistral_api(
  340. prompt,
  341. conversation_history[channel_id],
  342. image_url,
  343. user_id=str(message.author.id),
  344. username=message.author.display_name
  345. )
  346. await message.channel.send(response)
  347. except Exception as e:
  348. logger.error(f"Erreur lors de l'appel à l'API: {e}")
  349. await message.channel.send("Désolé, une erreur est survenue lors du traitement de votre demande.")
  350. # Assurer que les autres gestionnaires d'événements reçoivent également le message
  351. await bot.process_commands(message)
  352. bot.run(DISCORD_TOKEN)