# Tested on Python 3.10
# Reqs:
# python -m pip install requests
from datetime import datetime, timedelta
from threading import Thread
from time import sleep
import requests
import re
import base64
import uuid
import random
import settings
import xmltodict
import queue
import os
from helpers import RID
class ModBot:
def __init__(self, instance, username, password, chatroom) -> None:
self.peertube_instance = instance
self.peertube_username = username
self.peertube_password = password
self.chat_room = chatroom
self._prefixed_instance = f"https://{instance}"
self.rid = RID()
self.uid = RID()
self.users = {}
self.msg_history = []
self.msg_history_max_len = 500
self.restricted_mode = 0
# Timeout length, if !timeout command was send without 2nd argument
self.timeout_default = 300
# Rate limit settings
# Time window (in seconds) to measure rates
self.ratelimit_timewindow = 10
# How many messages one user allowed to send in 'ratelimit_timewindow' seconds
self.ratelimit_maxmessages = 3
# Length of timeout (in seconds) that will be applied to user who exceeded the limits
self.ratelimit_timeout = 60
self.resource_pref = "modbot.py"
self.bot_msg_prefix = "/me [ModBot]: "
self.last_msg = {"username": "", "count": 0}
self.msg_queue = queue.Queue()
self.banned_usernames = []
self._bannedusernames_filename = "bannedusernames.txt"
self._bannedusernames_stamp = 0
self.banned_words = []
self._bannedwords_filename = "bannedwords.txt"
self._bannedwords_stamp = 0
def _bosh_send(self, str):
req = requests.post(f'{self._prefixed_instance}{self.bosh_service_url}',
headers={'Content-Type': f'text/xml; charset=utf-8'}, timeout=60, data=str.encode('utf-8'))
#print(f"REQUEST: {str}\nRESPONSE: {req.text}\n")
return req.text
def _generate_body_headers(self):
return f'rid="{self.rid.x}" sid="{self.authid}" xmlns="http://jabber.org/protocol/httpbind"'
def update_user(self, presenses):
if isinstance(presenses, dict):
presenses = [presenses, ]
for presense in presenses:
username = presense["@from"].split("/", 1)[1]
# If you are getting error HERE, your account... got banned from your room???
x_items = presense["x"]["item"]
if isinstance(x_items, dict):
x_items = [x_items, ]
for x_item in x_items:
userdata = {
"role": x_item["@role"],
"affiliation": x_item["@affiliation"],
"jid": x_item["@jid"].split("/", 1)[0],
}
new_user = False
if "@nick" in x_item:
for prev_username, prev_user in self.users.items():
if prev_user["jid"] == userdata["jid"]:
newnick = x_item['@nick']
self.users[newnick] = self.users[prev_username]
del self.users[prev_username]
user = self.users[newnick]
print(f"[{datetime.now().strftime('%H:%M:%S')}] ({user['uid']}|{prev_username}|{user['jid']} changed nickname to {newnick}", end="", flush=True)
if self.users[newnick]["role"] == "visitor":
print(f" and was muted too)")
self.bosh_user_mute(newnick)
else: print(")")
break
else:
if username in self.users:
self.users[username]["role"] = userdata["role"]
self.users[username]["affiliation"] = userdata["affiliation"]
self.users[username]["jid"] = userdata["jid"]
else:
self.users[username] = userdata
self.users[username]["uid"] = self.uid.x
new_user = True
if not userdata["role"] in ("visitor", "none"):
if new_user:
if self.restricted_mode == 1 and "@anon." in userdata['jid']:
self.bosh_user_mute(username)
if self.restricted_mode == 2 and "@anon." in userdata['jid']:
self.bosh_user_mute(username)
elif self.restricted_mode == 3 and userdata["role"] != "moderator":
self.bosh_user_mute(username)
for banned_username in self.banned_usernames:
if banned_username in username:
print(f"User '{username}' was muted: contains banned username '{banned_username}'")
self.bosh_user_mute(username)
def send_msg(self, msg, cmd=False):
if cmd == False:
someid = uuid.uuid4()
req_body = f'
{self.bot_msg_prefix}{msg}'
return self._bosh_send(req_body)
else:
print(msg)
def connect(self):
_pi = self._prefixed_instance
_room = self.chat_room
_inst = self.peertube_instance
print(f'Peertube: Getting client tokens...')
req = requests.get(f'{_pi}/api/v1/oauth-clients/local')
client_tokens = req.json()
print(f'Peertube: Authorizing and getting user token...')
req = requests.post(f'{_pi}/api/v1/users/token', data={
"client_id": client_tokens["client_id"],
"client_secret": client_tokens["client_secret"],
"grant_type": "password",
"response_type": "code",
"username": self.peertube_username,
"password": self.peertube_password,
})
token_resp = req.json()
if "access_token" in token_resp:
access_token = req.json()["access_token"]
else:
print(f"Error while logging in: {token_resp}")
exit(1)
print(f"Webchat: Accessing room {_room} at instance {_inst}")
req = requests.get(f'{_pi}/plugins/livechat/router/webchat/room/{_room}')
# Getting errors there? Does the room even exist?
self.livechat_version = re.search(
r"/livechat/(.*)/static/", req.text, re.MULTILINE).group(1)
print(f'Webchat: Livechat version: {self.livechat_version}')
self.bosh_service_url = re.search(r"boshServiceUrl: '(.*)',",
req.text, re.MULTILINE).group(1)
self.authentication_url = re.search(r"authenticationUrl: '(.*)',",
req.text, re.MULTILINE).group(1).removeprefix(_pi)
headers = {'authorization': f'Bearer {access_token}'}
req = requests.get(f'{_pi}{self.authentication_url}', headers=headers)
creds = {}
if (req.status_code == 200):
creds = req.json()
print(f"Webchat: Signed in as '{creds['nickname']}' ({creds['jid']})")
else:
print(f"Webchat: ERROR: UNAUTHORIZED (STATUS:{req.status_code})")
exit(1)
self.creds = creds
headers = {'Content-Type': f'text/xml; charset=utf-8'}
authb64 = base64.b64encode(f'\0{creds["nickname"]}\0{creds["password"]}'.encode('ASCII')).decode()
resource_id = f'{self.resource_pref}-{random.randint(10000000, 99999999)}'
self.resource_id = resource_id
print(f'boshService: Fetching "authid"...')
req_body = f'{authb64}'
req = self._bosh_send(req_body)
print(f'boshService: Restarting datastream...')
req_body = f'{resource_id}'
req = self._bosh_send(req_body)
print(f'boshService: Getting session...')
req_body = f''
req = self._bosh_send(req_body)
print(f'boshService: Joining room, sending presence...')
self.bosh_update_presenses()
print(f'Done, check if there is ModBot message in chat.')
self.send_msg("Online!")
def bosh_update_presenses(self):
req_body = f''
req = self._bosh_send(req_body)
presenses = xmltodict.parse(req)["body"]
if "presence" in presenses:
presenses = presenses["presence"]
if isinstance(presenses, dict):
presenses = [presenses, ]
for presense in presenses:
self.update_user(presense)
def bosh_user_unmute(self, username):
req_body = f' '
return self._bosh_send(req_body)
def bosh_user_mute(self, username):
req_body = f' '
return self._bosh_send(req_body)
def bosh_user_ban(self, jid):
req_body = f' '
return self._bosh_send(req_body)
def bosh_retract_msg(self, msg_id):
req_body = f''
return self._bosh_send(req_body)
def wipe_user_msg(self, user_id):
newmsg = []
for msg in self.msg_history:
if msg["user_id"] == user_id:
self.bosh_retract_msg(msg["msg_id"])
else:
newmsg.append(msg)
self.msg_history = newmsg
def process_command(self, body, cmd=False):
if body == "!help":
self.send_msg("List of commands:\n" +
"!users - Show list of online users\n" +
"!usersall - Show list of all known users\n" +
"!mute USERID - Mute user by ID\n" +
"!unmute USERID - Unmute user by ID\n"
"!timeout USERID SEC - Mute user by ID for SEC seconds\n"
"!wipe USERID - Retract user's recent messages\n"
"!ban USERID - Ban user by ID and retract his last messages\n"
"!mode 0 - Lift mutes, everyone can talk\n"
"!mode 1 - Mute anons-newcomers, existing and registered users can talk\n"
"!mode 2 - Mute all anons, registered users can talk\n"
"!mode 3 (or !shutup) - Siege mode: mute everyone except moderators.\n"
"!ratelimits M W S - Rewrite ratelimits: allow M messages per W seconds before S seconds timeout.\n", cmd)
elif body == "!users":
usersstr = "List of online users:\n"
usersstr += f"ID) [name] (jid, role, affiliation)\n"
for username, user in self.users.items():
if user['role'] != "none":
usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n"
self.send_msg(usersstr, cmd)
elif body == "!usersall":
usersstr = "List of known users:\n"
usersstr += f"ID) [name] (jid, role, affiliation)\n"
for username, user in self.users.items():
usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n"
self.send_msg(usersstr, cmd)
elif body.startswith("!mute"):
_id = body[6:]
if _id.isdigit():
_id = int(_id)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
self.bosh_user_mute(username)
self.send_msg(f"'{username}' muted", cmd)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
elif body.startswith("!unmute"):
_id = body[8:]
if _id.isdigit():
_id = int(_id)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
self.bosh_user_unmute(username)
self.send_msg(f"'{username}' unmuted", cmd)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
elif body.startswith("!timeout"):
args = body[9:].split(" ")
_id = args[0]
_time = args[1] if len(args) > 1 else str(self.timeout_default)
if _id.isdigit() and _time.isdigit():
_id = int(_id)
_time = int(_time)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
self.bosh_user_mute(username)
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=_time)
self.send_msg(f"'{username}' muted for {_time} seconds", cmd)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg("Error: malformed command", cmd)
elif body.startswith("!ratelimits"):
args = body[12:].split(" ")
if len(args) == 3:
_msgs = args[0]
_wnd = args[1]
_time = args[2]
if _msgs.isdigit() and _wnd.isdigit() and _time.isdigit():
_msgs = int(_msgs)
_wnd = int(_wnd)
_time = int(_time)
self.ratelimit_maxmessages = _msgs
self.ratelimit_timewindow = _wnd
self.ratelimit_timeout = _time
self.send_msg(f"Ratelimits updated: users who send >={_msgs} messages in {_wnd} seconds will be timed out for {_time} seconds.", cmd)
else:
self.send_msg("Error: malformed command", cmd)
else:
self.send_msg("Error: malformed command", cmd)
elif body == "!mode 0":
for username, user in self.users.items():
self.bosh_user_unmute(username)
self.restricted_mode = 0
self.send_msg("Code 🟩-0, everyone unmuted. Welcome back!", cmd)
elif body == "!mode 1":
self.restricted_mode = 1
self.send_msg("Code 🟨-0, newcomers will be muted.", cmd)
elif body == "!mode 2":
for username, user in self.users.items():
if "@anon." in user['jid']:
self.bosh_user_mute(username)
self.restricted_mode = 2
self.send_msg("Code 🟨-1, anon users muted.", cmd)
elif body == "!mode 3" or body == "!shutup":
for username, user in self.users.items():
if user["role"] != "moderator":
self.bosh_user_mute(username)
self.restricted_mode = 3
self.send_msg("Code 🟥-0, everyone muted. Stand by.", cmd)
elif body.startswith("!wipe"):
_id = body[6:]
if _id.isdigit():
_id = int(_id)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
self.wipe_user_msg(_id)
self.send_msg(f"'{username}' msgs wiped.", cmd)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
elif body.startswith("!ban"):
_id = body[5:]
if _id.isdigit():
_id = int(_id)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
if user["jid"] == self.creds['jid']:
self.send_msg(f"Nope, won't ban myself ({self.creds['jid']})", cmd)
else:
self.bosh_user_ban(user["jid"])
self.send_msg(f"'{username}' banned.", cmd)
self.wipe_user_msg(_id)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
def _receiver_loop(self):
while True:
req_body = f'