1
0

utils.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. import datetime
  2. import discord
  3. import pytz
  4. from enum import Enum
  5. from typing import List
  6. import myanimebot.globals as globals
  7. class Service(Enum):
  8. MAL=globals.SERVICE_MAL
  9. ANILIST=globals.SERVICE_ANILIST
  10. @staticmethod
  11. def from_str(label: str):
  12. if label.upper() in ('MAL', 'MYANIMELIST', globals.SERVICE_MAL.upper()):
  13. return Service.MAL
  14. elif label.upper() in ('AL', 'ANILIST', globals.SERVICE_ANILIST.upper()):
  15. return Service.ANILIST
  16. else:
  17. raise NotImplementedError('Error: Cannot convert "{}" to a Service'.format(label))
  18. class MediaType(Enum):
  19. ANIME="ANIME"
  20. MANGA="MANGA"
  21. @staticmethod
  22. def from_str(label: str):
  23. if label.upper() in ('ANIME', 'ANIME_LIST'):
  24. return MediaType.ANIME
  25. elif label.upper() in ('MANGA', 'MANGA_LIST'):
  26. return MediaType.MANGA
  27. else:
  28. raise NotImplementedError('Error: Cannot convert "{}" to a MediaType'.format(label))
  29. class User():
  30. data = None
  31. def __init__(self,
  32. id : int,
  33. service_id : int,
  34. name : str,
  35. servers : List[int]):
  36. self.id = id
  37. self.service_id = service_id
  38. self.name = name
  39. self.servers = servers
  40. class Media():
  41. def __init__(self,
  42. name : str,
  43. url : str,
  44. episodes : str,
  45. image : str,
  46. type : MediaType):
  47. self.name = name
  48. self.url = url
  49. self.episodes = episodes
  50. self.image = image
  51. self.type = type
  52. @staticmethod
  53. def get_number_episodes(activity):
  54. media_type = MediaType.from_str(activity["type"])
  55. episodes = '?'
  56. if media_type == MediaType.ANIME:
  57. episodes = activity["media"]["episodes"]
  58. elif media_type == MediaType.MANGA:
  59. episodes = activity["media"]["chapters"]
  60. else:
  61. raise NotImplementedError('Error: Unknown media type "{}"'.format(media_type))
  62. if episodes is None:
  63. episodes = '?'
  64. return episodes
  65. class Feed():
  66. def __init__(self,
  67. service : Service,
  68. date_publication : datetime.datetime,
  69. user : User,
  70. status : str, # TODO Need to change
  71. description : str, # TODO Need to change
  72. media : Media
  73. ):
  74. self.service = service
  75. self.date_publication = date_publication
  76. self.user = user
  77. self.status = status
  78. self.media = media
  79. self.description = description
  80. def replace_all(text : str, replace_dic : dict) -> str:
  81. ''' Replace multiple substrings from a string '''
  82. for replace_key, replace_value in replace_dic.items():
  83. text = text.replace(replace_key, replace_value)
  84. return text
  85. def filter_name(name : str) -> str:
  86. ''' Escapes special characters from name '''
  87. dic = {
  88. "♥": "\♥",
  89. "♀": "\♀",
  90. "♂": "\♂",
  91. "♪": "\♪",
  92. "☆": "\☆"
  93. }
  94. return replace_all(name, dic)
  95. # Check if the show's name ends with a show type and truncate it
  96. def truncate_end_show(show):
  97. show_types = (
  98. '- TV',
  99. '- Movie',
  100. '- Special',
  101. '- OVA',
  102. '- ONA',
  103. '- Manga',
  104. '- Manhua',
  105. '- Manhwa',
  106. '- Novel',
  107. '- One-Shot',
  108. '- Doujinshi',
  109. '- Music',
  110. '- OEL',
  111. '- Unknown'
  112. )
  113. for show_type in show_types:
  114. if show.endswith(show_type):
  115. new_show = show[:-len(show_type)]
  116. # Check if space at the end
  117. if new_show.endswith(' '):
  118. new_show = new_show[:-1]
  119. return new_show
  120. return show
  121. def get_channels(server_id: int) -> dict:
  122. ''' Returns the registered channels for a server '''
  123. if server_id is None:
  124. return None
  125. # TODO Make generic execute
  126. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  127. cursor.execute("SELECT channel FROM t_servers WHERE server = %s", [server_id])
  128. channels = cursor.fetchall()
  129. cursor.close()
  130. return channels
  131. def is_server_in_db(server_id : str) -> bool:
  132. ''' Checks if server is registered in the database '''
  133. if server_id is None:
  134. return False
  135. cursor = globals.conn.cursor(buffered=True)
  136. cursor.execute("SELECT server FROM t_servers WHERE server=%s", [server_id])
  137. data = cursor.fetchone()
  138. cursor.close()
  139. return data is not None
  140. def get_users() -> List[dict]:
  141. ''' Returns all registered users '''
  142. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  143. cursor.execute('SELECT {}, service, servers FROM t_users'.format(globals.DB_USER_NAME))
  144. users = cursor.fetchall()
  145. cursor.close()
  146. return users
  147. def get_user_servers(user_name : str, service : Service) -> str:
  148. ''' Returns a list of every registered servers for a user of a specific service, as a string '''
  149. if user_name is None or service is None:
  150. return
  151. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  152. cursor.execute("SELECT servers FROM t_users WHERE LOWER({})=%s AND service=%s".format(globals.DB_USER_NAME),
  153. [user_name.lower(), service.value])
  154. user_servers = cursor.fetchone()
  155. cursor.close()
  156. if user_servers is not None:
  157. return user_servers["servers"]
  158. return None
  159. def remove_server_from_servers(server : str, servers : str) -> str:
  160. ''' Removes the server from a comma-separated string containing multiple servers '''
  161. servers_list = servers.split(',')
  162. # If the server is not found, return None
  163. if server not in servers_list:
  164. return None
  165. # Remove every occurence of server
  166. servers_list = [x for x in servers_list if x != server]
  167. # Build server-free string
  168. return ','.join(servers_list)
  169. def delete_user_from_db(user_name : str, service : Service) -> bool:
  170. ''' Removes the user from the database '''
  171. if user_name is None or service is None:
  172. globals.logger.warning("Error while trying to delete user '{}' with service '{}'".format(user_name, service))
  173. return False
  174. cursor = globals.conn.cursor(buffered=True)
  175. cursor.execute("DELETE FROM t_users WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  176. [user_name.lower(), service.value])
  177. globals.conn.commit()
  178. cursor.close()
  179. return True
  180. def update_user_servers_db(user_name : str, service : Service, servers : str) -> bool:
  181. if user_name is None or service is None or servers is None:
  182. globals.logger.warning("Error while trying to update user's servers. User '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  183. return False
  184. cursor = globals.conn.cursor(buffered=True)
  185. cursor.execute("UPDATE t_users SET servers = %s WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  186. [servers, user_name.lower(), service.value])
  187. globals.conn.commit()
  188. cursor.close()
  189. return True
  190. def insert_user_into_db(user_name : str, service : Service, servers : str) -> bool:
  191. ''' Add the user to the database '''
  192. if user_name is None or service is None or servers is None:
  193. globals.logger.warning("Error while trying to add user '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  194. return False
  195. cursor = globals.conn.cursor(buffered=True)
  196. cursor.execute("INSERT INTO t_users ({}, service, servers) VALUES (%s, %s, %s)".format(globals.DB_USER_NAME),
  197. [user_name, service.value, servers])
  198. globals.conn.commit()
  199. cursor.close()
  200. return True
  201. # TODO Move those functions somewhere else (E.g. discord.py)
  202. # TODO Create a Feed class instead of sending a lot of parameters
  203. def build_embed(user, item_title, item_link, item_description, pub_date, image, service: Service):
  204. ''' Build the embed message related to the anime's status '''
  205. # Get service
  206. if service == Service.MAL:
  207. service_name = 'MyAnimeList'
  208. profile_url = "{}{}".format(globals.MAL_PROFILE_URL, user)
  209. icon_url = globals.MAL_ICON_URL
  210. elif service == Service.ANILIST:
  211. service_name = 'AniList'
  212. profile_url = "{}{}".format(globals.ANILIST_PROFILE_URL, user)
  213. icon_url = globals.ANILIST_ICON_URL
  214. else:
  215. raise NotImplementedError('Unknown service {}'.format(service))
  216. description = "[{}]({})\n```{}```".format(filter_name(item_title), item_link, item_description)
  217. profile_url_label = "{}'s {}".format(user, service_name)
  218. try:
  219. embed = discord.Embed(colour=0xEED000, url=item_link, description=description, timestamp=pub_date.astimezone(pytz.timezone("utc")))
  220. embed.set_thumbnail(url=image)
  221. embed.set_author(name=profile_url_label, url=profile_url, icon_url=icon_url)
  222. embed.set_footer(text="MyAnimeBot", icon_url=globals.iconBot)
  223. return embed
  224. except Exception as e:
  225. globals.logger.error("Error when generating the message: " + str(e))
  226. return
  227. # Function used to send the embed
  228. async def send_embed_wrapper(asyncioloop, channelid, client, embed):
  229. channel = client.get_channel(int(channelid))
  230. try:
  231. await channel.send(embed=embed)
  232. globals.logger.info("Message sent in channel: " + channelid)
  233. except Exception as e:
  234. globals.logger.debug("Impossible to send a message on '" + channelid + "': " + str(e))
  235. return