FMN_bot/src/listener_mention.py

85 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from src.fedi_api import get_notifications, mark_as_read_notification, post_status, upload_attachment
from src.fmn_states_db import add_state, get_state
from config import admins_bot, limit_movies_per_user, limit_all_movies_poll, hour_poll_posting, fmn_next_watching_hour
import threading, time
from datetime import datetime
from dateutil.parser import parse as dateutilparse
from dateutil.relativedelta import relativedelta, TU, SU
from loguru import logger
def get_control_mention():
while True:
time.sleep(30)
time_now = datetime.now()
now_week = time_now.weekday()
now_hour = time_now.hour
if now_week not in (0, 6):
continue
if now_week == 6 and now_hour < fmn_next_watching_hour: # Предотвращение работы в холстую до начала сеанса
continue
post_exists = get_state('last_thread_id')
if post_exists:
continue
logger.debug('Wait for from admin mention...')
notif = get_notifications()
for i in notif:
if i['type'] != "mention":
continue
seen = i['pleroma']['is_seen']
acct_mention = i['account']['acct']
reply_to_id = i['status']['in_reply_to_id']
if acct_mention in admins_bot and seen == False and reply_to_id == None and now_week in (0, 6):
logger.success(f'Найдено упоминание от {acct_mention}')
st_id = i['status']['id']
st_date = i['status']['created_at']
thread_created_at = dateutilparse(st_date)
delta = relativedelta(hour=hour_poll_posting, minute=0, second=0, weekday=TU(1))
stop_thread_scan = thread_created_at + delta
movies_accept_time = stop_thread_scan.strftime('%H:%M %d.%m.%Y по Москве')
stop_thread_scan = time.mktime(time.struct_time(stop_thread_scan.timetuple()))
if now_week == 6: # Фикс стыков двух недель. Если вс, то расчитываем на следующую неделю
next_week = 2
else:
next_week = 1
next_movie_watching_delta = relativedelta(hour=fmn_next_watching_hour, minute=0, second=0, weekday=SU(next_week))
next_movie_watching = time_now + next_movie_watching_delta
max_mute_time = time.mktime(time.struct_time(next_movie_watching.timetuple())) # Глушение до следующего сеанса FMN.
next_movie_watching = next_movie_watching.strftime('%d.%m.%Y')
post_status(start_collect_movies_text(movies_accept_time, next_movie_watching), st_id, attachments=[upload_attachment('src/FMN.png')])
time.sleep(0.2)
mark_as_read_notification(i['id'])
add_state('max_mute_time', int(max_mute_time))
add_state('stop_thread_scan', int(stop_thread_scan))
add_state('last_thread_id', st_id)
break
time.sleep(30)
def start_collect_movies_text(movies_accept_time=str, next_movie_watching=str):
text = f'''
Начинаем прием заявок на следующий вечерний киносеанс, запланированный на {next_movie_watching} в 21:00 по Москве.
Напоминаем правила:
- Мы принимаем на просмотр полнометражные художественные фильмы;
- Прием варианта осуществляется путем публикации ссылки на этот фильм на IMDB (libremdb) или Кинопоиске в этом треде;
- Нам не подходят: сериалы, короткометражные и документальные фильмы;
- Максимальное количество вариантов, предложенных одним человеком не должно превышать {limit_movies_per_user};
- Всего может быть собрано до {limit_all_movies_poll} фильмов;
- Заявки принимаются до крайнего срока, после чего будет объявлено голосование по собранным вариантам.
Крайний срок подачи заявки - {movies_accept_time}.
Желаем удачи.
'''.replace('\t', '')
return text
def run_scan_notif():
scan_notif = threading.Thread(target=get_control_mention, daemon=True)
scan_notif.start()