593 lines
27 KiB
Python
593 lines
27 KiB
Python
# Tested on Python 3.10
|
|
# Reqs:
|
|
# python -m pip install requests
|
|
|
|
from datetime import datetime, timedelta
|
|
from threading import Thread
|
|
from time import sleep
|
|
import requests
|
|
import re
|
|
import base64
|
|
import uuid
|
|
import random
|
|
import settings
|
|
import xmltodict
|
|
import queue
|
|
import os
|
|
from helpers import RID
|
|
|
|
class ModBot:
|
|
|
|
def __init__(self, instance, username, password, chatroom) -> None:
|
|
self.peertube_instance = instance
|
|
self.peertube_username = username
|
|
self.peertube_password = password
|
|
self.chat_room = chatroom
|
|
|
|
self._prefixed_instance = f"https://{instance}"
|
|
self.rid = RID()
|
|
self.uid = RID()
|
|
self.users = {}
|
|
|
|
self.msg_history = []
|
|
self.msg_history_max_len = 500
|
|
|
|
self.restricted_mode = 0
|
|
|
|
# Timeout length, if !timeout command was send without 2nd argument
|
|
self.timeout_default = 300
|
|
|
|
# Rate limit settings
|
|
# Time window (in seconds) to measure rates
|
|
self.ratelimit_timewindow = 10
|
|
# How many messages one user allowed to send in 'ratelimit_timewindow' seconds
|
|
self.ratelimit_maxmessages = 3
|
|
# Length of timeout (in seconds) that will be applied to user who exceeded the limits
|
|
self.ratelimit_timeout = 60
|
|
|
|
self.resource_pref = "modbot.py"
|
|
self.bot_msg_prefix = "/me [ModBot]: "
|
|
self.last_msg = {"username": "", "count": 0}
|
|
|
|
self.msg_queue = queue.Queue()
|
|
|
|
self.banned_usernames = []
|
|
self._bannedusernames_filename = "bannedusernames.txt"
|
|
self._bannedusernames_stamp = 0
|
|
|
|
self.banned_words = []
|
|
self._bannedwords_filename = "bannedwords.txt"
|
|
self._bannedwords_stamp = 0
|
|
|
|
def _bosh_send(self, str):
|
|
req = requests.post(f'{self._prefixed_instance}{self.bosh_service_url}',
|
|
headers={'Content-Type': f'text/xml; charset=utf-8'}, timeout=60, data=str.encode('utf-8'))
|
|
#print(f"REQUEST: {str}\nRESPONSE: {req.text}\n")
|
|
return req.text
|
|
|
|
def _generate_body_headers(self):
|
|
return f'rid="{self.rid.x}" sid="{self.authid}" xmlns="http://jabber.org/protocol/httpbind"'
|
|
|
|
def update_user(self, presenses):
|
|
if isinstance(presenses, dict):
|
|
presenses = [presenses, ]
|
|
|
|
for presense in presenses:
|
|
username = presense["@from"].split("/", 1)[1]
|
|
# If you are getting error HERE, your account... got banned from your room???
|
|
x_items = presense["x"]["item"]
|
|
if isinstance(x_items, dict):
|
|
x_items = [x_items, ]
|
|
|
|
for x_item in x_items:
|
|
|
|
|
|
userdata = {
|
|
"role": x_item["@role"],
|
|
"affiliation": x_item["@affiliation"],
|
|
"jid": x_item["@jid"].split("/", 1)[0],
|
|
}
|
|
|
|
new_user = False
|
|
|
|
if "@nick" in x_item:
|
|
for prev_username, prev_user in self.users.items():
|
|
if prev_user["jid"] == userdata["jid"]:
|
|
newnick = x_item['@nick']
|
|
self.users[newnick] = self.users[prev_username]
|
|
del self.users[prev_username]
|
|
user = self.users[newnick]
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] ({user['uid']}|{prev_username}|{user['jid']} changed nickname to {newnick}", end="", flush=True)
|
|
|
|
if self.users[newnick]["role"] == "visitor":
|
|
print(f" and was muted too)")
|
|
self.bosh_user_mute(newnick)
|
|
else: print(")")
|
|
break
|
|
else:
|
|
if username in self.users:
|
|
self.users[username]["role"] = userdata["role"]
|
|
self.users[username]["affiliation"] = userdata["affiliation"]
|
|
self.users[username]["jid"] = userdata["jid"]
|
|
else:
|
|
self.users[username] = userdata
|
|
self.users[username]["uid"] = self.uid.x
|
|
new_user = True
|
|
|
|
if not userdata["role"] in ("visitor", "none"):
|
|
|
|
if new_user:
|
|
if self.restricted_mode == 1 and "@anon." in userdata['jid']:
|
|
self.bosh_user_mute(username)
|
|
|
|
if self.restricted_mode == 2 and "@anon." in userdata['jid']:
|
|
self.bosh_user_mute(username)
|
|
elif self.restricted_mode == 3 and userdata["role"] != "moderator":
|
|
self.bosh_user_mute(username)
|
|
|
|
for banned_username in self.banned_usernames:
|
|
if banned_username in username:
|
|
print(f"User '{username}' was muted: contains banned username '{banned_username}'")
|
|
self.bosh_user_mute(username)
|
|
|
|
def send_msg(self, msg, cmd=False):
|
|
if cmd == False:
|
|
someid = uuid.uuid4()
|
|
req_body = f'<body {self._generate_body_headers()}><message from="{self._internal_user_addr}" id="{someid}" to="{self._internal_room_addr}" type="groupchat" xmlns="jabber:client"><body>{self.bot_msg_prefix}{msg}</body><active xmlns="http://jabber.org/protocol/chatstates"/><origin-id id="{someid}" xmlns="urn:xmpp:sid:0"/></message></body>'
|
|
return self._bosh_send(req_body)
|
|
else:
|
|
print(msg)
|
|
|
|
def connect(self):
|
|
_pi = self._prefixed_instance
|
|
_room = self.chat_room
|
|
_inst = self.peertube_instance
|
|
|
|
print(f'Peertube: Getting client tokens...')
|
|
req = requests.get(f'{_pi}/api/v1/oauth-clients/local')
|
|
client_tokens = req.json()
|
|
|
|
print(f'Peertube: Authorizing and getting user token...')
|
|
req = requests.post(f'{_pi}/api/v1/users/token', data={
|
|
"client_id": client_tokens["client_id"],
|
|
"client_secret": client_tokens["client_secret"],
|
|
"grant_type": "password",
|
|
"response_type": "code",
|
|
"username": self.peertube_username,
|
|
"password": self.peertube_password,
|
|
})
|
|
token_resp = req.json()
|
|
if "access_token" in token_resp:
|
|
access_token = req.json()["access_token"]
|
|
else:
|
|
print(f"Error while logging in: {token_resp}")
|
|
exit(1)
|
|
|
|
print(f"Webchat: Accessing room {_room} at instance {_inst}")
|
|
req = requests.get(f'{_pi}/plugins/livechat/router/webchat/room/{_room}')
|
|
|
|
# Getting errors there? Does the room even exist?
|
|
self.livechat_version = re.search(
|
|
r"/livechat/(.*)/static/", req.text, re.MULTILINE).group(1)
|
|
print(f'Webchat: Livechat version: {self.livechat_version}')
|
|
|
|
self.bosh_service_url = re.search(r"boshServiceUrl: '(.*)',",
|
|
req.text, re.MULTILINE).group(1)
|
|
self.authentication_url = re.search(r"authenticationUrl: '(.*)',",
|
|
req.text, re.MULTILINE).group(1).removeprefix(_pi)
|
|
|
|
headers = {'authorization': f'Bearer {access_token}'}
|
|
req = requests.get(f'{_pi}{self.authentication_url}', headers=headers)
|
|
creds = {}
|
|
if (req.status_code == 200):
|
|
creds = req.json()
|
|
print(f"Webchat: Signed in as '{creds['nickname']}' ({creds['jid']})")
|
|
else:
|
|
print(f"Webchat: ERROR: UNAUTHORIZED (STATUS:{req.status_code})")
|
|
exit(1)
|
|
|
|
self.creds = creds
|
|
|
|
headers = {'Content-Type': f'text/xml; charset=utf-8'}
|
|
authb64 = base64.b64encode(f'\0{creds["nickname"]}\0{creds["password"]}'.encode('ASCII')).decode()
|
|
resource_id = f'{self.resource_pref}-{random.randint(10000000, 99999999)}'
|
|
self.resource_id = resource_id
|
|
|
|
print(f'boshService: Fetching "authid"...')
|
|
req_body = f'<body content="text/xml; charset=utf-8" hold="1" rid="{self.rid.x}" to="{_inst}" ver="1.6" wait="59" xml:lang="en" xmlns="http://jabber.org/protocol/httpbind" xmlns:xmpp="urn:xmpp:xbosh" xmpp:version="1.0"/>'
|
|
req = self._bosh_send(req_body)
|
|
authid = xmltodict.parse(req)["body"]["@authid"]
|
|
|
|
self.authid = authid
|
|
self._internal_user_addr = f'{creds["nickname"]}@{_inst}/{resource_id}'
|
|
self._internal_room_addr = f'{_room}@room.{_inst}'
|
|
|
|
print(f'boshService: Authorizing...')
|
|
req_body = f'<body {self._generate_body_headers()}><auth mechanism="PLAIN" xmlns="urn:ietf:params:xml:ns:xmpp-sasl">{authb64}</auth></body>'
|
|
req = self._bosh_send(req_body)
|
|
|
|
print(f'boshService: Restarting datastream...')
|
|
req_body = f'<body {self._generate_body_headers()} to="{_inst}" xml:lang="en" xmlns:xmpp="urn:xmpp:xbosh" xmpp:restart="true"/>'
|
|
req = self._bosh_send(req_body)
|
|
|
|
print(f'boshService: Binding auth...')
|
|
req_body = f'<body {self._generate_body_headers()}><iq id="_bind_auth_2" type="set" xmlns="jabber:client"><bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"><resource>{resource_id}</resource></bind></iq></body>'
|
|
req = self._bosh_send(req_body)
|
|
|
|
print(f'boshService: Getting session...')
|
|
req_body = f'<body {self._generate_body_headers()}><iq id="_session_auth_2" type="set" xmlns="jabber:client"><session xmlns="urn:ietf:params:xml:ns:xmpp-session"/></iq></body>'
|
|
req = self._bosh_send(req_body)
|
|
|
|
print(f'boshService: Joining room, sending presence...')
|
|
self.bosh_update_presenses()
|
|
|
|
print(f'Done, check if there is ModBot message in chat.')
|
|
self.send_msg("Online!")
|
|
|
|
def bosh_update_presenses(self):
|
|
req_body = f'<body {self._generate_body_headers()}><presence from="{self._internal_user_addr}" to="{self._internal_room_addr}/{self.creds["nickname"]}" xmlns="jabber:client"><x xmlns="http://jabber.org/protocol/muc"><history maxstanzas="0"/></x><c hash="sha-1" node="https://conversejs.org" ver="vFjUiQWh2ew0hsRBxf7LNFK8ol0=" xmlns="http://jabber.org/protocol/caps"/></presence></body>'
|
|
req = self._bosh_send(req_body)
|
|
presenses = xmltodict.parse(req)["body"]
|
|
if "presence" in presenses:
|
|
presenses = presenses["presence"]
|
|
if isinstance(presenses, dict):
|
|
presenses = [presenses, ]
|
|
for presense in presenses:
|
|
self.update_user(presense)
|
|
|
|
def bosh_user_unmute(self, username):
|
|
req_body = f'<body {self._generate_body_headers()}><iq id="{uuid.uuid4()}:sendIQ" to="{self._internal_room_addr}" type="set" xmlns="jabber:client"><query xmlns="http://jabber.org/protocol/muc#admin"><item nick="{username}" role="participant"><reason/></item></query></iq></body>'
|
|
return self._bosh_send(req_body)
|
|
|
|
def bosh_user_mute(self, username):
|
|
req_body = f'<body {self._generate_body_headers()}><iq id="{uuid.uuid4()}:sendIQ" to="{self._internal_room_addr}" type="set" xmlns="jabber:client"><query xmlns="http://jabber.org/protocol/muc#admin"><item nick="{username}" role="visitor"><reason/></item></query></iq></body>'
|
|
return self._bosh_send(req_body)
|
|
|
|
def bosh_user_ban(self, jid):
|
|
req_body = f'<body {self._generate_body_headers()}><iq id="{uuid.uuid4()}:sendIQ" to="{self._internal_room_addr}" type="set" xmlns="jabber:client"><query xmlns="http://jabber.org/protocol/muc#admin"><item affiliation="outcast" jid="{jid}"><reason/></item></query></iq></body>'
|
|
return self._bosh_send(req_body)
|
|
|
|
def bosh_retract_msg(self, msg_id):
|
|
req_body = f'<body {self._generate_body_headers()}><iq id="{uuid.uuid4()}:sendIQ" to="{self._internal_room_addr}" type="set" xmlns="jabber:client"><apply-to id="{msg_id}" xmlns="urn:xmpp:fasten:0"><moderate xmlns="urn:xmpp:message-moderate:0"><retract xmlns="urn:xmpp:message-retract:0"/><reason></reason></moderate></apply-to></iq></body>'
|
|
return self._bosh_send(req_body)
|
|
|
|
def wipe_user_msg(self, user_id):
|
|
newmsg = []
|
|
for msg in self.msg_history:
|
|
if msg["user_id"] == user_id:
|
|
self.bosh_retract_msg(msg["msg_id"])
|
|
else:
|
|
newmsg.append(msg)
|
|
self.msg_history = newmsg
|
|
|
|
def process_command(self, body, cmd=False):
|
|
if body == "!help":
|
|
self.send_msg("List of commands:\n" +
|
|
"!users - Show list of online users\n" +
|
|
"!usersall - Show list of all known users\n" +
|
|
"!mute USERID - Mute user by ID\n" +
|
|
"!unmute USERID - Unmute user by ID\n"
|
|
"!timeout USERID SEC - Mute user by ID for SEC seconds\n"
|
|
"!wipe USERID - Retract user's recent messages\n"
|
|
"!ban USERID - Ban user by ID and retract his last messages\n"
|
|
"!mode 0 - Lift mutes, everyone can talk\n"
|
|
"!mode 1 - Mute anons-newcomers, existing and registered users can talk\n"
|
|
"!mode 2 - Mute all anons, registered users can talk\n"
|
|
"!mode 3 (or !shutup) - Siege mode: mute everyone except moderators.\n"
|
|
"!ratelimits M W S - Rewrite ratelimits: allow M messages per W seconds before S seconds timeout.\n", cmd)
|
|
|
|
elif body == "!users":
|
|
usersstr = "List of online users:\n"
|
|
usersstr += f"ID) [name] (jid, role, affiliation)\n"
|
|
for username, user in self.users.items():
|
|
if user['role'] != "none":
|
|
usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n"
|
|
self.send_msg(usersstr, cmd)
|
|
|
|
elif body == "!usersall":
|
|
usersstr = "List of known users:\n"
|
|
usersstr += f"ID) [name] (jid, role, affiliation)\n"
|
|
for username, user in self.users.items():
|
|
usersstr += f"{user['uid']}) [{username}] ({user['jid']}, {user['role']}, {user['affiliation']})\n"
|
|
self.send_msg(usersstr, cmd)
|
|
|
|
elif body.startswith("!mute"):
|
|
_id = body[6:]
|
|
if _id.isdigit():
|
|
_id = int(_id)
|
|
found = False
|
|
for username, user in self.users.items():
|
|
if user["uid"] == _id:
|
|
found = True
|
|
self.bosh_user_mute(username)
|
|
self.send_msg(f"'{username}' muted", cmd)
|
|
if not found:
|
|
self.send_msg(f"Can't find user with ID={_id}", cmd)
|
|
else:
|
|
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
|
|
|
|
elif body.startswith("!unmute"):
|
|
_id = body[8:]
|
|
if _id.isdigit():
|
|
_id = int(_id)
|
|
found = False
|
|
for username, user in self.users.items():
|
|
if user["uid"] == _id:
|
|
found = True
|
|
self.bosh_user_unmute(username)
|
|
self.send_msg(f"'{username}' unmuted", cmd)
|
|
if not found:
|
|
self.send_msg(f"Can't find user with ID={_id}", cmd)
|
|
else:
|
|
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
|
|
|
|
elif body.startswith("!timeout"):
|
|
args = body[9:].split(" ")
|
|
_id = args[0]
|
|
_time = args[1] if len(args) > 1 else str(self.timeout_default)
|
|
if _id.isdigit() and _time.isdigit():
|
|
_id = int(_id)
|
|
_time = int(_time)
|
|
found = False
|
|
for username, user in self.users.items():
|
|
if user["uid"] == _id:
|
|
found = True
|
|
self.bosh_user_mute(username)
|
|
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=_time)
|
|
self.send_msg(f"'{username}' muted for {_time} seconds", cmd)
|
|
if not found:
|
|
self.send_msg(f"Can't find user with ID={_id}", cmd)
|
|
else:
|
|
self.send_msg("Error: malformed command", cmd)
|
|
|
|
elif body.startswith("!ratelimits"):
|
|
args = body[12:].split(" ")
|
|
if len(args) == 3:
|
|
_msgs = args[0]
|
|
_wnd = args[1]
|
|
_time = args[2]
|
|
if _msgs.isdigit() and _wnd.isdigit() and _time.isdigit():
|
|
_msgs = int(_msgs)
|
|
_wnd = int(_wnd)
|
|
_time = int(_time)
|
|
self.ratelimit_maxmessages = _msgs
|
|
self.ratelimit_timewindow = _wnd
|
|
self.ratelimit_timeout = _time
|
|
self.send_msg(f"Ratelimits updated: users who send >={_msgs} messages in {_wnd} seconds will be timed out for {_time} seconds.", cmd)
|
|
else:
|
|
self.send_msg("Error: malformed command", cmd)
|
|
else:
|
|
self.send_msg("Error: malformed command", cmd)
|
|
|
|
elif body == "!mode 0":
|
|
for username, user in self.users.items():
|
|
self.bosh_user_unmute(username)
|
|
self.restricted_mode = 0
|
|
self.send_msg("Code 🟩-0, everyone unmuted. Welcome back!", cmd)
|
|
|
|
elif body == "!mode 1":
|
|
self.restricted_mode = 1
|
|
self.send_msg("Code 🟨-0, newcomers will be muted.", cmd)
|
|
|
|
elif body == "!mode 2":
|
|
for username, user in self.users.items():
|
|
if "@anon." in user['jid']:
|
|
self.bosh_user_mute(username)
|
|
self.restricted_mode = 2
|
|
self.send_msg("Code 🟨-1, anon users muted.", cmd)
|
|
|
|
elif body == "!mode 3" or body == "!shutup":
|
|
for username, user in self.users.items():
|
|
if user["role"] != "moderator":
|
|
self.bosh_user_mute(username)
|
|
self.restricted_mode = 3
|
|
self.send_msg("Code 🟥-0, everyone muted. Stand by.", cmd)
|
|
|
|
elif body.startswith("!wipe"):
|
|
_id = body[6:]
|
|
if _id.isdigit():
|
|
_id = int(_id)
|
|
found = False
|
|
for username, user in self.users.items():
|
|
if user["uid"] == _id:
|
|
found = True
|
|
self.wipe_user_msg(_id)
|
|
self.send_msg(f"'{username}' msgs wiped.", cmd)
|
|
if not found:
|
|
self.send_msg(f"Can't find user with ID={_id}", cmd)
|
|
else:
|
|
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
|
|
|
|
elif body.startswith("!ban"):
|
|
_id = body[5:]
|
|
if _id.isdigit():
|
|
_id = int(_id)
|
|
found = False
|
|
for username, user in self.users.items():
|
|
if user["uid"] == _id:
|
|
found = True
|
|
if user["jid"] == self.creds['jid']:
|
|
self.send_msg(f"Nope, won't ban myself ({self.creds['jid']})", cmd)
|
|
else:
|
|
self.bosh_user_ban(user["jid"])
|
|
self.send_msg(f"'{username}' banned.", cmd)
|
|
self.wipe_user_msg(_id)
|
|
if not found:
|
|
self.send_msg(f"Can't find user with ID={_id}", cmd)
|
|
else:
|
|
self.send_msg(f"Error: '{_id}' is not integer. Provide numeric user ID (you can list users with !users)", cmd)
|
|
|
|
def _receiver_loop(self):
|
|
while True:
|
|
req_body = f'<body {self._generate_body_headers()}/>'
|
|
resp = self._bosh_send(req_body)
|
|
self.msg_queue.put(resp)
|
|
|
|
def start_receiver_loop(self):
|
|
t = Thread(target=self._receiver_loop)
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
def _reader_loop(self):
|
|
while True:
|
|
resp = self.msg_queue.get()
|
|
|
|
#self.bosh_update_presenses()
|
|
|
|
msg = xmltodict.parse(resp)["body"]
|
|
|
|
#print(self.users)
|
|
#print(json.dumps(msg, indent=2) + "\n")
|
|
time_now = datetime.now()
|
|
|
|
stamp = os.stat(self._bannedusernames_filename).st_mtime
|
|
if self._bannedusernames_stamp != stamp:
|
|
self._bannedusernames_stamp = stamp
|
|
with open(self._bannedusernames_filename, 'r', encoding='utf-8') as file:
|
|
self.banned_usernames = [line.strip() for line in file]
|
|
print(f"Banned usernames list updated (count: {len(self.banned_usernames)}).")
|
|
|
|
stamp = os.stat(self._bannedwords_filename).st_mtime
|
|
if self._bannedwords_stamp != stamp:
|
|
self._bannedwords_stamp = stamp
|
|
with open(self._bannedwords_filename, 'r', encoding='utf-8') as file:
|
|
self.banned_words = [line.strip() for line in file]
|
|
print(f"Banned words list updated (count: {len(self.banned_words)}).")
|
|
|
|
for username, user in self.users.items():
|
|
if "timeout_until" in user:
|
|
if user["timeout_until"] < time_now:
|
|
self.send_msg(f"'{username}' unmuted after timeout.")
|
|
self.bosh_user_unmute(username)
|
|
del self.users[username]["timeout_until"]
|
|
|
|
# should probably be removed since there is a bosh_update_presenses() call
|
|
if "presence" in msg:
|
|
self.update_user(msg["presence"])
|
|
|
|
if "message" in msg:
|
|
chatmsgs = msg["message"]
|
|
if isinstance(chatmsgs, dict):
|
|
chatmsgs = [chatmsgs, ]
|
|
|
|
for chatmsg in chatmsgs:
|
|
|
|
if "body" in chatmsg:
|
|
# If it's actually a message in the chat
|
|
|
|
body = chatmsg["body"]
|
|
username = chatmsg["@from"].split("/", 1)[1]
|
|
msguser = self.users[username]
|
|
|
|
self.msg_history.append({
|
|
"user_id": msguser['uid'],
|
|
"body": body,
|
|
"msg_id": chatmsg["stanza-id"]["@id"]
|
|
})
|
|
|
|
if (len(self.msg_history) > self.msg_history_max_len):
|
|
self.msg_history.pop(0)
|
|
|
|
if self.last_msg["username"] == username:
|
|
self.last_msg["count"] += 1
|
|
else:
|
|
self.last_msg["username"] = username
|
|
self.last_msg["count"] = 1
|
|
|
|
msgtoprint = (body[:50] + '..') if len(body) > 50 else body
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] ({msguser['uid']}|{username}|{msguser['jid']}|x{self.last_msg['count']}) {msgtoprint}")
|
|
|
|
if self.users[username]["role"] == "moderator":
|
|
self.process_command(body)
|
|
|
|
else:
|
|
|
|
# Start of ratelimit maneuvers
|
|
user = self.users[username]
|
|
if not "msgtimes" in user:
|
|
self.users[username]["msgtimes"] = []
|
|
|
|
rl_tw = timedelta(seconds=self.ratelimit_timewindow)
|
|
for timestamp in user["msgtimes"]:
|
|
if timestamp < time_now - rl_tw:
|
|
self.users[username]["msgtimes"].remove(timestamp)
|
|
self.users[username]["msgtimes"].append(time_now)
|
|
|
|
msg_duplicate = False
|
|
if "lastmsg" in self.users[username]:
|
|
if self.users[username]["lastmsg"] == hash(body):
|
|
msg_duplicate = True
|
|
|
|
msgcount = 0
|
|
for timestamp in self.users[username]["msgtimes"]:
|
|
if timestamp >= time_now - rl_tw:
|
|
if msg_duplicate:
|
|
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout)
|
|
self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: spamming")
|
|
self.bosh_user_mute(username)
|
|
break
|
|
else:
|
|
msgcount += 1
|
|
|
|
if msgcount >= self.ratelimit_maxmessages:
|
|
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout)
|
|
self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: ratelimits")
|
|
self.bosh_user_mute(username)
|
|
|
|
self.users[username]["lastmsg"] = hash(body)
|
|
|
|
# End of ratelimit maneuvers
|
|
|
|
for banned_word in self.banned_words:
|
|
if banned_word in body.lower():
|
|
self.users[username]["timeout_until"] = datetime.now() + timedelta(seconds=self.ratelimit_timeout)
|
|
self.send_msg(f"'{username}' muted for {self.ratelimit_timeout} seconds. Reason: banned word")
|
|
self.bosh_user_mute(username)
|
|
|
|
#print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n")
|
|
|
|
# Put some rules or commands down there that apply for any user, regardless of role
|
|
else:
|
|
pass
|
|
#if not "urn:xmpp:hints" in req:
|
|
#print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n")
|
|
else:
|
|
pass
|
|
#if not "urn:xmpp:hints" in req:
|
|
#print(json.dumps(xmltodict.parse(req)["body"], indent=2) + "\n")
|
|
|
|
def start_reader_loop(self):
|
|
t = Thread(target=self._reader_loop)
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
def run(self):
|
|
self.connect()
|
|
self.start_receiver_loop()
|
|
self.start_reader_loop()
|
|
|
|
while True:
|
|
command = input()
|
|
self.process_command(command, cmd=True)
|
|
|
|
if __name__ == "__main__":
|
|
|
|
bot = ModBot(
|
|
settings.instance,
|
|
settings.username,
|
|
settings.password,
|
|
settings.room
|
|
)
|
|
|
|
bot.timeout_default = settings.timeout_default
|
|
bot.ratelimit_timewindow = settings.ratelimit_timewindow
|
|
bot.ratelimit_maxmessages = settings.ratelimit_maxmessages
|
|
bot.ratelimit_timeout = settings.ratelimit_timeout
|
|
|
|
|
|
|
|
#req = requests.get(f"https://xxivproduction.video/w/bcsWRyy1XgNdd9RdMriM9F")
|
|
#print(req.text)
|
|
|
|
bot.run()
|