from config import instance import time import requests from loguru import logger instance_point = f"https://{instance}/api/v1" with open(".auth", mode='rt') as auth: tkn = auth.read().replace('\n', '') s = requests.Session() s.headers.update({ "Authorization": "Bearer " + tkn, "Accept-encoding": 'gzip' }) def get_notifications(): params = { "limit": 15, "types": ["mention"] } success = 0 while success == 0: try: r = s.get(instance_point + "/notifications", json=params) r.raise_for_status() success = 1 return r.json() except: logger.exception('Error get notificatios') time.sleep(30) logger.info('Retrying get notificatios...') def mark_as_read_notification(id_notification): success = 0 while success == 0: try: r = s.post(instance_point + f"/notifications/{id_notification}/dismiss") r.raise_for_status() success = 1 return r.json() except: logger.exception(f'Error read notification {id_notification}') time.sleep(30) logger.info(f'Retrying read notification {id_notification}...') def get_status_context(status_id): retry = 0 success = 0 while success == 0: try: r = s.get(instance_point + f"/statuses/{status_id}/context", timeout=30) r.raise_for_status() success = 1 return r.json() except Exception as E: logger.exception(f'Ошибка получения контекста треда {status_id}') time.sleep(30) logger.info('Повторный запрос треда...') retry += 1 if retry > 5: raise IOError(f'Фетчинг треда поломан! {E}') def get_status(status_id): success = 0 while success == 0: try: r = s.get(instance_point + f"/statuses/{status_id}") r.raise_for_status() success = 1 return r.json() except: logger.exception(f'Error get status {status_id}') time.sleep(30) logger.info(f'Retrying get status {status_id}') def post_status(text, reply_to_status_id=None, poll_options=None, poll_expires=345600, attachments=None, visibility='unlisted'): poll = None if poll_options is not None: poll = { "options": poll_options, "expires_in": poll_expires, "multiple": True } params = { "status": text, "in_reply_to_id": reply_to_status_id, "visibility": visibility, "content_type": "text/plain", "language": "ru", "poll": poll } if attachments: params['media_ids'] = attachments success = 0 while success == 0: try: r = s.post(instance_point + "/statuses", json=params) r.raise_for_status() success = 1 return r.json() except: logger.exception('Error send status, retrying...') time.sleep(5) def upload_attachment(file_path): file = { "file": open(file_path, mode='rb') } params = { "description": "Fediverse Movie Night\nВоскресенье, 21:00\nLIVE ON XXIV Production", } r = s.post(instance_point + "/media", params, files=file, timeout=30) r.raise_for_status() return r.json()['id'] def mute_user(acct_id=str, acct=str, duration=None): params = { "duration": duration } success = 0 while success == 0: try: r = s.post(instance_point + '/accounts' + f"/{acct_id}/mute", params) r.raise_for_status() logger.info(f'Пользователь {acct} был заглушен на {duration} secs') success = 1 except: logger.exception(f'Ошибка глушения {acct}') time.sleep(5) logger.info(f'Повторное глушение {acct}...')