AnimeEasySync/anime_easy_sync/model.py

367 lines
12 KiB
Python
Raw Permalink Normal View History

2024-02-26 17:39:11 +01:00
"""AnimeEasySync model module"""
from base64 import b64decode
from json import loads
from time import sleep
import requests
from .oauth import refresh_mal_token, refresh_anilist_token, refresh_kitsu_token
ANILIST_TO_STATUS = {
'CURRENT': 'watching',
'COMPLETED': 'completed',
'PAUSED': 'on_hold',
'DROPPED': 'dropped',
'PLANNING': 'plan_to_watch',
'REPEATING': 'watching'
}
KITSU_TO_STATUS = {
'current': 'watching',
'completed': 'completed',
'on_hold': 'on_hold',
'dropped': 'dropped',
'planned': 'plan_to_watch'
}
MAL_API_URL = 'https://api.myanimelist.net/v2'
KITSU_API_URL = 'https://kitsu.io/api/edge'
ANILIST_GRAPHQL_URL = 'https://graphql.anilist.co'
def get_userid_from_token(token):
"""Get userId from Anilist token"""
payload = token['access_token'].split(
'.')[1].replace('-', '+').replace('_', '/')
payload = payload + '='*(len(payload) % 4)
return int(loads(b64decode(payload))['sub'])
class BearerAuth(requests.auth.AuthBase):
"""Bearer Auth header"""
def __init__(self, token):
self.token = token['access_token']
def __call__(self, r):
r.headers["authorization"] = "Bearer " + self.token
return r
def mal_to_entry(mal_entry):
"""Map MAL entry to AES entry"""
return {
'mal_id': str(mal_entry['node']['id']),
'title': mal_entry['node']['title'],
'score': mal_entry['list_status']['score'],
'status': mal_entry['list_status']['status'],
'progress': mal_entry['list_status']['num_episodes_watched'],
'link': f'https://myanimelist.net/anime/{mal_entry["node"]["id"]}'
}
def read_mal(token):
"""Read MAL entries"""
entries = []
url = MAL_API_URL+'/users/@me/animelist?fields=list_status&limit=1000&nsfw=true'
auth = BearerAuth(token)
while True:
response = requests.get(url=url, auth=auth)
if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member
refresh_mal_token(token)
if not token:
return entries
auth = BearerAuth(token)
continue
mal = response.json().get('data')
if mal is None:
print('read_mal exception')
print(response.json())
return entries
for mal_entry in mal:
entries.append(mal_to_entry(mal_entry))
paging = response.json().get('paging')
if paging:
url = paging.get('next')
if url:
continue
break
return entries
def anilist_to_entry(anilist_entry):
"""Map Anilist entry to AES entry"""
return {
'anilist_id': str(anilist_entry['media']['id']),
'mal_id': str(anilist_entry['media']['idMal']) if anilist_entry['media']['idMal'] else '',
'title': f"{anilist_entry['media']['title']['english']} " +
f"({anilist_entry['media']['title']['romaji']})",
'score': anilist_entry['score'],
'status': ANILIST_TO_STATUS[anilist_entry['status']],
'progress': anilist_entry['progress'],
'link': anilist_entry['media']['siteUrl']
}
def read_anilist(token):
"""Read Anilist entries"""
entries = []
auth = BearerAuth(token)
anilist_query = '''
query ($chunk: Int, $userId: Int) {
MediaListCollection(userId: $userId, type: ANIME, chunk: $chunk, perChunk: 500) {
hasNextChunk
lists {
name
isCustomList
isSplitCompletedList
status
entries {
status
score(format: POINT_10)
progress
media {
id
idMal
title {
english
romaji
}
siteUrl
}
}
}
}
}
'''
chunk = 1
while True:
variables = {
'chunk': chunk,
'userId': get_userid_from_token(token)
}
response = requests.post(
ANILIST_GRAPHQL_URL, json={
'query': anilist_query, 'variables': variables},
auth=auth)
if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member
refresh_anilist_token(token)
if not token:
return entries
auth = BearerAuth(token)
continue
anilist = response.json().get('data')
if anilist is None:
print('read_anilist exception')
print(response.json())
return entries
for lst in anilist['MediaListCollection']['lists']:
for anilist_entry in lst['entries']:
entries.append(anilist_to_entry(anilist_entry))
if anilist['MediaListCollection']['hasNextChunk']:
chunk += 1
else:
break
return entries
def get_kitsu_user_id(token):
"""Get user's Kitsu user id"""
url = KITSU_API_URL+'/users?filter[self]=true&fields[users]=name'
auth = BearerAuth(token)
while True:
response = requests.get(url, auth=auth)
if response.ok:
user_id = response.json()['data'][0]['id']
return user_id
if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member
refresh_kitsu_token(token)
if not token:
return None
auth = BearerAuth(token)
continue
print('get_kitsu_user_id exception')
print(response.json())
return None
def kitsu_to_entry(kitsu_entry, kitsu_anime, kitsu_mappings, kitsu_library_entry_id):
"""Map Kitsu entry to AES entry"""
mal_id = next((str(x['attributes']['externalId'])
for x in kitsu_mappings
if x['attributes']['externalSite'] == 'myanimelist/anime'), '')
anilist_id = next((str(x['attributes']['externalId'])
for x in kitsu_mappings
if x['attributes']['externalSite'] == 'anilist/anime'), '')
if not anilist_id:
anilist_id = next((str(x['attributes']['externalId'][6:]
if x['attributes']['externalId'][:6] == 'anime/'
else x['attributes']['externalId'])
if x['attributes']['externalId'] else 0
for x in kitsu_mappings
if x['attributes']['externalSite'] == 'anilist'), '')
return {
'kitsu_id': str(kitsu_anime['id']),
'mal_id': str(mal_id) if mal_id else '',
'anilist_id': str(anilist_id) if anilist_id else '',
'title': kitsu_anime['attributes']['canonicalTitle'],
'score': (int(kitsu_entry['ratingTwenty']) if kitsu_entry['ratingTwenty'] else 0)//2,
'status': KITSU_TO_STATUS[kitsu_entry['status']],
'progress': kitsu_entry['progress'],
'library_entry_id': kitsu_library_entry_id,
'link': f'https://kitsu.io/anime/{kitsu_anime["id"]}'
}
def read_kitsu(token, user_id):
"""Read Kitsu entries"""
entries = []
url = KITSU_API_URL+f"/library-entries?filter[user_id]={user_id}" + \
"&filter[kind]=anime&include=anime,anime.mappings" + \
"&fields[anime]=canonicalTitle,episodeCount,slug,mappings" + \
"&fields[mappings]=externalSite,externalId&page[limit]=500&page[offset]=0"
auth = BearerAuth(token)
retries = 2
while True:
response = requests.get(url, auth=auth)
if response.status_code == requests.codes.unauthorized and token: # pylint: disable=no-member
refresh_kitsu_token(token)
if not token:
return entries
auth = BearerAuth(token)
continue
if response.status_code == requests.codes.server_error and retries > 0: # pylint: disable=no-member
retries -= 1
sleep(1)
continue
kitsu = response.json().get('data')
if kitsu is None:
print('read_kitsu exception')
print(response.json())
return entries
anime = [x for x in response.json()['included']
if x['type'] == 'anime']
mappings = [x for x in response.json()['included']
if x['type'] == 'mappings']
for data in kitsu:
kitsu_anime = next(
y for y in anime if y['id'] == data['relationships']['anime']['data']['id'])
kitsu_mappings = []
for anime_entry in kitsu_anime['relationships']['mappings']['data']:
kitsu_mappings.append(
next(z for z in mappings if anime_entry['id'] == z['id']))
entries.append(kitsu_to_entry(
data['attributes'], kitsu_anime, kitsu_mappings, data['id']))
links = response.json().get('links')
if links:
url = links.get('next')
if url:
retries = 2
continue
break
return entries
def update_mal(token, mal_id, status=None, score=None, progress=None):
"""Update a MAL anime entry on user's list"""
auth = BearerAuth(token)
url = MAL_API_URL+f'/anime/{mal_id}/my_list_status'
data = {}
if status:
data['status'] = status
if score is not None:
data['score'] = score
if progress is not None:
data['num_watched_episodes'] = progress
while True:
response = requests.put(url=url, data=data, auth=auth)
if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member
refresh_mal_token(token)
if not token:
return False
auth = BearerAuth(token)
continue
if not response.ok:
print(response.json())
return False
return True
def update_kitsu(token, kitsu_library_entry_id, status=None, score=None, progress=None):
"""Update a Kitsu anime entry on user's list"""
auth = BearerAuth(token)
headers = {'Content-Type': 'application/vnd.api+json'}
url = KITSU_API_URL+f'/library-entries/{kitsu_library_entry_id}'
data = {
'data': {
'type': 'libraryEntries',
'id': str(kitsu_library_entry_id),
'attributes': {}
}
}
if status is not None:
data['data']['attributes']['status'] = list(KITSU_TO_STATUS.keys())[
list(KITSU_TO_STATUS.values()).index(status)]
if score is not None:
data['data']['attributes']['ratingTwenty'] = int(score*2)
if progress is not None:
data['data']['attributes']['progress'] = progress
while True:
response = requests.patch(
url=url, headers=headers, auth=auth, json=data)
if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member
refresh_kitsu_token(token)
if not token:
return False
auth = BearerAuth(token)
continue
if not response.ok:
print(response.json())
return False
return True
def update_anilist(token, anilist_id, status=None, score=None, progress=None):
"""Add or update an Anilist anime entry on user's list"""
auth = BearerAuth(token)
anilist_query = '''
mutation ($id: Int, $status: MediaListStatus, $score: Float, $progress: Int) {
SaveMediaListEntry(id: $id, status: $status, score: $score, progress: $progress) {
id
status
score(format: POINT_10)
progress
}
}
'''
variables = {
'id': int(anilist_id)
}
if status is not None:
variables['status'] = list(ANILIST_TO_STATUS.keys())[
list(ANILIST_TO_STATUS.values()).index(status)]
if score is not None:
variables['score'] = float(score)
if progress is not None:
variables['progress'] = int(progress)
data = {'query': anilist_query, 'variables': variables}
while True:
response = requests.post(url=ANILIST_GRAPHQL_URL, auth=auth, json=data)
if response.status_code == requests.codes.unauthorized: # pylint: disable=no-member
refresh_anilist_token(token)
if not token:
return False
auth = BearerAuth(token)
continue
if not response.ok:
print(response.json())
return False
return True
UPDATE_FUNCS = {
'mal': update_mal,
'anilist': update_anilist,
'kitsu': update_kitsu
}