import asyncio import os import sys import aiogram as telegram import nio as matrix def main(): config = Config( matrix_homeserver_url=os.environ['MATRIX_HOMESERVER_URL'], matrix_full_user_id=os.environ['MATRIX_FULL_USER_ID'], matrix_password=os.environ['MATRIX_PASSWORD'], telegram_bot_token=os.environ['TELEGRAM_BOT_TOKEN'], ) asyncio.run(Application(config).run()) class Config: def __init__(self, **kwargs): self.matrix_homeserver_url = kwargs['matrix_homeserver_url'] self.matrix_full_user_id = kwargs['matrix_full_user_id'] self.matrix_password = kwargs['matrix_password'] self.telegram_bot_token = kwargs['telegram_bot_token'] class Application: def __init__(self, config): self.config = config self.matrix_loop = MatrixLoop( config.matrix_homeserver_url, config.matrix_full_user_id, config.matrix_password, ) self.telegram_loop = TelegramLoop(config.telegram_bot_token) async def run(self): try: await self.matrix_loop.prepare() await asyncio.gather( self.matrix_loop.run(), self.telegram_loop.run(), ) finally: if self.matrix_loop: await self.matrix_loop.finish() class MatrixLoop: def __init__(self, homeserver_url, full_user_id, password): self.password = password self.client = matrix.AsyncClient(homeserver_url, full_user_id) self.client.add_event_callback(self.on_message, matrix.RoomMessage) async def prepare(self): await self.client.login(self.password) async def finish(self): await self.client.close() async def run(self): await self.client.sync_forever(timeout=30000) async def on_message(self, room, event): print(room, event, file=sys.stderr) class TelegramLoop: def __init__(self, bot_token): self.bot = telegram.Bot(token=bot_token) self.dispatcher = telegram.Dispatcher(bot=self.bot) self.dispatcher.register_message_handler(self.on_message) async def run(self): await self.dispatcher.start_polling() async def on_message(self, msg): print(msg, file=sys.stderr) if __name__ == '__main__': main()