This repository has been archived on 2023-01-19. You can view files and clone it, but cannot push or open issues or pull requests.
MirrorTea/mirrortea/__main__.py

116 lines
3.4 KiB
Python

import asyncio
import os
import sqlite3
import sys
import aiogram as telegram
import nio as matrix
TELEGRAM_USER_MATRIX_CHATS_SQL = '''
CREATE TABLE IF NOT EXISTS telegram_user_matrix_chats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
telegram_user_id INTEGER NOT NULL,
matrix_chat_id INTEGER NOT NULL,
FOREIGN KEY(telegram_user_id) REFERENCES telegram_users(id),
FOREIGN KEY(matrix_chat_id) REFERENCES matrix_chats(id)
);
'''
TELEGRAM_USERS_SQL = '''
CREATE TABLE IF NOT EXISTS telegram_users
(
id INTEGER PRIMARY KEY NOT NULL CHECK(id < 15),
first_name TEXT NOT NULL CHECK(first_name < 50),
last_name TEXT CHECK(last_name < 50,
username TEXT CHECK(username < 50),
);
'''
def main():
config = Config(
db_path=os.environ['DB_PATH'],
matrix_bot_id=os.environ['MATRIX_BOT_ID'],
matrix_homeserver_url=os.environ['MATRIX_HOMESERVER_URL'],
matrix_owner_id=os.environ['MATRIX_OWNER_ID'],
matrix_password=os.environ['MATRIX_PASSWORD'],
telegram_bot_token=os.environ['TELEGRAM_BOT_TOKEN'],
)
asyncio.run(Application(config).run())
class Config:
def __init__(self, **kwargs):
self.db_path = kwargs['db_path']
self.matrix_bot_id = kwargs['matrix_bot_id']
self.matrix_homeserver_url = kwargs['matrix_homeserver_url']
self.matrix_owner_id = kwargs['matrix_owner_id']
self.matrix_password = kwargs['matrix_password']
self.telegram_bot_token = kwargs['telegram_bot_token']
class Application:
def __init__(self, config):
self.config = config
self.sqlite_adapter = SqliteAdapter(self, config.db_path)
self.matrix_loop = MatrixLoop(self)
self.telegram_loop = TelegramLoop(self)
async def run(self):
try:
await self.matrix_loop.prepare()
await asyncio.gather(
self.matrix_loop.run(),
self.telegram_loop.run(),
)
finally:
if self.matrix_loop:
await self.matrix_loop.finish()
class MatrixLoop:
def __init__(self, app):
self.app = app
self.client = matrix.AsyncClient(
app.config.matrix_homeserver_url,
app.config.matrix_bot_id,
)
self.client.add_event_callback(self.on_message, matrix.RoomMessage)
async def prepare(self):
await self.client.login(self.app.config.matrix_password)
async def finish(self):
await self.client.close()
async def run(self):
await self.client.sync_forever(timeout=30000)
async def on_message(self, room, event):
print(room, event, file=sys.stderr)
class TelegramLoop:
def __init__(self, app):
self.app = app
self.bot = telegram.Bot(token=app.config.telegram_bot_token)
self.dispatcher = telegram.Dispatcher(bot=self.bot)
self.dispatcher.register_message_handler(self.on_message)
async def run(self):
await self.dispatcher.start_polling()
async def on_message(self, msg):
print(msg, file=sys.stderr)
class SqliteAdapter:
def __init__(self, app, path):
self.app = app
self.path = path
self.conn = sqlite3.connect(path)
self._create_tables()
def _create_tables(self):
self.conn.execute(TELEGRAM_USER_MATRIX_CHATS_SQL)
self.conn.commit()
if __name__ == '__main__':
main()