1
0

utils.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. import datetime
  2. from enum import Enum
  3. from typing import List
  4. import myanimebot.globals as globals
  5. # TODO Redo all of the desc/status system
  6. class Service(Enum):
  7. MAL=globals.SERVICE_MAL
  8. ANILIST=globals.SERVICE_ANILIST
  9. @staticmethod
  10. def from_str(label: str):
  11. if label is None: raise TypeError
  12. if label.upper() in ('MAL', 'MYANIMELIST', globals.SERVICE_MAL.upper()):
  13. return Service.MAL
  14. elif label.upper() in ('AL', 'ANILIST', globals.SERVICE_ANILIST.upper()):
  15. return Service.ANILIST
  16. else:
  17. raise NotImplementedError('Error: Cannot convert "{}" to a Service'.format(label))
  18. class MediaType(Enum):
  19. ANIME="ANIME"
  20. MANGA="MANGA"
  21. @staticmethod
  22. def from_str(label: str):
  23. if label is None: raise TypeError
  24. if label.upper() in ('ANIME', 'ANIME_LIST'):
  25. return MediaType.ANIME
  26. elif label.upper() in ('MANGA', 'MANGA_LIST'):
  27. return MediaType.MANGA
  28. else:
  29. raise NotImplementedError('Error: Cannot convert "{}" to a MediaType'.format(label))
  30. def get_media_count_type(self):
  31. if self == MediaType.ANIME:
  32. return 'episodes'
  33. elif self == MediaType.MANGA:
  34. return 'chapters'
  35. else:
  36. raise NotImplementedError('Unknown MediaType "{}"'.format(self))
  37. class MediaStatus(Enum):
  38. CURRENT=0
  39. PLANNING=1
  40. COMPLETED=2
  41. DROPPED=3
  42. PAUSED=4
  43. REPEATING=5
  44. @staticmethod
  45. def from_str(label: str):
  46. if label is None: raise TypeError
  47. first_word = label.split(' ')[0].upper()
  48. if first_word in ['READ', 'READING', 'WATCHED', 'WATCHING']:
  49. return MediaStatus.CURRENT
  50. elif first_word in ['PLANS', 'PLAN']:
  51. return MediaStatus.PLANNING
  52. elif first_word in ['COMPLETED']:
  53. return MediaStatus.COMPLETED
  54. elif first_word in ['DROPPED']:
  55. return MediaStatus.DROPPED
  56. elif first_word in ['PAUSED', 'ON-HOLD']:
  57. return MediaStatus.PAUSED
  58. elif first_word in ['REREAD', 'REREADING', 'REWATCHED', 'REWATCHING', 'RE-READING', 'RE-WATCHING', 'RE-READ', 'RE-WATCHED']:
  59. return MediaStatus.REPEATING
  60. else:
  61. raise NotImplementedError('Error: Cannot convert "{}" to a MediaStatus'.format(label))
  62. class User():
  63. data = None
  64. def __init__(self,
  65. id : int,
  66. service_id : int,
  67. name : str,
  68. servers : List[int]):
  69. self.id = id
  70. self.service_id = service_id
  71. self.name = name
  72. self.servers = servers
  73. class Media():
  74. def __init__(self,
  75. name : str,
  76. url : str,
  77. episodes : str,
  78. image : str,
  79. type : MediaType):
  80. self.name = name
  81. self.url = url
  82. self.episodes = episodes
  83. self.image = image
  84. self.type = type
  85. class Feed():
  86. def __init__(self,
  87. service : Service,
  88. date_publication : datetime.datetime,
  89. user : User,
  90. status : MediaStatus,
  91. description : str, # TODO Need to change
  92. progress : str,
  93. media : Media
  94. ):
  95. self.service = service
  96. self.date_publication = date_publication
  97. self.user = user
  98. self.status = status
  99. self.media = media
  100. self.description = description
  101. self.progress = progress
  102. def get_status_str(self):
  103. if self.status == MediaStatus.CURRENT \
  104. or self.status == MediaStatus.REPEATING:
  105. if self.media.type == MediaType.ANIME:
  106. status_str = 'Watching'
  107. elif self.media.type == MediaType.MANGA:
  108. status_str = 'Reading'
  109. else:
  110. raise NotImplementedError('Unknown MediaType: {}'.format(self.media.type))
  111. # Add prefix if rewatching
  112. if self.status == MediaStatus.REPEATING:
  113. status_str = 'Re-{}'.format(status_str.lower())
  114. elif self.status == MediaStatus.COMPLETED:
  115. status_str = 'Completed'
  116. elif self.status == MediaStatus.PAUSED:
  117. status_str = 'Paused'
  118. elif self.status == MediaStatus.DROPPED:
  119. status_str = 'Dropped'
  120. elif self.status == MediaStatus.PLANNING:
  121. if self.media.type == MediaType.ANIME:
  122. media_type_label = 'watch'
  123. elif self.media.type == MediaType.MANGA:
  124. media_type_label = 'read'
  125. else:
  126. raise NotImplementedError('Unknown MediaType: {}'.format(self.media.type))
  127. status_str = 'Plans to {}'.format(media_type_label)
  128. else:
  129. raise NotImplementedError('Unknown MediaStatus: {}'.format(self.status))
  130. return status_str
  131. def replace_all(text : str, replace_dic : dict) -> str:
  132. ''' Replace multiple substrings from a string '''
  133. if text is None or replace_dic is None:
  134. return text
  135. for replace_key, replace_value in replace_dic.items():
  136. text = text.replace(replace_key, replace_value)
  137. return text
  138. def filter_name(name : str) -> str:
  139. ''' Escapes special characters from name '''
  140. dic = {
  141. "♥": "\♥",
  142. "♀": "\♀",
  143. "♂": "\♂",
  144. "♪": "\♪",
  145. "☆": "\☆"
  146. }
  147. return replace_all(name, dic)
  148. # Check if the show's name ends with a show type and truncate it
  149. def truncate_end_show(media_name : str):
  150. ''' Check if a show's name ends with a show type and truncate it '''
  151. if media_name is None: return None
  152. show_types = (
  153. '- TV',
  154. '- Movie',
  155. '- Special',
  156. '- OVA',
  157. '- ONA',
  158. '- Manga',
  159. '- Manhua',
  160. '- Manhwa',
  161. '- Novel',
  162. '- One-Shot',
  163. '- Doujinshi',
  164. '- Music',
  165. '- OEL',
  166. '- Unknown'
  167. )
  168. for show_type in show_types:
  169. if media_name.endswith(show_type):
  170. new_show = media_name[:-len(show_type)]
  171. # Check if space at the end
  172. if new_show.endswith(' '):
  173. new_show = new_show[:-1]
  174. return new_show
  175. return media_name
  176. def build_description_string(feed : Feed):
  177. ''' Build and returns a string describing the feed '''
  178. media_type_count = feed.media.type.get_media_count_type()
  179. status_str = feed.get_status_str()
  180. # Build the string
  181. return '{} | {} of {} {}'.format(status_str, feed.progress, feed.media.episodes, media_type_count)
  182. def get_channels(server_id: int) -> dict:
  183. ''' Returns the registered channels for a server '''
  184. if server_id is None: return None
  185. # TODO Make generic execute
  186. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  187. cursor.execute("SELECT channel FROM t_servers WHERE server = %s", [server_id])
  188. channels = cursor.fetchall()
  189. cursor.close()
  190. return channels
  191. def is_server_in_db(server_id : str) -> bool:
  192. ''' Checks if server is registered in the database '''
  193. if server_id is None:
  194. return False
  195. cursor = globals.conn.cursor(buffered=True)
  196. cursor.execute("SELECT server FROM t_servers WHERE server=%s", [server_id])
  197. data = cursor.fetchone()
  198. cursor.close()
  199. return data is not None
  200. def get_users() -> List[dict]:
  201. ''' Returns all registered users '''
  202. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  203. cursor.execute('SELECT {}, service, servers FROM t_users'.format(globals.DB_USER_NAME))
  204. users = cursor.fetchall()
  205. cursor.close()
  206. return users
  207. def get_user_servers(user_name : str, service : Service) -> str:
  208. ''' Returns a list of every registered servers for a user of a specific service, as a string '''
  209. if user_name is None or service is None:
  210. return
  211. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  212. cursor.execute("SELECT servers FROM t_users WHERE LOWER({})=%s AND service=%s".format(globals.DB_USER_NAME),
  213. [user_name.lower(), service.value])
  214. user_servers = cursor.fetchone()
  215. cursor.close()
  216. if user_servers is not None:
  217. return user_servers["servers"]
  218. return None
  219. def remove_server_from_servers(server : str, servers : str) -> str:
  220. ''' Removes the server from a comma-separated string containing multiple servers '''
  221. servers_list = servers.split(',')
  222. # If the server is not found, return None
  223. if server not in servers_list:
  224. return None
  225. # Remove every occurence of server
  226. servers_list = [x for x in servers_list if x != server]
  227. # Build server-free string
  228. return ','.join(servers_list)
  229. def delete_user_from_db(user_name : str, service : Service) -> bool:
  230. ''' Removes the user from the database '''
  231. if user_name is None or service is None:
  232. globals.logger.warning("Error while trying to delete user '{}' with service '{}'".format(user_name, service))
  233. return False
  234. cursor = globals.conn.cursor(buffered=True)
  235. cursor.execute("DELETE FROM t_users WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  236. [user_name.lower(), service.value])
  237. globals.conn.commit()
  238. cursor.close()
  239. return True
  240. def update_user_servers_db(user_name : str, service : Service, servers : str) -> bool:
  241. if user_name is None or service is None or servers is None:
  242. globals.logger.warning("Error while trying to update user's servers. User '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  243. return False
  244. cursor = globals.conn.cursor(buffered=True)
  245. cursor.execute("UPDATE t_users SET servers = %s WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  246. [servers, user_name.lower(), service.value])
  247. globals.conn.commit()
  248. cursor.close()
  249. return True
  250. def insert_user_into_db(user_name : str, service : Service, servers : str) -> bool:
  251. ''' Add the user to the database '''
  252. if user_name is None or service is None or servers is None:
  253. globals.logger.warning("Error while trying to add user '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  254. return False
  255. cursor = globals.conn.cursor(buffered=True)
  256. cursor.execute("INSERT INTO t_users ({}, service, servers) VALUES (%s, %s, %s)".format(globals.DB_USER_NAME),
  257. [user_name, service.value, servers])
  258. globals.conn.commit()
  259. cursor.close()
  260. return True