chore: Clen-up

This commit is contained in:
Inex Code 2025-03-21 06:39:01 +03:00
parent 4d7660d87a
commit d5eaf399d2
No known key found for this signature in database
23 changed files with 174 additions and 144 deletions

View file

@ -1,6 +1,6 @@
{ pythonPackages, rev ? "local" }:
pythonPackages.buildPythonPackage rec {
pythonPackages.buildPythonPackage {
pname = "selfprivacy-graphql-api";
version = rev;
src = builtins.filterSource (p: t: p != ".git" && t != "symlink") ./.;

View file

@ -1,17 +1,23 @@
from typing_extensions import Optional
from selfprivacy_api.models.email_password_metadata import EmailPasswordData
from typing import Optional
from uuid import UUID, uuid4
from datetime import datetime, timezone
from selfprivacy_api.models.email_password_metadata import EmailPasswordData
from selfprivacy_api.models.tokens.time import ensure_timezone
from selfprivacy_api.repositories.email_password import ACTIVE_EMAIL_PASSWORD_PROVIDER
from passlib.hash import argon2
from selfprivacy_api.utils.argon2 import generate_password_hash
def get_email_credentials_metadata(username: str) -> list[EmailPasswordData]:
"""
Retrieve all email password metadata for a given username.
Args:
username (str): The username to retrieve email password metadata for.
Returns:
list[EmailPasswordData]: A list of EmailPasswordData objects containing the metadata.
"""
return ACTIVE_EMAIL_PASSWORD_PROVIDER.get_all_email_passwords_metadata(
username=username,
)

View file

@ -1,4 +1,5 @@
from typing import Optional
from datetime import datetime
import strawberry
@ -11,26 +12,26 @@ from selfprivacy_api.actions.email_passwords import (
class EmailPasswordMetadata:
uuid: str
display_name: str
created_at: Optional[str] = None
expires_at: Optional[str] = None
last_used: Optional[str] = None
created_at: Optional[datetime] = None
expires_at: Optional[datetime] = None
last_used: Optional[datetime] = None
def get_email_credentials_metadata(username: str) -> list[EmailPasswordMetadata]:
email_credintials_metadata_list = action_get_email_credentials_metadata(
email_credentials_metadata_list = action_get_email_credentials_metadata(
username=username
)
if not email_credintials_metadata_list:
if not email_credentials_metadata_list:
return []
return [
EmailPasswordMetadata(
uuid=email_credintial_metadata.uuid,
display_name=email_credintial_metadata.display_name,
created_at=email_credintial_metadata.created_at,
expires_at=email_credintial_metadata.expires_at,
last_used=email_credintial_metadata.last_used,
uuid=email_credential_metadata.uuid,
display_name=email_credential_metadata.display_name,
created_at=email_credential_metadata.created_at,
expires_at=email_credential_metadata.expires_at,
last_used=email_credential_metadata.last_used,
)
for email_credintial_metadata in email_credintials_metadata_list
for email_credential_metadata in email_credentials_metadata_list
]

View file

@ -1,8 +1,5 @@
"""Email passwords metadata management module"""
# pylint: disable=too-few-public-methods
from uuid import uuid4
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
@ -10,7 +7,6 @@ from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericMutationReturn,
)
from selfprivacy_api.actions.email_passwords import (
add_email_password as action_add_email_password,
delete_email_password_hash as action_delete_email_password,
)
@ -19,25 +15,6 @@ from selfprivacy_api.actions.email_passwords import (
class EmailPasswordsMetadataMutations:
"""Mutations change email passwords metadata records"""
@strawberry.mutation(permission_classes=[IsAuthenticated])
def add_email_password(
self,
username: str,
) -> GenericMutationReturn:
try:
action_add_email_password(username=username, password=str(uuid4()))
except Exception as error:
return GenericMutationReturn(
success=False,
message=str(error), # TODO
code=409,
)
return GenericMutationReturn(
success=True,
message="Password added successfully",
code=201,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def delete_email_password(self, username: str, uuid: str) -> GenericMutationReturn:
try:

View file

@ -52,7 +52,7 @@ from selfprivacy_api.repositories.users.exceptions_kanidm import (
from selfprivacy_api.utils.strings import PLEASE_UPDATE_APP_TEXT
FAILED_TO_SETUP_SSO_PASSWORD_TEXT = "Password has been changed only for email, to change it in other services, please update the SelfPrivacy app."
FAILED_TO_SETUP_SSO_PASSWORD_TEXT = "New password for applied to email, to use Single Sign On, please update the SelfPrivacy app."
def return_failed_mutation_return(

View file

@ -6,7 +6,7 @@ from selfprivacy_api.models.email_password_metadata import EmailPasswordData
class AbstractEmailPasswordManager(ABC):
@staticmethod
@abstractmethod
def get_all_email_passwords_matadata(
def get_all_email_passwords_metadata(
username: str,
with_passwords_hashes: bool = False,
) -> list[EmailPasswordData]:
@ -17,7 +17,7 @@ class AbstractEmailPasswordManager(ABC):
username (str)
Returns:
List[EmailPasswordMetadata]:
List[EmailPasswordData]:
A list of metadata objects containing details
about stored passwords. Without hashed password.
"""
@ -33,7 +33,7 @@ class AbstractEmailPasswordManager(ABC):
Args:
username (str)
password_hash (str): The hashed password value.
credential_metadata (EmailPasswordMetadata):
credential_metadata (EmailPasswordData):
Metadata associated with the password,
including display name and timestamps.
"""
@ -51,6 +51,7 @@ class AbstractEmailPasswordManager(ABC):
"""
@staticmethod
@abstractmethod
def delete_all_email_passwords_hashes(username: str) -> None:
"""
Remove all stored email passwords along with their metadata

View file

@ -1,9 +1,10 @@
from datetime import datetime, timezone
from selfprivacy_api.models.email_password_metadata import EmailPasswordData
from selfprivacy_api.repositories.email_password.abstract_email_password_repository import (
AbstractEmailPasswordManager,
)
from selfprivacy_api.utils.redis_pool import RedisPool
from datetime import datetime, timezone
redis = RedisPool().get_userpanel_connection()
@ -67,10 +68,14 @@ class EmailPasswordManager(AbstractEmailPasswordManager):
if credential_metadata.expires_at is not None:
password_data["expires_at"] = credential_metadata.expires_at.isoformat()
redis.hmset(key, password_data)
redis.hset(key, mapping=password_data)
if credential_metadata.expires_at:
redis.expireat(key, int(credential_metadata.expires_at.timestamp()))
try:
redis.expireat(key, int(credential_metadata.expires_at.timestamp()))
except Exception as e:
# Handle the exception (e.g., log it)
print(f"Failed to set expiration time for {key}: {e}")
@staticmethod
def update_email_password_hash_last_used(username: str, uuid: str) -> None:

View file

@ -31,13 +31,16 @@ from selfprivacy_api.repositories.users.abstract_user_repository import (
REDIS_TOKEN_KEY = "kanidm:token"
KANIDM_URL = "https://127.0.0.1:3013"
SP_ADMIN_GROUPS = ["sp.admins"]
SP_DEFAULT_GROUPS = ["sp.full_users"]
logger = logging.getLogger(__name__)
def get_kanidm_url():
return f"https://auth.{get_domain()}"
class KanidmAdminToken:
"""
Manages the administrative token for Kanidm.
@ -62,11 +65,11 @@ class KanidmAdminToken:
@staticmethod
def get() -> str:
redis = RedisPool().get_connection()
kanidm_admin_token = redis.get(REDIS_TOKEN_KEY)
kanidm_admin_token: str = redis.get(REDIS_TOKEN_KEY) # type: ignore
if kanidm_admin_token:
if KanidmAdminToken._is_token_valid(kanidm_admin_token): # type: ignore
return kanidm_admin_token # type: ignore
if KanidmAdminToken._is_token_valid(kanidm_admin_token):
return kanidm_admin_token
logging.warning("Kanidm admin token is missing or invalid. Regenerating.")
@ -133,7 +136,7 @@ class KanidmAdminToken:
@staticmethod
def _is_token_valid(token: str) -> bool:
endpoint = f"{KANIDM_URL}/v1/person/root"
endpoint = f"{get_kanidm_url()}/v1/person/root"
try:
response = requests.get(
endpoint,
@ -142,7 +145,6 @@ class KanidmAdminToken:
"Content-Type": "application/json",
},
timeout=1,
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
)
except (
@ -257,7 +259,7 @@ class KanidmUserRepository(AbstractUserRepository):
logger.error(f"HTTP method '{method}' is not supported.")
raise ValueError(f"Unsupported HTTP method: {method}")
full_endpoint = f"{KANIDM_URL}/v1/{endpoint}"
full_endpoint = f"{get_kanidm_url()}/v1/{endpoint}"
try:
response = request_method(
@ -268,7 +270,6 @@ class KanidmUserRepository(AbstractUserRepository):
"Content-Type": "application/json",
},
timeout=1,
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
)
response_data = response.json()
@ -352,7 +353,7 @@ class KanidmUserRepository(AbstractUserRepository):
"name": [username],
"displayname": [displayname if displayname else username],
"mail": [f"{username}@{get_domain()}"],
"class": ["user"], # TODO read more about it
"class": ["user"],
}
}

View file

@ -487,7 +487,8 @@ class Service(ABC):
return job
report_progress(5, job, f"Stopping {service_name}...")
assert self is not None
if self is not None:
raise AssertionError
with StoppedService(self):
report_progress(9, job, "Stopped service, starting the move...")
self.do_move_to_volume(volume, job)

View file

@ -1,4 +1,5 @@
from authlib.integrations.starlette_client import OAuth
from selfprivacy_api.utils.oauth_secrets import (
load_oauth_client_secret,
OAUTH_CLIENT_ID,
@ -7,21 +8,19 @@ from selfprivacy_api.utils import get_domain
oauth = OAuth()
idm_domain = f"https://auth.{get_domain()}"
idm_domain_url = f"https://auth.{get_domain()}"
oauth.register(
name="kanidm",
client_id=OAUTH_CLIENT_ID,
client_secret=load_oauth_client_secret(),
server_metadata_url=f"{idm_domain}/oauth2/openid/{OAUTH_CLIENT_ID}/.well-known/openid-configuration",
# access_token_url=f"{idm_domain}/oauth2/token",
# access_token_params=None,
# authorize_url=f"{idm_domain}/ui/oauth2",
# authorize_params=None,
server_metadata_url=(
f"{idm_domain_url}/oauth2/openid/{OAUTH_CLIENT_ID}/.well-known/openid-configuration"
),
client_kwargs={
"scope": "openid profile email groups",
"code_challenge_method": "S256",
"token_endpoint_auth_method": "client_secret_post",
},
userinfo_endpoint=f"{idm_domain}/oauth2/openid/{OAUTH_CLIENT_ID}/userinfo",
userinfo_endpoint=f"{idm_domain_url}/oauth2/openid/{OAUTH_CLIENT_ID}/userinfo",
)

View file

@ -1,10 +1,12 @@
from fastapi import Response
from selfprivacy_api.utils.redis_pool import RedisPool
import secrets
import base64
from hashlib import sha256
from datetime import datetime, timedelta, timezone
import secrets
import json
from datetime import datetime, timedelta, timezone
from fastapi import Response
from hashlib import sha256
from selfprivacy_api.utils.redis_pool import RedisPool
def generate_password() -> str:
@ -52,6 +54,9 @@ async def create_session(token: str, user_id: str) -> Session:
async def validate_session_token(token: str) -> Session | None:
redis_conn = RedisPool().get_userpanel_connection_async()
if redis_conn is None:
raise Exception("Redis storage is not available")
session_id = sha256(token.encode()).hexdigest()
item = await redis_conn.get(f"session:{session_id}")
if item is None:
@ -87,10 +92,21 @@ async def validate_session_token(token: str) -> Session | None:
async def invalidate_session(session_id: str) -> None:
redis_conn = RedisPool().get_userpanel_connection_async()
if redis_conn is None:
raise Exception("Redis storage is not available")
await redis_conn.delete(f"session:{session_id}")
def set_session_token_cookie(response: Response, token: str, expires_at: datetime):
"""
Set a session token cookie in the response.
Args:
response (Response): The response object to set the cookie on.
token (str): The session token to be set in the cookie.
expires_at (datetime): The expiration time of the cookie.
"""
response.set_cookie(
"session_token",
token,
@ -102,4 +118,10 @@ def set_session_token_cookie(response: Response, token: str, expires_at: datetim
def delete_session_token_cookie(response: Response):
"""
Delete the session token cookie from the response.
Args:
response (Response): The response object to delete the cookie from.
"""
response.delete_cookie("session_token")

View file

@ -1,33 +1,35 @@
from fastapi import Depends, HTTPException, Request, Response
from fastapi.responses import RedirectResponse
from fastapi import HTTPException, Request, Response
from fastapi.security import OAuth2PasswordBearer
from selfprivacy_api.userpanel.auth.session import (
Session,
validate_session_token,
set_session_token_cookie,
delete_session_token_cookie,
)
import logging
logger = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(request: Request, response: Response) -> Session:
"""
Retrieve the current user session based on the session token in the request cookies.
Args:
request (Request): The request object containing cookies.
response (Response): The response object to set the session token cookie.
Returns:
Session: The user session if a valid session token is found.
Raises:
HTTPException: If no session token is found or if the session token is invalid.
"""
session_token = request.cookies.get("session_token")
if not session_token:
logger.warning("No session token found, redirecting to login.")
raise HTTPException(status_code=307, headers={"Location": "/login"})
raise HTTPException(status_code=303, headers={"Location": "/login"})
session = await validate_session_token(session_token)
if session is None:
logger.warning(
"Invalid session token, deleting cookie and redirecting to login."
)
raise HTTPException(status_code=307, headers={"Location": "/user/logout"})
raise HTTPException(status_code=303, headers={"Location": "/user/logout"})
else:
logger.info("Valid session token found, setting cookie.")
set_session_token_cookie(response, session_token, session.expires_at)
return session

View file

@ -1,11 +1,11 @@
from fastapi import APIRouter, Request, Form
import logging
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from selfprivacy_api.utils.self_service_portal_utils import validate_email_password
from selfprivacy_api.utils import get_domain
from typing import Annotated
import logging
logger = logging.getLogger(__name__)
@ -19,33 +19,23 @@ class EmailPasswordCheckInput(BaseModel):
@router.post("/check-email-password")
async def check_email_password(request: Request, input_data: EmailPasswordCheckInput):
headers = request.headers
logger.info("Headers:")
for key, value in headers.items():
logger.info(f"{key}: {value}")
username = input_data.username
password = input_data.password
if not username or not password:
logger.error("Invalid request")
return JSONResponse({"isValid": False}, status_code=400)
if "@" in username:
parsed_user, domain = username.split("@")
if not (username and password):
parsed_user, domain = username.rsplit("@", 1)
if domain != get_domain():
logger.error(f"Invalid domain for user: {username}")
return JSONResponse({"isValid": False}, status_code=400)
user = parsed_user
else:
logger.error("Invalid input: username must contain a domain")
return JSONResponse({"isValid": False}, status_code=400)
try:
is_valid = validate_email_password(user, password)
logger.info(
f"Password for user {username} is {'valid' if is_valid else 'invalid'}"
)
return JSONResponse({"isValid": is_valid})
except Exception as e:
logger.error(f"Error validating user: {e}")

View file

@ -1,20 +1,17 @@
import logging
from fastapi import APIRouter, HTTPException, Response
from fastapi.responses import RedirectResponse, HTMLResponse
from starlette.requests import Request
from authlib.integrations.starlette_client import OAuth
from authlib.integrations.base_client.errors import OAuthError
from selfprivacy_api.userpanel.templates import templates
from selfprivacy_api.utils.oauth_secrets import (
load_oauth_client_secret,
OAUTH_CLIENT_ID,
)
from selfprivacy_api.userpanel.auth.oauth import oauth
from selfprivacy_api.userpanel.auth.session import (
generate_session_token,
create_session,
set_session_token_cookie,
)
import logging
router = APIRouter()

View file

@ -1,8 +1,18 @@
import base64
import logging
from uuid import UUID
from typing import Annotated, Optional
from datetime import datetime, timezone
from urllib.parse import urlencode
from io import BytesIO
from fastapi import APIRouter, Depends, HTTPException, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from pydantic import BaseModel, Field, ValidationError
import qrcode
from qrcode.image.pil import PilImage
from qrcode.image.pure import PyPNGImage
from selfprivacy_api.models.user import UserDataUser
from selfprivacy_api.userpanel.templates import templates
from selfprivacy_api.userpanel.auth.session import Session, delete_session_token_cookie
@ -15,24 +25,11 @@ from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.icons import sanitize_svg
from selfprivacy_api.models.email_password_metadata import EmailPasswordData
from selfprivacy_api.actions.email_passwords import (
add_email_password,
delete_email_password_hash,
get_email_credentials_metadata,
)
from uuid import UUID
from typing import Annotated, Optional
from datetime import datetime, timezone, date
from selfprivacy_api.actions.system import get_timezone
import logging
from selfprivacy_api.utils.self_service_portal_utils import generate_new_email_password
from urllib.parse import urlencode
import qrcode
from io import BytesIO
import base64
logger = logging.getLogger(__name__)

View file

@ -1,11 +1,14 @@
import os
from typing import Optional
from datetime import datetime
from fastapi.templating import Jinja2Templates
from pytz import timezone
def format_datetime(
value: datetime | None, format="%Y-%m-%d %H:%M:%S", tz_str: str = None
value: Optional[datetime],
format="%Y-%m-%d %H:%M:%S",
tz_str: Optional[str] = None,
):
if value is None:
return ""

View file

@ -31,14 +31,14 @@
<img src="data:image/png;base64,{{ deltachat_qr_base64 }}" alt="Delta Chat QR Code" class="my-2">
<div class="mt-2 flex flex-wrap gap-2 w-full">
{% call() button.primary(deltachat_uri, cls='w-full') %}
Or click here to open Delta Chat on this device
Or tap here to open Delta Chat on this device
{% endcall %}
</div>
</div>
{% endif %}
<div class="max-w-xl">
<h2 class="text-l font-bold">How to use your email password</h2>
<h2 class="text-xl font-bold">How to use your email password</h2>
<p class="text-sm text-muted-foreground">Use the following settings to configure your email client:</p>
<ul class="list-disc list-inside my-2">
<li><strong>Login:</strong> {{ login }}</li>

View file

@ -17,9 +17,9 @@
</div>
<p class="text-sm text-muted-foreground">Tap on the button below to begin</p>
</div>
<div class="flex items-center gap-4 p-6">
<div class="flex items-center gap-4 p-6 w-full">
{% call() button.primary(href='/login/oauth') %}
Sign in with OAuth
Sign in with Single Sign On
{% endcall %}
</div>
</div>

View file

@ -11,10 +11,29 @@ def generate_urlsave_password() -> str:
def generate_password_hash(password: str) -> str:
"""
Generate a hash for the given password using Argon2.
Args:
password (str): The password to hash.
Returns:
str: The hashed password.
"""
return argon2.hash(unicodedata.normalize("NFKC", password))
def verify_password(password: str, password_hash: str) -> bool:
"""
Verify a password against a hash.
Args:
password (str): The password to verify.
password_hash (str): The hash to verify against.
Returns:
bool: True if the password is correct, False otherwise
"""
password = unicodedata.normalize("NFKC", password)
if "$argon2" in password_hash:

View file

@ -55,7 +55,19 @@ ALLOWED_ATTRIBUTES = {
}
def sanitize_svg(svg_content):
def sanitize_svg(svg_content: str):
"""
Sanitize the given SVG content by removing any potentially harmful elements and attributes.
Args:
svg_content (str): The SVG content to sanitize.
Returns:
str: The sanitized SVG content.
"""
return bleach.clean(
svg_content, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES, strip=True
svg_content,
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRIBUTES,
strip=True,
)

View file

@ -1,21 +1,14 @@
import os
import logging
OAUTH_SECRET_PATH = "/run/keys/selfprivacy-api/kanidm-oauth-client-secret"
OAUTH_CLIENT_ID = "selfprivacy-api"
logger = logging.getLogger(__name__)
def load_oauth_client_secret():
logger.info("Loading oauth client secret")
secret_path = OAUTH_SECRET_PATH
if os.path.exists(secret_path):
logger.info(f"Secret file found at {secret_path}")
with open(secret_path, "r", encoding="utf-8") as secret_file:
secret = secret_file.read().strip()
logger.info(f"Loaded oauth client secret: {secret}")
return secret
else:
logger.error(f"Secret file not found at {secret_path}")
raise FileNotFoundError(f"Secret file not found at {secret_path}")

View file

@ -1,20 +1,17 @@
from datetime import datetime, timezone
import secrets
import base64
from typing import Optional
import unicodedata
from passlib.hash import argon2, sha512_crypt
import logging
from selfprivacy_api.repositories.email_password import ACTIVE_EMAIL_PASSWORD_PROVIDER
from selfprivacy_api.models.email_password_metadata import EmailPasswordData
from selfprivacy_api.utils.argon2 import (
verify_password,
generate_urlsave_password,
generate_password_hash,
)
from selfprivacy_api.actions.email_passwords import add_email_password
logger = logging.getLogger(__name__)
def get_email_credentials_metadata_with_passwords_hashes(
username: str,
@ -25,6 +22,10 @@ def get_email_credentials_metadata_with_passwords_hashes(
)
def is_expired(expires_at: Optional[datetime]) -> bool:
return expires_at is not None and expires_at < datetime.now(timezone.utc)
def validate_email_password(username: str, password: str) -> bool:
email_passwords_data = (
ACTIVE_EMAIL_PASSWORD_PROVIDER.get_all_email_passwords_metadata(
@ -38,13 +39,16 @@ def validate_email_password(username: str, password: str) -> bool:
for i in email_passwords_data:
if i.password is None:
continue
if i.expires_at is not None and i.expires_at < datetime.now(timezone.utc):
if is_expired(i.expires_at):
continue
if verify_password(password=password, password_hash=str(i.password)):
ACTIVE_EMAIL_PASSWORD_PROVIDER.update_email_password_hash_last_used(
username=username,
uuid=i.uuid,
)
try:
ACTIVE_EMAIL_PASSWORD_PROVIDER.update_email_password_hash_last_used(
username=username,
uuid=i.uuid,
)
except Exception as e:
logger.error(f"Failed to update email password hash last_used: {e}")
return True
return False

View file

@ -55,7 +55,7 @@ def test_all_jobs_when_some(authorized_client, jobs):
job = Jobs.add("bogus", "bogus.bogus", "fungus")
output = api_jobs(authorized_client)
len(output) == 1
assert len(output) == 1
api_job = output[0]
assert api_job["uid"] == str(job.uid)