utils.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. import datetime
  2. from enum import Enum
  3. from typing import List
  4. import myanimebot.globals as globals
  5. # TODO Redo all of the desc/status system
  6. class Service(Enum):
  7. MAL=globals.SERVICE_MAL
  8. ANILIST=globals.SERVICE_ANILIST
  9. @staticmethod
  10. def from_str(label: str):
  11. if label.upper() in ('MAL', 'MYANIMELIST', globals.SERVICE_MAL.upper()):
  12. return Service.MAL
  13. elif label.upper() in ('AL', 'ANILIST', globals.SERVICE_ANILIST.upper()):
  14. return Service.ANILIST
  15. else:
  16. raise NotImplementedError('Error: Cannot convert "{}" to a Service'.format(label))
  17. class MediaType(Enum):
  18. ANIME="ANIME"
  19. MANGA="MANGA"
  20. @staticmethod
  21. def from_str(label: str):
  22. if label.upper() in ('ANIME', 'ANIME_LIST'):
  23. return MediaType.ANIME
  24. elif label.upper() in ('MANGA', 'MANGA_LIST'):
  25. return MediaType.MANGA
  26. else:
  27. raise NotImplementedError('Error: Cannot convert "{}" to a MediaType'.format(label))
  28. class User():
  29. data = None
  30. def __init__(self,
  31. id : int,
  32. service_id : int,
  33. name : str,
  34. servers : List[int]):
  35. self.id = id
  36. self.service_id = service_id
  37. self.name = name
  38. self.servers = servers
  39. class Media():
  40. def __init__(self,
  41. name : str,
  42. url : str,
  43. episodes : str,
  44. image : str,
  45. type : MediaType):
  46. self.name = name
  47. self.url = url
  48. self.episodes = episodes
  49. self.image = image
  50. self.type = type
  51. @staticmethod
  52. def get_number_episodes(activity): # TODO Dont work for MAL
  53. media_type = MediaType.from_str(activity["type"])
  54. episodes = '?'
  55. if media_type == MediaType.ANIME:
  56. episodes = activity["media"]["episodes"]
  57. elif media_type == MediaType.MANGA:
  58. episodes = activity["media"]["chapters"]
  59. else:
  60. raise NotImplementedError('Error: Unknown media type "{}"'.format(media_type))
  61. if episodes is None:
  62. episodes = '?'
  63. return episodes
  64. class Feed():
  65. def __init__(self,
  66. service : Service,
  67. date_publication : datetime.datetime,
  68. user : User,
  69. status : str, # TODO Need to change
  70. description : str, # TODO Need to change
  71. media : Media
  72. ):
  73. self.service = service
  74. self.date_publication = date_publication
  75. self.user = user
  76. self.status = status
  77. self.media = media
  78. self.description = description
  79. def replace_all(text : str, replace_dic : dict) -> str:
  80. ''' Replace multiple substrings from a string '''
  81. for replace_key, replace_value in replace_dic.items():
  82. text = text.replace(replace_key, replace_value)
  83. return text
  84. def filter_name(name : str) -> str:
  85. ''' Escapes special characters from name '''
  86. dic = {
  87. "♥": "\♥",
  88. "♀": "\♀",
  89. "♂": "\♂",
  90. "♪": "\♪",
  91. "☆": "\☆"
  92. }
  93. return replace_all(name, dic)
  94. # Check if the show's name ends with a show type and truncate it
  95. def truncate_end_show(show):
  96. show_types = (
  97. '- TV',
  98. '- Movie',
  99. '- Special',
  100. '- OVA',
  101. '- ONA',
  102. '- Manga',
  103. '- Manhua',
  104. '- Manhwa',
  105. '- Novel',
  106. '- One-Shot',
  107. '- Doujinshi',
  108. '- Music',
  109. '- OEL',
  110. '- Unknown'
  111. )
  112. for show_type in show_types:
  113. if show.endswith(show_type):
  114. new_show = show[:-len(show_type)]
  115. # Check if space at the end
  116. if new_show.endswith(' '):
  117. new_show = new_show[:-1]
  118. return new_show
  119. return show
  120. def get_channels(server_id: int) -> dict:
  121. ''' Returns the registered channels for a server '''
  122. if server_id is None:
  123. return None
  124. # TODO Make generic execute
  125. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  126. cursor.execute("SELECT channel FROM t_servers WHERE server = %s", [server_id])
  127. channels = cursor.fetchall()
  128. cursor.close()
  129. return channels
  130. def is_server_in_db(server_id : str) -> bool:
  131. ''' Checks if server is registered in the database '''
  132. if server_id is None:
  133. return False
  134. cursor = globals.conn.cursor(buffered=True)
  135. cursor.execute("SELECT server FROM t_servers WHERE server=%s", [server_id])
  136. data = cursor.fetchone()
  137. cursor.close()
  138. return data is not None
  139. def get_users() -> List[dict]:
  140. ''' Returns all registered users '''
  141. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  142. cursor.execute('SELECT {}, service, servers FROM t_users'.format(globals.DB_USER_NAME))
  143. users = cursor.fetchall()
  144. cursor.close()
  145. return users
  146. def get_user_servers(user_name : str, service : Service) -> str:
  147. ''' Returns a list of every registered servers for a user of a specific service, as a string '''
  148. if user_name is None or service is None:
  149. return
  150. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  151. cursor.execute("SELECT servers FROM t_users WHERE LOWER({})=%s AND service=%s".format(globals.DB_USER_NAME),
  152. [user_name.lower(), service.value])
  153. user_servers = cursor.fetchone()
  154. cursor.close()
  155. if user_servers is not None:
  156. return user_servers["servers"]
  157. return None
  158. def remove_server_from_servers(server : str, servers : str) -> str:
  159. ''' Removes the server from a comma-separated string containing multiple servers '''
  160. servers_list = servers.split(',')
  161. # If the server is not found, return None
  162. if server not in servers_list:
  163. return None
  164. # Remove every occurence of server
  165. servers_list = [x for x in servers_list if x != server]
  166. # Build server-free string
  167. return ','.join(servers_list)
  168. def delete_user_from_db(user_name : str, service : Service) -> bool:
  169. ''' Removes the user from the database '''
  170. if user_name is None or service is None:
  171. globals.logger.warning("Error while trying to delete user '{}' with service '{}'".format(user_name, service))
  172. return False
  173. cursor = globals.conn.cursor(buffered=True)
  174. cursor.execute("DELETE FROM t_users WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  175. [user_name.lower(), service.value])
  176. globals.conn.commit()
  177. cursor.close()
  178. return True
  179. def update_user_servers_db(user_name : str, service : Service, servers : str) -> bool:
  180. if user_name is None or service is None or servers is None:
  181. globals.logger.warning("Error while trying to update user's servers. User '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  182. return False
  183. cursor = globals.conn.cursor(buffered=True)
  184. cursor.execute("UPDATE t_users SET servers = %s WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  185. [servers, user_name.lower(), service.value])
  186. globals.conn.commit()
  187. cursor.close()
  188. return True
  189. def insert_user_into_db(user_name : str, service : Service, servers : str) -> bool:
  190. ''' Add the user to the database '''
  191. if user_name is None or service is None or servers is None:
  192. globals.logger.warning("Error while trying to add user '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  193. return False
  194. cursor = globals.conn.cursor(buffered=True)
  195. cursor.execute("INSERT INTO t_users ({}, service, servers) VALUES (%s, %s, %s)".format(globals.DB_USER_NAME),
  196. [user_name, service.value, servers])
  197. globals.conn.commit()
  198. cursor.close()
  199. return True
  200. # TODO Create a Feed class instead of sending a lot of parameters