"""AnimeEasySync model module""" from base64 import b64decode from json import loads from time import sleep import requests from .oauth import refresh_mal_token, refresh_anilist_token, refresh_kitsu_token ANILIST_TO_STATUS = { 'CURRENT': 'watching', 'COMPLETED': 'completed', 'PAUSED': 'on_hold', 'DROPPED': 'dropped', 'PLANNING': 'plan_to_watch', 'REPEATING': 'watching' } KITSU_TO_STATUS = { 'current': 'watching', 'completed': 'completed', 'on_hold': 'on_hold', 'dropped': 'dropped', 'planned': 'plan_to_watch' } MAL_API_URL = 'https://api.myanimelist.net/v2' KITSU_API_URL = 'https://kitsu.io/api/edge' ANILIST_GRAPHQL_URL = 'https://graphql.anilist.co' def get_userid_from_token(token): """Get userId from Anilist token""" payload = token['access_token'].split( '.')[1].replace('-', '+').replace('_', '/') payload = payload + '='*(len(payload) % 4) return int(loads(b64decode(payload))['sub']) class BearerAuth(requests.auth.AuthBase): """Bearer Auth header""" def __init__(self, token): self.token = token['access_token'] def __call__(self, r): r.headers["authorization"] = "Bearer " + self.token return r def mal_to_entry(mal_entry): """Map MAL entry to AES entry""" return { 'mal_id': str(mal_entry['node']['id']), 'title': mal_entry['node']['title'], 'score': mal_entry['list_status']['score'], 'status': mal_entry['list_status']['status'], 'progress': mal_entry['list_status']['num_episodes_watched'], 'link': f'https://myanimelist.net/anime/{mal_entry["node"]["id"]}' } def read_mal(token): """Read MAL entries""" entries = [] url = MAL_API_URL+'/users/@me/animelist?fields=list_status&limit=1000&nsfw=true' auth = BearerAuth(token) while True: response = requests.get(url=url, auth=auth) if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member refresh_mal_token(token) if not token: return entries auth = BearerAuth(token) continue mal = response.json().get('data') if mal is None: print('read_mal exception') print(response.json()) return entries for mal_entry in mal: entries.append(mal_to_entry(mal_entry)) paging = response.json().get('paging') if paging: url = paging.get('next') if url: continue break return entries def anilist_to_entry(anilist_entry): """Map Anilist entry to AES entry""" return { 'anilist_id': str(anilist_entry['media']['id']), 'mal_id': str(anilist_entry['media']['idMal']) if anilist_entry['media']['idMal'] else '', 'title': f"{anilist_entry['media']['title']['english']} " + f"({anilist_entry['media']['title']['romaji']})", 'score': anilist_entry['score'], 'status': ANILIST_TO_STATUS[anilist_entry['status']], 'progress': anilist_entry['progress'], 'link': anilist_entry['media']['siteUrl'] } def read_anilist(token): """Read Anilist entries""" entries = [] auth = BearerAuth(token) anilist_query = ''' query ($chunk: Int, $userId: Int) { MediaListCollection(userId: $userId, type: ANIME, chunk: $chunk, perChunk: 500) { hasNextChunk lists { name isCustomList isSplitCompletedList status entries { status score(format: POINT_10) progress media { id idMal title { english romaji } siteUrl } } } } } ''' chunk = 1 while True: variables = { 'chunk': chunk, 'userId': get_userid_from_token(token) } response = requests.post( ANILIST_GRAPHQL_URL, json={ 'query': anilist_query, 'variables': variables}, auth=auth) if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member refresh_anilist_token(token) if not token: return entries auth = BearerAuth(token) continue anilist = response.json().get('data') if anilist is None: print('read_anilist exception') print(response.json()) return entries for lst in anilist['MediaListCollection']['lists']: for anilist_entry in lst['entries']: entries.append(anilist_to_entry(anilist_entry)) if anilist['MediaListCollection']['hasNextChunk']: chunk += 1 else: break return entries def get_kitsu_user_id(token): """Get user's Kitsu user id""" url = KITSU_API_URL+'/users?filter[self]=true&fields[users]=name' auth = BearerAuth(token) while True: response = requests.get(url, auth=auth) if response.ok: user_id = response.json()['data'][0]['id'] return user_id if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member refresh_kitsu_token(token) if not token: return None auth = BearerAuth(token) continue print('get_kitsu_user_id exception') print(response.json()) return None def kitsu_to_entry(kitsu_entry, kitsu_anime, kitsu_mappings, kitsu_library_entry_id): """Map Kitsu entry to AES entry""" mal_id = next((str(x['attributes']['externalId']) for x in kitsu_mappings if x['attributes']['externalSite'] == 'myanimelist/anime'), '') anilist_id = next((str(x['attributes']['externalId']) for x in kitsu_mappings if x['attributes']['externalSite'] == 'anilist/anime'), '') if not anilist_id: anilist_id = next((str(x['attributes']['externalId'][6:] if x['attributes']['externalId'][:6] == 'anime/' else x['attributes']['externalId']) if x['attributes']['externalId'] else 0 for x in kitsu_mappings if x['attributes']['externalSite'] == 'anilist'), '') return { 'kitsu_id': str(kitsu_anime['id']), 'mal_id': str(mal_id) if mal_id else '', 'anilist_id': str(anilist_id) if anilist_id else '', 'title': kitsu_anime['attributes']['canonicalTitle'], 'score': (int(kitsu_entry['ratingTwenty']) if kitsu_entry['ratingTwenty'] else 0)//2, 'status': KITSU_TO_STATUS[kitsu_entry['status']], 'progress': kitsu_entry['progress'], 'library_entry_id': kitsu_library_entry_id, 'link': f'https://kitsu.io/anime/{kitsu_anime["id"]}' } def read_kitsu(token, user_id): """Read Kitsu entries""" entries = [] url = KITSU_API_URL+f"/library-entries?filter[user_id]={user_id}" + \ "&filter[kind]=anime&include=anime,anime.mappings" + \ "&fields[anime]=canonicalTitle,episodeCount,slug,mappings" + \ "&fields[mappings]=externalSite,externalId&page[limit]=500&page[offset]=0" auth = BearerAuth(token) retries = 2 while True: response = requests.get(url, auth=auth) if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member refresh_kitsu_token(token) if not token: return entries auth = BearerAuth(token) continue if response.status_code == requests.codes.server_error and retries > 0: # pylint: disable=no-member retries -= 1 sleep(1) continue kitsu = response.json().get('data') if kitsu is None: print('read_kitsu exception') print(response.json()) return entries anime = [x for x in response.json()['included'] if x['type'] == 'anime'] mappings = [x for x in response.json()['included'] if x['type'] == 'mappings'] for data in kitsu: kitsu_anime = next( y for y in anime if y['id'] == data['relationships']['anime']['data']['id']) kitsu_mappings = [] for anime_entry in kitsu_anime['relationships']['mappings']['data']: kitsu_mappings.append( next(z for z in mappings if anime_entry['id'] == z['id'])) entries.append(kitsu_to_entry( data['attributes'], kitsu_anime, kitsu_mappings, data['id'])) links = response.json().get('links') if links: url = links.get('next') if url: retries = 2 continue break return entries def update_mal(token, mal_id, status=None, score=None, progress=None): """Update a MAL anime entry on user's list""" auth = BearerAuth(token) url = MAL_API_URL+f'/anime/{mal_id}/my_list_status' data = {} if status: data['status'] = status if score is not None: data['score'] = score if progress is not None: data['num_watched_episodes'] = progress while True: response = requests.put(url=url, data=data, auth=auth) if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member refresh_mal_token(token) if not token: return False auth = BearerAuth(token) continue if not response.ok: print(response.json()) return False return True def update_kitsu(token, kitsu_library_entry_id, status=None, score=None, progress=None): """Update a Kitsu anime entry on user's list""" auth = BearerAuth(token) headers = {'Content-Type': 'application/vnd.api+json'} url = KITSU_API_URL+f'/library-entries/{kitsu_library_entry_id}' data = { 'data': { 'type': 'libraryEntries', 'id': str(kitsu_library_entry_id), 'attributes': {} } } if status is not None: data['data']['attributes']['status'] = list(KITSU_TO_STATUS.keys())[ list(KITSU_TO_STATUS.values()).index(status)] if score is not None: data['data']['attributes']['ratingTwenty'] = int(score*2) if progress is not None: data['data']['attributes']['progress'] = progress while True: response = requests.patch( url=url, headers=headers, auth=auth, json=data) if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member refresh_kitsu_token(token) if not token: return False auth = BearerAuth(token) continue if not response.ok: print(response.json()) return False return True def update_anilist(token, anilist_id, status=None, score=None, progress=None): """Add or update an Anilist anime entry on user's list""" auth = BearerAuth(token) anilist_query = ''' mutation ($id: Int, $status: MediaListStatus, $score: Float, $progress: Int) { SaveMediaListEntry(id: $id, status: $status, score: $score, progress: $progress) { id status score(format: POINT_10) progress } } ''' variables = { 'id': int(anilist_id) } if status is not None: variables['status'] = list(ANILIST_TO_STATUS.keys())[ list(ANILIST_TO_STATUS.values()).index(status)] if score is not None: variables['score'] = float(score) if progress is not None: variables['progress'] = int(progress) data = {'query': anilist_query, 'variables': variables} while True: response = requests.post(url=ANILIST_GRAPHQL_URL, auth=auth, json=data) if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member refresh_anilist_token(token) if not token: return False auth = BearerAuth(token) continue if not response.ok: print(response.json()) return False return True UPDATE_FUNCS = { 'mal': update_mal, 'anilist': update_anilist, 'kitsu': update_kitsu }