# Tested on Python 3.10 # Reqs: # python -m pip install requests from datetime import datetime, timedelta from threading import Thread from time import sleep import requests import re import base64 import uuid import random import settings import xmltodict import queue import os from helpers import RID class ModBot: def __init__(self, instance, username, password, chatroom) -> None: self.peertube_instance = instance self.peertube_username = username self.peertube_password = password self.chat_room = chatroom self._prefixed_instance = f"https://{instance}" self.rid = RID() self.uid = RID() self.users = {} self.msg_history = [] self.msg_history_max_len = 500 self.restricted_mode = 0 # Timeout length, if !timeout command was send without 2nd argument self.timeout_default = 300 # Rate limit settings # Time window (in seconds) to measure rates self.ratelimit_timewindow = 10 # How many messages one user allowed to send in 'ratelimit_timewindow' seconds self.ratelimit_maxmessages = 3 # Length of timeout (in seconds) that will be applied to user who exceeded the limits self.ratelimit_timeout = 60 self.resource_pref = "modbot.py" self.bot_msg_prefix = "/me [ModBot]: " self.last_msg = {"username": "", "count": 0} self.msg_queue = queue.Queue() self.banned_usernames = [] self._bannedusernames_filename = "bannedusernames.txt" self._bannedusernames_stamp = 0 self.banned_words = [] self._bannedwords_filename = "bannedwords.txt" self._bannedwords_stamp = 0 def _bosh_send(self, str): req = requests.post(f'{self._prefixed_instance}{self.bosh_service_url}', headers={'Content-Type': f'text/xml; charset=utf-8'}, timeout=60, data=str.encode('utf-8')) #print(f"REQUEST: {str}\nRESPONSE: {req.text}\n") return req.text def _generate_body_headers(self): return f'rid="{self.rid.x}" sid="{self.authid}" xmlns="http://jabber.org/protocol/httpbind"' def update_user(self, presenses): if isinstance(presenses, dict): presenses = [presenses, ] for presense in presenses: username = presense["@from"].split("/", 1)[1] # If you are getting error HERE, your account... got banned from your room??? x_items = presense["x"]["item"] if isinstance(x_items, dict): x_items = [x_items, ] for x_item in x_items: userdata = { "role": x_item["@role"], "affiliation": x_item["@affiliation"], "jid": x_item["@jid"].split("/", 1)[0], } new_user = False if "@nick" in x_item: for prev_username, prev_user in self.users.items(): if prev_user["jid"] == userdata["jid"]: newnick = x_item['@nick'] self.users[newnick] = self.users[prev_username] del self.users[prev_username] user = self.users[newnick] print(f"[{datetime.now().strftime('%H:%M:%S')}] ({user['uid']}|{prev_username}|{user['jid']} changed nickname to {newnick}", end="", flush=True) if self.users[newnick]["role"] == "visitor": print(f" and was muted too)") self.bosh_user_mute(newnick) else: print(")") break else: if username in self.users: self.users[username]["role"] = userdata["role"] self.users[username]["affiliation"] = userdata["affiliation"] self.users[username]["jid"] = userdata["jid"] else: self.users[username] = userdata self.users[username]["uid"] = self.uid.x new_user = True if not userdata["role"] in ("visitor", "none"): if new_user: if self.restricted_mode == 1 and "@anon." in userdata['jid']: self.bosh_user_mute(username) if self.restricted_mode == 2 and "@anon." in userdata['jid']: self.bosh_user_mute(username) elif self.restricted_mode == 3 and userdata["role"] != "moderator": self.bosh_user_mute(username) for banned_username in self.banned_usernames: if banned_username in username: print(f"User '{username}' was muted: contains banned username '{banned_username}'") self.bosh_user_mute(username) def send_msg(self, msg, cmd=False): if cmd == False: someid = uuid.uuid4() req_body = f'{self.bot_msg_prefix}{msg}' return self._bosh_send(req_body) else: print(msg) def connect(self): _pi = self._prefixed_instance _room = self.chat_room _inst = self.peertube_instance print(f'Peertube: Getting client tokens...') req = requests.get(f'{_pi}/api/v1/oauth-clients/local') client_tokens = req.json() print(f'Peertube: Authorizing and getting user token...') req = requests.post(f'{_pi}/api/v1/users/token', data={ "client_id": client_tokens["client_id"], "client_secret": client_tokens["client_secret"], "grant_type": "password", "response_type": "code", "username": self.peertube_username, "password": self.peertube_password, }) token_resp = req.json() if "access_token" in token_resp: access_token = req.json()["access_token"] else: print(f"Error while logging in: {token_resp}") exit(1) print(f"Webchat: Accessing room {_room} at instance {_inst}") req = requests.get(f'{_pi}/plugins/livechat/router/webchat/room/{_room}') # Getting errors there? Does the room even exist? self.livechat_version = re.search( r"/livechat/(.*)/static/", req.text, re.MULTILINE).group(1) print(f'Webchat: Livechat version: {self.livechat_version}') self.bosh_service_url = re.search(r"boshServiceUrl: '(.*)',", req.text, re.MULTILINE).group(1) self.authentication_url = re.search(r"authenticationUrl: '(.*)',", req.text, re.MULTILINE).group(1).removeprefix(_pi) headers = {'authorization': f'Bearer {access_token}'} req = requests.get(f'{_pi}{self.authentication_url}', headers=headers) creds = {} if (req.status_code == 200): creds = req.json() print(f"Webchat: Signed in as '{creds['nickname']}' ({creds['jid']})") else: print(f"Webchat: ERROR: UNAUTHORIZED (STATUS:{req.status_code})") exit(1) self.creds = creds headers = {'Content-Type': f'text/xml; charset=utf-8'} authb64 = base64.b64encode(f'\0{creds["nickname"]}\0{creds["password"]}'.encode('ASCII')).decode() resource_id = f'{self.resource_pref}-{random.randint(10000000, 99999999)}' self.resource_id = resource_id print(f'boshService: Fetching "authid"...') req_body = f'' req = self._bosh_send(req_body) authid = xmltodict.parse(req)["body"]["@authid"] self.authid = authid self._internal_user_addr = f'{creds["nickname"]}@{_inst}/{resource_id}' self._internal_room_addr = f'{_room}@room.{_inst}' print(f'boshService: Authorizing...') req_body = f'{authb64}' req = self._bosh_send(req_body) print(f'boshService: Restarting datastream...') req_body = f'' req = self._bosh_send(req_body) print(f'boshService: Binding auth...') req_body = f'{resource_id}' req = self._bosh_send(req_body) print(f'boshService: Getting session...') req_body = f'' req = self._bosh_send(req_body) print(f'boshService: Joining room, sending presence...') self.bosh_update_presenses() print(f'Done, check if there is ModBot message in chat.') self.send_msg("Online!") def bosh_update_presenses(self): req_body = f'' req = self._bosh_send(req_body) presenses = xmltodict.parse(req)["body"] if "presence" in presenses: presenses = presenses["presence"] if isinstance(presenses, dict): presenses = [presenses, ] for presense in presenses: self.update_user(presense) def bosh_user_unmute(self, username): req_body = f'' return self._bosh_send(req_body) def bosh_user_mute(self, username): req_body = f'' return self._bosh_send(req_body) def bosh_user_ban(self, jid): req_body = f'' return self._bosh_send(req_body) def bosh_retract_msg(self, msg_id): req_body = f'' return self._bosh_send(req_body) def wipe_user_msg(self, user_id): newmsg = [] for msg in self.msg_history: if msg["user_id"] == user_id: self.bosh_retract_msg(msg["msg_id"]) else: newmsg.append(msg) self.msg_history = newmsg def process_command(self, body, cmd=False): if body == "!help": self.send_msg("List of commands:\n" + "!users - Show list of online users\n" + "!usersall - Show list of all known users\n" + "!mute USERID - Mute user by ID\n" + "!unmute USERID - Unmute user by ID\n" "!timeout USERID SEC - Mute user by ID for SEC seconds\n" "!wipe USERID - Retract user's recent messages\n" "!ban USERID - Ban user by ID and retract his last messages\n" "!mode 0 - Lift mutes, everyone can talk\n" "!mode 1 - Mute anons-newcomers, existing and registered users can talk\n" "!mode 2 - Mute all anons, registered users can talk\n" "!mode 3 (or !shutup) - Siege mode: mute everyone except moderators.\n" "!ratelimits M W S - Rewrite ratelimits: allow M messages per W seconds before S seconds timeout.\n", cmd) elif body == "!users": usersstr = "List of online users:\n" usersstr += f"ID) [name] (jid, role, affiliation)\n" for username, user in self.users.items(): if user['role'] != "none": usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n" self.send_msg(usersstr, cmd) elif body == "!usersall": usersstr = "List of known users:\n" usersstr += f"ID) [name] (jid, role, affiliation)\n" for username, user in self.users.items(): usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n" self.send_msg(usersstr, cmd) elif body.startswith("!mute"): _id = body[6:] if _id.isdigit(): _id = int(_id) found = False for username, user in self.users.items(): if user["uid"] == _id: found = True self.bosh_user_mute(username) self.send_msg(f"'{username}' muted", cmd) if not found: self.send_msg(f"Can't find user with ID={_id}", cmd) else: self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd) elif body.startswith("!unmute"): _id = body[8:] if _id.isdigit(): _id = int(_id) found = False for username, user in self.users.items(): if user["uid"] == _id: found = True self.bosh_user_unmute(username) self.send_msg(f"'{username}' unmuted", cmd) if not found: self.send_msg(f"Can't find user with ID={_id}", cmd) else: self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd) elif body.startswith("!timeout"): args = body[9:].split(" ") _id = args[0] _time = args[1] if len(args) > 1 else str(self.timeout_default) if _id.isdigit() and _time.isdigit(): _id = int(_id) _time = int(_time) found = False for username, user in self.users.items(): if user["uid"] == _id: found = True self.bosh_user_mute(username) self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=_time) self.send_msg(f"'{username}' muted for {_time} seconds", cmd) if not found: self.send_msg(f"Can't find user with ID={_id}", cmd) else: self.send_msg("Error: malformed command", cmd) elif body.startswith("!ratelimits"): args = body[12:].split(" ") if len(args) == 3: _msgs = args[0] _wnd = args[1] _time = args[2] if _msgs.isdigit() and _wnd.isdigit() and _time.isdigit(): _msgs = int(_msgs) _wnd = int(_wnd) _time = int(_time) self.ratelimit_maxmessages = _msgs self.ratelimit_timewindow = _wnd self.ratelimit_timeout = _time self.send_msg(f"Ratelimits updated: users who send >={_msgs} messages in {_wnd} seconds will be timed out for {_time} seconds.", cmd) else: self.send_msg("Error: malformed command", cmd) else: self.send_msg("Error: malformed command", cmd) elif body == "!mode 0": for username, user in self.users.items(): self.bosh_user_unmute(username) self.restricted_mode = 0 self.send_msg("Code 🟩-0, everyone unmuted. Welcome back!", cmd) elif body == "!mode 1": self.restricted_mode = 1 self.send_msg("Code 🟨-0, newcomers will be muted.", cmd) elif body == "!mode 2": for username, user in self.users.items(): if "@anon." in user['jid']: self.bosh_user_mute(username) self.restricted_mode = 2 self.send_msg("Code 🟨-1, anon users muted.", cmd) elif body == "!mode 3" or body == "!shutup": for username, user in self.users.items(): if user["role"] != "moderator": self.bosh_user_mute(username) self.restricted_mode = 3 self.send_msg("Code 🟥-0, everyone muted. Stand by.", cmd) elif body.startswith("!wipe"): _id = body[6:] if _id.isdigit(): _id = int(_id) found = False for username, user in self.users.items(): if user["uid"] == _id: found = True self.wipe_user_msg(_id) self.send_msg(f"'{username}' msgs wiped.", cmd) if not found: self.send_msg(f"Can't find user with ID={_id}", cmd) else: self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd) elif body.startswith("!ban"): _id = body[5:] if _id.isdigit(): _id = int(_id) found = False for username, user in self.users.items(): if user["uid"] == _id: found = True if user["jid"] == self.creds['jid']: self.send_msg(f"Nope, won't ban myself ({self.creds['jid']})", cmd) else: self.bosh_user_ban(user["jid"]) self.send_msg(f"'{username}' banned.", cmd) self.wipe_user_msg(_id) if not found: self.send_msg(f"Can't find user with ID={_id}", cmd) else: self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd) def _receiver_loop(self): while True: req_body = f'' resp = self._bosh_send(req_body) self.msg_queue.put(resp) def start_receiver_loop(self): t = Thread(target=self._receiver_loop) t.daemon = True t.start() def _reader_loop(self): while True: resp = self.msg_queue.get() #self.bosh_update_presenses() msg = xmltodict.parse(resp)["body"] #print(self.users) #print(json.dumps(msg, indent=2) + "\n") time_now = datetime.now() stamp = os.stat(self._bannedusernames_filename).st_mtime if self._bannedusernames_stamp != stamp: self._bannedusernames_stamp = stamp with open(self._bannedusernames_filename, 'r', encoding='utf-8') as file: self.banned_usernames = [line.strip() for line in file] print(f"Banned usernames list updated (count: {len(self.banned_usernames)}).") stamp = os.stat(self._bannedwords_filename).st_mtime if self._bannedwords_stamp != stamp: self._bannedwords_stamp = stamp with open(self._bannedwords_filename, 'r', encoding='utf-8') as file: self.banned_words = [line.strip() for line in file] print(f"Banned words list updated (count: {len(self.banned_words)}).") for username, user in self.users.items(): if "timeout_until" in user: if user["timeout_until"] < time_now: self.send_msg(f"'{username}' unmuted after timeout.") self.bosh_user_unmute(username) del self.users[username]["timeout_until"] # should probably be removed since there is a bosh_update_presenses() call if "presence" in msg: self.update_user(msg["presence"]) if "message" in msg: chatmsgs = msg["message"] if isinstance(chatmsgs, dict): chatmsgs = [chatmsgs, ] for chatmsg in chatmsgs: if "body" in chatmsg: # If it's actually a message in the chat body = chatmsg["body"] username = chatmsg["@from"].split("/", 1)[1] msguser = self.users[username] self.msg_history.append({ "user_id": msguser['uid'], "body": body, "msg_id": chatmsg["stanza-id"]["@id"] }) if (len(self.msg_history) > self.msg_history_max_len): self.msg_history.pop(0) if self.last_msg["username"] == username: self.last_msg["count"] += 1 else: self.last_msg["username"] = username self.last_msg["count"] = 1 msgtoprint = (body[:50] + '..') if len(body) > 50 else body print(f"[{datetime.now().strftime('%H:%M:%S')}] ({msguser['uid']}|{username}|{msguser['jid']}|x{self.last_msg['count']}) {msgtoprint}") if self.users[username]["role"] == "moderator": self.process_command(body) else: # Start of ratelimit maneuvers user = self.users[username] if not "msgtimes" in user: self.users[username]["msgtimes"] = [] rl_tw = timedelta(seconds=self.ratelimit_timewindow) for timestamp in user["msgtimes"]: if timestamp < time_now - rl_tw: self.users[username]["msgtimes"].remove(timestamp) self.users[username]["msgtimes"].append(time_now) msg_duplicate = False if "lastmsg" in self.users[username]: if self.users[username]["lastmsg"] == hash(body): msg_duplicate = True msgcount = 0 for timestamp in self.users[username]["msgtimes"]: if timestamp >= time_now - rl_tw: if msg_duplicate: self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout) self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: spamming") self.bosh_user_mute(username) break else: msgcount += 1 if msgcount >= self.ratelimit_maxmessages: self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout) self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: ratelimits") self.bosh_user_mute(username) self.users[username]["lastmsg"] = hash(body) # End of ratelimit maneuvers for banned_word in self.banned_words: if banned_word in body.lower(): self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout) self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: banned word") self.bosh_user_mute(username) #print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n") # Put some rules or commands down there that apply for any user, regardless of role else: pass #if not "urn:xmpp:hints" in req: #print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n") else: pass #if not "urn:xmpp:hints" in req: #print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n") def start_reader_loop(self): t = Thread(target=self._reader_loop) t.daemon = True t.start() def run(self): self.connect() self.start_receiver_loop() self.start_reader_loop() while True: command = input() self.process_command(command, cmd=True) if __name__ == "__main__": bot = ModBot( settings.instance, settings.username, settings.password, settings.room ) bot.timeout_default = settings.timeout_default bot.ratelimit_timewindow = settings.ratelimit_timewindow bot.ratelimit_maxmessages = settings.ratelimit_maxmessages bot.ratelimit_timeout = settings.ratelimit_timeout #req = requests.get(f"https://xxivproduction.video/w/bcsWRyy1XgNdd9RdMriM9F") #print(req.text) bot.run()