FMN_bot/src/fedi_api.py

115 lines
3.4 KiB
Python

from config import instance
import time
import json
import requests
from loguru import logger
instance_point = f"https://{instance}/api/v1"
with open(".auth", mode='rt') as auth:
tkn = auth.read().replace('\n', '')
headers= {
"Authorization": "Bearer " + tkn
}
def get_notifications():
params = {
"limit": 15,
"types": ["mention"]
}
r = requests.get(instance_point + "/notifications", json=params, headers=headers)
return r.json()
def mark_as_read_notification(id_notification):
success = 0
while success == 0:
try:
r = requests.post(instance_point + f"/notifications/{id_notification}/dismiss", headers=headers)
r.raise_for_status()
success = 1
return r.json()
except:
logger.exception(f'Error read notification {id_notification}')
time.sleep(30)
logger.info('Retrying read notification {id_notification}...')
def get_status_context(status_id):
success = 0
while success == 0:
try:
r = requests.get(instance_point + f"/statuses/{status_id}/context", headers=headers)
r.raise_for_status()
success = 1
return r.json()
except:
logger.exception(f'Ошибка получения контекста треда {status_id}')
time.sleep(30)
logger.info('Повторный запрос треда...')
def get_status(status_id):
r = requests.get(instance_point + f"/statuses/{status_id}", headers=headers)
return r.json()
def post_status(text, reply_to_status_id=None, poll_options=None, poll_expires=345600, attachments=None):
poll = None
if poll_options is not None:
poll = {
"options": poll_options,
"expires_in": poll_expires,
"multiple": True
}
params = {
"status": text,
"in_reply_to_id": reply_to_status_id,
"visibility": "unlisted",
"content_type": "text/plain",
"language": "ru",
"poll": poll
}
if attachments:
params['media_ids'] = attachments
success = 0
while success == 0:
try:
r = requests.post(instance_point + "/statuses", json=params, headers=headers)
r.raise_for_status()
success = 1
return r.json()
except:
logger.exception('Error send status, retrying...')
time.sleep(5)
def upload_attachment(file_path):
file = {
"file": open(file_path, mode='rb')
}
params = {
"description": "Fediverse Movie Night\nВоскресенье, 21:00\nLIVE ON XXIV Production",
}
r = requests.post(instance_point + "/media", params, files=file, headers=headers)
return r.json()['id']
def mute_user(acct_id=str, acct=str, duration=None):
params = {
"duration": duration
}
success = 0
while success == 0:
try:
r = requests.post(instance_point + '/accounts' + f"/{acct_id}/mute", params, headers=headers)
r.raise_for_status()
logger.info(f'Пользователь {acct} был заглушен на {duration} secs')
success = 1
except:
logger.exception(f'Ошибка глушения {acct}')
time.sleep(5)
logger.info(f'Повторное глушение {acct}...')