modbot.py/modbot/modbot.py

593 lines
27 KiB
Python

# Tested on Python 3.10
# Reqs:
# python -m pip install requests
from datetime import datetime, timedelta
from threading import Thread
from time import sleep
import requests
import re
import base64
import uuid
import random
import settings
import xmltodict
import queue
import os
from helpers import RID
class ModBot:
def __init__(self, instance, username, password, chatroom) -> None:
self.peertube_instance = instance
self.peertube_username = username
self.peertube_password = password
self.chat_room = chatroom
self._prefixed_instance = f"https://{instance}"
self.rid = RID()
self.uid = RID()
self.users = {}
self.msg_history = []
self.msg_history_max_len = 500
self.restricted_mode = 0
# Timeout length, if !timeout command was send without 2nd argument
self.timeout_default = 300
# Rate limit settings
# Time window (in seconds) to measure rates
self.ratelimit_timewindow = 10
# How many messages one user allowed to send in 'ratelimit_timewindow' seconds
self.ratelimit_maxmessages = 3
# Length of timeout (in seconds) that will be applied to user who exceeded the limits
self.ratelimit_timeout = 60
self.resource_pref = "modbot.py"
self.bot_msg_prefix = "/me [ModBot]: "
self.last_msg = {"username": "", "count": 0}
self.msg_queue = queue.Queue()
self.banned_usernames = []
self._bannedusernames_filename = "bannedusernames.txt"
self._bannedusernames_stamp = 0
self.banned_words = []
self._bannedwords_filename = "bannedwords.txt"
self._bannedwords_stamp = 0
def _bosh_send(self, str):
req = requests.post(f'{self._prefixed_instance}{self.bosh_service_url}',
headers={'Content-Type': f'text/xml; charset=utf-8'}, timeout=60, data=str.encode('utf-8'))
#print(f"REQUEST: {str}\nRESPONSE: {req.text}\n")
return req.text
def _generate_body_headers(self):
return f'rid="{self.rid.x}" sid="{self.authid}" xmlns="http://jabber.org/protocol/httpbind"'
def update_user(self, presenses):
if isinstance(presenses, dict):
presenses = [presenses, ]
for presense in presenses:
username = presense["@from"].split("/", 1)[1]
# If you are getting error HERE, your account... got banned from your room???
x_items = presense["x"]["item"]
if isinstance(x_items, dict):
x_items = [x_items, ]
for x_item in x_items:
userdata = {
"role": x_item["@role"],
"affiliation": x_item["@affiliation"],
"jid": x_item["@jid"].split("/", 1)[0],
}
new_user = False
if "@nick" in x_item:
for prev_username, prev_user in self.users.items():
if prev_user["jid"] == userdata["jid"]:
newnick = x_item['@nick']
self.users[newnick] = self.users[prev_username]
del self.users[prev_username]
user = self.users[newnick]
print(f"[{datetime.now().strftime('%H:%M:%S')}] ({user['uid']}|{prev_username}|{user['jid']} changed nickname to {newnick}", end="", flush=True)
if self.users[newnick]["role"] == "visitor":
print(f" and was muted too)")
self.bosh_user_mute(newnick)
else: print(")")
break
else:
if username in self.users:
self.users[username]["role"] = userdata["role"]
self.users[username]["affiliation"] = userdata["affiliation"]
self.users[username]["jid"] = userdata["jid"]
else:
self.users[username] = userdata
self.users[username]["uid"] = self.uid.x
new_user = True
if not userdata["role"] in ("visitor", "none"):
if new_user:
if self.restricted_mode == 1 and "@anon." in userdata['jid']:
self.bosh_user_mute(username)
if self.restricted_mode == 2 and "@anon." in userdata['jid']:
self.bosh_user_mute(username)
elif self.restricted_mode == 3 and userdata["role"] != "moderator":
self.bosh_user_mute(username)
for banned_username in self.banned_usernames:
if banned_username in username:
print(f"User '{username}' was muted: contains banned username '{banned_username}'")
self.bosh_user_mute(username)
def send_msg(self, msg, cmd=False):
if cmd == False:
someid = uuid.uuid4()
req_body = f'<body {self._generate_body_headers()}><message from="{self._internal_user_addr}" id="{someid}" to="{self._internal_room_addr}" type="groupchat" xmlns="jabber:client"><body>{self.bot_msg_prefix}{msg}</body><active xmlns="http://jabber.org/protocol/chatstates"/><origin-id id="{someid}" xmlns="urn:xmpp:sid:0"/></message></body>'
return self._bosh_send(req_body)
else:
print(msg)
def connect(self):
_pi = self._prefixed_instance
_room = self.chat_room
_inst = self.peertube_instance
print(f'Peertube: Getting client tokens...')
req = requests.get(f'{_pi}/api/v1/oauth-clients/local')
client_tokens = req.json()
print(f'Peertube: Authorizing and getting user token...')
req = requests.post(f'{_pi}/api/v1/users/token', data={
"client_id": client_tokens["client_id"],
"client_secret": client_tokens["client_secret"],
"grant_type": "password",
"response_type": "code",
"username": self.peertube_username,
"password": self.peertube_password,
})
token_resp = req.json()
if "access_token" in token_resp:
access_token = req.json()["access_token"]
else:
print(f"Error while logging in: {token_resp}")
exit(1)
print(f"Webchat: Accessing room {_room} at instance {_inst}")
req = requests.get(f'{_pi}/plugins/livechat/router/webchat/room/{_room}')
# Getting errors there? Does the room even exist?
self.livechat_version = re.search(
r"/livechat/(.*)/static/", req.text, re.MULTILINE).group(1)
print(f'Webchat: Livechat version: {self.livechat_version}')
self.bosh_service_url = re.search(r"boshServiceUrl: '(.*)',",
req.text, re.MULTILINE).group(1)
self.authentication_url = re.search(r"authenticationUrl: '(.*)',",
req.text, re.MULTILINE).group(1).removeprefix(_pi)
headers = {'authorization': f'Bearer {access_token}'}
req = requests.get(f'{_pi}{self.authentication_url}', headers=headers)
creds = {}
if (req.status_code == 200):
creds = req.json()
print(f"Webchat: Signed in as '{creds['nickname']}' ({creds['jid']})")
else:
print(f"Webchat: ERROR: UNAUTHORIZED (STATUS:{req.status_code})")
exit(1)
self.creds = creds
headers = {'Content-Type': f'text/xml; charset=utf-8'}
authb64 = base64.b64encode(f'\0{creds["nickname"]}\0{creds["password"]}'.encode('ASCII')).decode()
resource_id = f'{self.resource_pref}-{random.randint(10000000, 99999999)}'
self.resource_id = resource_id
print(f'boshService: Fetching "authid"...')
req_body = f'<body content="text/xml; charset=utf-8" hold="1" rid="{self.rid.x}" to="{_inst}" ver="1.6" wait="59" xml:lang="en" xmlns="http://jabber.org/protocol/httpbind" xmlns:xmpp="urn:xmpp:xbosh" xmpp:version="1.0"/>'
req = self._bosh_send(req_body)
authid = xmltodict.parse(req)["body"]["@authid"]
self.authid = authid
self._internal_user_addr = f'{creds["nickname"]}@{_inst}/{resource_id}'
self._internal_room_addr = f'{_room}@room.{_inst}'
print(f'boshService: Authorizing...')
req_body = f'<body {self._generate_body_headers()}><auth mechanism="PLAIN" xmlns="urn:ietf:params:xml:ns:xmpp-sasl">{authb64}</auth></body>'
req = self._bosh_send(req_body)
print(f'boshService: Restarting datastream...')
req_body = f'<body {self._generate_body_headers()} to="{_inst}" xml:lang="en" xmlns:xmpp="urn:xmpp:xbosh" xmpp:restart="true"/>'
req = self._bosh_send(req_body)
print(f'boshService: Binding auth...')
req_body = f'<body {self._generate_body_headers()}><iq id="_bind_auth_2" type="set" xmlns="jabber:client"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><resource>{resource_id}</resource></bind></iq></body>'
req = self._bosh_send(req_body)
print(f'boshService: Getting session...')
req_body = f'<body {self._generate_body_headers()}><iq id="_session_auth_2" type="set" xmlns="jabber:client"><session xmlns="urn:ietf:params:xml:ns:xmpp-session"/></iq></body>'
req = self._bosh_send(req_body)
print(f'boshService: Joining room, sending presence...')
self.bosh_update_presenses()
print(f'Done, check if there is ModBot message in chat.')
self.send_msg("Online!")
def bosh_update_presenses(self):
req_body = f'<body {self._generate_body_headers()}><presence from="{self._internal_user_addr}" to="{self._internal_room_addr}/{self.creds["nickname"]}" xmlns="jabber:client"><x xmlns="http://jabber.org/protocol/muc"><history maxstanzas="0"/></x><c hash="sha-1" node="https://conversejs.org" ver="vFjUiQWh2ew0hsRBxf7LNFK8ol0=" xmlns="http://jabber.org/protocol/caps"/></presence></body>'
req = self._bosh_send(req_body)
presenses = xmltodict.parse(req)["body"]
if "presence" in presenses:
presenses = presenses["presence"]
if isinstance(presenses, dict):
presenses = [presenses, ]
for presense in presenses:
self.update_user(presense)
def bosh_user_unmute(self, username):
req_body = f'<body {self._generate_body_headers()}><iq id="{uuid.uuid4()}:sendIQ" to="{self._internal_room_addr}" type="set" xmlns="jabber:client"><query xmlns="http://jabber.org/protocol/muc#admin"><item nick="{username}" role="participant"><reason/></item></query></iq></body>'
return self._bosh_send(req_body)
def bosh_user_mute(self, username):
req_body = f'<body {self._generate_body_headers()}><iq id="{uuid.uuid4()}:sendIQ" to="{self._internal_room_addr}" type="set" xmlns="jabber:client"><query xmlns="http://jabber.org/protocol/muc#admin"><item nick="{username}" role="visitor"><reason/></item></query></iq></body>'
return self._bosh_send(req_body)
def bosh_user_ban(self, jid):
req_body = f'<body {self._generate_body_headers()}><iq id="{uuid.uuid4()}:sendIQ" to="{self._internal_room_addr}" type="set" xmlns="jabber:client"><query xmlns="http://jabber.org/protocol/muc#admin"><item affiliation="outcast" jid="{jid}"><reason/></item></query></iq></body>'
return self._bosh_send(req_body)
def bosh_retract_msg(self, msg_id):
req_body = f'<body {self._generate_body_headers()}><iq id="{uuid.uuid4()}:sendIQ" to="{self._internal_room_addr}" type="set" xmlns="jabber:client"><apply-to id="{msg_id}" xmlns="urn:xmpp:fasten:0"><moderate xmlns="urn:xmpp:message-moderate:0"><retract xmlns="urn:xmpp:message-retract:0"/><reason></reason></moderate></apply-to></iq></body>'
return self._bosh_send(req_body)
def wipe_user_msg(self, user_id):
newmsg = []
for msg in self.msg_history:
if msg["user_id"] == user_id:
self.bosh_retract_msg(msg["msg_id"])
else:
newmsg.append(msg)
self.msg_history = newmsg
def process_command(self, body, cmd=False):
if body == "!help":
self.send_msg("List of commands:\n" +
"!users - Show list of online users\n" +
"!usersall - Show list of all known users\n" +
"!mute USERID - Mute user by ID\n" +
"!unmute USERID - Unmute user by ID\n"
"!timeout USERID SEC - Mute user by ID for SEC seconds\n"
"!wipe USERID - Retract user's recent messages\n"
"!ban USERID - Ban user by ID and retract his last messages\n"
"!mode 0 - Lift mutes, everyone can talk\n"
"!mode 1 - Mute anons-newcomers, existing and registered users can talk\n"
"!mode 2 - Mute all anons, registered users can talk\n"
"!mode 3 (or !shutup) - Siege mode: mute everyone except moderators.\n"
"!ratelimits M W S - Rewrite ratelimits: allow M messages per W seconds before S seconds timeout.\n", cmd)
elif body == "!users":
usersstr = "List of online users:\n"
usersstr += f"ID) [name] (jid, role, affiliation)\n"
for username, user in self.users.items():
if user['role'] != "none":
usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n"
self.send_msg(usersstr, cmd)
elif body == "!usersall":
usersstr = "List of known users:\n"
usersstr += f"ID) [name] (jid, role, affiliation)\n"
for username, user in self.users.items():
usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n"
self.send_msg(usersstr, cmd)
elif body.startswith("!mute"):
_id = body[6:]
if _id.isdigit():
_id = int(_id)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
self.bosh_user_mute(username)
self.send_msg(f"'{username}' muted", cmd)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
elif body.startswith("!unmute"):
_id = body[8:]
if _id.isdigit():
_id = int(_id)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
self.bosh_user_unmute(username)
self.send_msg(f"'{username}' unmuted", cmd)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
elif body.startswith("!timeout"):
args = body[9:].split(" ")
_id = args[0]
_time = args[1] if len(args) > 1 else str(self.timeout_default)
if _id.isdigit() and _time.isdigit():
_id = int(_id)
_time = int(_time)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
self.bosh_user_mute(username)
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=_time)
self.send_msg(f"'{username}' muted for {_time} seconds", cmd)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg("Error: malformed command", cmd)
elif body.startswith("!ratelimits"):
args = body[12:].split(" ")
if len(args) == 3:
_msgs = args[0]
_wnd = args[1]
_time = args[2]
if _msgs.isdigit() and _wnd.isdigit() and _time.isdigit():
_msgs = int(_msgs)
_wnd = int(_wnd)
_time = int(_time)
self.ratelimit_maxmessages = _msgs
self.ratelimit_timewindow = _wnd
self.ratelimit_timeout = _time
self.send_msg(f"Ratelimits updated: users who send >={_msgs} messages in {_wnd} seconds will be timed out for {_time} seconds.", cmd)
else:
self.send_msg("Error: malformed command", cmd)
else:
self.send_msg("Error: malformed command", cmd)
elif body == "!mode 0":
for username, user in self.users.items():
self.bosh_user_unmute(username)
self.restricted_mode = 0
self.send_msg("Code 🟩-0, everyone unmuted. Welcome back!", cmd)
elif body == "!mode 1":
self.restricted_mode = 1
self.send_msg("Code 🟨-0, newcomers will be muted.", cmd)
elif body == "!mode 2":
for username, user in self.users.items():
if "@anon." in user['jid']:
self.bosh_user_mute(username)
self.restricted_mode = 2
self.send_msg("Code 🟨-1, anon users muted.", cmd)
elif body == "!mode 3" or body == "!shutup":
for username, user in self.users.items():
if user["role"] != "moderator":
self.bosh_user_mute(username)
self.restricted_mode = 3
self.send_msg("Code 🟥-0, everyone muted. Stand by.", cmd)
elif body.startswith("!wipe"):
_id = body[6:]
if _id.isdigit():
_id = int(_id)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
self.wipe_user_msg(_id)
self.send_msg(f"'{username}' msgs wiped.", cmd)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
elif body.startswith("!ban"):
_id = body[5:]
if _id.isdigit():
_id = int(_id)
found = False
for username, user in self.users.items():
if user["uid"] == _id:
found = True
if user["jid"] == self.creds['jid']:
self.send_msg(f"Nope, won't ban myself ({self.creds['jid']})", cmd)
else:
self.bosh_user_ban(user["jid"])
self.send_msg(f"'{username}' banned.", cmd)
self.wipe_user_msg(_id)
if not found:
self.send_msg(f"Can't find user with ID={_id}", cmd)
else:
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
def _receiver_loop(self):
while True:
req_body = f'<body {self._generate_body_headers()}/>'
resp = self._bosh_send(req_body)
self.msg_queue.put(resp)
def start_receiver_loop(self):
t = Thread(target=self._receiver_loop)
t.daemon = True
t.start()
def _reader_loop(self):
while True:
resp = self.msg_queue.get()
#self.bosh_update_presenses()
msg = xmltodict.parse(resp)["body"]
#print(self.users)
#print(json.dumps(msg, indent=2) + "\n")
time_now = datetime.now()
stamp = os.stat(self._bannedusernames_filename).st_mtime
if self._bannedusernames_stamp != stamp:
self._bannedusernames_stamp = stamp
with open(self._bannedusernames_filename, 'r', encoding='utf-8') as file:
self.banned_usernames = [line.strip() for line in file]
print(f"Banned usernames list updated (count: {len(self.banned_usernames)}).")
stamp = os.stat(self._bannedwords_filename).st_mtime
if self._bannedwords_stamp != stamp:
self._bannedwords_stamp = stamp
with open(self._bannedwords_filename, 'r', encoding='utf-8') as file:
self.banned_words = [line.strip() for line in file]
print(f"Banned words list updated (count: {len(self.banned_words)}).")
for username, user in self.users.items():
if "timeout_until" in user:
if user["timeout_until"] < time_now:
self.send_msg(f"'{username}' unmuted after timeout.")
self.bosh_user_unmute(username)
del self.users[username]["timeout_until"]
# should probably be removed since there is a bosh_update_presenses() call
if "presence" in msg:
self.update_user(msg["presence"])
if "message" in msg:
chatmsgs = msg["message"]
if isinstance(chatmsgs, dict):
chatmsgs = [chatmsgs, ]
for chatmsg in chatmsgs:
if "body" in chatmsg:
# If it's actually a message in the chat
body = chatmsg["body"]
username = chatmsg["@from"].split("/", 1)[1]
msguser = self.users[username]
self.msg_history.append({
"user_id": msguser['uid'],
"body": body,
"msg_id": chatmsg["stanza-id"]["@id"]
})
if (len(self.msg_history) > self.msg_history_max_len):
self.msg_history.pop(0)
if self.last_msg["username"] == username:
self.last_msg["count"] += 1
else:
self.last_msg["username"] = username
self.last_msg["count"] = 1
msgtoprint = (body[:50] + '..') if len(body) > 50 else body
print(f"[{datetime.now().strftime('%H:%M:%S')}] ({msguser['uid']}|{username}|{msguser['jid']}|x{self.last_msg['count']}) {msgtoprint}")
if self.users[username]["role"] == "moderator":
self.process_command(body)
else:
# Start of ratelimit maneuvers
user = self.users[username]
if not "msgtimes" in user:
self.users[username]["msgtimes"] = []
rl_tw = timedelta(seconds=self.ratelimit_timewindow)
for timestamp in user["msgtimes"]:
if timestamp < time_now - rl_tw:
self.users[username]["msgtimes"].remove(timestamp)
self.users[username]["msgtimes"].append(time_now)
msg_duplicate = False
if "lastmsg" in self.users[username]:
if self.users[username]["lastmsg"] == hash(body):
msg_duplicate = True
msgcount = 0
for timestamp in self.users[username]["msgtimes"]:
if timestamp >= time_now - rl_tw:
if msg_duplicate:
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout)
self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: spamming")
self.bosh_user_mute(username)
break
else:
msgcount += 1
if msgcount >= self.ratelimit_maxmessages:
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout)
self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: ratelimits")
self.bosh_user_mute(username)
self.users[username]["lastmsg"] = hash(body)
# End of ratelimit maneuvers
for banned_word in self.banned_words:
if banned_word in body.lower():
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout)
self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: banned word")
self.bosh_user_mute(username)
#print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n")
# Put some rules or commands down there that apply for any user, regardless of role
else:
pass
#if not "urn:xmpp:hints" in req:
#print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n")
else:
pass
#if not "urn:xmpp:hints" in req:
#print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n")
def start_reader_loop(self):
t = Thread(target=self._reader_loop)
t.daemon = True
t.start()
def run(self):
self.connect()
self.start_receiver_loop()
self.start_reader_loop()
while True:
command = input()
self.process_command(command, cmd=True)
if __name__ == "__main__":
bot = ModBot(
settings.instance,
settings.username,
settings.password,
settings.room
)
bot.timeout_default = settings.timeout_default
bot.ratelimit_timewindow = settings.ratelimit_timewindow
bot.ratelimit_maxmessages = settings.ratelimit_maxmessages
bot.ratelimit_timeout = settings.ratelimit_timeout
#req = requests.get(f"https://xxivproduction.video/w/bcsWRyy1XgNdd9RdMriM9F")
#print(req.text)
bot.run()