mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-23 09:16:51 +00:00
Merge remote-tracking branch 'origin/master' into api-logs
This commit is contained in:
commit
9f5f0507e3
|
@ -27,7 +27,7 @@ def move_service(service_id: str, volume_name: str) -> Job:
|
|||
job = Jobs.add(
|
||||
type_id=f"services.{service.get_id()}.move",
|
||||
name=f"Move {service.get_display_name()}",
|
||||
description=f"Moving {service.get_display_name()} data to {volume.name}",
|
||||
description=f"Moving {service.get_display_name()} data to {volume.get_display_name().lower()}",
|
||||
)
|
||||
|
||||
move_service_task(service, volume, job)
|
||||
|
|
|
@ -27,4 +27,4 @@ async def get_token_header(
|
|||
|
||||
def get_api_version() -> str:
|
||||
"""Get API version"""
|
||||
return "3.2.1"
|
||||
return "3.2.2"
|
||||
|
|
|
@ -16,6 +16,10 @@ class IsAuthenticated(BasePermission):
|
|||
token = info.context["request"].headers.get("Authorization")
|
||||
if token is None:
|
||||
token = info.context["request"].query_params.get("token")
|
||||
if token is None:
|
||||
connection_params = info.context.get("connection_params")
|
||||
if connection_params is not None:
|
||||
token = connection_params.get("Authorization")
|
||||
if token is None:
|
||||
return False
|
||||
return is_token_valid(token.replace("Bearer ", ""))
|
||||
|
|
|
@ -14,6 +14,7 @@ class DnsProvider(Enum):
|
|||
class ServerProvider(Enum):
|
||||
HETZNER = "HETZNER"
|
||||
DIGITALOCEAN = "DIGITALOCEAN"
|
||||
OTHER = "OTHER"
|
||||
|
||||
|
||||
@strawberry.enum
|
||||
|
|
|
@ -6,7 +6,7 @@ import shutil
|
|||
from pydantic import BaseModel
|
||||
from selfprivacy_api.jobs import Job, JobStatus, Jobs
|
||||
from selfprivacy_api.services.bitwarden import Bitwarden
|
||||
from selfprivacy_api.services.gitea import Gitea
|
||||
from selfprivacy_api.services.forgejo import Forgejo
|
||||
from selfprivacy_api.services.mailserver import MailServer
|
||||
from selfprivacy_api.services.nextcloud import Nextcloud
|
||||
from selfprivacy_api.services.pleroma import Pleroma
|
||||
|
@ -230,7 +230,7 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
|
|||
status_text="Migrating Gitea.",
|
||||
)
|
||||
|
||||
Gitea().stop()
|
||||
Forgejo().stop()
|
||||
|
||||
if not pathlib.Path("/volumes/sda1/gitea").exists():
|
||||
if not pathlib.Path("/volumes/sdb/gitea").exists():
|
||||
|
@ -241,7 +241,7 @@ def migrate_to_binds(config: BindMigrationConfig, job: Job):
|
|||
group="gitea",
|
||||
)
|
||||
|
||||
Gitea().start()
|
||||
Forgejo().start()
|
||||
|
||||
# Perform migration of Mail server
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import typing
|
||||
from selfprivacy_api.services.bitwarden import Bitwarden
|
||||
from selfprivacy_api.services.gitea import Gitea
|
||||
from selfprivacy_api.services.forgejo import Forgejo
|
||||
from selfprivacy_api.services.jitsimeet import JitsiMeet
|
||||
from selfprivacy_api.services.mailserver import MailServer
|
||||
from selfprivacy_api.services.nextcloud import Nextcloud
|
||||
|
@ -13,7 +13,7 @@ import selfprivacy_api.utils.network as network_utils
|
|||
|
||||
services: list[Service] = [
|
||||
Bitwarden(),
|
||||
Gitea(),
|
||||
Forgejo(),
|
||||
MailServer(),
|
||||
Nextcloud(),
|
||||
Pleroma(),
|
||||
|
|
53
selfprivacy_api/services/flake_service_manager.py
Normal file
53
selfprivacy_api/services/flake_service_manager.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
import re
|
||||
from typing import Tuple, Optional
|
||||
|
||||
FLAKE_CONFIG_PATH = "/etc/nixos/sp-modules/flake.nix"
|
||||
|
||||
|
||||
class FlakeServiceManager:
|
||||
def __enter__(self) -> "FlakeServiceManager":
|
||||
self.services = {}
|
||||
|
||||
with open(FLAKE_CONFIG_PATH, "r") as file:
|
||||
for line in file:
|
||||
service_name, url = self._extract_services(input_string=line)
|
||||
if service_name and url:
|
||||
self.services[service_name] = url
|
||||
|
||||
return self
|
||||
|
||||
def _extract_services(
|
||||
self, input_string: str
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
pattern = r"inputs\.([\w-]+)\.url\s*=\s*([\S]+);"
|
||||
match = re.search(pattern, input_string)
|
||||
|
||||
if match:
|
||||
variable_name = match.group(1)
|
||||
url = match.group(2)
|
||||
return variable_name, url
|
||||
else:
|
||||
return None, None
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback) -> None:
|
||||
with open(FLAKE_CONFIG_PATH, "w") as file:
|
||||
file.write(
|
||||
"""
|
||||
{
|
||||
description = "SelfPrivacy NixOS PoC modules/extensions/bundles/packages/etc";\n
|
||||
"""
|
||||
)
|
||||
|
||||
for key, value in self.services.items():
|
||||
file.write(
|
||||
f"""
|
||||
inputs.{key}.url = {value};
|
||||
"""
|
||||
)
|
||||
|
||||
file.write(
|
||||
"""
|
||||
outputs = _: { };
|
||||
}
|
||||
"""
|
||||
)
|
|
@ -7,31 +7,34 @@ from selfprivacy_api.utils import get_domain
|
|||
|
||||
from selfprivacy_api.utils.systemd import get_service_status
|
||||
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||
from selfprivacy_api.services.gitea.icon import GITEA_ICON
|
||||
from selfprivacy_api.services.forgejo.icon import FORGEJO_ICON
|
||||
|
||||
|
||||
class Gitea(Service):
|
||||
"""Class representing Gitea service"""
|
||||
class Forgejo(Service):
|
||||
"""Class representing Forgejo service.
|
||||
|
||||
Previously was Gitea, so some IDs are still called gitea for compatibility.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def get_id() -> str:
|
||||
"""Return service id."""
|
||||
"""Return service id. For compatibility keep in gitea."""
|
||||
return "gitea"
|
||||
|
||||
@staticmethod
|
||||
def get_display_name() -> str:
|
||||
"""Return service display name."""
|
||||
return "Gitea"
|
||||
return "Forgejo"
|
||||
|
||||
@staticmethod
|
||||
def get_description() -> str:
|
||||
"""Return service description."""
|
||||
return "Gitea is a Git forge."
|
||||
return "Forgejo is a Git forge."
|
||||
|
||||
@staticmethod
|
||||
def get_svg_icon() -> str:
|
||||
"""Read SVG icon from file and return it as base64 encoded string."""
|
||||
return base64.b64encode(GITEA_ICON.encode("utf-8")).decode("utf-8")
|
||||
return base64.b64encode(FORGEJO_ICON.encode("utf-8")).decode("utf-8")
|
||||
|
||||
@staticmethod
|
||||
def get_url() -> Optional[str]:
|
||||
|
@ -65,19 +68,19 @@ class Gitea(Service):
|
|||
Return code 3 means service is stopped.
|
||||
Return code 4 means service is off.
|
||||
"""
|
||||
return get_service_status("gitea.service")
|
||||
return get_service_status("forgejo.service")
|
||||
|
||||
@staticmethod
|
||||
def stop():
|
||||
subprocess.run(["systemctl", "stop", "gitea.service"])
|
||||
subprocess.run(["systemctl", "stop", "forgejo.service"])
|
||||
|
||||
@staticmethod
|
||||
def start():
|
||||
subprocess.run(["systemctl", "start", "gitea.service"])
|
||||
subprocess.run(["systemctl", "start", "forgejo.service"])
|
||||
|
||||
@staticmethod
|
||||
def restart():
|
||||
subprocess.run(["systemctl", "restart", "gitea.service"])
|
||||
subprocess.run(["systemctl", "restart", "forgejo.service"])
|
||||
|
||||
@staticmethod
|
||||
def get_configuration():
|
||||
|
@ -93,4 +96,5 @@ class Gitea(Service):
|
|||
|
||||
@staticmethod
|
||||
def get_folders() -> List[str]:
|
||||
"""The data folder is still called gitea for compatibility."""
|
||||
return ["/var/lib/gitea"]
|
Before Width: | Height: | Size: 1.3 KiB After Width: | Height: | Size: 1.3 KiB |
|
@ -1,4 +1,4 @@
|
|||
GITEA_ICON = """
|
||||
FORGEJO_ICON = """
|
||||
<svg width="24" height="24" viewBox="0 0 24 24" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M2.60007 10.5899L8.38007 4.79995L10.0701 6.49995C9.83007 7.34995 10.2201 8.27995 11.0001 8.72995V14.2699C10.4001 14.6099 10.0001 15.2599 10.0001 15.9999C10.0001 16.5304 10.2108 17.0391 10.5859 17.4142C10.9609 17.7892 11.4696 17.9999 12.0001 17.9999C12.5305 17.9999 13.0392 17.7892 13.4143 17.4142C13.7894 17.0391 14.0001 16.5304 14.0001 15.9999C14.0001 15.2599 13.6001 14.6099 13.0001 14.2699V9.40995L15.0701 11.4999C15.0001 11.6499 15.0001 11.8199 15.0001 11.9999C15.0001 12.5304 15.2108 13.0391 15.5859 13.4142C15.9609 13.7892 16.4696 13.9999 17.0001 13.9999C17.5305 13.9999 18.0392 13.7892 18.4143 13.4142C18.7894 13.0391 19.0001 12.5304 19.0001 11.9999C19.0001 11.4695 18.7894 10.9608 18.4143 10.5857C18.0392 10.2107 17.5305 9.99995 17.0001 9.99995C16.8201 9.99995 16.6501 9.99995 16.5001 10.0699L13.9301 7.49995C14.1901 6.56995 13.7101 5.54995 12.7801 5.15995C12.3501 4.99995 11.9001 4.95995 11.5001 5.06995L9.80007 3.37995L10.5901 2.59995C11.3701 1.80995 12.6301 1.80995 13.4101 2.59995L21.4001 10.5899C22.1901 11.3699 22.1901 12.6299 21.4001 13.4099L13.4101 21.3999C12.6301 22.1899 11.3701 22.1899 10.5901 21.3999L2.60007 13.4099C1.81007 12.6299 1.81007 11.3699 2.60007 10.5899Z" fill="black"/>
|
||||
</svg>
|
|
@ -90,6 +90,14 @@ class BlockDevice:
|
|||
def __hash__(self):
|
||||
return hash(self.name)
|
||||
|
||||
def get_display_name(self) -> str:
|
||||
if self.is_root():
|
||||
return "System disk"
|
||||
elif self.model == "Volume":
|
||||
return "Expandable volume"
|
||||
else:
|
||||
return self.name
|
||||
|
||||
def is_root(self) -> bool:
|
||||
"""
|
||||
Return True if the block device is the root device.
|
||||
|
|
|
@ -29,8 +29,6 @@ class RedisPool:
|
|||
url,
|
||||
decode_responses=True,
|
||||
)
|
||||
# TODO: inefficient, this is probably done each time we connect
|
||||
self.get_connection().config_set("notify-keyspace-events", "KEA")
|
||||
|
||||
@staticmethod
|
||||
def connection_url(dbnumber: int) -> str:
|
||||
|
|
2
setup.py
2
setup.py
|
@ -2,7 +2,7 @@ from setuptools import setup, find_packages
|
|||
|
||||
setup(
|
||||
name="selfprivacy_api",
|
||||
version="3.2.1",
|
||||
version="3.2.2",
|
||||
packages=find_packages(),
|
||||
scripts=[
|
||||
"selfprivacy_api/app.py",
|
||||
|
|
127
tests/test_flake_services_manager.py
Normal file
127
tests/test_flake_services_manager.py
Normal file
|
@ -0,0 +1,127 @@
|
|||
import pytest
|
||||
|
||||
from selfprivacy_api.services.flake_service_manager import FlakeServiceManager
|
||||
|
||||
all_services_file = """
|
||||
{
|
||||
description = "SelfPrivacy NixOS PoC modules/extensions/bundles/packages/etc";
|
||||
|
||||
|
||||
inputs.bitwarden.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/bitwarden;
|
||||
|
||||
inputs.gitea.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/gitea;
|
||||
|
||||
inputs.jitsi-meet.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/jitsi-meet;
|
||||
|
||||
inputs.nextcloud.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/nextcloud;
|
||||
|
||||
inputs.ocserv.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/ocserv;
|
||||
|
||||
inputs.pleroma.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/pleroma;
|
||||
|
||||
inputs.simple-nixos-mailserver.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/simple-nixos-mailserver;
|
||||
|
||||
outputs = _: { };
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
some_services_file = """
|
||||
{
|
||||
description = "SelfPrivacy NixOS PoC modules/extensions/bundles/packages/etc";
|
||||
|
||||
|
||||
inputs.bitwarden.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/bitwarden;
|
||||
|
||||
inputs.gitea.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/gitea;
|
||||
|
||||
inputs.jitsi-meet.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/jitsi-meet;
|
||||
|
||||
outputs = _: { };
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def some_services_flake_mock(mocker, datadir):
|
||||
flake_config_path = datadir / "some_services.nix"
|
||||
mocker.patch(
|
||||
"selfprivacy_api.services.flake_service_manager.FLAKE_CONFIG_PATH",
|
||||
new=flake_config_path,
|
||||
)
|
||||
return flake_config_path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def no_services_flake_mock(mocker, datadir):
|
||||
flake_config_path = datadir / "no_services.nix"
|
||||
mocker.patch(
|
||||
"selfprivacy_api.services.flake_service_manager.FLAKE_CONFIG_PATH",
|
||||
new=flake_config_path,
|
||||
)
|
||||
return flake_config_path
|
||||
|
||||
|
||||
# ---
|
||||
|
||||
|
||||
def test_read_services_list(some_services_flake_mock):
|
||||
with FlakeServiceManager() as manager:
|
||||
services = {
|
||||
"bitwarden": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/bitwarden",
|
||||
"gitea": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/gitea",
|
||||
"jitsi-meet": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/jitsi-meet",
|
||||
}
|
||||
assert manager.services == services
|
||||
|
||||
|
||||
def test_change_services_list(some_services_flake_mock):
|
||||
services = {
|
||||
"bitwarden": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/bitwarden",
|
||||
"gitea": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/gitea",
|
||||
"jitsi-meet": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/jitsi-meet",
|
||||
"nextcloud": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/nextcloud",
|
||||
"ocserv": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/ocserv",
|
||||
"pleroma": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/pleroma",
|
||||
"simple-nixos-mailserver": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/simple-nixos-mailserver",
|
||||
}
|
||||
|
||||
with FlakeServiceManager() as manager:
|
||||
manager.services = services
|
||||
|
||||
with FlakeServiceManager() as manager:
|
||||
assert manager.services == services
|
||||
|
||||
with open(some_services_flake_mock, "r", encoding="utf-8") as file:
|
||||
file_content = file.read().strip()
|
||||
|
||||
assert all_services_file.strip() == file_content
|
||||
|
||||
|
||||
def test_read_empty_services_list(no_services_flake_mock):
|
||||
with FlakeServiceManager() as manager:
|
||||
services = {}
|
||||
assert manager.services == services
|
||||
|
||||
|
||||
def test_change_empty_services_list(no_services_flake_mock):
|
||||
services = {
|
||||
"bitwarden": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/bitwarden",
|
||||
"gitea": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/gitea",
|
||||
"jitsi-meet": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/jitsi-meet",
|
||||
"nextcloud": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/nextcloud",
|
||||
"ocserv": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/ocserv",
|
||||
"pleroma": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/pleroma",
|
||||
"simple-nixos-mailserver": "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/simple-nixos-mailserver",
|
||||
}
|
||||
|
||||
with FlakeServiceManager() as manager:
|
||||
manager.services = services
|
||||
|
||||
with FlakeServiceManager() as manager:
|
||||
assert manager.services == services
|
||||
|
||||
with open(no_services_flake_mock, "r", encoding="utf-8") as file:
|
||||
file_content = file.read().strip()
|
||||
|
||||
assert all_services_file.strip() == file_content
|
4
tests/test_flake_services_manager/no_services.nix
Normal file
4
tests/test_flake_services_manager/no_services.nix
Normal file
|
@ -0,0 +1,4 @@
|
|||
{
|
||||
description = "SelfPrivacy NixOS PoC modules/extensions/bundles/packages/etc";
|
||||
outputs = _: { };
|
||||
}
|
12
tests/test_flake_services_manager/some_services.nix
Normal file
12
tests/test_flake_services_manager/some_services.nix
Normal file
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
description = "SelfPrivacy NixOS PoC modules/extensions/bundles/packages/etc";
|
||||
|
||||
|
||||
inputs.bitwarden.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/bitwarden;
|
||||
|
||||
inputs.gitea.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/gitea;
|
||||
|
||||
inputs.jitsi-meet.url = git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/jitsi-meet;
|
||||
|
||||
outputs = _: { };
|
||||
}
|
|
@ -2,7 +2,7 @@ import pytest
|
|||
|
||||
from selfprivacy_api.services.service import ServiceStatus
|
||||
from selfprivacy_api.services.bitwarden import Bitwarden
|
||||
from selfprivacy_api.services.gitea import Gitea
|
||||
from selfprivacy_api.services.forgejo import Forgejo
|
||||
from selfprivacy_api.services.mailserver import MailServer
|
||||
from selfprivacy_api.services.nextcloud import Nextcloud
|
||||
from selfprivacy_api.services.ocserv import Ocserv
|
||||
|
@ -22,7 +22,7 @@ def call_args_asserts(mocked_object):
|
|||
"dovecot2.service",
|
||||
"postfix.service",
|
||||
"vaultwarden.service",
|
||||
"gitea.service",
|
||||
"forgejo.service",
|
||||
"phpfpm-nextcloud.service",
|
||||
"ocserv.service",
|
||||
"pleroma.service",
|
||||
|
@ -77,7 +77,7 @@ def mock_popen_systemctl_service_not_ok(mocker):
|
|||
def test_systemctl_ok(mock_popen_systemctl_service_ok):
|
||||
assert MailServer.get_status() == ServiceStatus.ACTIVE
|
||||
assert Bitwarden.get_status() == ServiceStatus.ACTIVE
|
||||
assert Gitea.get_status() == ServiceStatus.ACTIVE
|
||||
assert Forgejo.get_status() == ServiceStatus.ACTIVE
|
||||
assert Nextcloud.get_status() == ServiceStatus.ACTIVE
|
||||
assert Ocserv.get_status() == ServiceStatus.ACTIVE
|
||||
assert Pleroma.get_status() == ServiceStatus.ACTIVE
|
||||
|
@ -87,7 +87,7 @@ def test_systemctl_ok(mock_popen_systemctl_service_ok):
|
|||
def test_systemctl_failed_service(mock_popen_systemctl_service_not_ok):
|
||||
assert MailServer.get_status() == ServiceStatus.FAILED
|
||||
assert Bitwarden.get_status() == ServiceStatus.FAILED
|
||||
assert Gitea.get_status() == ServiceStatus.FAILED
|
||||
assert Forgejo.get_status() == ServiceStatus.FAILED
|
||||
assert Nextcloud.get_status() == ServiceStatus.FAILED
|
||||
assert Ocserv.get_status() == ServiceStatus.FAILED
|
||||
assert Pleroma.get_status() == ServiceStatus.FAILED
|
||||
|
|
39
tests/test_websocket_uvicorn_standalone.py
Normal file
39
tests/test_websocket_uvicorn_standalone.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
import pytest
|
||||
from fastapi import FastAPI, WebSocket
|
||||
import uvicorn
|
||||
|
||||
# import subprocess
|
||||
from multiprocessing import Process
|
||||
import asyncio
|
||||
from time import sleep
|
||||
from websockets import client
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.websocket("/")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
while True:
|
||||
data = await websocket.receive_text()
|
||||
await websocket.send_text(f"You sent: {data}")
|
||||
|
||||
|
||||
def run_uvicorn():
|
||||
uvicorn.run(app, port=5000)
|
||||
return True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_uvcorn_ws_works_in_prod():
|
||||
proc = Process(target=run_uvicorn)
|
||||
proc.start()
|
||||
sleep(2)
|
||||
|
||||
ws = await client.connect("ws://127.0.0.1:5000")
|
||||
|
||||
await ws.send("hohoho")
|
||||
message = await ws.read_message()
|
||||
assert message == "You sent: hohoho"
|
||||
await ws.close()
|
||||
proc.kill()
|
Loading…
Reference in a new issue