1
0

utils.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import datetime
  2. import re
  3. import urllib.request
  4. from enum import Enum
  5. from typing import List
  6. from bs4 import BeautifulSoup
  7. import myanimebot.globals as globals
  8. class Service(Enum):
  9. MAL=globals.SERVICE_MAL
  10. ANILIST=globals.SERVICE_ANILIST
  11. @staticmethod
  12. def from_str(label: str):
  13. if label.upper() in ('MAL', 'MYANIMELIST', globals.SERVICE_MAL.upper()):
  14. return Service.MAL
  15. elif label.upper() in ('AL', 'ANILIST', globals.SERVICE_ANILIST.upper()):
  16. return Service.ANILIST
  17. else:
  18. raise NotImplementedError('Error: Cannot convert "{}" to a Service'.format(label))
  19. class MediaType(Enum):
  20. ANIME="ANIME"
  21. MANGA="MANGA"
  22. @staticmethod
  23. def from_str(label: str):
  24. if label.upper() in ('ANIME', 'ANIME_LIST'):
  25. return MediaType.ANIME
  26. elif label.upper() in ('MANGA', 'MANGA_LIST'):
  27. return MediaType.MANGA
  28. else:
  29. raise NotImplementedError('Error: Cannot convert "{}" to a MediaType'.format(label))
  30. class User():
  31. data = None
  32. def __init__(self,
  33. id : int,
  34. service_id : int,
  35. name : str,
  36. servers : List[int]):
  37. self.id = id
  38. self.service_id = service_id
  39. self.name = name
  40. self.servers = servers
  41. class Media():
  42. def __init__(self,
  43. name : str,
  44. url : str,
  45. episodes : str,
  46. image : str,
  47. type : MediaType):
  48. self.name = name
  49. self.url = url
  50. self.episodes = episodes
  51. self.image = image
  52. self.type = type
  53. @staticmethod
  54. def get_number_episodes(activity):
  55. media_type = MediaType.from_str(activity["type"])
  56. episodes = '?'
  57. if media_type == MediaType.ANIME:
  58. episodes = activity["media"]["episodes"]
  59. elif media_type == MediaType.MANGA:
  60. episodes = activity["media"]["chapters"]
  61. else:
  62. raise NotImplementedError('Error: Unknown media type "{}"'.format(media_type))
  63. if episodes is None:
  64. episodes = '?'
  65. return episodes
  66. class Feed():
  67. def __init__(self,
  68. service : Service,
  69. date_publication : datetime.datetime,
  70. user : User,
  71. status : str, # TODO Need to change
  72. description : str, # TODO Need to change
  73. media : Media
  74. ):
  75. self.service = service
  76. self.date_publication = date_publication
  77. self.user = user
  78. self.status = status
  79. self.media = media
  80. self.description = description
  81. # Get thumbnail from an URL
  82. def getThumbnail(urlParam):
  83. url = "/".join((urlParam).split("/")[:5])
  84. websource = urllib.request.urlopen(url)
  85. soup = BeautifulSoup(websource.read(), "html.parser")
  86. image = re.search("(?P<url>https?://[^\s]+)", str(soup.find("img", {"itemprop": "image"}))).group("url")
  87. thumbnail = "".join(image.split('"')[:1]).replace('"','')
  88. return thumbnail
  89. def replace_all(text : str, replace_dic : dict) -> str:
  90. ''' Replace multiple substrings from a string '''
  91. for replace_key, replace_value in replace_dic.items():
  92. text = text.replace(replace_key, replace_value)
  93. return text
  94. def filter_name(name : str) -> str:
  95. ''' Escapes special characters from name '''
  96. dic = {
  97. "♥": "\♥",
  98. "♀": "\♀",
  99. "♂": "\♂",
  100. "♪": "\♪",
  101. "☆": "\☆"
  102. }
  103. return replace_all(name, dic)
  104. # Check if the show's name ends with a show type and truncate it
  105. def truncate_end_show(show):
  106. show_types = (
  107. '- TV',
  108. '- Movie',
  109. '- Special',
  110. '- OVA',
  111. '- ONA',
  112. '- Manga',
  113. '- Manhua',
  114. '- Manhwa',
  115. '- Novel',
  116. '- One-Shot',
  117. '- Doujinshi',
  118. '- Music',
  119. '- OEL',
  120. '- Unknown'
  121. )
  122. for show_type in show_types:
  123. if show.endswith(show_type):
  124. new_show = show[:-len(show_type)]
  125. # Check if space at the end
  126. if new_show.endswith(' '):
  127. new_show = new_show[:-1]
  128. return new_show
  129. return show
  130. def get_channels(server_id: int) -> dict:
  131. ''' Returns the registered channels for a server '''
  132. if server_id is None:
  133. return None
  134. # TODO Make generic execute
  135. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  136. cursor.execute("SELECT channel FROM t_servers WHERE server = %s", [server_id])
  137. channels = cursor.fetchall()
  138. cursor.close()
  139. return channels
  140. def is_server_in_db(server_id : str) -> bool:
  141. ''' Checks if server is registered in the database '''
  142. if server_id is None:
  143. return False
  144. cursor = globals.conn.cursor(buffered=True)
  145. cursor.execute("SELECT server FROM t_servers WHERE server=%s", [server_id])
  146. data = cursor.fetchone()
  147. cursor.close()
  148. return data is not None
  149. def get_users() -> List[dict]:
  150. ''' Returns all registered users '''
  151. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  152. cursor.execute('SELECT {}, service, servers FROM t_users'.format(globals.DB_USER_NAME))
  153. users = cursor.fetchall()
  154. cursor.close()
  155. return users
  156. def get_user_servers(user_name : str, service : Service) -> str:
  157. ''' Returns a list of every registered servers for a user of a specific service, as a string '''
  158. if user_name is None or service is None:
  159. return
  160. cursor = globals.conn.cursor(buffered=True, dictionary=True)
  161. cursor.execute("SELECT servers FROM t_users WHERE LOWER({})=%s AND service=%s".format(globals.DB_USER_NAME),
  162. [user_name.lower(), service.value])
  163. user_servers = cursor.fetchone()
  164. cursor.close()
  165. if user_servers is not None:
  166. return user_servers["servers"]
  167. return None
  168. def remove_server_from_servers(server : str, servers : str) -> str:
  169. ''' Removes the server from a comma-separated string containing multiple servers '''
  170. servers_list = servers.split(',')
  171. # If the server is not found, return None
  172. if server not in servers_list:
  173. return None
  174. # Remove every occurence of server
  175. servers_list = [x for x in servers_list if x != server]
  176. # Build server-free string
  177. return ','.join(servers_list)
  178. def delete_user_from_db(user_name : str, service : Service) -> bool:
  179. ''' Removes the user from the database '''
  180. if user_name is None or service is None:
  181. globals.logger.warning("Error while trying to delete user '{}' with service '{}'".format(user_name, service))
  182. return False
  183. cursor = globals.conn.cursor(buffered=True)
  184. cursor.execute("DELETE FROM t_users WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  185. [user_name.lower(), service.value])
  186. globals.conn.commit()
  187. cursor.close()
  188. return True
  189. def update_user_servers_db(user_name : str, service : Service, servers : str) -> bool:
  190. if user_name is None or service is None or servers is None:
  191. globals.logger.warning("Error while trying to update user's servers. User '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  192. return False
  193. cursor = globals.conn.cursor(buffered=True)
  194. cursor.execute("UPDATE t_users SET servers = %s WHERE LOWER({}) = %s AND service=%s".format(globals.DB_USER_NAME),
  195. [servers, user_name.lower(), service.value])
  196. globals.conn.commit()
  197. cursor.close()
  198. return True
  199. def insert_user_into_db(user_name : str, service : Service, servers : str) -> bool:
  200. ''' Add the user to the database '''
  201. if user_name is None or service is None or servers is None:
  202. globals.logger.warning("Error while trying to add user '{}' with service '{}' and servers '{}'".format(user_name, service, servers))
  203. return False
  204. cursor = globals.conn.cursor(buffered=True)
  205. cursor.execute("INSERT INTO t_users ({}, service, servers) VALUES (%s, %s, %s)".format(globals.DB_USER_NAME),
  206. [user_name, service.value, servers])
  207. globals.conn.commit()
  208. cursor.close()
  209. return True