81 lines
2.6 KiB
Python
81 lines
2.6 KiB
Python
|
"""AnimeEasySync OAuth module"""
|
||
|
from requests import post, codes
|
||
|
from requests.auth import HTTPBasicAuth
|
||
|
from flask import current_app
|
||
|
|
||
|
MAL_TOKEN_URL = 'https://myanimelist.net/v1/oauth2/token'
|
||
|
ANILIST_TOKEN_URL = 'https://anilist.co/api/v2/oauth/token'
|
||
|
KITSU_TOKEN_URL = 'https://kitsu.io/api/oauth/token'
|
||
|
|
||
|
OAUTH_PARAMS = {
|
||
|
'authorization_code': {
|
||
|
'required': ['client_id', 'code', 'redirect_uri'],
|
||
|
'optional': ['code_verifier', 'client_secret']
|
||
|
},
|
||
|
'password': {
|
||
|
'required': ['username', 'password'],
|
||
|
'optional': []
|
||
|
},
|
||
|
'refresh_token': {
|
||
|
'required': [],
|
||
|
'optional': []
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
def get_oauth_token(url, grant_type, refresh_token, **kwargs):
|
||
|
"""Gets OAuth Token"""
|
||
|
print(f'url:{url}, grant_type:{grant_type}, refresh_token:{refresh_token}, kwargs:{kwargs}')
|
||
|
if not url or grant_type not in OAUTH_PARAMS or None in kwargs.values():
|
||
|
return None
|
||
|
data = {'grant_type': grant_type}
|
||
|
if refresh_token and grant_type == 'refresh_token':
|
||
|
data['refresh_token'] = refresh_token
|
||
|
refresh_token = None
|
||
|
for req in OAUTH_PARAMS[grant_type]['required']:
|
||
|
if req not in kwargs:
|
||
|
return None
|
||
|
data[req] = kwargs[req]
|
||
|
for opt in OAUTH_PARAMS[grant_type]['optional']:
|
||
|
if opt in kwargs:
|
||
|
data[opt] = kwargs[opt]
|
||
|
request_kwargs = {}
|
||
|
if 'auth' in kwargs:
|
||
|
request_kwargs['auth'] = kwargs['auth']
|
||
|
print(f'data:{data}, request_kwargs:{request_kwargs}')
|
||
|
response = post(url=url, data=data, **request_kwargs)
|
||
|
ret = None
|
||
|
if response.status_code == codes.ok: # pylint: disable=no-member
|
||
|
ret = response.json()
|
||
|
elif response.status_code == codes.unauthorized and refresh_token: # pylint: disable=no-member
|
||
|
ret = get_oauth_token(url, 'refresh_token', refresh_token, **kwargs)
|
||
|
else:
|
||
|
print(response.json())
|
||
|
return ret
|
||
|
|
||
|
|
||
|
def refresh_oauth_token(token, url, **kwargs):
|
||
|
"""Refresh token"""
|
||
|
refresh_token = token['refresh_token']
|
||
|
new_token = get_oauth_token(url, 'refresh_token', refresh_token, **kwargs)
|
||
|
token.clear()
|
||
|
if new_token:
|
||
|
token.update(new_token)
|
||
|
|
||
|
|
||
|
def refresh_mal_token(token):
|
||
|
"""Refresh MAL token"""
|
||
|
mal_auth = HTTPBasicAuth(
|
||
|
current_app.config['MAL_CLIENT_ID'], current_app.config['MAL_CLIENT_SECRET'])
|
||
|
refresh_oauth_token(token, MAL_TOKEN_URL, auth=mal_auth)
|
||
|
|
||
|
|
||
|
def refresh_anilist_token(token):
|
||
|
"""Refresh Anilist token"""
|
||
|
refresh_oauth_token(token, ANILIST_TOKEN_URL)
|
||
|
|
||
|
|
||
|
def refresh_kitsu_token(token):
|
||
|
"""Refresh Kitsu token"""
|
||
|
refresh_oauth_token(token, KITSU_TOKEN_URL)
|