from src.mpv_control import set_http_header from src.settings import get_config import requests, json, time import urllib.parse import os from loguru import logger auth_file = '.auth.json' if os.path.exists(auth_file): with open('.auth.json', 'rt') as f: auth = json.loads(f.read()) else: # The default umask is 0o22 which turns off write permission of group and others os.umask(0) descriptor = os.open( path=auth_file, flags=( os.O_WRONLY # access mode: write only | os.O_CREAT # create if not exists | os.O_TRUNC # truncate the file to zero ), mode=0o600) with open(descriptor, 'wt') as f: f.write('{}') auth = {} s = requests.Session() instance = get_config('instance') token = auth.get(instance) if token: s.headers.update({ "Authorization": "Bearer " + token, "Accept-encoding": 'gzip' }) set_http_header(['Authorization: ' + 'Bearer ' + token]) else: s.headers.update({"Accept-encoding": 'gzip'}) s.get(f'https://{instance}/') # Get cookies from unauthorized instance for working some functionality (radios) set_http_header() def select_instance(new_instance=None): global instance instance = new_instance with open(auth_file, 'rt') as f: auth = json.loads(f.read()) new_token = auth.get(instance) s.headers.update({"Authorization": None, "Accept-encoding": 'gzip'}) set_http_header() if new_token: s.get(f'https://{instance}') s.headers.update({ "Authorization": "Bearer " + new_token, "Accept-encoding": 'gzip' }) player.http_header_fields = ['Authorization: ' + 'Bearer ' + new_token] set_http_header(['Authorization: ' + 'Bearer ' + token]) @logger.catch def get_audio_file(track_uuid, listen_url=False, download=False, transcoding=get_config('enable_server_transcoding'), to='ogg'): if not transcoding: to = None params = { "download": download, "to": to } if listen_url: url = f'https://{instance}{track_uuid}?' else: url = f'https://{instance}/api/v1/listen/{track_uuid}?' return url + urllib.parse.urlencode(params) @logger.catch def get_tracks(page=None, q=None, artist=None, album=None, favourites=None, include_channels=None, pg=None): '''This function get tracks by params''' params = { 'page': page, 'q': q, 'artist': artist, 'album': album, 'favourites': favourites, 'include_channels': include_channels } if pg: r = s.get(pg) else: r = s.get(f'https://{instance}/api/v1/tracks', params=params) return r.json() @logger.catch def get_favorires_tracks(page=None, q=None, scope=None, include_channels=None, pg=None): '''This function get favorites tracks (not only for user)''' params = { 'page': page, 'q': q, 'scope': scope, 'include_channels': include_channels } if pg: r = s.get(pg) else: r = s.get(f'https://{instance}/api/v1/favorites/tracks/', params=params) return r.json() @logger.catch def get_artists(page=None, q=None, artist=None, album=None, favourites=None, refresh=False, pg=None): '''This function get artists by params''' params = { 'page': page, 'q': q, 'artist': artist, 'album': album, 'favourites': favourites, 'refresh': refresh } if pg: r = s.get(pg) else: r = s.get(f'https://{instance}/api/v1/artists', params=params) return r.json() @logger.catch def get_albums(page=None, q=None, artist=None, include_channels=None, refresh=False, pg=None): '''This function get artists by params''' params = { 'page': page, 'q': q, 'artist': artist, 'include_channels': include_channels, 'refresh': refresh } if pg: r = s.get(pg) else: r = s.get(f'https://{instance}/api/v1/albums', params=params) return r.json() @logger.catch def get_channels(page=None, q=None, tag=None, pg=None): params = { 'page': page, 'q': q, 'tag': tag } if pg: r = s.get(pg) else: r = s.get(f'https://{instance}/api/v1/channels', params=params) return r.json() @logger.catch def get_playlists(page=None, page_size=None, q=None, ordering='-modification_date', pg=None): '''List playlists''' params = { 'page': page, 'page_size': page_size, 'q': q, 'ordering': ordering } if pg: r = s.get(pg) else: r = s.get(f'https://{instance}/api/v1/playlists', params=params) r.raise_for_status() return r.json() @logger.catch def get_playlist_tracks(playlist_id, pg=None): '''Retrieve all tracks in the playlist''' if pg: r = s.get(pg) else: r = s.get(f'https://{instance}/api/v1/playlists/{playlist_id}/tracks') return r.json() @logger.catch def list_libraries(page=None, page_size=None, q=None, scope='all', pg=None): params = { 'page': page, 'page_size': page_size, 'q': q, 'scope': scope, } if pg: r = s.get(pg) else: r = s.get(f'https://{instance}/api/v1/libraries', params=params) return r.json() @logger.catch def federate_search_by_url(object): params = { 'object': object } r = s.post(f'https://{instance}/api/v1/federation/fetches', json=params) return r.json() @logger.catch def favorite_track(track_id): r = s.post(f'https://{instance}/api/v1/favorites/tracks', json={'track': int(track_id)}) r.raise_for_status() return r.json @logger.catch def unfavorite_track(track_id): r = s.post(f'https://{instance}/api/v1/favorites/tracks/delete', json={'track': int(track_id)}) r.raise_for_status() return r.json @logger.catch def hide_content(content): '''This function hide content (write permission)''' r = s.post(f'https://{instance}/api/v1/moderation/content-filters/', json=content) r.raise_for_status() return r.json # [FunkWhale radios] def get_radios(): r = s.get(f'https://{instance}/api/v1/radios/radios/') return r.json() def post_radio_session(requested_radio): r = s.post(f'https://{instance}/api/v1/radios/sessions/', json=requested_radio) return r.json() @logger.catch def get_track_radio(radio_session): r = s.post(f'https://{instance}/api/v1/radios/tracks/', json=radio_session) return r.json()