diff --git a/bannedusernames.txt b/bannedusernames.txt
new file mode 100644
index 0000000..5e6500b
--- /dev/null
+++ b/bannedusernames.txt
@@ -0,0 +1 @@
+Бэдыч
diff --git a/bannedwords.txt b/bannedwords.txt
new file mode 100644
index 0000000..e69de29
diff --git a/helpers.py b/helpers.py
new file mode 100644
index 0000000..1e347f5
--- /dev/null
+++ b/helpers.py
@@ -0,0 +1,10 @@
+class RID(object):
+ def __init__(self):
+ self._id = 0
+ @property
+ def x(self):
+ self._id += 1
+ return self._id
+ @x.setter
+ def x(self, value):
+ self._id = value
diff --git a/modbot.py b/modbot.py
new file mode 100644
index 0000000..bacb8fd
--- /dev/null
+++ b/modbot.py
@@ -0,0 +1,592 @@
+# Tested on Python 3.10
+# Reqs:
+# python -m pip install requests
+
+from datetime import datetime, timedelta
+from threading import Thread
+from time import sleep
+import requests
+import re
+import base64
+import uuid
+import random
+import settings
+import xmltodict
+import queue
+import os
+from helpers import RID
+
+class ModBot:
+
+ def __init__(self, instance, username, password, chatroom) -> None:
+ self.peertube_instance = instance
+ self.peertube_username = username
+ self.peertube_password = password
+ self.chat_room = chatroom
+
+ self._prefixed_instance = f"https://{instance}"
+ self.rid = RID()
+ self.uid = RID()
+ self.users = {}
+
+ self.msg_history = []
+ self.msg_history_max_len = 500
+
+ self.restricted_mode = 0
+
+ # Timeout length, if !timeout command was send without 2nd argument
+ self.timeout_default = 300
+
+ # Rate limit settings
+ # Time window (in seconds) to measure rates
+ self.ratelimit_timewindow = 10
+ # How many messages one user allowed to send in 'ratelimit_timewindow' seconds
+ self.ratelimit_maxmessages = 3
+ # Length of timeout (in seconds) that will be applied to user who exceeded the limits
+ self.ratelimit_timeout = 60
+
+ self.resource_pref = "modbot.py"
+ self.bot_msg_prefix = "/me [ModBot]: "
+ self.last_msg = {"username": "", "count": 0}
+
+ self.msg_queue = queue.Queue()
+
+ self.banned_usernames = []
+ self._bannedusernames_filename = "bannedusernames.txt"
+ self._bannedusernames_stamp = 0
+
+ self.banned_words = []
+ self._bannedwords_filename = "bannedwords.txt"
+ self._bannedwords_stamp = 0
+
+ def _bosh_send(self, str):
+ req = requests.post(f'{self._prefixed_instance}{self.bosh_service_url}',
+ headers={'Content-Type': f'text/xml; charset=utf-8'}, timeout=60, data=str.encode('utf-8'))
+ #print(f"REQUEST: {str}\nRESPONSE: {req.text}\n")
+ return req.text
+
+ def _generate_body_headers(self):
+ return f'rid="{self.rid.x}" sid="{self.authid}" xmlns="http://jabber.org/protocol/httpbind"'
+
+ def update_user(self, presenses):
+ if isinstance(presenses, dict):
+ presenses = [presenses, ]
+
+ for presense in presenses:
+ username = presense["@from"].split("/", 1)[1]
+ # If you are getting error HERE, your account... got banned from your room???
+ x_items = presense["x"]["item"]
+ if isinstance(x_items, dict):
+ x_items = [x_items, ]
+
+ for x_item in x_items:
+
+
+ userdata = {
+ "role": x_item["@role"],
+ "affiliation": x_item["@affiliation"],
+ "jid": x_item["@jid"].split("/", 1)[0],
+ }
+
+ new_user = False
+
+ if "@nick" in x_item:
+ for prev_username, prev_user in self.users.items():
+ if prev_user["jid"] == userdata["jid"]:
+ newnick = x_item['@nick']
+ self.users[newnick] = self.users[prev_username]
+ del self.users[prev_username]
+ user = self.users[newnick]
+ print(f"[{datetime.now().strftime('%H:%M:%S')}] ({user['uid']}|{prev_username}|{user['jid']} changed nickname to {newnick}", end="", flush=True)
+
+ if self.users[newnick]["role"] == "visitor":
+ print(f" and was muted too)")
+ self.bosh_user_mute(newnick)
+ else: print(")")
+ break
+ else:
+ if username in self.users:
+ self.users[username]["role"] = userdata["role"]
+ self.users[username]["affiliation"] = userdata["affiliation"]
+ self.users[username]["jid"] = userdata["jid"]
+ else:
+ self.users[username] = userdata
+ self.users[username]["uid"] = self.uid.x
+ new_user = True
+
+ if not userdata["role"] in ("visitor", "none"):
+
+ if new_user:
+ if self.restricted_mode == 1 and "@anon." in userdata['jid']:
+ self.bosh_user_mute(username)
+
+ if self.restricted_mode == 2 and "@anon." in userdata['jid']:
+ self.bosh_user_mute(username)
+ elif self.restricted_mode == 3 and userdata["role"] != "moderator":
+ self.bosh_user_mute(username)
+
+ for banned_username in self.banned_usernames:
+ if banned_username in username:
+ print(f"User '{username}' was muted: contains banned username '{banned_username}'")
+ self.bosh_user_mute(username)
+
+ def send_msg(self, msg, cmd=False):
+ if cmd == False:
+ someid = uuid.uuid4()
+ req_body = f'
{self.bot_msg_prefix}{msg}'
+ return self._bosh_send(req_body)
+ else:
+ print(msg)
+
+ def connect(self):
+ _pi = self._prefixed_instance
+ _room = self.chat_room
+ _inst = self.peertube_instance
+
+ print(f'Peertube: Getting client tokens...')
+ req = requests.get(f'{_pi}/api/v1/oauth-clients/local')
+ client_tokens = req.json()
+
+ print(f'Peertube: Authorizing and getting user token...')
+ req = requests.post(f'{_pi}/api/v1/users/token', data={
+ "client_id": client_tokens["client_id"],
+ "client_secret": client_tokens["client_secret"],
+ "grant_type": "password",
+ "response_type": "code",
+ "username": self.peertube_username,
+ "password": self.peertube_password,
+ })
+ token_resp = req.json()
+ if "access_token" in token_resp:
+ access_token = req.json()["access_token"]
+ else:
+ print(f"Error while logging in: {token_resp}")
+ exit(1)
+
+ print(f"Webchat: Accessing room {_room} at instance {_inst}")
+ req = requests.get(f'{_pi}/plugins/livechat/router/webchat/room/{_room}')
+
+ # Getting errors there? Does the room even exist?
+ self.livechat_version = re.search(
+ r"/livechat/(.*)/static/", req.text, re.MULTILINE).group(1)
+ print(f'Webchat: Livechat version: {self.livechat_version}')
+
+ self.bosh_service_url = re.search(r"boshServiceUrl: '(.*)',",
+ req.text, re.MULTILINE).group(1)
+ self.authentication_url = re.search(r"authenticationUrl: '(.*)',",
+ req.text, re.MULTILINE).group(1).removeprefix(_pi)
+
+ headers = {'authorization': f'Bearer {access_token}'}
+ req = requests.get(f'{_pi}{self.authentication_url}', headers=headers)
+ creds = {}
+ if (req.status_code == 200):
+ creds = req.json()
+ print(f"Webchat: Signed in as '{creds['nickname']}' ({creds['jid']})")
+ else:
+ print(f"Webchat: ERROR: UNAUTHORIZED (STATUS:{req.status_code})")
+ exit(1)
+
+ self.creds = creds
+
+ headers = {'Content-Type': f'text/xml; charset=utf-8'}
+ authb64 = base64.b64encode(f'\0{creds["nickname"]}\0{creds["password"]}'.encode('ASCII')).decode()
+ resource_id = f'{self.resource_pref}-{random.randint(10000000, 99999999)}'
+ self.resource_id = resource_id
+
+ print(f'boshService: Fetching "authid"...')
+ req_body = f'{authb64}'
+ req = self._bosh_send(req_body)
+
+ print(f'boshService: Restarting datastream...')
+ req_body = f'{resource_id}'
+ req = self._bosh_send(req_body)
+
+ print(f'boshService: Getting session...')
+ req_body = f''
+ req = self._bosh_send(req_body)
+
+ print(f'boshService: Joining room, sending presence...')
+ self.bosh_update_presenses()
+
+ print(f'Done, check if there is ModBot message in chat.')
+ self.send_msg("Online!")
+
+ def bosh_update_presenses(self):
+ req_body = f''
+ req = self._bosh_send(req_body)
+ presenses = xmltodict.parse(req)["body"]
+ if "presence" in presenses:
+ presenses = presenses["presence"]
+ if isinstance(presenses, dict):
+ presenses = [presenses, ]
+ for presense in presenses:
+ self.update_user(presense)
+
+ def bosh_user_unmute(self, username):
+ req_body = f' '
+ return self._bosh_send(req_body)
+
+ def bosh_user_mute(self, username):
+ req_body = f' '
+ return self._bosh_send(req_body)
+
+ def bosh_user_ban(self, jid):
+ req_body = f' '
+ return self._bosh_send(req_body)
+
+ def bosh_retract_msg(self, msg_id):
+ req_body = f''
+ return self._bosh_send(req_body)
+
+ def wipe_user_msg(self, user_id):
+ newmsg = []
+ for msg in self.msg_history:
+ if msg["user_id"] == user_id:
+ self.bosh_retract_msg(msg["msg_id"])
+ else:
+ newmsg.append(msg)
+ self.msg_history = newmsg
+
+ def process_command(self, body, cmd=False):
+ if body == "!help":
+ self.send_msg("List of commands:\n" +
+ "!users - Show list of online users\n" +
+ "!usersall - Show list of all known users\n" +
+ "!mute USERID - Mute user by ID\n" +
+ "!unmute USERID - Unmute user by ID\n"
+ "!timeout USERID SEC - Mute user by ID for SEC seconds\n"
+ "!wipe USERID - Retract user's recent messages\n"
+ "!ban USERID - Ban user by ID and retract his last messages\n"
+ "!mode 0 - Lift mutes, everyone can talk\n"
+ "!mode 1 - Mute anons-newcomers, existing and registered users can talk\n"
+ "!mode 2 - Mute all anons, registered users can talk\n"
+ "!mode 3 (or !shutup) - Siege mode: mute everyone except moderators.\n"
+ "!ratelimits M W S - Rewrite ratelimits: allow M messages per W seconds before S seconds timeout.\n", cmd)
+
+ elif body == "!users":
+ usersstr = "List of online users:\n"
+ usersstr += f"ID) [name] (jid, role, affiliation)\n"
+ for username, user in self.users.items():
+ if user['role'] != "none":
+ usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n"
+ self.send_msg(usersstr, cmd)
+
+ elif body == "!usersall":
+ usersstr = "List of known users:\n"
+ usersstr += f"ID) [name] (jid, role, affiliation)\n"
+ for username, user in self.users.items():
+ usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n"
+ self.send_msg(usersstr, cmd)
+
+ elif body.startswith("!mute"):
+ _id = body[6:]
+ if _id.isdigit():
+ _id = int(_id)
+ found = False
+ for username, user in self.users.items():
+ if user["uid"] == _id:
+ found = True
+ self.bosh_user_mute(username)
+ self.send_msg(f"'{username}' muted", cmd)
+ if not found:
+ self.send_msg(f"Can't find user with ID={_id}", cmd)
+ else:
+ self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
+
+ elif body.startswith("!unmute"):
+ _id = body[8:]
+ if _id.isdigit():
+ _id = int(_id)
+ found = False
+ for username, user in self.users.items():
+ if user["uid"] == _id:
+ found = True
+ self.bosh_user_unmute(username)
+ self.send_msg(f"'{username}' unmuted", cmd)
+ if not found:
+ self.send_msg(f"Can't find user with ID={_id}", cmd)
+ else:
+ self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
+
+ elif body.startswith("!timeout"):
+ args = body[9:].split(" ")
+ _id = args[0]
+ _time = args[1] if len(args) > 1 else str(self.timeout_default)
+ if _id.isdigit() and _time.isdigit():
+ _id = int(_id)
+ _time = int(_time)
+ found = False
+ for username, user in self.users.items():
+ if user["uid"] == _id:
+ found = True
+ self.bosh_user_mute(username)
+ self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=_time)
+ self.send_msg(f"'{username}' muted for {_time} seconds", cmd)
+ if not found:
+ self.send_msg(f"Can't find user with ID={_id}", cmd)
+ else:
+ self.send_msg("Error: malformed command", cmd)
+
+ elif body.startswith("!ratelimits"):
+ args = body[12:].split(" ")
+ if len(args) == 3:
+ _msgs = args[0]
+ _wnd = args[1]
+ _time = args[2]
+ if _msgs.isdigit() and _wnd.isdigit() and _time.isdigit():
+ _msgs = int(_msgs)
+ _wnd = int(_wnd)
+ _time = int(_time)
+ self.ratelimit_maxmessages = _msgs
+ self.ratelimit_timewindow = _wnd
+ self.ratelimit_timeout = _time
+ self.send_msg(f"Ratelimits updated: users who send >={_msgs} messages in {_wnd} seconds will be timed out for {_time} seconds.", cmd)
+ else:
+ self.send_msg("Error: malformed command", cmd)
+ else:
+ self.send_msg("Error: malformed command", cmd)
+
+ elif body == "!mode 0":
+ for username, user in self.users.items():
+ self.bosh_user_unmute(username)
+ self.restricted_mode = 0
+ self.send_msg("Code 🟩-0, everyone unmuted. Welcome back!", cmd)
+
+ elif body == "!mode 1":
+ self.restricted_mode = 1
+ self.send_msg("Code 🟨-0, newcomers will be muted.", cmd)
+
+ elif body == "!mode 2":
+ for username, user in self.users.items():
+ if "@anon." in user['jid']:
+ self.bosh_user_mute(username)
+ self.restricted_mode = 2
+ self.send_msg("Code 🟨-1, anon users muted.", cmd)
+
+ elif body == "!mode 3" or body == "!shutup":
+ for username, user in self.users.items():
+ if user["role"] != "moderator":
+ self.bosh_user_mute(username)
+ self.restricted_mode = 3
+ self.send_msg("Code 🟥-0, everyone muted. Stand by.", cmd)
+
+ elif body.startswith("!wipe"):
+ _id = body[6:]
+ if _id.isdigit():
+ _id = int(_id)
+ found = False
+ for username, user in self.users.items():
+ if user["uid"] == _id:
+ found = True
+ self.wipe_user_msg(_id)
+ self.send_msg(f"'{username}' msgs wiped.", cmd)
+ if not found:
+ self.send_msg(f"Can't find user with ID={_id}", cmd)
+ else:
+ self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
+
+ elif body.startswith("!ban"):
+ _id = body[5:]
+ if _id.isdigit():
+ _id = int(_id)
+ found = False
+ for username, user in self.users.items():
+ if user["uid"] == _id:
+ found = True
+ if user["jid"] == self.creds['jid']:
+ self.send_msg(f"Nope, won't ban myself ({self.creds['jid']})", cmd)
+ else:
+ self.bosh_user_ban(user["jid"])
+ self.send_msg(f"'{username}' banned.", cmd)
+ self.wipe_user_msg(_id)
+ if not found:
+ self.send_msg(f"Can't find user with ID={_id}", cmd)
+ else:
+ self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
+
+ def _receiver_loop(self):
+ while True:
+ req_body = f'