mirror of
http://gitea.phreedom.club/localhost_frssoft/funkwlmpv
synced 2024-11-24 09:41:27 +00:00
125 lines
5 KiB
Python
Executable file
125 lines
5 KiB
Python
Executable file
#!/bin/env python3
|
|
import requests
|
|
import concurrent.futures
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
tracks_stor = []
|
|
with open('instances') as instances:
|
|
instances = instances.read().strip().split('\n')
|
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog='funkwlplay',
|
|
description='Create playlist from query or just random playlist tracks from funkwhale instances')
|
|
parser.add_argument('-s', '--search', help='This global search on funkwhale instances, it matches artists, albums, tracks, etc...')
|
|
parser.add_argument('-t', '--tag', help='This tag search, use this as genre search')
|
|
parser.add_argument('-i', '--instance', help='Specify instance, by default search on all instances in instances file')
|
|
parser.add_argument('-r', '--recursion', type=int, default=0, help='Use recursion if instance contain more than 50 tracks')
|
|
parser.add_argument('-d', '--depth', type=int, default=5, help='Depth of recursion, default is 5 pages, 250 tracks')
|
|
args = parser.parse_args()
|
|
if args.instance:
|
|
instances = [args.instance]
|
|
|
|
|
|
def create_playlist_file(track_list):
|
|
filename = 'playlist.m3u8'
|
|
with open(filename, 'w') as file:
|
|
file.write('#EXTM3U\n')
|
|
for i in track_list:
|
|
file.write('\n' + i)
|
|
print(f'Playlist saved as {filename}')
|
|
|
|
|
|
def filter_tracks(tracks):
|
|
def remove_unreach_tracks(track):
|
|
try:
|
|
r = requests.head(track['listen_url'], timeout=2, headers={'Content-Encoding': 'gzip'})
|
|
r.raise_for_status()
|
|
return 1
|
|
except:
|
|
return 0
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
|
res = [executor.submit(remove_unreach_tracks, track) for track in tracks]
|
|
concurrent.futures.wait(res)
|
|
avalaible = []
|
|
for idx, track in enumerate(tracks):
|
|
is_avalaible = res[idx].result()
|
|
if is_avalaible == 1:
|
|
avalaible.append(track)
|
|
tracks = avalaible
|
|
|
|
Path('filter_tags').touch()
|
|
Path('filter_artists').touch()
|
|
Path('filter_raw_urls').touch()
|
|
with open('filter_tags') as tags_file:
|
|
block_tags = tags_file.read().strip().split('\n')
|
|
|
|
with open('filter_artists') as artists_file:
|
|
block_artists = artists_file.read().strip().split('\n')
|
|
|
|
with open('filter_raw_urls') as raw_urls_file:
|
|
block_raw_urls = raw_urls_file.read().strip().split('\n')
|
|
filtred_tracks = []
|
|
for i in tracks:
|
|
if [tag.lower() for tag in i['tags']] in block_tags:
|
|
continue
|
|
if i['artist']['name'].lower() in block_artists:
|
|
continue
|
|
if i['listen_url'].lower() in block_raw_urls:
|
|
continue
|
|
filtred_tracks.append(i)
|
|
return filtred_tracks
|
|
|
|
|
|
def search_tracks_on_instance(instance, tag='', query='', recursion=args.recursion):
|
|
r = requests.get(f'https://{instance}/api/v1/tracks', params={'tag': tag, 'q': query,
|
|
'local': True, 'playable': True,
|
|
'ordering': 'random', 'scope': 'all'},
|
|
timeout=10, headers={'Content-Encoding': 'gzip'})
|
|
r.raise_for_status()
|
|
tracks = r.json()
|
|
|
|
count = tracks['count']
|
|
print(f'found {count} tracks on {instance}')
|
|
if recursion == 1:
|
|
recursion_limit = 0
|
|
while tracks['next']:
|
|
try:
|
|
if recursion_limit >= args.depth:
|
|
break
|
|
new_tracks = requests.get(tracks['next']).json()
|
|
tracks['results'] += new_tracks['results']
|
|
tracks['next'] = new_tracks['next']
|
|
recursion_limit += 1
|
|
except Exception as E:
|
|
print(E)
|
|
tracks_replacer = []
|
|
for track in tracks['results']:
|
|
track['listen_url'] = f'https://{instance}' + track['listen_url']
|
|
tracks_replacer.append(track)
|
|
tracks['results'] = tracks_replacer
|
|
return tracks
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
|
res = [executor.submit(search_tracks_on_instance, instance, args.tag, args.search) for instance in instances]
|
|
concurrent.futures.wait(res)
|
|
playlist_files = []
|
|
for idx, instance in enumerate(instances):
|
|
try:
|
|
tracks = res[idx].result()
|
|
before_filter = len(tracks['results'])
|
|
filtred_tracks = filter_tracks(tracks['results'])
|
|
after_filter = before_filter - len(filtred_tracks)
|
|
print(f'{after_filter} tracks filtred on {instance}')
|
|
tracks_stor += filtred_tracks
|
|
except Exception as E:
|
|
print(E)
|
|
for track in tracks_stor:
|
|
artist, album, title, play_url, track_duration, fid = track['artist']['name'], track['album']['title'], track['title'], track['listen_url'], track.get('duration'), track['fid']
|
|
if not track_duration:
|
|
track_duration = -1
|
|
playlist_files.append(f'#EXTINF:{track_duration},{artist} - {album} - {title} url="{fid}"\n{play_url}')
|
|
create_playlist_file(playlist_files)
|