import src.fw_api from src.utils import download_track, print_there from src.settings import get_config from loguru import logger from pyfzf.pyfzf import FzfPrompt from shutil import get_terminal_size import mpv import time import sys fzf = FzfPrompt() player = mpv.MPV(cache=True, demuxer_max_bytes=25*1024*1024) player.ytdl = False # Prevent attempts load track with yt-dlp player.volume = get_config('mpv_volume') player.prefetch_playlist = get_config('prefetch_playlist') show_like_button = get_config('show_like_button') track_activity_history = get_config('track_activity_history') class player_fw_storage: storage = {} @logger.catch def track_url_to_uuid(listen_url=None): '''Attempt get uuid from track listen url or current playing url''' if listen_url: uuid = listen_url.split(r'/')[-2] else: uuid = player.stream_open_filename.split(r'/')[-2] return uuid if track_activity_history: @player.property_observer('time-pos') @logger.catch def time_observer(_name, value): # Here, _value is either None if nothing is playing or a float containing # fractional seconds since the beginning of the file. if value and player.http_header_fields != [] and player.pause is False: if value >= 30.0 and value <= 30.1: # detect 30 secs for reporting listen activity track = player_fw_storage.storage.get(track_url_to_uuid()) track_id = track.get('id') if track_id: src.fw_api.record_track_in_history(track_id) else: logger.error("Can't write track to history: No track id") time.sleep(1) @player.property_observer('filtered-metadata') @logger.catch def osd_observer(_name, value): '''Sumulate osd playing message in console''' if value: osd_message = [] for i in value.items(): if i[0] in ('Artist', 'Album', 'Title'): osd_message.append(i[1]) print_there(0, 0, '\r ') osd_string = ' — '.join(osd_message) term_len = get_terminal_size().columns print_there(0, 0, '\r'+osd_string[:term_len]) @player.event_callback('start-file') @logger.catch def starting_file_handler(value): '''just show loading state''' print_there(0, 0, '\rLoading track...') @player.property_observer('percent-pos') @logger.catch def universal_observer(_name, value): if value: percent = int(value) if player.audio_bitrate: kbps = int(player.audio_bitrate/1024) else: kbps = '?' if player.file_size: track_size = round(player.file_size/1024/1024, 1) else: track_size = '?' if player.cache_speed: speed_load = player.cache_speed if speed_load >= 3*1024*1024: cache_speed = '| <<<' elif speed_load >= 1*1024*1024: cache_speed = '| <<*' else: cache_speed = '| <=>' else: cache_speed = '' if player.playlist_count > -1: player_pos = f'{player.playlist_pos_1}/{player.playlist_count}' else: player_pos = '-/-' print_there(2, 2, f'\r'+' '*get_terminal_size().columns) print_there(2, 2, f'\r{player_pos} | {kbps} kbps | {percent}% | {track_size}MB {cache_speed}') time.sleep(1) @logger.catch def player_menu(header='', storage={}): player_fw_storage.storage.update(storage) player.volume = get_config("mpv_volume") while True: try: player_items_menu = ['Next', 'Prev', 'Pause', 'Download', 'Info'] if player.pause: player_items_menu[2] = 'Play' else: player_items_menu[2] = 'Pause' if show_like_button: player_items_menu.append('Like') player_items_menu.extend(['Hide artist', 'Exit']) select = fzf.prompt(player_items_menu, f"--header=\'{header}\'")[0] if select == 'Next': try: player.playlist_next() except: print('No more next tracks') elif select == 'Prev': player.playlist_prev() elif select in ('Pause', 'Play'): if player.pause: player.pause = False else: player.pause = True elif select == 'Download': name_downloaded = download_track(player.stream_open_filename) elif select == 'Info': track = player_fw_storage.storage.get(track_url_to_uuid()) for i in track.keys(): if i in ('album', 'artist'): name_aa = track.get(i).get('name') if not name_aa: name_aa = track.get(i).get('title') print(i + ': ' + name_aa) key = track.get(i) if key and isinstance(key, str): print(i + ': ' + key) print('Direct link: ' + player.stream_open_filename) input() elif select == 'Like': src.fw_api.favorite_track( player_fw_storage.storage.get(track_url_to_uuid())['id']) elif select == 'Hide artist': track = player_fw_storage.storage.get(track_url_to_uuid()) src.fw_api.hide_content( {'target': {'id': track.get('artist').get('id'), 'type': 'artist'}}) elif select == 'Exit': player.playlist_clear() player.stop() player_fw_storage.storage = {} break except KeyboardInterrupt: break def play_track(track, multi=False): listen_url = src.fw_api.get_audio_file(track['listen_url'], True) player_fw_storage.storage[track_url_to_uuid(listen_url)] = track if multi: player.loadfile(listen_url, 'append-play') else: player.loadfile(listen_url, 'append-play') track_name = track.get('title') player_menu(f"{track_name} playing...", player_fw_storage.storage)