import sys from messengers import Matrix as MatrixLoop, Telegram as TelegramLoop from config_dataclass import Config CONFIG_FILE_NAME = "config.yaml" def main(): config = Config.from_yaml_config(CONFIG_FILE_NAME) telegram.run(Application(config).run()) class TelegramLopp: def __init__(self, app): self.app = app self.bot = telegram.Bot(token=app.config.telegram_bot_token) self.dispatcher = telegram.Dispatcher(bot=self.bot) self.dispatcher.register_message_handler(self.on_message) async def run(self) -> None: await self.dispatcher.start_polling() async def on_message(self, msg): print(msg, file=sys.stderr) class MatrixLoop: def __init__(self, app): self.app = app self.client = matrix.AsyncClient( app.config.matrix_homeserver_url, app.config.matrix_full_bot_id, ) self.client.add_event_callback(self.on_message, matrix.RoomMessage) async def prepare(self): await self.client.login(self.app.config.matrix_bot_password) async def finish(self): await self.client.close() async def run(self): await self.client.sync_forever(timeout=30000) async def on_message(self, room, event): print(room, event, file=sys.stderr) def upgrade_room(self, room, telegram_nickname): event_dict = matrix.event_builders.event_builder.EventBuilder( name=telegram_nickname ).as_dict() client.room_send( room_id=room, message_type=event_dict["type"], content=event_dict["content"], ) # предположу что оно так работает # https://matrix-nio.readthedocs.io/en/latest/nio.html#module-nio.event_builders.state_events class Application: def __init__(self, config): self.config = config self.matrix_loop = MatrixLoop(self) self.telegram_loop = TelegramLoop(self) async def run(self): try: await self.matrix_loop.prepare() await asyncio.gather( self.matrix_loop.run(), self.telegram_loop.run(), ) finally: if self.matrix_loop: await self.matrix_loop.finish() if __name__ == "__main__": main()