||
- import os
- import openai
- import discord
- import aiohttp
- import asyncio
- import base64
- import logging
- import re
- from dotenv import load_dotenv
- # Charger les variables d'environnement depuis le fichier .env
- load_dotenv()
- DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
- OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
- DISCORD_CHANNEL_ID = os.getenv('DISCORD_CHANNEL_ID')
- # Chemin vers le fichier de prompt de personnalité
- PERSONALITY_PROMPT_FILE = os.getenv('PERSONALITY_PROMPT_FILE', 'personality_prompt.txt')
- # Vérifier que les tokens et le prompt de personnalité sont récupérés
- if DISCORD_TOKEN is None or OPENAI_API_KEY is None or DISCORD_CHANNEL_ID is None:
- raise ValueError("Les tokens ou l'ID du canal ne sont pas définis dans les variables d'environnement.")
- if not os.path.isfile(PERSONALITY_PROMPT_FILE):
- raise FileNotFoundError(f"Le fichier de prompt de personnalité '{PERSONALITY_PROMPT_FILE}' est introuvable.")
- # Lire le prompt de personnalité depuis le fichier
- with open(PERSONALITY_PROMPT_FILE, 'r', encoding='utf-8') as f:
- PERSONALITY_PROMPT = f.read().strip()
- # Log configuration
- log_format='%(asctime)-13s : %(name)-15s : %(levelname)-8s : %(message)s'
- logging.basicConfig(handlers=[logging.FileHandler("./chatbot.log", 'a', 'utf-8')], format=log_format, level="INFO")
- console = logging.StreamHandler()
- console.setLevel(logging.INFO)
- console.setFormatter(logging.Formatter(log_format))
- logger = logging.getLogger("chatbot")
- logger.setLevel("INFO")
- logging.getLogger('').addHandler(console)
- # Initialiser les intents
- intents = discord.Intents.default()
- intents.message_content = True # Activer l'intent pour les contenus de message
- # Initialiser le client Discord avec les intents modifiés
- client_discord = discord.Client(intents=intents)
- # Initialiser l'API OpenAI avec un client
- client_openai = openai.OpenAI(api_key=OPENAI_API_KEY)
- # Liste pour stocker l'historique des conversations
- conversation_history = []
- # Convertir l'ID du channel en entier
- chatgpt_channel_id = int(DISCORD_CHANNEL_ID)
- def is_ascii_art(text):
- # Compter les caractères spéciaux et le nombre total de caractères
- special_char_count = len(re.findall(r'[^\w\s]', text))
- total_chars = len(text)
- # Définir des critères pour détecter des dessins ASCII
- density_threshold = 0.2 # Proportion minimale de caractères spéciaux
- min_lines = 3 # Nombre minimum de lignes pour considérer comme un dessin ASCII
- # Vérifier la densité des caractères spéciaux
- if total_chars > 0 and (special_char_count / total_chars) > density_threshold:
- # Vérifier la structure des lignes
- lines = text.split('\n')
- if len(lines) >= min_lines:
- average_length = sum(len(line) for line in lines) / len(lines)
- similar_length_lines = sum(1 for line in lines if abs(len(line) - average_length) < 5)
- # Si la plupart des lignes ont une longueur similaire, c'est probablement un dessin ASCII
- if similar_length_lines >= len(lines) * 0.8:
- return True
- return False
- def is_long_special_text(text):
- # Définir un seuil pour considérer le texte comme long et contenant beaucoup de caractères spéciaux
- special_char_count = len(re.findall(r'[^\w\s]', text))
- if len(text) > 800 or special_char_count > 50:
- return True
- return False
- def calculate_cost(usage):
- input_tokens = usage.get('prompt_tokens', 0)
- output_tokens = usage.get('completion_tokens', 0)
- # Coûts estimés
- input_cost = input_tokens / 1_000_000 * 5.00 # 5$ pour 1M tokens d'entrée
- output_cost = output_tokens / 1_000_000 * 15.00 # 15$ pour 1M tokens de sortie
- total_cost = input_cost + output_cost
- return input_tokens, output_tokens, total_cost
- async def read_text_file(attachment):
- # Télécharger et lire le contenu du fichier texte
- async with aiohttp.ClientSession() as session:
- async with session.get(attachment.url) as resp:
- return await resp.text()
- async def encode_image_from_attachment(attachment):
- async with aiohttp.ClientSession() as session:
- async with session.get(attachment.url) as resp:
- image_data = await resp.read()
- return base64.b64encode(image_data).decode('utf-8')
- async def call_openai_api(user_text, user_name, image_data=None):
- # Préparer le contenu pour l'appel API
- message_to_send = {
- "role": "user",
- "content": [{"type": "text", "text": f"{user_name} dit : {user_text}"}]
- }
- # Inclure l'image dans l'appel API courant
- if image_data:
- message_to_send["content"].append({
- "type": "image_url",
- "image_url": {"url": f"data:image/jpeg;base64,{image_data}"}
- })
- if not conversation_history:
- conversation_history.append({
- "role": "system",
- "content": PERSONALITY_PROMPT
- })
- # Ajouter le message de l'utilisateur à l'historique global, mais uniquement s'il ne s'agit pas d'une image ou d'ASCII art
- if image_data is None and not is_ascii_art(user_text):
- conversation_history.append(message_to_send)
- payload = {
- "model": "gpt-4o",
- "messages": conversation_history,
- "max_tokens": 500
- }
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {OPENAI_API_KEY}"
- }
- try:
- async with aiohttp.ClientSession() as session:
- async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as resp:
- result = await resp.json()
- if resp.status != 200:
- raise ValueError(f"API Error: {result.get('error', {}).get('message', 'Unknown error')}")
- # Calculer les coûts
- usage = result.get('usage', {})
- input_tokens, output_tokens, total_cost = calculate_cost(usage)
- # Afficher dans la console
- logging.info(f"Estimated Cost: ${total_cost:.4f} / Input Tokens: {input_tokens} / Output Tokens: {output_tokens} / Total Tokens: {input_tokens + output_tokens}")
- return result
- except Exception as e:
- logger.error(f"Error calling OpenAI API: {e}")
- return None
- @client_discord.event
- async def on_ready():
- logger.info(f'Bot connecté en tant que {client_discord.user}')
- # Ajouter la personnalité de l'IA à l'historique au démarrage
- if not conversation_history:
- conversation_history.append({
- "role": "system",
- "content": PERSONALITY_PROMPT
- })
- @client_discord.event
- async def on_message(message):
- # Vérifier si le message provient du canal autorisé
- if message.channel.id != chatgpt_channel_id:
- return
- # Vérifier si l'auteur du message est le bot lui-même
- if message.author == client_discord.user:
- return
- user_text = message.content.strip()
- image_data = None
- file_content = None
- # Extensions de fichiers autorisées
- allowed_extensions = ['.txt', '.py', '.html', '.css', '.js']
- # Vérifier s'il y a une pièce jointe
- if message.attachments:
- for attachment in message.attachments:
- # Vérifier si c'est un fichier avec une extension autorisée
- if any(attachment.filename.endswith(ext) for ext in allowed_extensions):
- file_content = await read_text_file(attachment)
- break
- # Vérifier si c'est une image
- elif attachment.content_type.startswith('image/'):
- image_data = await encode_image_from_attachment(attachment)
- break
- # Ajouter le contenu du fichier à la requête si présent
- if file_content:
- user_text += f"\nContenu du fichier {attachment.filename}:\n{file_content}"
- # Appeler l'API OpenAI
- result = await call_openai_api(user_text, message.author.name, image_data)
- if result:
- reply = result['choices'][0]['message']['content']
- await message.channel.send(reply)
- # Ajouter la réponse du modèle à l'historique
- # Ne pas ajouter à l'historique si c'est un dessin ASCII ou une image
- if image_data is None and not is_ascii_art(user_text):
- add_to_conversation_history({
- "role": "assistant",
- "content": reply
- })
- MAX_HISTORY_LENGTH = 50 # Nombre maximum de messages à conserver
- # Liste pour stocker les indices des messages longs et spéciaux
- temporary_messages = []
- def add_to_conversation_history(new_message):
- # Ajouter la personnalité de l'IA en tant que premier message
- if not conversation_history:
- conversation_history.append({
- "role": "system",
- "content": PERSONALITY_PROMPT
- })
- # Ajouter le message à l'historique
- conversation_history.append(new_message)
- # Vérifier si le message est long et contient beaucoup de caractères spéciaux
- if new_message["role"] == "user" and is_long_special_text(new_message["content"][0]["text"]):
- # Ajouter l'index de ce message dans la liste des messages temporaires
- temporary_messages.append(len(conversation_history) - 1)
- # Limiter la taille de l'historique
- if len(conversation_history) > MAX_HISTORY_LENGTH:
- # Garder le premier message de personnalité et les messages les plus récents
- conversation_history[:] = conversation_history[:1] + conversation_history[-MAX_HISTORY_LENGTH:]
- # Supprimer les messages temporaires après dix messages
- if len(temporary_messages) > 0:
- for index in reversed(temporary_messages):
- # Supprimer le message s'il a été dans l'historique pendant dix messages ou plus
- if len(conversation_history) - index > 10:
- del conversation_history[index]
- temporary_messages.remove(index)
- # Démarrer le bot Discord
- client_discord.run(DISCORD_TOKEN)
|