utils.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import datetime
  2. import re
  3. import urllib.request
  4. from enum import Enum
  5. from typing import List
  6. from bs4 import BeautifulSoup
  7. import myanimebot.globals as globals
  8. class Service(Enum):
  9. MAL=globals.SERVICE_MAL
  10. ANILIST=globals.SERVICE_ANILIST
  11. @staticmethod
  12. def from_str(label: str):
  13. if label.upper() in ('MAL', 'MYANIMELIST', globals.SERVICE_MAL.upper()):
  14. return Service.MAL
  15. elif label.upper() in ('AL', 'ANILIST', globals.SERVICE_ANILIST.upper()):
  16. return Service.ANILIST
  17. else:
  18. raise NotImplementedError('Error: Cannot convert "{}" to a Service'.format(label))
  19. class MediaType(Enum):
  20. ANIME="ANIME"
  21. MANGA="MANGA"
  22. @staticmethod
  23. def from_str(label: str):
  24. if label.upper() in ('ANIME', 'ANIME_LIST'):
  25. return MediaType.ANIME
  26. elif label.upper() in ('MANGA', 'MANGA_LIST'):
  27. return MediaType.MANGA
  28. else:
  29. raise NotImplementedError('Error: Cannot convert "{}" to a MediaType'.format(label))
  30. class User():
  31. data = None
  32. def __init__(self,
  33. id : int,
  34. service_id : int,
  35. name : str,
  36. servers : List[int]):
  37. self.id = id
  38. self.service_id = service_id
  39. self.name = name
  40. self.servers = servers
  41. class Media():
  42. def __init__(self,
  43. name : str,
  44. url : str,
  45. episodes : str,
  46. image : str,
  47. type : MediaType):
  48. self.name = name
  49. self.url = url
  50. self.episodes = episodes
  51. self.image = image
  52. self.type = type
  53. @staticmethod
  54. def get_number_episodes(activity):
  55. media_type = MediaType.from_str(activity["type"])
  56. episodes = '?'
  57. if media_type == MediaType.ANIME:
  58. episodes = activity["media"]["episodes"]
  59. elif media_type == MediaType.MANGA:
  60. episodes = activity["media"]["chapters"]
  61. else:
  62. raise NotImplementedError('Error: Unknown media type "{}"'.format(media_type))
  63. if episodes is None:
  64. episodes = '?'
  65. return episodes
  66. class Feed():
  67. def __init__(self,
  68. service : Service,
  69. date_publication : datetime.datetime,
  70. user : User,
  71. status : str, # TODO Need to change
  72. description : str, # TODO Need to change
  73. media : Media
  74. ):
  75. self.service = service
  76. self.date_publication = date_publication
  77. self.user = user
  78. self.status = status
  79. self.media = media
  80. self.description = description
  81. # Get thumbnail from an URL
  82. def getThumbnail(urlParam):
  83. url = "/".join((urlParam).split("/")[:5])
  84. websource = urllib.request.urlopen(url)
  85. soup = BeautifulSoup(websource.read(), "html.parser")
  86. image = re.search("(?P<url>https?://[^\s]+)", str(soup.find("img", {"itemprop": "image"}))).group("url")
  87. thumbnail = "".join(image.split('"')[:1]).replace('"','')
  88. return thumbnail
  89. # Replace multiple substrings from a string
  90. def replace_all(text : str, dic : dict) -> str:
  91. for i, j in dic.items():
  92. text = text.replace(i, j)
  93. return text
  94. # Escape special characters
  95. def filter_name(name):
  96. dic = {
  97. "♥": "\♥",
  98. "♀": "\♀",
  99. "♂": "\♂",
  100. "♪": "\♪",
  101. "☆": "\☆"
  102. }
  103. return replace_all(name, dic)
  104. # Check if the show's name ends with a show type and truncate it
  105. def truncate_end_show(show):
  106. SHOW_TYPES = (
  107. '- TV',
  108. '- Movie',
  109. '- Special',
  110. '- OVA',
  111. '- ONA',
  112. '- Manga',
  113. '- Manhua',
  114. '- Manhwa',
  115. '- Novel',
  116. '- One-Shot',
  117. '- Doujinshi',
  118. '- Music',
  119. '- OEL',
  120. '- Unknown'
  121. )
  122. if show.endswith(SHOW_TYPES):
  123. return show[:show.rindex('-') - 1]
  124. return show
  125. def get_channels(server_id: int) -> dict:
  126. ''' Returns the registered channels for a server '''
  127. if server_id is None:
  128. return None
  129. # TODO Make generic execute
  130. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  131. cursor.execute("SELECT channel FROM t_servers WHERE server = %s", [server_id])
  132. channels = cursor.fetchall()
  133. cursor.close()
  134. return channels
  135. def is_server_in_db(server_id : str) -> bool:
  136. ''' Checks if server is registered in the database '''
  137. if server_id is None:
  138. return False
  139. cursor = globals.conn.cursor(buffered=True)
  140. cursor.execute("SELECT server FROM t_servers WHERE server=%s", [server_id])
  141. data = cursor.fetchone()
  142. cursor.close()
  143. return data is not None
  144. def get_users() -> List[dict]:
  145. ''' Returns all registered users '''
  146. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  147. cursor.execute('SELECT {}, service, servers FROM t_users'.format(globals.DB_USER_NAME))
  148. users = cursor.fetchall()
  149. cursor.close()
  150. return users
  151. def get_user_servers(user_name : str, service : Service) -> str:
  152. ''' Returns a list of every registered servers for a user of a specific service, as a string '''
  153. if user_name is None or service is None:
  154. return
  155. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  156. cursor.execute("SELECT servers FROM t_users WHERE LOWER({})=%s AND service=%s".format(globals.DB_USER_NAME),
  157. [user_name.lower(), service.value])
  158. user_servers = cursor.fetchone()
  159. cursor.close()
  160. if user_servers is not None:
  161. return user_servers["servers"]
  162. return None
  163. def remove_server_from_servers(server : str, servers : str) -> str:
  164. ''' Removes the server from a comma-separated string containing multiple servers '''
  165. servers_list = servers.split(',')
  166. # If the server is not found, return None
  167. if server not in servers_list:
  168. return None
  169. # Remove every occurence of server
  170. servers_list = [x for x in servers_list if x != server]
  171. # Build server-free string
  172. return ','.join(servers_list)
  173. def delete_user_from_db(user_name : str, service : Service) -> bool:
  174. ''' Removes the user from the database '''
  175. if user_name is None or service is None:
  176. globals.logger.warning("Error while trying to delete user '{}' with service '{}'".format(user_name, service))
  177. return False
  178. cursor = globals.conn.cursor(buffered=True)
  179. cursor.execute("DELETE FROM t_users WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  180. [user_name.lower(), service.value])
  181. globals.conn.commit()
  182. cursor.close()
  183. return True
  184. def update_user_servers_db(user_name : str, service : Service, servers : str) -> bool:
  185. if user_name is None or service is None or servers is None:
  186. globals.logger.warning("Error while trying to update user's servers. User '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  187. return False
  188. cursor = globals.conn.cursor(buffered=True)
  189. cursor.execute("UPDATE t_users SET servers = %s WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  190. [servers, user_name.lower(), service.value])
  191. globals.conn.commit()
  192. cursor.close()
  193. return True
  194. def insert_user_into_db(user_name : str, service : Service, servers : str) -> bool:
  195. ''' Add the user to the database '''
  196. if user_name is None or service is None or servers is None:
  197. globals.logger.warning("Error while trying to add user '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  198. return False
  199. cursor = globals.conn.cursor(buffered=True)
  200. cursor.execute("INSERT INTO t_users ({}, service, servers) VALUES (%s, %s, %s)".format(globals.DB_USER_NAME),
  201. [user_name, service.value, servers])
  202. globals.conn.commit()
  203. cursor.close()
  204. return True