1
0

utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. import datetime
  2. from enum import Enum
  3. from typing import List
  4. import myanimebot.globals as globals
  5. # TODO Redo all of the desc/status system
  6. # Media Status colors
  7. CURRENT_COLOR = "0x00CC00"
  8. PLANNING_COLOR = "0xBFBFBF"
  9. COMPLETED_COLOR = "0x000088"
  10. DROPPED_COLOR = "0xCC0000"
  11. PAUSED_COLOR = "0xDDDD00"
  12. REPEATING_COLOR = "0x007700"
  13. class Service(Enum):
  14. MAL=globals.SERVICE_MAL
  15. ANILIST=globals.SERVICE_ANILIST
  16. @staticmethod
  17. def from_str(label: str):
  18. if label is None: raise TypeError
  19. if label.upper() in ('MAL', 'MYANIMELIST', globals.SERVICE_MAL.upper()):
  20. return Service.MAL
  21. elif label.upper() in ('AL', 'ANILIST', globals.SERVICE_ANILIST.upper()):
  22. return Service.ANILIST
  23. else:
  24. raise NotImplementedError('Error: Cannot convert "{}" to a Service'.format(label))
  25. class MediaType(Enum):
  26. ANIME="ANIME"
  27. MANGA="MANGA"
  28. @staticmethod
  29. def from_str(label: str):
  30. if label is None: raise TypeError
  31. if label.upper() in ('ANIME', 'ANIME_LIST'):
  32. return MediaType.ANIME
  33. elif label.upper() in ('MANGA', 'MANGA_LIST'):
  34. return MediaType.MANGA
  35. else:
  36. raise NotImplementedError('Error: Cannot convert "{}" to a MediaType'.format(label))
  37. def get_media_count_type(self):
  38. if self == MediaType.ANIME:
  39. return 'episodes'
  40. elif self == MediaType.MANGA:
  41. return 'chapters'
  42. else:
  43. raise NotImplementedError('Unknown MediaType "{}"'.format(self))
  44. class MediaStatus(Enum):
  45. CURRENT = CURRENT_COLOR
  46. PLANNING = PLANNING_COLOR
  47. COMPLETED = COMPLETED_COLOR
  48. DROPPED = DROPPED_COLOR
  49. PAUSED = PAUSED_COLOR
  50. REPEATING = REPEATING_COLOR
  51. @staticmethod
  52. def from_str(label: str):
  53. if label is None: raise TypeError
  54. first_word = label.split(' ')[0].upper()
  55. if first_word in ['READ', 'READING', 'WATCHED', 'WATCHING']:
  56. return MediaStatus.CURRENT
  57. elif first_word in ['PLANS', 'PLAN']:
  58. return MediaStatus.PLANNING
  59. elif first_word in ['COMPLETED']:
  60. return MediaStatus.COMPLETED
  61. elif first_word in ['DROPPED']:
  62. return MediaStatus.DROPPED
  63. elif first_word in ['PAUSED', 'ON-HOLD']:
  64. return MediaStatus.PAUSED
  65. elif first_word in ['REREAD', 'REREADING', 'REWATCHED', 'REWATCHING', 'RE-READING', 'RE-WATCHING', 'RE-READ', 'RE-WATCHED']:
  66. return MediaStatus.REPEATING
  67. else:
  68. raise NotImplementedError('Error: Cannot convert "{}" to a MediaStatus'.format(label))
  69. class User():
  70. data = None
  71. def __init__(self,
  72. id : int,
  73. service_id : int,
  74. name : str,
  75. servers : List[int]):
  76. self.id = id
  77. self.service_id = service_id
  78. self.name = name
  79. self.servers = servers
  80. class Media():
  81. def __init__(self,
  82. id : int,
  83. name : str,
  84. url : str,
  85. episodes : str,
  86. image : str,
  87. type : MediaType):
  88. self.id = id
  89. self.name = name
  90. self.url = url
  91. self.episodes = episodes
  92. self.image = image
  93. self.type = type
  94. class Feed():
  95. def __init__(self,
  96. service : Service,
  97. date_publication : datetime.datetime,
  98. user : User,
  99. status : MediaStatus,
  100. description : str, # TODO Need to change
  101. progress : str,
  102. media : Media,
  103. score : float,
  104. score_format : str
  105. ):
  106. self.service = service
  107. self.date_publication = date_publication
  108. self.user = user
  109. self.status = status
  110. self.media = media
  111. self.description = description
  112. self.progress = progress
  113. self.score = score
  114. self.score_format = score_format
  115. def get_status_str(self):
  116. if self.status == MediaStatus.CURRENT \
  117. or self.status == MediaStatus.REPEATING:
  118. if self.media.type == MediaType.ANIME:
  119. status_str = 'Watching'
  120. elif self.media.type == MediaType.MANGA:
  121. status_str = 'Reading'
  122. else:
  123. raise NotImplementedError('Unknown MediaType: {}'.format(self.media.type))
  124. # Add prefix if rewatching
  125. if self.status == MediaStatus.REPEATING:
  126. status_str = 'Re-{}'.format(status_str.lower())
  127. elif self.status == MediaStatus.COMPLETED:
  128. status_str = 'Completed'
  129. elif self.status == MediaStatus.PAUSED:
  130. status_str = 'Paused'
  131. elif self.status == MediaStatus.DROPPED:
  132. status_str = 'Dropped'
  133. elif self.status == MediaStatus.PLANNING:
  134. if self.media.type == MediaType.ANIME:
  135. media_type_label = 'watch'
  136. elif self.media.type == MediaType.MANGA:
  137. media_type_label = 'read'
  138. else:
  139. raise NotImplementedError('Unknown MediaType: {}'.format(self.media.type))
  140. status_str = 'Plans to {}'.format(media_type_label)
  141. else:
  142. raise NotImplementedError('Unknown MediaStatus: {}'.format(self.status))
  143. return status_str
  144. def replace_all(text : str, replace_dic : dict) -> str:
  145. '''Replace multiple substrings from a string'''
  146. if text is None or replace_dic is None:
  147. return text
  148. for replace_key, replace_value in replace_dic.items():
  149. text = text.replace(replace_key, replace_value)
  150. return text
  151. def filter_name(name : str) -> str:
  152. '''Escapes special characters from name'''
  153. dic = {
  154. "♥": "\♥",
  155. "♀": "\♀",
  156. "♂": "\♂",
  157. "♪": "\♪",
  158. "☆": "\☆"
  159. }
  160. return replace_all(name, dic)
  161. # Check if the show's name ends with a show type and truncate it
  162. def truncate_end_show(media_name : str):
  163. '''Check if a show's name ends with a show type and truncate it'''
  164. if media_name is None: return None
  165. show_types = (
  166. '- TV',
  167. '- Movie',
  168. '- Special',
  169. '- OVA',
  170. '- ONA',
  171. '- Manga',
  172. '- Manhua',
  173. '- Manhwa',
  174. '- Novel',
  175. '- One-Shot',
  176. '- Doujinshi',
  177. '- Music',
  178. '- OEL',
  179. '- Unknown',
  180. '- Light Novel'
  181. )
  182. for show_type in show_types:
  183. if media_name.endswith(show_type):
  184. new_show = media_name[:-len(show_type)]
  185. # Check if space at the end
  186. if new_show.endswith(' '):
  187. new_show = new_show[:-1]
  188. return new_show
  189. return media_name
  190. def build_score_string(score_format : str):
  191. if score_format == "POINT_100":
  192. return "100"
  193. elif score_format == "POINT_10" or score_format == "POINT_10_DECIMAL":
  194. return "10"
  195. elif score_format == "POINT_5":
  196. return "5"
  197. elif score_format == "POINT_3":
  198. return "3"
  199. else:
  200. return "?"
  201. def build_description_string(feed : Feed):
  202. '''Build and returns a string describing the feed'''
  203. media_type_count = feed.media.type.get_media_count_type()
  204. status_str = feed.get_status_str()
  205. # Build the string
  206. desc = '{} | {} of {} {}'.format(status_str, feed.progress, feed.media.episodes, media_type_count)
  207. globals.logger.debug("Feed media score {}".format(feed.score))
  208. if feed.score is not None:
  209. desc += '\nScore: {} / {}'.format(feed.score, build_score_string(feed.score_format))
  210. return desc
  211. def get_channels(server_id: int) -> dict:
  212. '''Returns the registered channels for a server'''
  213. if server_id is None: return None
  214. # TODO Make generic execute
  215. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  216. cursor.execute("SELECT channel FROM t_servers WHERE server = %s", [server_id])
  217. channels = cursor.fetchall()
  218. cursor.close()
  219. return channels
  220. def is_server_in_db(server_id : str) -> bool:
  221. '''Checks if server is registered in the database'''
  222. if server_id is None:
  223. return False
  224. cursor = globals.conn.cursor(buffered=True)
  225. cursor.execute("SELECT server FROM t_servers WHERE server=%s", [server_id])
  226. data = cursor.fetchone()
  227. cursor.close()
  228. return data is not None
  229. def get_users() -> List[dict]:
  230. '''Returns all registered users'''
  231. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  232. cursor.execute('SELECT {}, service, servers FROM t_users'.format(globals.DB_USER_NAME))
  233. users = cursor.fetchall()
  234. cursor.close()
  235. return users
  236. def get_user_servers(user_name : str, service : Service) -> str:
  237. '''Returns a list of every registered servers for a user of a specific service, as a string'''
  238. if user_name is None or service is None:
  239. return
  240. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  241. cursor.execute("SELECT servers FROM t_users WHERE LOWER({})=%s AND service=%s".format(globals.DB_USER_NAME),
  242. [user_name.lower(), service.value])
  243. user_servers = cursor.fetchone()
  244. cursor.close()
  245. if user_servers is not None:
  246. return user_servers["servers"]
  247. return None
  248. def remove_server_from_servers(server : str, servers : str) -> str:
  249. '''Removes the server from a comma-separated string containing multiple servers'''
  250. servers_list = servers.split(',')
  251. # If the server is not found, return None
  252. if server not in servers_list:
  253. return None
  254. # Remove every occurence of server
  255. servers_list = [x for x in servers_list if x != server]
  256. # Build server-free string
  257. return ','.join(servers_list)
  258. def delete_user_from_db(user_name : str, service : Service) -> bool:
  259. '''Removes the user from the database'''
  260. if user_name is None or service is None:
  261. globals.logger.warning("Error while trying to delete user '{}' with service '{}'".format(user_name, service))
  262. return False
  263. cursor = globals.conn.cursor(buffered=True)
  264. cursor.execute("DELETE FROM t_users WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  265. [user_name.lower(), service.value])
  266. globals.conn.commit()
  267. cursor.close()
  268. return True
  269. def update_user_servers_db(user_name : str, service : Service, servers : str) -> bool:
  270. if user_name is None or service is None or servers is None:
  271. globals.logger.warning("Error while trying to update user's servers. User '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  272. return False
  273. cursor = globals.conn.cursor(buffered=True)
  274. cursor.execute("UPDATE t_users SET servers = %s WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  275. [servers, user_name.lower(), service.value])
  276. globals.conn.commit()
  277. cursor.close()
  278. return True
  279. def insert_user_into_db(user_name : str, service : Service, servers : str) -> bool:
  280. '''Add the user to the database'''
  281. if user_name is None or service is None or servers is None:
  282. globals.logger.warning("Error while trying to add user '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  283. return False
  284. cursor = globals.conn.cursor(buffered=True)
  285. cursor.execute("INSERT INTO t_users ({}, service, servers) VALUES (%s, %s, %s)".format(globals.DB_USER_NAME),
  286. [user_name, service.value, servers])
  287. globals.conn.commit()
  288. cursor.close()
  289. return True
  290. def get_allowed_role(server : int) -> int:
  291. '''Return the allowed role for a given server'''
  292. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  293. cursor.execute("SELECT admin_group FROM t_servers WHERE server=%s LIMIT 1", [str(server)])
  294. allowedRole = cursor.fetchone()
  295. cursor.close()
  296. if allowedRole is None:
  297. return None
  298. return allowedRole["admin_group"]
  299. def get_role_name(provided_role_id : int, server) -> str :
  300. ''' Convert a role ID into a displayable name '''
  301. role_name = None
  302. if provided_role_id is not None:
  303. for role in server.roles:
  304. if str(role.id) == str(provided_role_id):
  305. role_name = role.name
  306. if role_name is None:
  307. role_name = "[DELETED ROLE]"
  308. return role_name