367 lines
12 KiB
Python
367 lines
12 KiB
Python
"""AnimeEasySync model module"""
|
|
from base64 import b64decode
|
|
from json import loads
|
|
from time import sleep
|
|
import requests
|
|
from .oauth import refresh_mal_token, refresh_anilist_token, refresh_kitsu_token
|
|
|
|
|
|
ANILIST_TO_STATUS = {
|
|
'CURRENT': 'watching',
|
|
'COMPLETED': 'completed',
|
|
'PAUSED': 'on_hold',
|
|
'DROPPED': 'dropped',
|
|
'PLANNING': 'plan_to_watch',
|
|
'REPEATING': 'watching'
|
|
}
|
|
|
|
KITSU_TO_STATUS = {
|
|
'current': 'watching',
|
|
'completed': 'completed',
|
|
'on_hold': 'on_hold',
|
|
'dropped': 'dropped',
|
|
'planned': 'plan_to_watch'
|
|
}
|
|
|
|
MAL_API_URL = 'https://api.myanimelist.net/v2'
|
|
KITSU_API_URL = 'https://kitsu.io/api/edge'
|
|
ANILIST_GRAPHQL_URL = 'https://graphql.anilist.co'
|
|
|
|
|
|
def get_userid_from_token(token):
|
|
"""Get userId from Anilist token"""
|
|
payload = token['access_token'].split(
|
|
'.')[1].replace('-', '+').replace('_', '/')
|
|
payload = payload + '='*(len(payload) % 4)
|
|
return int(loads(b64decode(payload))['sub'])
|
|
|
|
|
|
class BearerAuth(requests.auth.AuthBase):
|
|
"""Bearer Auth header"""
|
|
|
|
def __init__(self, token):
|
|
self.token = token['access_token']
|
|
|
|
def __call__(self, r):
|
|
r.headers["authorization"] = "Bearer " + self.token
|
|
return r
|
|
|
|
|
|
def mal_to_entry(mal_entry):
|
|
"""Map MAL entry to AES entry"""
|
|
return {
|
|
'mal_id': str(mal_entry['node']['id']),
|
|
'title': mal_entry['node']['title'],
|
|
'score': mal_entry['list_status']['score'],
|
|
'status': mal_entry['list_status']['status'],
|
|
'progress': mal_entry['list_status']['num_episodes_watched'],
|
|
'link': f'https://myanimelist.net/anime/{mal_entry["node"]["id"]}'
|
|
}
|
|
|
|
|
|
def read_mal(token):
|
|
"""Read MAL entries"""
|
|
entries = []
|
|
url = MAL_API_URL+'/users/@me/animelist?fields=list_status&limit=1000&nsfw=true'
|
|
auth = BearerAuth(token)
|
|
while True:
|
|
response = requests.get(url=url, auth=auth)
|
|
if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member
|
|
refresh_mal_token(token)
|
|
if not token:
|
|
return entries
|
|
auth = BearerAuth(token)
|
|
continue
|
|
mal = response.json().get('data')
|
|
if mal is None:
|
|
print('read_mal exception')
|
|
print(response.json())
|
|
return entries
|
|
for mal_entry in mal:
|
|
entries.append(mal_to_entry(mal_entry))
|
|
paging = response.json().get('paging')
|
|
if paging:
|
|
url = paging.get('next')
|
|
if url:
|
|
continue
|
|
break
|
|
return entries
|
|
|
|
|
|
def anilist_to_entry(anilist_entry):
|
|
"""Map Anilist entry to AES entry"""
|
|
return {
|
|
'anilist_id': str(anilist_entry['media']['id']),
|
|
'mal_id': str(anilist_entry['media']['idMal']) if anilist_entry['media']['idMal'] else '',
|
|
'title': f"{anilist_entry['media']['title']['english']} " +
|
|
f"({anilist_entry['media']['title']['romaji']})",
|
|
'score': anilist_entry['score'],
|
|
'status': ANILIST_TO_STATUS[anilist_entry['status']],
|
|
'progress': anilist_entry['progress'],
|
|
'link': anilist_entry['media']['siteUrl']
|
|
}
|
|
|
|
|
|
def read_anilist(token):
|
|
"""Read Anilist entries"""
|
|
entries = []
|
|
auth = BearerAuth(token)
|
|
anilist_query = '''
|
|
query ($chunk: Int, $userId: Int) {
|
|
MediaListCollection(userId: $userId, type: ANIME, chunk: $chunk, perChunk: 500) {
|
|
hasNextChunk
|
|
lists {
|
|
name
|
|
isCustomList
|
|
isSplitCompletedList
|
|
status
|
|
entries {
|
|
status
|
|
score(format: POINT_10)
|
|
progress
|
|
media {
|
|
id
|
|
idMal
|
|
title {
|
|
english
|
|
romaji
|
|
}
|
|
siteUrl
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
'''
|
|
chunk = 1
|
|
while True:
|
|
variables = {
|
|
'chunk': chunk,
|
|
'userId': get_userid_from_token(token)
|
|
}
|
|
response = requests.post(
|
|
ANILIST_GRAPHQL_URL, json={
|
|
'query': anilist_query, 'variables': variables},
|
|
auth=auth)
|
|
if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member
|
|
refresh_anilist_token(token)
|
|
if not token:
|
|
return entries
|
|
auth = BearerAuth(token)
|
|
continue
|
|
anilist = response.json().get('data')
|
|
if anilist is None:
|
|
print('read_anilist exception')
|
|
print(response.json())
|
|
return entries
|
|
for lst in anilist['MediaListCollection']['lists']:
|
|
for anilist_entry in lst['entries']:
|
|
entries.append(anilist_to_entry(anilist_entry))
|
|
if anilist['MediaListCollection']['hasNextChunk']:
|
|
chunk += 1
|
|
else:
|
|
break
|
|
return entries
|
|
|
|
|
|
def get_kitsu_user_id(token):
|
|
"""Get user's Kitsu user id"""
|
|
url = KITSU_API_URL+'/users?filter[self]=true&fields[users]=name'
|
|
auth = BearerAuth(token)
|
|
while True:
|
|
response = requests.get(url, auth=auth)
|
|
if response.ok:
|
|
user_id = response.json()['data'][0]['id']
|
|
return user_id
|
|
if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member
|
|
refresh_kitsu_token(token)
|
|
if not token:
|
|
return None
|
|
auth = BearerAuth(token)
|
|
continue
|
|
print('get_kitsu_user_id exception')
|
|
print(response.json())
|
|
return None
|
|
|
|
|
|
def kitsu_to_entry(kitsu_entry, kitsu_anime, kitsu_mappings, kitsu_library_entry_id):
|
|
"""Map Kitsu entry to AES entry"""
|
|
mal_id = next((str(x['attributes']['externalId'])
|
|
for x in kitsu_mappings
|
|
if x['attributes']['externalSite'] == 'myanimelist/anime'), '')
|
|
anilist_id = next((str(x['attributes']['externalId'])
|
|
for x in kitsu_mappings
|
|
if x['attributes']['externalSite'] == 'anilist/anime'), '')
|
|
if not anilist_id:
|
|
anilist_id = next((str(x['attributes']['externalId'][6:]
|
|
if x['attributes']['externalId'][:6] == 'anime/'
|
|
else x['attributes']['externalId'])
|
|
if x['attributes']['externalId'] else 0
|
|
for x in kitsu_mappings
|
|
if x['attributes']['externalSite'] == 'anilist'), '')
|
|
return {
|
|
'kitsu_id': str(kitsu_anime['id']),
|
|
'mal_id': str(mal_id) if mal_id else '',
|
|
'anilist_id': str(anilist_id) if anilist_id else '',
|
|
'title': kitsu_anime['attributes']['canonicalTitle'],
|
|
'score': (int(kitsu_entry['ratingTwenty']) if kitsu_entry['ratingTwenty'] else 0)//2,
|
|
'status': KITSU_TO_STATUS[kitsu_entry['status']],
|
|
'progress': kitsu_entry['progress'],
|
|
'library_entry_id': kitsu_library_entry_id,
|
|
'link': f'https://kitsu.io/anime/{kitsu_anime["id"]}'
|
|
}
|
|
|
|
|
|
def read_kitsu(token, user_id):
|
|
"""Read Kitsu entries"""
|
|
entries = []
|
|
url = KITSU_API_URL+f"/library-entries?filter[user_id]={user_id}" + \
|
|
"&filter[kind]=anime&include=anime,anime.mappings" + \
|
|
"&fields[anime]=canonicalTitle,episodeCount,slug,mappings" + \
|
|
"&fields[mappings]=externalSite,externalId&page[limit]=500&page[offset]=0"
|
|
auth = BearerAuth(token)
|
|
retries = 2
|
|
while True:
|
|
response = requests.get(url, auth=auth)
|
|
if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member
|
|
refresh_kitsu_token(token)
|
|
if not token:
|
|
return entries
|
|
auth = BearerAuth(token)
|
|
continue
|
|
if response.status_code == requests.codes.server_error and retries > 0: # pylint: disable=no-member
|
|
retries -= 1
|
|
sleep(1)
|
|
continue
|
|
kitsu = response.json().get('data')
|
|
if kitsu is None:
|
|
print('read_kitsu exception')
|
|
print(response.json())
|
|
return entries
|
|
anime = [x for x in response.json()['included']
|
|
if x['type'] == 'anime']
|
|
mappings = [x for x in response.json()['included']
|
|
if x['type'] == 'mappings']
|
|
for data in kitsu:
|
|
kitsu_anime = next(
|
|
y for y in anime if y['id'] == data['relationships']['anime']['data']['id'])
|
|
kitsu_mappings = []
|
|
for anime_entry in kitsu_anime['relationships']['mappings']['data']:
|
|
kitsu_mappings.append(
|
|
next(z for z in mappings if anime_entry['id'] == z['id']))
|
|
entries.append(kitsu_to_entry(
|
|
data['attributes'], kitsu_anime, kitsu_mappings, data['id']))
|
|
links = response.json().get('links')
|
|
if links:
|
|
url = links.get('next')
|
|
if url:
|
|
retries = 2
|
|
continue
|
|
break
|
|
return entries
|
|
|
|
|
|
def update_mal(token, mal_id, status=None, score=None, progress=None):
|
|
"""Update a MAL anime entry on user's list"""
|
|
auth = BearerAuth(token)
|
|
url = MAL_API_URL+f'/anime/{mal_id}/my_list_status'
|
|
data = {}
|
|
if status:
|
|
data['status'] = status
|
|
if score is not None:
|
|
data['score'] = score
|
|
if progress is not None:
|
|
data['num_watched_episodes'] = progress
|
|
|
|
while True:
|
|
response = requests.put(url=url, data=data, auth=auth)
|
|
if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member
|
|
refresh_mal_token(token)
|
|
if not token:
|
|
return False
|
|
auth = BearerAuth(token)
|
|
continue
|
|
if not response.ok:
|
|
print(response.json())
|
|
return False
|
|
return True
|
|
|
|
|
|
def update_kitsu(token, kitsu_library_entry_id, status=None, score=None, progress=None):
|
|
"""Update a Kitsu anime entry on user's list"""
|
|
auth = BearerAuth(token)
|
|
headers = {'Content-Type': 'application/vnd.api+json'}
|
|
url = KITSU_API_URL+f'/library-entries/{kitsu_library_entry_id}'
|
|
data = {
|
|
'data': {
|
|
'type': 'libraryEntries',
|
|
'id': str(kitsu_library_entry_id),
|
|
'attributes': {}
|
|
}
|
|
}
|
|
if status is not None:
|
|
data['data']['attributes']['status'] = list(KITSU_TO_STATUS.keys())[
|
|
list(KITSU_TO_STATUS.values()).index(status)]
|
|
if score is not None:
|
|
data['data']['attributes']['ratingTwenty'] = int(score*2)
|
|
if progress is not None:
|
|
data['data']['attributes']['progress'] = progress
|
|
while True:
|
|
response = requests.patch(
|
|
url=url, headers=headers, auth=auth, json=data)
|
|
if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member
|
|
refresh_kitsu_token(token)
|
|
if not token:
|
|
return False
|
|
auth = BearerAuth(token)
|
|
continue
|
|
if not response.ok:
|
|
print(response.json())
|
|
return False
|
|
return True
|
|
|
|
|
|
def update_anilist(token, anilist_id, status=None, score=None, progress=None):
|
|
"""Add or update an Anilist anime entry on user's list"""
|
|
auth = BearerAuth(token)
|
|
anilist_query = '''
|
|
mutation ($id: Int, $status: MediaListStatus, $score: Float, $progress: Int) {
|
|
SaveMediaListEntry(id: $id, status: $status, score: $score, progress: $progress) {
|
|
id
|
|
status
|
|
score(format: POINT_10)
|
|
progress
|
|
}
|
|
}
|
|
'''
|
|
variables = {
|
|
'id': int(anilist_id)
|
|
}
|
|
if status is not None:
|
|
variables['status'] = list(ANILIST_TO_STATUS.keys())[
|
|
list(ANILIST_TO_STATUS.values()).index(status)]
|
|
if score is not None:
|
|
variables['score'] = float(score)
|
|
if progress is not None:
|
|
variables['progress'] = int(progress)
|
|
data = {'query': anilist_query, 'variables': variables}
|
|
while True:
|
|
response = requests.post(url=ANILIST_GRAPHQL_URL, auth=auth, json=data)
|
|
if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member
|
|
refresh_anilist_token(token)
|
|
if not token:
|
|
return False
|
|
auth = BearerAuth(token)
|
|
continue
|
|
if not response.ok:
|
|
print(response.json())
|
|
return False
|
|
return True
|
|
|
|
|
|
UPDATE_FUNCS = {
|
|
'mal': update_mal,
|
|
'anilist': update_anilist,
|
|
'kitsu': update_kitsu
|
|
}
|