Source code for MAIA.dashboard_utils

from __future__ import annotations

import asyncio
import email
import os
import smtplib
import ssl
from datetime import datetime, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path

import requests
import yaml
from bs4 import BeautifulSoup
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from kubernetes import config
from loguru import logger
from minio import Minio
from pyhelm3 import Client

from MAIA.keycloak_utils import get_groups_in_keycloak
from MAIA.kubernetes_utils import generate_kubeconfig, get_namespaces, get_minio_shareable_link
from MAIA_scripts.MAIA_install_project_toolkit import verify_installed_maia_toolkit


[docs] def upload_env_file_to_minio(env_file, namespace, settings): client = Minio( settings.MINIO_URL, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE, ) if env_file.name.endswith(".zip"): with open(f"/tmp/{namespace}_env.zip", "wb+") as destination: for chunk in env_file.chunks(): destination.write(chunk) logger.info(f"Storing {namespace}_env.zip in MinIO, in bucket {settings.BUCKET_NAME}") client.fput_object(settings.BUCKET_NAME, f"{namespace}_env.zip", f"/tmp/{namespace}_env.zip") logger.info(get_minio_shareable_link(f"{namespace}_env.zip", settings.BUCKET_NAME, settings)) filename = f"{namespace}_env.zip" elif env_file.name.endswith(".yaml") or env_file.name.endswith(".yml"): with open(f"/tmp/{namespace}_env.yaml", "wb+") as destination: for chunk in env_file.chunks(): destination.write(chunk) logger.info(f"Storing {namespace}_env.yaml in MinIO, in bucket {settings.BUCKET_NAME}") client.fput_object(settings.BUCKET_NAME, f"{namespace}_env.yaml", f"/tmp/{namespace}_env.yaml") logger.info(get_minio_shareable_link(f"{namespace}_env.yaml", settings.BUCKET_NAME, settings)) filename = f"{namespace}_env.yaml" elif env_file.name.endswith(".txt"): with open(f"/tmp/{namespace}_env.txt", "wb+") as destination: for chunk in env_file.chunks(): destination.write(chunk) logger.info(f"Storing {namespace}_env.txt in MinIO, in bucket {settings.BUCKET_NAME}") client.fput_object(settings.BUCKET_NAME, f"{namespace}_env.txt", f"/tmp/{namespace}_env.txt") logger.info(get_minio_shareable_link(f"{namespace}_env.txt", settings.BUCKET_NAME, settings)) filename = f"{namespace}_env.txt" else: msg = "Environment file must be a zip file, yaml file, or txt file" success = False return msg, success return filename, True
[docs] def verify_gpu_availability(global_existing_bookings, new_booking, gpu_specs): """ Verify GPU availability for a new booking. Parameters ---------- global_existing_bookings : list of dict A list of existing bookings where each booking is represented as a dictionary with keys "gpu", "start_date", and "end_date". new_booking : dict A dictionary representing the new booking with keys "gpu", "start_date", and "end_date". gpu_specs : list of dict A list of GPU specifications where each specification is represented as a dictionary with keys "name", "replicas", and "count". Returns ------- overlapping_time_points : list of datetime A list of time points where bookings overlap. gpu_availability_per_slot : list of int A list of available GPU counts for each overlapping time slot. total_gpus : int The total number of GPUs available for the specified GPU type. """ gpu_name = new_booking["gpu"] gpu_count = 0 gpu_replicas = 0 for gpu_spec in gpu_specs: if gpu_spec["name"] == gpu_name: gpu_replicas = gpu_spec["replicas"] gpu_count = gpu_spec["count"] overlapping_allocations = [] for existing_booking in global_existing_bookings: if existing_booking["gpu"] == gpu_name: if isinstance(existing_booking["start_date"], str): existing_booking_start = datetime.strptime(existing_booking["start_date"], "%Y-%m-%d %H:%M:%S") else: existing_booking_start = existing_booking["start_date"] if isinstance(existing_booking["end_date"], str): existing_booking_end = datetime.strptime(existing_booking["end_date"], "%Y-%m-%d %H:%M:%S") else: existing_booking_end = existing_booking["end_date"] new_booking_start = datetime.strptime(new_booking["starting_time"], "%Y-%m-%d %H:%M:%S").replace( tzinfo=existing_booking_start.tzinfo ) new_booking_end = datetime.strptime(new_booking["ending_time"], "%Y-%m-%d %H:%M:%S").replace( tzinfo=existing_booking_end.tzinfo ) if new_booking_start >= existing_booking_end or new_booking_end <= existing_booking_start: continue if new_booking_start <= existing_booking_start and new_booking_end >= existing_booking_start: overlapping_allocations.append([existing_booking_start, existing_booking_end]) elif new_booking_start <= existing_booking_start and new_booking_end <= existing_booking_end: overlapping_allocations.append([existing_booking_start, new_booking_end]) elif new_booking_start >= existing_booking_start and new_booking_end >= existing_booking_end: overlapping_allocations.append([new_booking_start, existing_booking_end]) elif new_booking_start >= existing_booking_start and new_booking_end <= existing_booking_end: overlapping_allocations.append([new_booking_start, new_booking_end]) overlapping_time_points = [] gpu_availability_per_slot = [] for overlapping_allocation in overlapping_allocations: overlapping_time_points.append(overlapping_allocation[0]) overlapping_time_points.append(overlapping_allocation[1]) if len(global_existing_bookings) == 0: overlapping_time_points.append(datetime.strptime(new_booking["starting_time"], "%Y-%m-%d %H:%M:%S")) overlapping_time_points.append(datetime.strptime(new_booking["ending_time"], "%Y-%m-%d %H:%M:%S")) else: overlapping_time_points.append( datetime.strptime(new_booking["starting_time"], "%Y-%m-%d %H:%M:%S").replace(tzinfo=new_booking_start.tzinfo) ) overlapping_time_points.append( datetime.strptime(new_booking["ending_time"], "%Y-%m-%d %H:%M:%S").replace(tzinfo=new_booking_end.tzinfo) ) overlapping_time_points = sorted(set(overlapping_time_points)) for overlapping_time_point in overlapping_time_points[:-1]: overlapping_window = [ overlapping_time_point, overlapping_time_points[overlapping_time_points.index(overlapping_time_point) + 1], ] overlapping_window_start = overlapping_window[0] overlapping_window_end = overlapping_window[1] available_gpus = gpu_replicas * gpu_count gpu_availability_per_slot.append(available_gpus) for existing_booking in global_existing_bookings: if isinstance(existing_booking["start_date"], str): existing_booking_start = datetime.strptime(existing_booking["start_date"], "%Y-%m-%d %H:%M:%S").replace( tzinfo=new_booking_start.tzinfo ) else: existing_booking_start = existing_booking["start_date"] if isinstance(existing_booking["end_date"], str): existing_booking_end = datetime.strptime(existing_booking["end_date"], "%Y-%m-%d %H:%M:%S").replace( tzinfo=new_booking_end.tzinfo ) else: existing_booking_end = existing_booking["end_date"] if existing_booking_start < overlapping_window_end and existing_booking_end > overlapping_window_start: available_gpus -= 1 gpu_availability_per_slot[-1] = available_gpus return overlapping_time_points, gpu_availability_per_slot, gpu_replicas * gpu_count
[docs] def verify_gpu_booking_policy(existing_bookings, new_booking, global_existing_bookings, gpu_specs): """ Verify GPU booking policy to ensure the new booking does not exceed the allowed days and GPU availability. Parameters ---------- existing_bookings : list A list of existing booking objects with `start_date` and `end_date` attributes. new_booking : dict A dictionary containing the `starting_time` and `ending_time` of the new booking in "%Y-%m-%d %H:%M:%S" format. global_existing_bookings : list A list of all existing bookings globally. gpu_specs : dict A dictionary containing the specifications of the GPUs. Returns ------- bool True if the booking policy is verified, False otherwise. str or None An error message if the booking policy is not verified, None otherwise. """ ending_time = datetime.strptime(new_booking["ending_time"], "%Y-%m-%d %H:%M:%S") starting_time = datetime.strptime(new_booking["starting_time"], "%Y-%m-%d %H:%M:%S") for booking in existing_bookings: if booking.start_date <= datetime.now(tz=booking.start_date.tzinfo) and booking.end_date >= datetime.now( tz=booking.end_date.tzinfo ): return False, "There is an active booking, you cannot book a new one while another is active." if booking.start_date <= datetime.now(tz=booking.start_date.tzinfo) and booking.end_date <= datetime.now( tz=booking.end_date.tzinfo ): # Ensure starting_time is timezone-aware to match booking.end_date if booking.end_date.tzinfo is not None and starting_time.tzinfo is None: starting_time_tz = starting_time.replace(tzinfo=booking.end_date.tzinfo) else: starting_time_tz = starting_time if (starting_time_tz - booking.end_date).days < 14: return ( False, "The time between your old booking and the new booking must be at least 14 days. You can start a new booking on {}.".format( booking.end_date + timedelta(days=14) ), ) if booking.start_date >= datetime.now(tz=booking.start_date.tzinfo) and booking.end_date >= datetime.now( tz=booking.end_date.tzinfo ): return False, "You already have a planned booking [{} - {}], you cannot book a new one.".format( booking.start_date, booking.end_date ) new_booking_days = (ending_time - starting_time).days if new_booking_days <= 0: return False, "The booking must be at least one day long." if new_booking_days > 14: return False, "The booking cannot exceed 14 days." # Verify that the sum of existing bookings and the new booking does not exceed 60 days # if total_days + new_booking_days > 60: # return False, "The total number of days for all bookings cannot exceed 60 days." overlapping_time_slots, gpu_availability_per_slot, total_replicas = verify_gpu_availability( global_existing_bookings=global_existing_bookings, new_booking=new_booking, gpu_specs=gpu_specs ) for idx, gpu_availability in enumerate(gpu_availability_per_slot): if gpu_availability == 0: error_msg = "GPU not available between the selected time slots: {} - {}".format( overlapping_time_slots[idx], overlapping_time_slots[idx + 1] ) return False, error_msg return True, None
[docs] def send_maia_info_email(receiver_email, register_project_url, register_user_url, discord_support_link): """ Send an email with registration information for the MAIA platform. Parameters ---------- receiver_email : str The email address of the recipient. register_project_url : str The URL for project registration. register_user_url : str The URL for user registration. discord_support_link : str The URL for the MAIA support Discord. Returns ------- None """ sender_email = os.environ["email_account"] message = MIMEMultipart() message["Subject"] = "Registration Information for the MAIA Platform" message["From"] = f"MAIA Admin Team <{sender_email}>" message["To"] = receiver_email html = """\ <html> <head></head> <body> <p>Thank you for your interest in the MAIA platform. Below are the steps to register:</p> <p><b>Project Registration:</b><br> If you are starting a research work and you want to have it hosted in MAIA, please first register your project here:<br> <a href="{}">MAIA Project Registration</a></p> <p><b>User Registration:</b><br> To create a user account, an active project must be available to select. Once a project is registered, you can sign up for an account linked to that project here:<br> <a href="{}">MAIA User Registration</a></p> <p>If you have any questions or need further assistance, feel free to join our Discord community:<br> <a href="{}">MAIA Support Discord</a></p> <br> <p>Best regards,</p> <p>The MAIA Admin Team</p> </body> </html> """.format(register_project_url, register_user_url, discord_support_link) # noqa: B950 # Turn these into plain/html MIMEText objects part1 = MIMEText(html, "html") message.attach(part1) port = 587 # For SSL password = os.environ["email_password"] # Create a secure SSL context # context = ssl.create_default_context() with smtplib.SMTP(os.environ["email_smtp_server"], port) as server: server.ehlo() # identify ourselves to SMTP server server.starttls() # encrypt the session server.login(sender_email, password) server.sendmail(sender_email, receiver_email, message.as_string())
[docs] def verify_minio_availability(settings): """ Verifies the availability of a MinIO server. Parameters ---------- settings : object An object containing the MinIO configuration settings. MINIO_URL : str The URL of the MinIO server. MINIO_ACCESS_KEY : str The access key for the MinIO server. MINIO_SECRET_KEY : str The secret key for the MinIO server. BUCKET_NAME : str The name of the bucket to check for existence. Returns ------- bool True if the MinIO server is available and the bucket exists, False otherwise. """ try: client = Minio( settings.MINIO_URL, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE, ) client.bucket_exists(settings.BUCKET_NAME) minio_available = True except Exception as e: logger.error(f"MinIO error: {e}") minio_available = False return minio_available
[docs] def send_approved_registration_email(receiver_email, login_url, temp_password): """ Sends an email to notify the user that their MAIA account registration has been approved. Parameters ---------- receiver_email : str The email address of the recipient. login_url : str The URL where the user can log in to MAIA. temp_password : str The temporary password assigned to the user. Raises ------ KeyError If the environment variables 'email_account' or 'email_password' are not set. smtplib.SMTPException If there is an error sending the email. """ sender_email = os.environ["email_account"] message = MIMEMultipart() message["Subject"] = "Account Registration Approved" message["From"] = f"MAIA Registration <{sender_email}>" message["To"] = receiver_email html = """\ <html> <head></head> <body> <p>Your MAIA Account has been approved.</p> <p>Log in to MAIA at the following link: <a href="{}">MAIA</a></p> <p>Your temporary password is: {}</p> <p>Please change your password after logging in.</p> <br> <p>Best Regards,</p> <p>MAIA Admin Team</p> </body> </html> """.format(login_url, temp_password) # Turn these into plain/html MIMEText objects part1 = MIMEText(html, "html") message.attach(part1) port = 587 # For SSL password = os.environ["email_password"] # Create a secure SSL context # context = ssl.create_default_context() with smtplib.SMTP(os.environ["email_smtp_server"], port) as server: server.ehlo() # identify ourselves to SMTP server server.starttls() # encrypt the session server.login(sender_email, password) server.sendmail(sender_email, receiver_email, message.as_string())
[docs] def send_discord_message(username, namespace, url, project_registration=False): """ Sends a message to a Discord webhook to request a MAIA account. Parameters ---------- username : str The username of the person requesting the account. namespace : str The project namespace for which the account is being requested. url : str The Discord webhook URL to which the message will be sent. project_registration : bool, optional If True, indicates that a project registration is also being requested (default is False). Raises ------ requests.exceptions.HTTPError If the HTTP request returned an unsuccessful status code. Prints ------ str Success message with the HTTP status code if the payload is delivered successfully. str Error message if the HTTP request fails. """ data = {"content": f"{username} is requesting a MAIA account for the project {namespace}.", "username": "MAIA-Bot"} data["embeds"] = [{"description": "MAIA User Registration Request", "title": "MAIA Account Request"}] if project_registration: data["embeds"][0]["description"] = "MAIA Project Registration Request" data["embeds"][0]["title"] = "MAIA Project Registration Request" data["content"] = f"{username} is requesting a MAIA account and a new project registration for {namespace}." result = requests.post(url, json=data) try: result.raise_for_status() except requests.exceptions.HTTPError as err: logger.error(f"Discord webhook error: {err}") else: logger.info(f"Payload delivered successfully, code {result.status_code}")
[docs] def get_pending_projects(settings, maia_project_model): """ Retrieve a list of pending projects that are not in the active groups. Parameters ---------- settings : dict Configuration settings required to access Keycloak. maia_project_model : Django model The Django model representing the MAIA projects. Returns ------- list A list of namespaces of pending projects. """ pending_projects = [] try: active_groups = get_groups_in_keycloak(settings) for project in maia_project_model.objects.all(): if project.namespace not in active_groups.values(): pending_projects.append(project.namespace) except Exception: ... return pending_projects
[docs] def get_user_table(settings, maia_user_model, maia_project_model): """ Retrieve user and project information from Keycloak and Minio, and organize it into a dictionary. Parameters ---------- settings : object An object containing configuration settings such as OIDC and Minio credentials. maia_user_model : Django model The Django model representing MAIA users. maia_project_model : Django model The Django model representing MAIA projects. Returns ------- tuple A tuple containing: - users_to_register_in_group (dict): Users to be registered in Keycloak groups. - users_to_register_in_keycloak (list): Users to be registered in Keycloak. - maia_group_dict (dict): Dictionary containing group information including users, conda environments, and project details. - users_to_remove_from_group (dict): Users to be removed from Keycloak groups. """ keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password="", realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=getattr(settings, "OIDC_CA_BUNDLE", True), ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) users_to_register_in_group = {} groups = keycloak_admin.get_groups() maia_groups = {group["id"]: group["name"][len("MAIA:") :] for group in groups if group["name"].startswith("MAIA:")} pending_projects = get_pending_projects(settings=settings, maia_project_model=maia_project_model) maia_group_dict = {} try: client = Minio( settings.MINIO_URL, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE, ) minio_env_files = [env.object_name for env in list(client.list_objects(settings.BUCKET_NAME))] except Exception: minio_env_files = [] for maia_group in maia_groups: if maia_groups[maia_group] in ( getattr(settings, "USERS_GROUP", "users"), getattr(settings, "ADMIN_GROUP", "admin"), ): continue users = keycloak_admin.get_group_members(group_id=maia_group) admin_users = [] cpu_limit = None memory_limit = None date = None cluster = None gpu = None project_tier = None env_files = [] if maia_project_model.objects.filter(namespace=maia_groups[maia_group]).exists(): project = maia_project_model.objects.filter(namespace=maia_groups[maia_group]).first() if project.email: admin_users = project.email.split(",") if "," in project.email else [project.email] else: admin_users = [] if project.supervisor: for email in project.supervisor.split(","): admin_users.append(email) cpu_limit = project.cpu_limit memory_limit = project.memory_limit date = project.date cluster = project.cluster gpu = project.gpu project_tier = project.project_tier env_file = project.env_file for env_file in minio_env_files: if env_file.startswith(maia_groups[maia_group] + "_env"): env_files.append(env_file) if len(env_files) == 0: env_files.append("N/A") group_users = [] for user in users: # admin_users are email addresses (project.email, supervisor); compare with user email if user.get("email") in admin_users: group_users.append(user["email"] + " [Project Admin]") else: group_users.append(user["email"]) maia_group_dict[maia_groups[maia_group]] = { "users": group_users, "env_file": env_files, "admin_users": admin_users, "cpu_limit": cpu_limit, "memory_limit": memory_limit, "date": date, "cluster": cluster, "gpu": gpu, "project_tier": project_tier, } for pending_project in pending_projects: env_files = [] for env_file in minio_env_files: if env_file.startswith(pending_project + "_env"): env_files.append(env_file) if len(env_files) == 0: env_files.append("N/A") users = [] for user in maia_user_model.objects.all(): if user.namespace and pending_project in user.namespace.split(","): users.append(user.email) project = maia_project_model.objects.filter(namespace=pending_project).first() maia_group_dict[pending_project] = { "users": users, "pending": True, "env_file": env_files, "admin_users": [], "cpu_limit": project.cpu_limit, "memory_limit": project.memory_limit, "date": project.date, "cluster": "N/A", "gpu": project.gpu, "project_tier": project.project_tier, } users_to_register_in_keycloak = [] users_to_remove_from_group = {} for user in maia_user_model.objects.all(): user_groups = [] user_in_keycloak = False for keycloak_user in keycloak_admin.get_users(): if "email" in keycloak_user: if keycloak_user["email"] == user.email: user_in_keycloak = True user_keycloak_groups = keycloak_admin.get_user_groups(user_id=keycloak_user["id"]) for user_keycloak_group in user_keycloak_groups: if user_keycloak_group["name"].startswith("MAIA:"): user_groups.append(user_keycloak_group["name"][len("MAIA:") :]) if not user_in_keycloak: users_to_register_in_keycloak.append(user.email) requested_namespaces = user.namespace.split(",") if user.namespace else [] for requested_namespace in requested_namespaces: if requested_namespace not in user_groups and requested_namespace != "N/A": if user.email not in users_to_register_in_group: users_to_register_in_group[user.email] = [requested_namespace] else: users_to_register_in_group[user.email].append(requested_namespace) for user_group in user_groups: if user_group not in requested_namespaces: if user.email not in users_to_remove_from_group: users_to_remove_from_group[user.email] = [user_group] else: users_to_remove_from_group[user.email].append(user_group) return users_to_register_in_group, users_to_register_in_keycloak, maia_group_dict, users_to_remove_from_group
[docs] def register_cluster_for_project_in_db(project_model, settings, namespace, cluster): """ Registers a cluster for a project in the database. This function connects to Keycloak to retrieve group information and associates a cluster with a project based on the provided namespace. If the project already exists, it updates the cluster information; otherwise, it creates a new project entry with the specified cluster. Parameters ---------- project_model : Django model The Django model representing the project. settings : object An object containing configuration settings. namespace : str The namespace associated with the project. cluster : str The cluster to be registered for the project. Returns ------- None """ keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password="", realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=getattr(settings, "OIDC_CA_BUNDLE", True), ) group_id = namespace keycloak_admin = KeycloakAdmin(connection=keycloak_connection) groups = keycloak_admin.get_groups() maia_groups = {group["id"]: group["name"][len("MAIA:") :] for group in groups if group["name"].startswith("MAIA:")} for maia_group in maia_groups: if maia_groups[maia_group].lower().replace("_", "-") == namespace: group_id = maia_groups[maia_group] logger.info(f"Registering Existing Cluster for Group: {group_id}") if project_model.objects.filter(namespace=group_id).exists(): project = project_model.objects.filter(namespace=group_id).first() if project: project.cluster = cluster project.save() else: project_model.objects.create(namespace=group_id, cluster=cluster, memory_limit="2 Gi", cpu_limit="2")
[docs] def update_user_table(form, user_model, maia_user_model, project_model): """ Updates user and project information based on the cleaned data from a form. Parameters ---------- form : Form The form containing cleaned data to update the user and project models. user_model : Model The user model to query and update user information. maia_user_model : Model The MAIA user model to query and update namespace information. project_model : Model The project model to query and update project information. Notes ----- - The function processes entries in the form's cleaned data to update user namespaces and project details. - User namespaces are updated or created in the `maia_user_model` based on the user's email. - Project details are updated or created in the `project_model` based on the namespace. """ project_entries = ["memory_limit", "cpu_limit", "date", "cluster", "gpu", "project_tier"] namespace_list = [] for entry in form.cleaned_data: if entry.startswith("namespace_"): user = user_model.objects.filter(email=entry.replace("namespace_", "")).first() if user: user_id = user.id if maia_user_model.objects.filter(id=user_id).exists(): namespace = form.cleaned_data[entry] # namespaces = [] # for namespace in namespace_list: # if namespace.endswith(" (Pending)"): # namespaces.append(namespace[:-len(" (Pending)")]) # else: # namespaces.append(namespace) maia_user_model.objects.filter(id=user_id).update(namespace=namespace) else: if user_id is not None: if user_model.objects.filter(id=user_id).exists(): namespace = form.cleaned_data[entry] # namespaces = [] # for namespace in namespace_list: # if namespace.endswith(" (Pending)"): # namespaces.append(namespace[:-len(" (Pending)")]) # else: # namespaces.append(namespace) maia_user_model.objects.create(id=user_id, namespace=namespace) else: namespace = form.cleaned_data[entry] # namespaces = [] # for namespace in namespace_list: # if namespace.endswith(" (Pending)"): # namespaces.append(namespace[:-len(" (Pending)")]) # else: # namespaces.append(namespace) maia_user_model.objects.create(id=user_id, namespace=namespace) for project_entry in project_entries: if entry.startswith(project_entry + "_"): namespace = entry[len(project_entry + "_") :] namespace_list.append(namespace) for namespace in namespace_list: # namespaced_entries = [entry for entry in form.cleaned_data if entry.endswith(namespace)] if project_model.objects.filter(namespace=namespace).exists(): project_model.objects.filter(namespace=namespace).update( memory_limit=form.cleaned_data["memory_limit_" + namespace], cpu_limit=form.cleaned_data["cpu_limit_" + namespace], date=form.cleaned_data["date_" + namespace], cluster=form.cleaned_data["cluster_" + namespace], gpu=form.cleaned_data["gpu_" + namespace], project_tier=form.cleaned_data["project_tier_" + namespace], ) else: project_model.objects.create( namespace=namespace, memory_limit=form.cleaned_data["memory_limit_" + namespace], cpu_limit=form.cleaned_data["cpu_limit_" + namespace], date=form.cleaned_data["date_" + namespace], cluster=form.cleaned_data["cluster_" + namespace], gpu=form.cleaned_data["gpu_" + namespace], project_tier=form.cleaned_data["project_tier_" + namespace], )
[docs] def get_project(group_id, settings, maia_project_model, is_namespace_style=False): """ Retrieve project details and associated cluster ID based on the group ID. Parameters ---------- group_id : str The ID of the group to search for. settings : object The settings object containing OIDC configuration. maia_project_model : Django model The Django model representing MAIA projects. is_namespace_style : bool, optional Flag indicating whether the group ID is in namespace style (default is False). Returns ------- tuple A tuple containing: - namespace_form (dict): A dictionary with project details and resource limits. - cluster_id (str or None): The ID of the associated cluster, or None if not applicable. """ cluster_id = None for project in maia_project_model.objects.all(): if is_namespace_style: if str(project.namespace).lower().replace("_", "-") == group_id: keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password="", realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=getattr(settings, "OIDC_CA_BUNDLE", True), ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) groups = keycloak_admin.get_groups() maia_groups = { group["id"]: group["name"][len("MAIA:") :] for group in groups if group["name"].startswith("MAIA:") } group_users = [] for maia_group in maia_groups: if maia_groups[maia_group] == group_id: users = keycloak_admin.get_group_members(group_id=maia_group) for user in users: group_users.append(user["email"]) namespace_form = { "group_ID": group_id, "group_subdomain": group_id.lower().replace("_", "-"), "users": group_users, "resources_limits": { "memory": [str(int(int(project.memory_limit[: -len(" Gi")]) / 2)) + " Gi", project.memory_limit], "cpu": [str(int(int(project.cpu_limit) / 2)), project.cpu_limit], }, "project_tier": project.project_tier, } if project.gpu != "N/A" and project.gpu != "NO": namespace_form["gpu"] = "1" if project.env_file != "N/A" and project.env_file is not None: namespace_form["minio_env_name"] = project.env_file cluster_id = project.cluster if cluster_id == "N/A": cluster_id = None return namespace_form, cluster_id else: if project.namespace == group_id: keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password="", realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=getattr(settings, "OIDC_CA_BUNDLE", True), ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) groups = keycloak_admin.get_groups() maia_groups = { group["id"]: group["name"][len("MAIA:") :] for group in groups if group["name"].startswith("MAIA:") } group_users = [] for maia_group in maia_groups: if maia_groups[maia_group] == group_id: users = keycloak_admin.get_group_members(group_id=maia_group) for user in users: group_users.append(user["email"]) namespace_form = { "group_ID": group_id, "group_subdomain": group_id.lower().replace("_", "-"), "users": group_users, "resources_limits": { "memory": [str(int(int(project.memory_limit[: -len(" Gi")]) / 2)) + " Gi", project.memory_limit], "cpu": [str(int(int(project.cpu_limit) / 2)), project.cpu_limit], }, "project_tier": project.project_tier, } if project.gpu != "N/A" and project.gpu != "NO": namespace_form["gpu_request"] = "1" if project.env_file != "N/A" and project.env_file is not None: namespace_form["minio_env_name"] = project.env_file cluster_id = project.cluster if cluster_id == "N/A": cluster_id = None return namespace_form, cluster_id return None, None
[docs] def get_argocd_project_status(argocd_namespace, project_id): return verify_installed_maia_toolkit(project_id=project_id, namespace=argocd_namespace, get_chart_metadata=False)
[docs] def get_allocation_date_for_project(maia_project_model, group_id, is_namespace_style=False): """ Retrieves the allocation date for a project based on the given group ID. Parameters ---------- maia_project_model : Model The Django model representing the MAIA project. group_id : str The group ID to match against the project's namespace. is_namespace_style : bool, optional If True, the group ID comparison will be done in a namespace style (lowercase and underscores replaced with hyphens). Default is False. Returns ------- date or None The allocation date of the project if a match is found, otherwise None. """ for project in maia_project_model.objects.all(): if is_namespace_style: if str(project.namespace).lower().replace("_", "-") == group_id: return project.date else: if project.namespace == group_id: return project.date return None
[docs] async def get_list_of_deployed_projects(): """ Asynchronously retrieves a list of deployed projects from the Argo CD namespace. This function uses a Kubernetes client to list all releases in the "argocd" namespace and returns the names of these releases. Returns ------- list of str A list containing the names of the deployed projects. Raises ------ KeyError If the "KUBECONFIG" environment variable is not set. kubernetes.client.exceptions.ApiException If there is an error communicating with the Kubernetes API. """ if "BACKEND" in os.environ and os.environ["BACKEND"] == "compose": return [os.environ["PROJECT_NAME"]] client = Client(kubeconfig=os.environ["KUBECONFIG"]) releases = await client.list_releases(namespace="argocd") return [release.name for release in releases]
[docs] def get_project_argo_status_and_user_table(request, settings, maia_user_model, maia_project_model): """ Retrieves the Argo CD project status and user table information. Parameters ---------- request : HttpRequest The HTTP request object containing session and user information. settings : Settings The settings object containing configuration values. maia_user_model : Model The Django model representing MAIA users. maia_project_model : Model The Django model representing MAIA projects. Returns ------- tuple A tuple containing: - user_table (dict): The user table information. - to_register_in_groups (list): List of users to register in groups. - to_register_in_keycloak (list): List of users to register in Keycloak. - maia_groups_dict (dict): Dictionary of MAIA groups. - project_argo_status (dict): Dictionary containing the Argo CD project status for each project. """ argocd_cluster_id = settings.ARGOCD_CLUSTER id_token = request.session.get("oidc_id_token") if argocd_cluster_id is None or argocd_cluster_id == "N/A": ... else: kubeconfig_dict = generate_kubeconfig(id_token, request.user.username, "default", argocd_cluster_id, settings=settings) config.load_kube_config_from_dict(kubeconfig_dict) with open(Path("/tmp").joinpath("kubeconfig-argo"), "w") as f: yaml.dump(kubeconfig_dict, f) os.environ["KUBECONFIG"] = str(Path("/tmp").joinpath("kubeconfig-argo")) to_register_in_groups, to_register_in_keycloak, maia_groups_dict, users_to_remove_from_group = get_user_table( settings=settings, maia_user_model=maia_user_model, maia_project_model=maia_project_model ) project_argo_status = {} namespaces = get_namespaces(id_token, api_urls=settings.API_URL, private_clusters=settings.PRIVATE_CLUSTERS) deployed_projects = asyncio.run(get_list_of_deployed_projects()) for project_id in maia_groups_dict: if project_id.lower().replace("_", "-") in deployed_projects: project_argo_status[project_id] = 1 else: project_argo_status[project_id] = -1 if "ARGOCD_DISABLED" in os.environ and os.environ["ARGOCD_DISABLED"] == "True": if project_id.lower().replace("_", "-") in namespaces: project_argo_status[project_id] = 1 else: project_argo_status[project_id] = -1 # project_argo_status[project_id] = asyncio.run(get_argocd_project_status(argocd_namespace="argocd", project_id=project_id.lower().replace("_", "-"))) # noqa: B950 return to_register_in_groups, to_register_in_keycloak, maia_groups_dict, project_argo_status, users_to_remove_from_group
[docs] def send_maia_message_email(receiver_emails, subject, message_body): """ Send an email with a custom message to multiple recipients with improved deliverability. """ try: sender_email = os.environ["email_account"] message = MIMEMultipart("alternative") # Changed to alternative for better compatibility # Add proper headers to improve deliverability message["Subject"] = subject message["From"] = f"MAIA Team <{sender_email}>" # Use proper From format message["To"] = ", ".join(receiver_emails) message["Reply-To"] = sender_email message["Date"] = email.utils.formatdate(localtime=True) message["Message-ID"] = email.utils.make_msgid(domain=sender_email.split("@")[1]) html = f"""\ <html> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> </head> <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333333;"> {message_body} <br> <p>Best regards,</p> <p>The MAIA Admin Team</p> <hr> <p style="font-size: 12px; color: #666666;"> This is an automated message from the MAIA Platform. If you believe you received this in error, please contact support. </p> </body> </html> """ # Create plain text version text = BeautifulSoup(html, "html.parser").get_text() # Attach both plain text and HTML versions part1 = MIMEText(text, "plain") part2 = MIMEText(html, "html") message.attach(part1) message.attach(part2) port = 465 # For SSL password = os.environ["email_password"] smtp_server = os.environ["email_smtp_server"] context = ssl.create_default_context() with smtplib.SMTP_SSL(smtp_server, port, context=context) as server: server.login(sender_email, password) server.send_message(message) # Using send_message instead of sendmail return True except Exception as e: logger.error(f"Error sending email: {str(e)}") return False
[docs] def generate_encryption_keys(folder_path): """ Generate RSA encryption keys and save them to files. Parameters ---------- folder_path : str The path to the folder where the keys will be saved. Returns ------- None """ # Generate RSA key pair private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) # Extract public key public_key = private_key.public_key() # Save private key to a file with open(Path(folder_path).joinpath("private_key.pem"), "wb") as f: f.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) ) # Save public key to a file with open(Path(folder_path).joinpath("public_key.pem"), "wb") as f: f.write( public_key.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) ) logger.info("Keys generated successfully!")
[docs] def encrypt_string(public_key, string): """ Encrypts a given string using the provided public key. Parameters ---------- public_key : str The file path to the public key in PEM format. string : str The string to be encrypted. Returns ------- str The encrypted string in hexadecimal format. Raises ------ ValueError If the public key file cannot be read or is invalid. """ # Load public key with open(public_key, "rb") as f: public_key = serialization.load_pem_public_key(f.read()) def encrypt_message(message, public_key): encrypted = public_key.encrypt( message.encode(), padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) ) return encrypted encrypted_message = encrypt_message(string, public_key) return encrypted_message.hex()
[docs] def decrypt_string(private_key, string): """ Decrypts an encrypted string using a given private key. Parameters ---------- private_key : str Path to the private key file in PEM format. string : bytes The encrypted string to be decrypted. Returns ------- str The decrypted string. Raises ------ ValueError If the decryption process fails. """ with open(private_key, "rb") as f: private_key = serialization.load_pem_private_key(f.read(), password=None) def decrypt_message(encrypted, private_key): decrypted = private_key.decrypt( encrypted, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) ) return decrypted.decode() decrypted_message = decrypt_message(string, private_key) return decrypted_message