1
0

utils.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. import datetime
  2. from enum import Enum
  3. from typing import List
  4. import myanimebot.globals as globals
  5. # TODO Redo all of the desc/status system
  6. class Service(Enum):
  7. MAL=globals.SERVICE_MAL
  8. ANILIST=globals.SERVICE_ANILIST
  9. @staticmethod
  10. def from_str(label: str):
  11. if label is None: raise TypeError
  12. if label.upper() in ('MAL', 'MYANIMELIST', globals.SERVICE_MAL.upper()):
  13. return Service.MAL
  14. elif label.upper() in ('AL', 'ANILIST', globals.SERVICE_ANILIST.upper()):
  15. return Service.ANILIST
  16. else:
  17. raise NotImplementedError('Error: Cannot convert "{}" to a Service'.format(label))
  18. class MediaType(Enum):
  19. ANIME="ANIME"
  20. MANGA="MANGA"
  21. @staticmethod
  22. def from_str(label: str):
  23. if label is None: raise TypeError
  24. if label.upper() in ('ANIME', 'ANIME_LIST'):
  25. return MediaType.ANIME
  26. elif label.upper() in ('MANGA', 'MANGA_LIST'):
  27. return MediaType.MANGA
  28. else:
  29. raise NotImplementedError('Error: Cannot convert "{}" to a MediaType'.format(label))
  30. class MediaStatus(Enum):
  31. CURRENT=0
  32. PLANNING=1
  33. COMPLETED=2
  34. DROPPED=3
  35. PAUSED=4
  36. REPEATING=5
  37. @staticmethod
  38. def from_str(label: str):
  39. if label is None: raise TypeError
  40. first_word = label.split(' ')[0].upper()
  41. if first_word in ['READ', 'READING', 'WATCHED', 'WATCHING']:
  42. return MediaStatus.CURRENT
  43. elif first_word in ['PLANS', 'PLAN']:
  44. return MediaStatus.PLANNING
  45. elif first_word in ['COMPLETED']:
  46. return MediaStatus.COMPLETED
  47. elif first_word in ['DROPPED']:
  48. return MediaStatus.DROPPED
  49. elif first_word in ['PAUSED', 'ON-HOLD']:
  50. return MediaStatus.PAUSED
  51. elif first_word in ['REREAD', 'REREADING', 'REWATCHED', 'REWATCHING']:
  52. return MediaStatus.REPEATING
  53. else:
  54. raise NotImplementedError('Error: Cannot convert "{}" to a MediaStatus'.format(label))
  55. class User():
  56. data = None
  57. def __init__(self,
  58. id : int,
  59. service_id : int,
  60. name : str,
  61. servers : List[int]):
  62. self.id = id
  63. self.service_id = service_id
  64. self.name = name
  65. self.servers = servers
  66. class Media():
  67. def __init__(self,
  68. name : str,
  69. url : str,
  70. episodes : str,
  71. image : str,
  72. type : MediaType):
  73. self.name = name
  74. self.url = url
  75. self.episodes = episodes
  76. self.image = image
  77. self.type = type
  78. @staticmethod
  79. def get_number_episodes(activity): # TODO Dont work for MAL
  80. media_type = MediaType.from_str(activity["type"])
  81. episodes = '?'
  82. if media_type == MediaType.ANIME:
  83. episodes = activity["media"]["episodes"]
  84. elif media_type == MediaType.MANGA:
  85. episodes = activity["media"]["chapters"]
  86. else:
  87. raise NotImplementedError('Error: Unknown media type "{}"'.format(media_type))
  88. if episodes is None:
  89. episodes = '?'
  90. return episodes
  91. class Feed():
  92. def __init__(self,
  93. service : Service,
  94. date_publication : datetime.datetime,
  95. user : User,
  96. status : MediaStatus,
  97. description : str, # TODO Need to change
  98. media : Media
  99. ):
  100. self.service = service
  101. self.date_publication = date_publication
  102. self.user = user
  103. self.status = status
  104. self.media = media
  105. self.description = description
  106. def replace_all(text : str, replace_dic : dict) -> str:
  107. ''' Replace multiple substrings from a string '''
  108. if text is None or replace_dic is None:
  109. return text
  110. for replace_key, replace_value in replace_dic.items():
  111. text = text.replace(replace_key, replace_value)
  112. return text
  113. def filter_name(name : str) -> str:
  114. ''' Escapes special characters from name '''
  115. dic = {
  116. "♥": "\♥",
  117. "♀": "\♀",
  118. "♂": "\♂",
  119. "♪": "\♪",
  120. "☆": "\☆"
  121. }
  122. return replace_all(name, dic)
  123. # Check if the show's name ends with a show type and truncate it
  124. def truncate_end_show(media_name : str):
  125. if media_name is None: return media_name
  126. show_types = (
  127. '- TV',
  128. '- Movie',
  129. '- Special',
  130. '- OVA',
  131. '- ONA',
  132. '- Manga',
  133. '- Manhua',
  134. '- Manhwa',
  135. '- Novel',
  136. '- One-Shot',
  137. '- Doujinshi',
  138. '- Music',
  139. '- OEL',
  140. '- Unknown'
  141. )
  142. for show_type in show_types:
  143. if media_name.endswith(show_type):
  144. new_show = media_name[:-len(show_type)]
  145. # Check if space at the end
  146. if new_show.endswith(' '):
  147. new_show = new_show[:-1]
  148. return new_show
  149. return media_name
  150. def get_channels(server_id: int) -> dict:
  151. ''' Returns the registered channels for a server '''
  152. if server_id is None:
  153. return None
  154. # TODO Make generic execute
  155. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  156. cursor.execute("SELECT channel FROM t_servers WHERE server = %s", [server_id])
  157. channels = cursor.fetchall()
  158. cursor.close()
  159. return channels
  160. def is_server_in_db(server_id : str) -> bool:
  161. ''' Checks if server is registered in the database '''
  162. if server_id is None:
  163. return False
  164. cursor = globals.conn.cursor(buffered=True)
  165. cursor.execute("SELECT server FROM t_servers WHERE server=%s", [server_id])
  166. data = cursor.fetchone()
  167. cursor.close()
  168. return data is not None
  169. def get_users() -> List[dict]:
  170. ''' Returns all registered users '''
  171. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  172. cursor.execute('SELECT {}, service, servers FROM t_users'.format(globals.DB_USER_NAME))
  173. users = cursor.fetchall()
  174. cursor.close()
  175. return users
  176. def get_user_servers(user_name : str, service : Service) -> str:
  177. ''' Returns a list of every registered servers for a user of a specific service, as a string '''
  178. if user_name is None or service is None:
  179. return
  180. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  181. cursor.execute("SELECT servers FROM t_users WHERE LOWER({})=%s AND service=%s".format(globals.DB_USER_NAME),
  182. [user_name.lower(), service.value])
  183. user_servers = cursor.fetchone()
  184. cursor.close()
  185. if user_servers is not None:
  186. return user_servers["servers"]
  187. return None
  188. def remove_server_from_servers(server : str, servers : str) -> str:
  189. ''' Removes the server from a comma-separated string containing multiple servers '''
  190. servers_list = servers.split(',')
  191. # If the server is not found, return None
  192. if server not in servers_list:
  193. return None
  194. # Remove every occurence of server
  195. servers_list = [x for x in servers_list if x != server]
  196. # Build server-free string
  197. return ','.join(servers_list)
  198. def delete_user_from_db(user_name : str, service : Service) -> bool:
  199. ''' Removes the user from the database '''
  200. if user_name is None or service is None:
  201. globals.logger.warning("Error while trying to delete user '{}' with service '{}'".format(user_name, service))
  202. return False
  203. cursor = globals.conn.cursor(buffered=True)
  204. cursor.execute("DELETE FROM t_users WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  205. [user_name.lower(), service.value])
  206. globals.conn.commit()
  207. cursor.close()
  208. return True
  209. def update_user_servers_db(user_name : str, service : Service, servers : str) -> bool:
  210. if user_name is None or service is None or servers is None:
  211. globals.logger.warning("Error while trying to update user's servers. User '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  212. return False
  213. cursor = globals.conn.cursor(buffered=True)
  214. cursor.execute("UPDATE t_users SET servers = %s WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  215. [servers, user_name.lower(), service.value])
  216. globals.conn.commit()
  217. cursor.close()
  218. return True
  219. def insert_user_into_db(user_name : str, service : Service, servers : str) -> bool:
  220. ''' Add the user to the database '''
  221. if user_name is None or service is None or servers is None:
  222. globals.logger.warning("Error while trying to add user '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  223. return False
  224. cursor = globals.conn.cursor(buffered=True)
  225. cursor.execute("INSERT INTO t_users ({}, service, servers) VALUES (%s, %s, %s)".format(globals.DB_USER_NAME),
  226. [user_name, service.value, servers])
  227. globals.conn.commit()
  228. cursor.close()
  229. return True
  230. # TODO Create a Feed class instead of sending a lot of parameters