1
0

utils.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # Contenir les fonctions utilitaires auxiliaires.
  2. import re
  3. import os
  4. import base64
  5. from io import BytesIO
  6. from PIL import Image
  7. from logger import logger
  8. import tiktoken
  9. def has_text(text):
  10. """
  11. Détermine si le texte fourni est non vide après suppression des espaces.
  12. """
  13. return bool(text.strip())
  14. def resize_image(image_bytes, mode='high', attachment_filename=None):
  15. try:
  16. with Image.open(BytesIO(image_bytes)) as img:
  17. original_format = img.format # Stocker le format original
  18. if mode == 'high':
  19. # Redimensionner pour le mode haute fidélité
  20. img.thumbnail((2000, 2000))
  21. if min(img.size) < 768:
  22. scale = 768 / min(img.size)
  23. new_size = tuple(int(x * scale) for x in img.size)
  24. img = img.resize(new_size, Image.Resampling.LANCZOS)
  25. elif mode == 'low':
  26. # Redimensionner pour le mode basse fidélité
  27. img = img.resize((512, 512))
  28. buffer = BytesIO()
  29. img_format = img.format
  30. if not img_format:
  31. if attachment_filename:
  32. _, ext = os.path.splitext(attachment_filename)
  33. ext = ext.lower()
  34. format_mapping = {
  35. '.jpg': 'JPEG',
  36. '.jpeg': 'JPEG',
  37. '.png': 'PNG',
  38. '.gif': 'GIF',
  39. '.bmp': 'BMP',
  40. '.tiff': 'TIFF'
  41. }
  42. img_format = format_mapping.get(ext, 'PNG')
  43. else:
  44. img_format = 'PNG'
  45. img.save(buffer, format=img_format)
  46. return buffer.getvalue()
  47. except Exception as e:
  48. logger.error(f"Error resizing image: {e}")
  49. raise
  50. def extract_text_from_message(message):
  51. content = message.get("content", "")
  52. if isinstance(content, list):
  53. # Extraire le texte de chaque élément de la liste
  54. texts = []
  55. for part in content:
  56. if isinstance(part, dict):
  57. text = part.get("text", "")
  58. if text:
  59. texts.append(text)
  60. return ' '.join(texts)
  61. elif isinstance(content, str):
  62. return content
  63. else:
  64. return ""
  65. def calculate_cost(usage, model='gpt-4o-mini'):
  66. input_tokens = usage.get('prompt_tokens', 0)
  67. output_tokens = usage.get('completion_tokens', 0)
  68. # Définir les tarifs par modèle
  69. model_costs = {
  70. 'gpt-4o': {
  71. 'input_rate': 5.00 / 1_000_000, # 5$ pour 1M tokens d'entrée
  72. 'output_rate': 15.00 / 1_000_000 # 15$ pour 1M tokens de sortie
  73. },
  74. 'gpt-4o-mini': {
  75. 'input_rate': 0.150 / 1_000_000, # 0.150$ pour 1M tokens d'entrée
  76. 'output_rate': 0.600 / 1_000_000 # 0.600$ pour 1M tokens de sortie
  77. }
  78. }
  79. # Obtenir les tarifs du modèle spécifié
  80. if model not in model_costs:
  81. logger.warning(f"Modèle inconnu '{model}'. Utilisation des tarifs par défaut pour 'gpt-4o-mini'.")
  82. model = 'gpt-4o-mini'
  83. input_rate = model_costs[model]['input_rate']
  84. output_rate = model_costs[model]['output_rate']
  85. # Calculer les coûts
  86. input_cost = input_tokens * input_rate
  87. output_cost = output_tokens * output_rate
  88. total_cost = input_cost + output_cost
  89. return input_tokens, output_tokens, total_cost
  90. async def read_text_file(attachment):
  91. file_bytes = await attachment.read()
  92. return file_bytes.decode('utf-8')
  93. async def encode_image_from_attachment(attachment, mode='high'):
  94. image_data = await attachment.read()
  95. resized_image = resize_image(image_data, mode=mode, attachment_filename=attachment.filename)
  96. return base64.b64encode(resized_image).decode('utf-8')