refactor: delete some shitcode

This commit is contained in:
def 2022-10-23 16:12:35 +04:00
parent 4fa50a3bd0
commit d8c8fe9c54
2 changed files with 442 additions and 227 deletions

223
pymaster.py Executable file → Normal file
View File

@ -12,160 +12,181 @@ from time import time
from server_entry import ServerEntry
from protocol import MasterProtocol
LOG_FILENAME = 'pymaster.log'
LOG_FILENAME = "pymaster.log"
MAX_SERVERS_FOR_IP = 14
def logPrint( msg ):
logging.debug( msg )
class PyMaster:
def __init__(self, ip, port):
self.serverList = []
self.sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
self.sock.bind( (ip, port) )
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((ip, port))
logPrint("Welcome to PyMaster!")
logPrint("I ask you again, are you my master?")
logPrint("Running on %s:%d" % (ip, port))
logging.debug("Welcome to PyMaster!")
logging.debug("I ask you again, are you my master? @-@")
logging.debug("Running on %s:%d" % (ip, port))
def serverLoop(self):
def server_loop(self):
data, addr = self.sock.recvfrom(1024)
data = data.decode('latin_1')
data = data.decode("latin_1")
if( data[0] == MasterProtocol.clientQuery ):
self.clientQuery(data, addr)
elif( data[0] == MasterProtocol.challengeRequest ):
self.sendChallengeToServer(data, addr)
elif( data[0] == MasterProtocol.addServer ):
self.addServerToList(data, addr)
elif( data[0] == MasterProtocol.removeServer ):
self.removeServerFromList(data, addr)
else:
logPrint("Unknown message: {0} from {1}:{2}".format(data, addr[0], addr[1]))
match data[0]:
case MasterProtocol.clientQuery:
self.client_query(data, addr)
case MasterProtocol.challengeRequest:
self.send_challenge_to_server(data, addr)
case MasterProtocol.addServer:
self.add_server_to_list(data, addr)
case MasterProtocol.removeServer:
self.remove_server_from_list(data, addr)
case other:
logging.debug("Unknown message: {0} from {1}:{2}".format(data, addr[0], addr[1]))
def clientQuery(self, data, addr):
def client_query(self, data, addr):
region = data[1] # UNUSED
data = data.strip('1' + region)
data = data.strip("1" + region)
try:
query = data.split('\0')
query = data.split("\0")
except ValueError:
logPrint(traceback.format_exc())
logging.debug(traceback.format_exc())
return
queryAddr = query[0] # UNUSED
rawFilter = query[1]
# Remove first \ character
rawFilter = rawFilter.strip('\\')
split = rawFilter.split('\\')
rawFilter = rawFilter.strip("\\")
split = rawFilter.split("\\")
# Use NoneType as undefined
gamedir = 'valve' # halflife, by default
gamedir = "valve" # halflife, by default
clver = None
nat = 0
for i in range( 0, len(split), 2 ):
for i in range(0, len(split), 2):
try:
key = split[i + 1]
if( split[i] == 'gamedir' ):
if split[i] == "gamedir":
gamedir = key.lower() # keep gamedir in lowercase
elif( split[i] == 'nat' ):
elif split[i] == "nat":
nat = int(key)
elif( split[i] == 'clver' ):
elif split[i] == "clver":
clver = key
else:
logPrint('Unhandled info string entry: {0}/{1}. Infostring was: {2}'.format(split[i], key, split))
logging.debug(
"Unhandled info string entry: {0}/{1}. Infostring was: {2}".format(
split[i], key, split
)
)
except IndexError:
pass
if( clver == None ): # Probably an old vulnerable version
self.fakeInfoForOldVersions( gamedir, addr )
if clver is None: # Probably an old vulnerable version
self.fake_info_for_old_versions(gamedir, addr)
return
packet = MasterProtocol.queryPacketHeader
for i in self.serverList:
if( time() > i.die ):
if time() > i.die:
self.serverList.remove(i)
continue
if( not i.check ):
if not i.check:
continue
if( nat != i.nat ):
if nat != i.nat:
continue
if( gamedir != None ):
if( gamedir != i.gamedir ):
if gamedir is not None and gamedir != i.gamedir:
continue
if( nat ):
reply = '\xff\xff\xff\xffc {0}:{1}'.format( addr[0], addr[1] )
data = reply.encode( 'latin_1' )
if nat:
reply = "\xff\xff\xff\xffc {0}:{1}".format(addr[0], addr[1])
data = reply.encode("latin_1")
# Tell server to send info reply
self.sock.sendto( data, i.addr )
self.sock.sendto(data, i.addr)
# Use pregenerated address string
packet += i.queryAddr
packet += b'\0\0\0\0\0\0' # Fill last IP:Port with \0
packet += b"\0\0\0\0\0\0" # Fill last IP:Port with \0
self.sock.sendto(packet, addr)
def fakeInfoForOldVersions(self, gamedir, addr):
def sendFakeInfo(sock, warnmsg, gamedir, addr):
baseReply = b"\xff\xff\xff\xffinfo\n\host\\" + warnmsg.encode('utf-8') + b"\map\\update\dm\\0\\team\\0\coop\\0\\numcl\\32\maxcl\\32\\gamedir\\" + gamedir.encode('latin-1') + b"\\"
def _send_fake_info(sock, warnmsg, gamedir, addr):
baseReply = (
b"\xff\xff\xff\xffinfo\n\host\\"
+ warnmsg.encode("utf-8")
+ b"\map\\update\dm\\0\\team\\0\coop\\0\\numcl\\32\maxcl\\32\\gamedir\\"
+ gamedir.encode("latin-1")
+ b"\\"
)
sock.sendto(baseReply, addr)
sendFakeInfo(self.sock, "This version is not", gamedir, addr)
sendFakeInfo(self.sock, "supported anymore", gamedir, addr)
sendFakeInfo(self.sock, "Please update Xash3DFWGS", gamedir, addr)
sendFakeInfo(self.sock, "From GooglePlay or GitHub", gamedir, addr)
sendFakeInfo(self.sock, "Эта версия", gamedir, addr)
sendFakeInfo(self.sock, "устарела", gamedir, addr)
sendFakeInfo(self.sock, "Обновите Xash3DFWGS c", gamedir, addr)
sendFakeInfo(self.sock, "GooglePlay или GitHub", gamedir, addr)
def removeServerFromList(self, data, addr):
for i in self.serverList:
if (i.addr == addr):
logPrint("Remove Server: from {0}:{1}".format(addr[0], addr[1]))
self.serverList.remove(i)
def fake_info_for_old_versions(self, gamedir, addr):
error_message = [
"This version is not",
"supported anymore",
"Please update Xash3DFWGS",
"From GooglePlay or GitHub",
"Эта версия",
"устарела",
"Обновите Xash3DFWGS c",
"GooglePlay или GitHub",
]
def sendChallengeToServer(self, data, addr):
logPrint("Challenge Request: from {0}:{1}".format(addr[0], addr[1]))
for string in error_message:
_send_fake_info(self.sock, string, gamedir, addr)
def remove_server_from_list(self, addr):
for server in self.serverList:
if server.addr == addr:
logging.debug("Remove Server: from {0}:{1}".format(addr[0], addr[1]))
self.serverList.remove(server)
def send_challenge_to_server(self, addr):
logging.debug("Challenge Request: from {0}:{1}".format(addr[0], addr[1]))
# At first, remove old server- data from list
#self.removeServerFromList(None, addr)
# self.removeServerFromList(None, addr)
count = 0
for i in self.serverList:
if ( i.addr[0] == addr[0] ):
if( i.addr[1] == addr[1] ):
if i.addr[0] == addr[0]:
if i.addr[1] == addr[1]:
self.serverList.remove(i)
else:
count += 1
if( count > MAX_SERVERS_FOR_IP ):
if count > MAX_SERVERS_FOR_IP:
return
# Generate a 32 bit challenge number
challenge = random.randint(0, 2**32-1)
challenge = random.randint(0, 2**32 - 1)
# Add server to list
self.serverList.append(ServerEntry(addr, challenge))
# And send him a challenge
packet = MasterProtocol.challengePacketHeader
packet += pack('I', challenge)
packet += pack("I", challenge)
self.sock.sendto(packet, addr)
def addServerToList(self, data, addr):
logPrint("Add Server: from {0}:{1}".format(addr[0], addr[1]))
def add_server_to_list(self, data, addr):
logging.debug("Add Server: from {0}:{1}".format(addr[0], addr[1]))
# Remove the header. Just for better parsing.
serverInfo = data.strip('\x30\x0a\x5c')
serverInfo = data.strip("\x30\x0a\x5c")
# Find a server with same address
for serverEntry in self.serverList:
if( serverEntry.addr == addr ):
if serverEntry.addr == addr:
break
serverEntry.setInfoString( serverInfo )
serverEntry.setInfoString(serverInfo)
def spawn_pymaster(verbose, ip, port):
if verbose:
@ -176,28 +197,56 @@ def spawn_pymaster(verbose, ip, port):
masterMain = PyMaster(ip, port)
while True:
try:
masterMain.serverLoop()
masterMain.server_loop()
except Exception:
logPrint(traceback.format_exc())
pass
logging.debug(traceback.format_exc())
if __name__ == "__main__":
parser = OptionParser()
parser.add_option('-i', '--ip', action='store', dest='ip', default='0.0.0.0',
help='ip to listen [default: %default]')
parser.add_option('-p', '--port', action='store', dest='port', type='int', default=27010,
help='port to listen [default: %default]')
parser.add_option('-d', '--daemonize', action='store_true', dest='daemonize', default=False,
help='run in background, argument is uid [default: %default]')
parser.add_option('-q', '--quiet', action='store_false', dest='verbose', default=True,
help='don\'t print to stdout [default: %default]')
parser.add_option(
"-i",
"--ip",
action="store",
dest="ip",
default="0.0.0.0",
help="ip to listen [default: %default]",
)
parser.add_option(
"-p",
"--port",
action="store",
dest="port",
type="int",
default=27010,
help="port to listen [default: %default]",
)
parser.add_option(
"-d",
"--daemonize",
action="store_true",
dest="daemonize",
default=False,
help="run in background, argument is uid [default: %default]",
)
parser.add_option(
"-q",
"--quiet",
action="store_false",
dest="verbose",
default=True,
help="don't print to stdout [default: %default]",
)
(options, args) = parser.parse_args()
if options.daemonize != 0:
from daemon import pidfile, DaemonContext
from daemon import DaemonContext
with DaemonContext(stdout=sys.stdout, stderr=sys.stderr, working_directory=os.getcwd()) as context:
with DaemonContext(
stdout=sys.stdout, stderr=sys.stderr, working_directory=os.getcwd()
) as context:
spawn_pymaster(options.verbose, options.ip, options.port)
else:
sys.exit(spawn_pymaster(options.verbose, options.ip, options.port))

View File

@ -1,83 +1,249 @@
from time import time
from struct import pack
#!/usr/bin/env python3
import socket
import random
import sys
import traceback
import logging
import os
class ServerEntry:
challenge2 = 0
gamedir = ''
protocol = 0
players = 0
maxplayers = 0
bots = 0
gamemap = ''
version = '0'
servtype = 'd'
password = 0
os = 'l'
secure = 0
lan = 0
region = 255
product = ''
from optparse import OptionParser
from struct import pack
from time import time
from server_entry import ServerEntry
from protocol import MasterProtocol
LOG_FILENAME = "pymaster.log"
MAX_SERVERS_FOR_IP = 14
class PyMaster:
def __init__(self, ip, port):
self.serverList = []
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((ip, port))
logging.debug("Welcome to PyMaster!")
logging.debug("I ask you again, are you my master? @-@")
logging.debug("Running on %s:%d" % (ip, port))
def server_loop(self):
data, addr = self.sock.recvfrom(1024)
data = data.decode("latin_1")
match data[0]:
case MasterProtocol.clientQuery:
self.client_query(data, addr)
case MasterProtocol.challengeRequest:
self.send_challenge_to_server(data, addr)
case MasterProtocol.addServer:
self.add_server_to_list(data, addr)
case MasterProtocol.removeServer:
self.remove_server_from_list(data, addr)
case other:
logging.debug(
"Unknown message: {0} from {1}:{2}".format(data, addr[0], addr[1])
)
def client_query(self, data, addr):
region = data[1] # UNUSED
data = data.strip("1" + region)
try:
query = data.split("\0")
except ValueError:
logging.debug(traceback.format_exc())
return
queryAddr = query[0] # UNUSED
rawFilter = query[1]
# Remove first \ character
rawFilter = rawFilter.strip("\\")
split = rawFilter.split("\\")
# Use NoneType as undefined
gamedir = "valve" # halflife, by default
clver = None
nat = 0
def setInfoString(self, data):
infostring = data.replace('\n', '').replace('\r', '').replace('\0', '')
split = infostring.split('\\')
for i in range(0, len(split), 2):
try:
key = split[i + 1]
if( split[i] == 'challenge' ):
self.challenge2 = int(key)
elif( split[i] == 'gamedir' ):
self.gamedir = key.lower() # keep gamedir lowercase
elif( split[i] == 'protocol' ):
self.protocol = int(key)
elif( split[i] == 'players' ):
self.players = int(key)
elif( split[i] == 'max' ):
self.maxplayers = int(key.split('.')[0])
elif( split[i] == 'bots' ):
self.bots = int(key)
elif( split[i] == 'map' ):
self.gamemap = key
elif( split[i] == 'version' ):
self.version = key
elif( split[i] == 'type' ):
self.servtype = key
elif( split[i] == 'password' ):
self.password = key
elif( split[i] == 'os' ):
self.os = key
elif( split[i] == 'secure' ):
self.secure = key
elif( split[i] == 'lan' ):
self.lan = key
elif( split[i] == 'region' ):
self.region = key
elif( split[i] == 'product' ):
self.product = key
elif( split[i] == 'nat' ):
self.nat = int(key)
if split[i] == "gamedir":
gamedir = key.lower() # keep gamedir in lowercase
elif split[i] == "nat":
nat = int(key)
elif split[i] == "clver":
clver = key
else:
logging.debug(
"Unhandled info string entry: {0}/{1}. Infostring was: {2}".format(
split[i], key, split
)
)
except IndexError:
pass
self.check = self.challenge == self.challenge2
def __init__(self, addr, challenge):
# Address
self.addr = addr
if clver is None: # Probably an old vulnerable version
self.fake_info_for_old_versions(gamedir, addr)
return
# Shortcuts for generating query
self.queryAddr = b''
for i in addr[0].split('.'):
self.queryAddr += pack('!B', int(i))
self.queryAddr += pack('!H', int(addr[1]))
packet = MasterProtocol.queryPacketHeader
# Random number that server must return
self.challenge = challenge
for i in self.serverList:
if time() > i.die:
self.serverList.remove(i)
continue
if not i.check:
continue
if nat != i.nat:
continue
if gamedir is not None and gamedir != i.gamedir:
continue
if nat:
reply = "\xff\xff\xff\xffc {0}:{1}".format(addr[0], addr[1])
data = reply.encode("latin_1")
# Tell server to send info reply
self.sock.sendto(data, i.addr)
# This server is not checked
# So it will not get into queries
self.check = False
# Use pregenerated address string
packet += i.queryAddr
# Remove server after this time.
# This maybe not instant
self.die = time() + 600.0
packet += b"\0\0\0\0\0\0" # Fill last IP:Port with \0
self.sock.sendto(packet, addr)
def _send_fake_info(sock, warnmsg, gamedir, addr):
baseReply = (
b"\xff\xff\xff\xffinfo\n\host\\"
+ warnmsg.encode("utf-8")
+ b"\map\\update\dm\\0\\team\\0\coop\\0\\numcl\\32\maxcl\\32\\gamedir\\"
+ gamedir.encode("latin-1")
+ b"\\"
)
sock.sendto(baseReply, addr)
def fake_info_for_old_versions(self, gamedir, addr):
error_message = [
"This version is not",
"supported anymore",
"Please update Xash3DFWGS",
"From GooglePlay or GitHub",
"Эта версия",
"устарела",
"Обновите Xash3DFWGS c",
"GooglePlay или GitHub",
]
for string in error_message:
_send_fake_info(self.sock, string, gamedir, addr)
def remove_server_from_list(self, addr):
for server in self.serverList:
if server.addr == addr:
logging.debug("Remove Server: from {0}:{1}".format(addr[0], addr[1]))
self.serverList.remove(server)
def send_challenge_to_server(self, addr):
logging.debug("Challenge Request: from {0}:{1}".format(addr[0], addr[1]))
# At first, remove old server- data from list
# self.removeServerFromList(None, addr)
count = 0
for i in self.serverList:
if i.addr[0] == addr[0]:
if i.addr[1] == addr[1]:
self.serverList.remove(i)
else:
count += 1
if count > MAX_SERVERS_FOR_IP:
return
challenge = random.randint(0, 2**32 - 1)
# Add server to list
self.serverList.append(ServerEntry(addr, challenge))
# And send him a challenge
packet = MasterProtocol.challengePacketHeader
packet += pack("I", challenge)
self.sock.sendto(packet, addr)
def add_server_to_list(self, data, addr):
logging.debug("Add Server: from {0}:{1}".format(addr[0], addr[1]))
# Remove the header. Just for better parsing.
serverInfo = data.strip("\x30\x0a\x5c")
# Find a server with same address
for serverEntry in self.serverList:
if serverEntry.addr == addr:
break
serverEntry.setInfoString(serverInfo)
def spawn_pymaster(verbose, ip, port):
if verbose:
logging.getLogger().addHandler(logging.StreamHandler())
logging.getLogger().addHandler(logging.FileHandler(LOG_FILENAME))
logging.getLogger().setLevel(logging.DEBUG)
masterMain = PyMaster(ip, port)
while True:
try:
masterMain.server_loop()
except Exception:
logging.debug(traceback.format_exc())
if __name__ == "__main__":
parser = OptionParser()
parser.add_option(
"-i",
"--ip",
action="store",
dest="ip",
default="0.0.0.0",
help="ip to listen [default: %default]",
)
parser.add_option(
"-p",
"--port",
action="store",
dest="port",
type="int",
default=27010,
help="port to listen [default: %default]",
)
parser.add_option(
"-d",
"--daemonize",
action="store_true",
dest="daemonize",
default=False,
help="run in background, argument is uid [default: %default]",
)
parser.add_option(
"-q",
"--quiet",
action="store_false",
dest="verbose",
default=True,
help="don't print to stdout [default: %default]",
)
(options, args) = parser.parse_args()
if options.daemonize != 0:
from daemon import DaemonContext
with DaemonContext(
stdout=sys.stdout, stderr=sys.stderr, working_directory=os.getcwd()
) as context:
spawn_pymaster(options.verbose, options.ip, options.port)
else:
sys.exit(spawn_pymaster(options.verbose, options.ip, options.port))