Source code for MAIA.dashboard_utils

import os
import smtplib
import ssl
import requests
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import requests
import json
import numpy as np
import requests
import json
import os
from kubernetes.client.rest import ApiException
import asyncio
import sqlite3
import kubernetes
from sqlalchemy import create_engine
import pandas as pd
from keycloak import KeycloakAdmin
from keycloak import KeycloakOpenIDConnection
from pathlib import Path
import yaml
from kubernetes import config
from minio import Minio
from MAIA_scripts.MAIA_install_project_toolkit import verify_installed_maia_toolkit

[docs] def verify_minio_availability(settings): """ Verifies the availability of a MinIO server. Parameters ---------- settings : object An object containing the MinIO configuration settings. - settings.MINIO_URL : str The URL of the MinIO server. - settings.MINIO_ACCESS_KEY : str The access key for the MinIO server. - settings.MINIO_SECRET_KEY : str The secret key for the MinIO server. - settings.BUCKET_NAME : str The name of the bucket to check for existence. Returns ------- bool True if the MinIO server is available and the bucket exists, False otherwise. """ try: client = Minio(settings.MINIO_URL, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=True) client.bucket_exists(settings.BUCKET_NAME) minio_available = True except Exception as e: print(e) minio_available = False return minio_available
[docs] def send_approved_registration_email(receiver_email, login_url, temp_password): """ Sends an email to notify the user that their MAIA account registration has been approved. Parameters ---------- receiver_email : str The email address of the recipient. login_url : str The URL where the user can log in to MAIA. temp_password : str The temporary password assigned to the user. Raises ------ KeyError If the environment variables 'email_account' or 'email_password' are not set. smtplib.SMTPException If there is an error sending the email. """ sender_email = os.environ["email_account"] message = MIMEMultipart() message["Subject"] = "Account Registration Approved" message["From"] = "MAIA Registration" message["To"] = receiver_email html = """\ <html> <head></head> <body> <p>Your MAIA Account has been approved.</p> <p>Log in to MAIA at the following link: <a href="{}">MAIA</a></p> <p>Your temporary password is: {}</p> <p>Please change your password after logging in.</p> <br> <p>Best Regards,</p> <p>MAIA Admin Team</p> </body> </html> """.format(login_url, temp_password) # Turn these into plain/html MIMEText objects part1 = MIMEText(html, "html") message.attach(part1) port = 465 # For SSL password = os.environ["email_password"] # Create a secure SSL context context = ssl.create_default_context() with smtplib.SMTP_SSL(os.environ["email_smtp_server"], port, context=context) as server: server.login(sender_email, password) server.sendmail(sender_email, receiver_email, message.as_string())
[docs] def send_discord_message(username, namespace, url): """ Sends a message to a Discord webhook to request a MAIA account. Parameters ---------- username : str The username of the person requesting the account. namespace : str The project namespace for which the account is being requested. url : str The Discord webhook URL to which the message will be sent. Raises ------ requests.exceptions.HTTPError If the HTTP request returned an unsuccessful status code. Prints ------ str Success message with the HTTP status code if the payload is delivered successfully. str Error message if the HTTP request fails. """ data = { "content": f"{username} is requesting a MAIA account for the project {namespace}.", "username": "MAIA-Bot" } data["embeds"] = [ { "description": "MAIA User Registration Request", "title": "MAIA Account Request", } ] result = requests.post(url, json=data) try: result.raise_for_status() except requests.exceptions.HTTPError as err: print(err) else: print("Payload delivered successfully, code {}.".format(result.status_code))
[docs] def get_namespaces(id_token, api_urls, private_clusters = []): """ Retrieves a list of unique namespaces from multiple API URLs. Parameters ---------- id_token : str The ID token used for authorization when accessing public clusters. api_urls : list A list of API URLs to query for namespaces. private_clusters : dict, optional A dictionary where keys are API URLs of private clusters and values are their respective tokens. Defaults to an empty list. Returns ------- list A list of unique namespace names retrieved from the provided API URLs. """ namespace_list = [] for API_URL in api_urls: if API_URL in private_clusters: token = private_clusters[API_URL]
[docs] def get_cluster_status(id_token, api_urls, cluster_names, private_clusters = []): """ Retrieve the status of clusters and their nodes. Parameters ---------- id_token : str The ID token for authentication. api_urls : list A list of API URLs for the clusters. cluster_names : dict A dictionary mapping API URLs to cluster names. private_clusters : dict, optional A dictionary mapping private cluster API URLs to their tokens. Defaults to []. Returns ------- tuple A tuple containing: - node_status_dict (dict): A dictionary mapping node names to their status and schedulability. - cluster_dict (dict): A dictionary mapping cluster names to their node names. """ cluster_dict = {} node_status_dict = {} for API_URL in api_urls: if API_URL in private_clusters: token = private_clusters[API_URL] try: response = requests.get(API_URL + "/api/v1/nodes", headers={"Authorization": "Bearer {}".format(token)}, verify=False) except: cluster = cluster_names[API_URL] cluster_dict[cluster] = ['Cluster API Not Reachable'] node_status_dict['Cluster API Not Reachable'] = ['API'] continue else: if API_URL.endswith("None"): cluster = cluster_names[API_URL] cluster_dict[cluster] = ['Cluster API Not Reachable'] node_status_dict['Cluster API Not Reachable'] = ['API'] continue else: try: response = requests.get(API_URL + "/api/v1/nodes", headers={"Authorization": "Bearer {}".format(id_token)}, verify=False) except: cluster = cluster_names[API_URL] cluster_dict[cluster] = ['Cluster API Not Reachable'] node_status_dict['Cluster API Not Reachable'] = ['API'] continue nodes = json.loads(response.text) if 'items' not in nodes: cluster = cluster_names[API_URL] cluster_dict[cluster] = ['Cluster API Not Reachable'] node_status_dict['Cluster API Not Reachable'] = ['API'] continue for node in nodes['items']: node_name = node['metadata']['name'] node_status_dict[node_name] = [] cluster = cluster_names[API_URL] if cluster in cluster_dict: cluster_dict[cluster].append(node_name) else: cluster_dict[cluster] = [node_name] for condition in node['status']['conditions']: if condition["type"] == "Ready": node_status_dict[node_name].append(condition["status"]) if 'unschedulable' in node['spec']: node_status_dict[node_name].append(node['spec']['unschedulable']) else: node_status_dict[node_name].append(False) return node_status_dict, cluster_dict
[docs] def get_available_resources(id_token, api_urls, cluster_names, private_clusters = []): """ Retrieves available GPU, CPU, and RAM resources from multiple Kubernetes clusters. Parameters ---------- id_token : str The ID token for authentication. api_urls : list List of API URLs for the Kubernetes clusters. cluster_names : dict Dictionary mapping API URLs to cluster names. private_clusters : list, optional List of private clusters with their tokens. Defaults to []. Returns ------- tuple A tuple containing: - gpu_dict (dict): Dictionary with GPU availability information for each node. - cpu_dict (dict): Dictionary with CPU availability information for each node. - ram_dict (dict): Dictionary with RAM availability information for each node. - gpu_allocations (dict): Dictionary with GPU allocation details for each pod. """ gpu_dict = {} cpu_dict = {} ram_dict = {} gpu_allocations = {} for API_URL in api_urls: cluster_name = cluster_names[API_URL] if API_URL in private_clusters: token = private_clusters[API_URL] try: response = requests.get(API_URL + "/api/v1/pods", headers={"Authorization": "Bearer {}".format(token)}, verify=False) pods = json.loads(response.text) response = requests.get(API_URL + "/api/v1/nodes", headers={"Authorization": "Bearer {}".format(token)}, verify=False) nodes = json.loads(response.text) except: continue else: try: response = requests.get(API_URL + "/api/v1/pods", headers={"Authorization": "Bearer {}".format(id_token)}, verify=False) pods = json.loads(response.text) response = requests.get(API_URL + "/api/v1/nodes", headers={"Authorization": "Bearer {}".format(id_token)}, verify=False) nodes = json.loads(response.text) except: continue node_status_dict = {} for node in nodes['items']: node_name = "{}/{}".format(cluster_name,node['metadata']['name']) node_status_dict[node_name] = [] for condition in node['status']['conditions']: if condition["type"] == "Ready": node_status_dict[node_name].append(condition["status"]) if 'unschedulable' in node['spec']: node_status_dict[node_name].append(node['spec']['unschedulable']) else: node_status_dict[node_name].append(False) for node in nodes['items']: node_name = "{}/{}".format(cluster_name, node['metadata']['name']) if 'nvidia.com/gpu.product' in node['metadata']['labels']: gpu_name = node['metadata']['labels']['nvidia.com/gpu.product'] else: gpu_name = "N/A" if 'nvidia.com/gpu.memory' in node['metadata']['labels']: gpu_size = str(round(int(node['metadata']['labels']['nvidia.com/gpu.memory'])/1024))+" Gi" else: gpu_size = "N/A" if 'nvidia.com/gpu' in node['status']['allocatable']: n_gpu_allocatable = int(node['status']['allocatable']['nvidia.com/gpu']) else: n_gpu_allocatable = 0 n_cpu_allocatable = int(node['status']['allocatable']['cpu']) ram_allocatable = float(int(node['status']['allocatable']['memory'][:-2])/(1024.*1024.)) n_gpu_requested = 0 n_cpus_requested = 0 ram_requested = 0 for pod in pods['items']: if 'nodeName' not in pod['spec']: continue if pod['spec']['nodeName'] != node_name.split("/")[1]: continue containers = pod['spec']['containers'] for container in containers: resources = container['resources'] cpu = 0 ram = 0 if 'requests' in resources: req = resources['requests'] if 'nvidia.com/gpu' in req: pod_name = pod['metadata']['name'] if pod_name.startswith("jupyter"): pod_name = pod_name.replace("-2d", "-").replace("-40", "@").replace("-2e", ".")[len("jupyter-"):] gpu_allocations[pod_name] = { 'node': node_name.split("/")[1], 'cluster': cluster_name, 'namespace': pod['metadata']['namespace'], 'gpu': req['nvidia.com/gpu'], 'gpu_name': gpu_name, 'gpu_size': gpu_size } n_gpu_requested += int(req['nvidia.com/gpu']) if 'cpu' in req: container_cpu = req['cpu'] if container_cpu[-1] == 'm': container_cpu = container_cpu[:-1] container_cpu = int(container_cpu) / 1000. else: container_cpu = int(container_cpu) if container_cpu > cpu: cpu = container_cpu if 'memory' in req: if req['memory'][-2:] == "Mi": container_memory = int(req['memory'][:-2])/1024. elif req['memory'][-2:] == "Gi": container_memory = int(req['memory'][:-2]) elif req['memory'][-1:] == "M": container_memory = int(req['memory'][:-1])/1024. else: container_memory = int(req['memory'])/(1024*1024*1024) if container_memory > ram: ram = container_memory #if 'limits' in resources: # lim = resources['limits'] # if 'cpu' in lim: # container_cpu = lim['cpu'] # if container_cpu[-1] == 'm': # container_cpu = container_cpu[:-1] # container_cpu = int(container_cpu)/1000. # else: # container_cpu = int(container_cpu) # if container_cpu > cpu: # cpu = container_cpu # if 'memory' in lim: # if lim['memory'][-2:] == "Mi": # container_memory = int(lim['memory'][:-2]) / 1024. # elif lim['memory'][-2:] == "Gi": # container_memory = int(lim['memory'][:-2]) # if container_memory > ram: # ram = container_memory n_cpus_requested += cpu ram_requested += ram gpu_dict[node_name] = [] if node_status_dict[node_name][0] != "True": gpu_dict[node_name].append("NA") gpu_dict[node_name].append("NA") elif node_status_dict[node_name][1]: gpu_dict[node_name].append("NA") gpu_dict[node_name].append("NA") else: gpu_dict[node_name].append(n_gpu_allocatable-n_gpu_requested) gpu_dict[node_name].append(n_gpu_allocatable) gpu_dict[node_name].append("{}, {}".format(gpu_name, gpu_size)) cpu_dict[node_name] = [] cpu_dict[node_name].append(n_cpu_allocatable- n_cpus_requested) cpu_dict[node_name].append(n_cpu_allocatable) cpu_dict[node_name].append((n_cpu_allocatable- n_cpus_requested)*100/n_cpu_allocatable) ram_dict[node_name] = [] ram_dict[node_name].append(ram_allocatable - ram_requested) ram_dict[node_name].append(ram_allocatable) ram_dict[node_name].append((ram_allocatable - ram_requested) * 100 / ram_allocatable) return gpu_dict, cpu_dict, ram_dict, gpu_allocations
[docs] def get_filtered_available_nodes(gpu_dict, cpu_dict, ram_dict, gpu_request, cpu_request, memory_request): """ Filters and returns nodes that meet the specified GPU, CPU, and memory requirements. Parameters ---------- gpu_dict : dict A dictionary where keys are node names and values are lists containing GPU information. cpu_dict : dict A dictionary where keys are node names and values are lists containing CPU information. ram_dict : dict A dictionary where keys are node names and values are lists containing RAM information. gpu_request : int The minimum number of GPUs required. cpu_request : float The minimum amount of CPU required. memory_request : float The minimum amount of memory required. Returns ------- tuple Three dictionaries containing the filtered nodes and their respective GPU, CPU, and RAM information. """ filtered_nodes = [] for node in gpu_dict: if int(gpu_dict[node][0]) >= gpu_request and float(cpu_dict[node][0]) >= cpu_request and float(ram_dict[node][0]) >= memory_request: filtered_nodes.append(node) return {node: gpu_dict[node] for node in filtered_nodes},{node: cpu_dict[node] for node in filtered_nodes},{node: ram_dict[node] for node in filtered_nodes}
[docs] def get_groups_in_keycloak(settings): """ Retrieve groups from Keycloak that start with "MAIA:" and return them in a dictionary. Parameters ---------- settings : object An object containing the Keycloak connection settings. It should have the following attributes: - OIDC_SERVER_URL : str The URL of the Keycloak server. - OIDC_USERNAME : str The username for Keycloak authentication. - OIDC_REALM_NAME : str The name of the Keycloak realm. - OIDC_RP_CLIENT_ID : str The client ID for Keycloak. - OIDC_RP_CLIENT_SECRET : str The client secret for Keycloak. Returns ------- dict A dictionary where the keys are group IDs and the values are group names (with the "MAIA:" prefix removed) for groups that start with "MAIA:". """ keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password='', realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=False) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) groups = keycloak_admin.get_groups() maia_groups = {group['id']:group['name'][len("MAIA:"):] for group in groups if group['name'].startswith("MAIA:")} return maia_groups
[docs] def get_pending_projects(settings): """ Retrieve a list of pending projects that are not in active groups. Parameters ---------- settings : object A settings object that contains configuration parameters. Returns ------- list A list of namespaces of pending projects. """ if settings.DEBUG: cnx = sqlite3.connect(os.path.join(settings.LOCAL_DB_PATH,"db.sqlite3")) else: db_host = os.environ["DB_HOST"] db_user = os.environ["DB_USERNAME"] dp_password = os.environ["DB_PASS"] engine = create_engine(f"mysql+pymysql://{db_user}:{dp_password}@{db_host}:3306/mysql") cnx = engine.raw_connection() pending_projects = [] try: authentication_maiaproject = pd.read_sql_query("SELECT * FROM authentication_maiaproject", con=cnx) active_groups = get_groups_in_keycloak(settings) for project in authentication_maiaproject.iterrows(): if project[1]['namespace'] not in active_groups.values(): pending_projects.append(project[1]['namespace']) except: ... cnx.close() return pending_projects
[docs] def get_user_table(settings): """ Retrieves and processes user data from various sources including a local SQLite database or a remote MySQL database, Keycloak, and Minio. Combines and returns user information along with group and project details. Parameters ---------- settings : object A settings object containing configuration parameters such as database paths, Keycloak server details, and Minio credentials. Returns ------- tuple A tuple containing: - table (pd.DataFrame): A DataFrame containing merged user data from the auth_user and authentication_maiauser tables. - users_to_register_in_group (dict): A dictionary mapping user emails to the groups they need to be registered in. - users_to_register_in_keycloak (list): A list of user emails that need to be registered in Keycloak. - maia_group_dict (dict): A dictionary containing detailed information about each MAIA group and pending projects. """ if settings.DEBUG: cnx = sqlite3.connect(os.path.join(settings.LOCAL_DB_PATH,"db.sqlite3")) else: db_host = os.environ["DB_HOST"] db_user = os.environ["DB_USERNAME"] dp_password = os.environ["DB_PASS"] engine = create_engine(f"mysql+pymysql://{db_user}:{dp_password}@{db_host}:3306/mysql") cnx = engine.raw_connection() auth_user = pd.read_sql_query("SELECT * FROM auth_user", con=cnx) authentication_maiauser = pd.read_sql_query("SELECT * FROM authentication_maiauser", con=cnx) authentication_maiaproject = pd.read_sql_query("SELECT * FROM authentication_maiaproject", con=cnx) authentication_maiauser_copy = pd.read_sql_query("SELECT * FROM authentication_maiauser", con=cnx) authentication_maiauser_copy.rename(columns={"user_ptr_id": "id"}, inplace=True) table = auth_user.merge(authentication_maiauser_copy,on="id",how="left") keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password='', realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=False ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) users_to_register_in_group = {} groups = keycloak_admin.get_groups() maia_groups = {group['id']:group['name'][len("MAIA:"):] for group in groups if group['name'].startswith("MAIA:")} pending_projects = get_pending_projects(settings=settings) maia_group_dict = {} try: client = Minio(settings.MINIO_URL, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE) minio_envs = [env.object_name[:-len("_env")] for env in list(client.list_objects(settings.BUCKET_NAME))] except: minio_envs = [] for maia_group in maia_groups: if maia_groups[maia_group] == "users": continue users = keycloak_admin.get_group_members(group_id=maia_group) admin_users = [] cpu_limit = None memory_limit = None date = None cluster = None gpu = None environment = None for project in authentication_maiaproject.iterrows(): if project[1]['namespace'] == maia_groups[maia_group]: admin_users = [project[1]['email']] cpu_limit = project[1]['cpu_limit'] memory_limit = project[1]['memory_limit'] date = project[1]['date'] cluster = project[1]['cluster'] gpu = project[1]['gpu'] environment = project[1]['minimal_env'] conda_envs = [] if maia_groups[maia_group] in minio_envs: conda_envs.append(maia_groups[maia_group]) else: conda_envs.append("N/A") group_users = [] for user in users: if user['username'] in admin_users: group_users.append(user['email']+" [Project Admin]") else: group_users.append(user['email']) maia_group_dict[maia_groups[maia_group]] = { "users":group_users, "conda": conda_envs, "admin_users": admin_users, "cpu_limit": cpu_limit, "memory_limit": memory_limit, "date": date, "cluster": cluster, "gpu": gpu, "environment": environment } for pending_project in pending_projects: conda_envs = [] if pending_project in minio_envs: conda_envs.append(pending_project) else: conda_envs.append("N/A") users = [] for user in auth_user.iterrows(): uid = user[1]['id'] if uid in authentication_maiauser['user_ptr_id'].values: requested_namespaces = authentication_maiauser[authentication_maiauser['user_ptr_id'] == uid][ 'namespace'].values[0].split(",") if pending_project in requested_namespaces: users.append(user[1]['email']) maia_group_dict[pending_project] = { "users": users, "pending": True, "conda": conda_envs, "admin_users": [], "cpu_limit": authentication_maiaproject[authentication_maiaproject['namespace'] == pending_project]['cpu_limit'].values[0], "memory_limit": authentication_maiaproject[authentication_maiaproject['namespace'] == pending_project]['memory_limit'].values[0], "date": authentication_maiaproject[authentication_maiaproject['namespace'] == pending_project]['date'].values[0], "cluster": "N/A", "gpu": authentication_maiaproject[authentication_maiaproject['namespace'] == pending_project]['gpu'].values[0], "environment": "Minimal" } users_to_register_in_keycloak = [] for user in auth_user.iterrows(): uid = user[1]['id'] username = user[1]['username'] email = user[1]['email'] user_groups = [] user_in_keycloak = False for keycloak_user in keycloak_admin.get_users(): if 'email' in keycloak_user: if keycloak_user['email'] == email: user_in_keycloak = True user_keycloak_groups = keycloak_admin.get_user_groups(user_id=keycloak_user['id']) for user_keycloak_group in user_keycloak_groups: if user_keycloak_group['name'].startswith("MAIA:"): user_groups.append(user_keycloak_group['name'][len("MAIA:"):]) if not user_in_keycloak: users_to_register_in_keycloak.append(email) if uid in authentication_maiauser['user_ptr_id'].values: requested_namespaces = authentication_maiauser[authentication_maiauser['user_ptr_id'] == uid][ 'namespace'].values[0].split(",") for requested_namespace in requested_namespaces: if requested_namespace not in user_groups and requested_namespace != "N/A": if email not in users_to_register_in_group: users_to_register_in_group[email] = [requested_namespace] else: users_to_register_in_group[email].append(requested_namespace) table.fillna("N/A", inplace=True) return table, users_to_register_in_group, users_to_register_in_keycloak, maia_group_dict
[docs] def update_user_table(form, settings): """ Updates the user and project tables based on the provided form data. Parameters ---------- form : dict A dictionary containing form data with keys indicating the type of data (e.g., "namespace", "memory_limit", etc.) and values being the corresponding data. settings : object An object containing configuration settings. It should have a DEBUG attribute to determine the environment and a LOCAL_DB_PATH attribute for the local database path. Returns ------- None The function performs the following steps: 1. Connects to the appropriate database (SQLite for debug mode, MySQL for production). 2. Reads the current data from the `auth_user`, `authentication_maiauser`, and `authentication_maiaproject` tables. 3. Iterates over the form data and updates the `authentication_maiauser` and `authentication_maiaproject` tables accordingly. 4. Closes the database connection. 5. Writes the updated data back to the database. """ if settings.DEBUG: cnx = sqlite3.connect(os.path.join(settings.LOCAL_DB_PATH,"db.sqlite3")) else: db_host = os.environ["DB_HOST"] db_user = os.environ["DB_USERNAME"] dp_password = os.environ["DB_PASS"] #try: engine = create_engine(f"mysql+pymysql://{db_user}:{dp_password}@{db_host}:3306/mysql") cnx = engine.raw_connection() auth_user = pd.read_sql_query("SELECT * FROM auth_user", con=cnx) authentication_maiauser = pd.read_sql_query("SELECT * FROM authentication_maiauser", con=cnx) authentication_maiaproject = pd.read_sql_query("SELECT * FROM authentication_maiaproject", con=cnx) for k in form: if k.startswith("namespace"): id = auth_user[auth_user["username"] == k[len("namespace_"):]]["id"].values[0] if len(authentication_maiauser[authentication_maiauser["user_ptr_id"] == id]) > 0: authentication_maiauser.loc[authentication_maiauser["user_ptr_id"] == id, "namespace"] = form[k] else: authentication_maiauser = authentication_maiauser.append({"user_ptr_id": id, "namespace": form[k]}, ignore_index=True) elif k.startswith("memory_limit"): try: id = authentication_maiaproject[authentication_maiaproject["namespace"] == k[len("memory_limit_"):]]["id"].values[0] except: id = 0 if pd.isna(authentication_maiaproject["id"].max()) else authentication_maiaproject["id"].max() + 1 if len(authentication_maiaproject[authentication_maiaproject["id"] == id]) > 0: authentication_maiaproject.loc[authentication_maiaproject["id"] == id, "memory_limit"] = form[k] else: authentication_maiaproject = authentication_maiaproject.append({"id": id, "memory_limit": form[k],"namespace": k[len("memory_limit_"):]}, ignore_index=True) elif k.startswith("cpu_limit"): try: id = authentication_maiaproject[authentication_maiaproject["namespace"] == k[len("cpu_limit_"):]]["id"].values[0] except: id = 0 if pd.isna(authentication_maiaproject["id"].max()) else authentication_maiaproject["id"].max() + 1 if len(authentication_maiaproject[authentication_maiaproject["id"] == id]) > 0: authentication_maiaproject.loc[authentication_maiaproject["id"] == id, "cpu_limit"] = form[k] else: authentication_maiaproject = authentication_maiaproject.append({"id": id, "cpu_limit": form[k],"namespace": k[len("cpu_limit_"):]}, ignore_index=True) elif k.startswith("date"): try: id = authentication_maiaproject[authentication_maiaproject["namespace"] == k[len("date_"):]]["id"].values[0] except: id = 0 if pd.isna(authentication_maiaproject["id"].max()) else authentication_maiaproject["id"].max() + 1 if len(authentication_maiaproject[authentication_maiaproject["id"] == id]) > 0: authentication_maiaproject.loc[authentication_maiaproject["id"] == id, "date"] = form[k] else: authentication_maiaproject = authentication_maiaproject.append({"id": id, "date": form[k],"namespace": k[len("date_"):]}, ignore_index=True) elif k.startswith("cluster"): try: id = authentication_maiaproject[authentication_maiaproject["namespace"] == k[len("cluster_"):]]["id"].values[0] except: id = 0 if pd.isna(authentication_maiaproject["id"].max()) else authentication_maiaproject["id"].max() + 1 if len(authentication_maiaproject[authentication_maiaproject["id"] == id]) > 0: authentication_maiaproject.loc[authentication_maiaproject["id"] == id, "cluster"] = form[k] else: authentication_maiaproject = authentication_maiaproject.append({"id": id, "cluster": form[k],"namespace": k[len("cluster_"):]}, ignore_index=True) elif k.startswith("gpu"): try: id = authentication_maiaproject[authentication_maiaproject["namespace"] == k[len("gpu_"):]]["id"].values[0] except: id = 0 if pd.isna(authentication_maiaproject["id"].max()) else authentication_maiaproject["id"].max() + 1 if len(authentication_maiaproject[authentication_maiaproject["id"] == id]) > 0: authentication_maiaproject.loc[authentication_maiaproject["id"] == id, "gpu"] = form[k] else: authentication_maiaproject = authentication_maiaproject.append({"id": id, "gpu": form[k],"namespace": k[len("gpu_"):]}, ignore_index=True) elif k.startswith("minimal_environment"): try: id = authentication_maiaproject[authentication_maiaproject["namespace"] == k[len("minimal_environment_"):]]["id"].values[0] except: id = 0 if pd.isna(authentication_maiaproject["id"].max()) else authentication_maiaproject["id"].max() + 1 if len(authentication_maiaproject[authentication_maiaproject["id"] == id]) > 0: authentication_maiaproject.loc[authentication_maiaproject["id"] == id, "minimal_env"] = form[k] else: authentication_maiaproject = authentication_maiaproject.append({"id": id, "minimal_env": form[k],"namespace": k[len("minimal_environment_"):]}, ignore_index=True) #try: cnx.close() if settings.DEBUG: cnx = sqlite3.connect(os.path.join(settings.LOCAL_DB_PATH,"db.sqlite3")) authentication_maiauser.to_sql("authentication_maiauser", con=cnx, if_exists="replace", index=False) authentication_maiaproject.to_sql("authentication_maiaproject", con=cnx, if_exists="replace", index=False) else: engine.dispose() engine_2 = create_engine(f"mysql+pymysql://{db_user}:{dp_password}@{db_host}:3306/mysql") authentication_maiauser.to_sql("authentication_maiauser", con=engine_2, if_exists="replace", index=False) authentication_maiaproject.to_sql("authentication_maiaproject", con=engine_2, if_exists="replace", index=False)
#stmt = text("ALTER TABLE authentication_maiauser-copy RENAME TO authentication_maiauser;") #engine.execute(stmt) #except: # ... #auth_user[auth_user["username"] == k[len("date_"):]]["date"] = form[k]
[docs] def generate_kubeconfig(id_token, user_id, namespace, cluster_id, settings): """ Generates a Kubernetes configuration dictionary for a given user and cluster. Parameters ---------- id_token : str The ID token for the user. user_id : str The user ID. namespace : str The Kubernetes namespace. cluster_id : str The cluster ID. settings : object An object containing various settings, including: - CLUSTER_NAMES (dict): A dictionary mapping cluster names to their IDs. - PRIVATE_CLUSTERS (dict): A dictionary of private clusters with their tokens. - OIDC_ISSUER_URL (str): The OIDC issuer URL. - OIDC_RP_CLIENT_ID (str): The OIDC client ID. - OIDC_RP_CLIENT_SECRET (str): The OIDC client secret. Returns ------- dict A dictionary representing the Kubernetes configuration. """ cluster_apis = {k: v for v, k in settings.CLUSTER_NAMES.items()} if cluster_apis[cluster_id] in settings.PRIVATE_CLUSTERS: kube_config = {'apiVersion': 'v1', 'kind': 'Config', 'preferences': {}, 'current-context': 'MAIA/{}'.format(user_id), 'contexts': [ {'name': 'MAIA/{}'.format(user_id), 'context': {'user': user_id, 'cluster': 'MAIA', 'namespace': namespace}}], 'clusters': [ {'name': 'MAIA', 'cluster': {'certificate-authority-data': "", # settings.CLUSTER_CA, 'server': cluster_apis[cluster_id], "insecure-skip-tls-verify": True}}], "users": [{'name': user_id, 'user': {'token': settings.PRIVATE_CLUSTERS[cluster_apis[cluster_id]]}}]} else: kube_config = {'apiVersion': 'v1', 'kind': 'Config', 'preferences': {}, 'current-context': 'MAIA/{}'.format(user_id), 'contexts': [ {'name': 'MAIA/{}'.format(user_id), 'context': {'user': user_id, 'cluster': 'MAIA', 'namespace': namespace}}], 'clusters': [{'name': 'MAIA', 'cluster': {'certificate-authority-data': "",#settings.CLUSTER_CA, 'server' : cluster_apis[cluster_id], "insecure-skip-tls-verify":True}}], "users" : [{'name': user_id, 'user': {'auth-provider': {'config': {'idp-issuer-url': settings.OIDC_ISSUER_URL, 'client-id': settings.OIDC_RP_CLIENT_ID, 'id-token':id_token, 'client-secret': settings.OIDC_RP_CLIENT_SECRET }, 'name': 'oidc'}}}]} return kube_config
[docs] def get_project(group_id, settings, is_namespace_style=False): """ Retrieve project information based on the provided group ID. Parameters ---------- group_id : str The group ID or namespace to search for. settings : module A settings module containing configuration values. is_namespace_style : bool, optional Flag to determine if the group ID should be treated as a namespace. Defaults to False. Returns ------- tuple A tuple containing the namespace form (dict) and the cluster ID (str or None). Returns None if no matching project is found. Raises ------ KeyError If required environment variables are not set. Exception If there is an error connecting to the database or querying the data. """ if settings.DEBUG: cnx = sqlite3.connect(os.path.join(settings.LOCAL_DB_PATH,"db.sqlite3")) else: db_host = os.environ["DB_HOST"] db_user = os.environ["DB_USERNAME"] dp_password = os.environ["DB_PASS"] #try: engine = create_engine(f"mysql+pymysql://{db_user}:{dp_password}@{db_host}:3306/mysql") cnx = engine.raw_connection() authentication_maiaproject = pd.read_sql_query("SELECT * FROM authentication_maiaproject", con=cnx) cluster_id = None for project in authentication_maiaproject.iterrows(): if is_namespace_style: if str(project[1]['namespace']).lower().replace("_","-") == group_id: keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password='', realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=False ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) groups = keycloak_admin.get_groups() maia_groups = {group['id']:group['name'][len("MAIA:"):] for group in groups if group['name'].startswith("MAIA:")} group_users = [] for maia_group in maia_groups: if maia_groups[maia_group] == group_id: users = keycloak_admin.get_group_members(group_id=maia_group) for user in users: group_users.append(user['email']) namespace_form = { "group_ID": group_id, "group_subdomain": group_id.lower().replace("_", "-"), "users": group_users, "resources_limits": { "memory": [str(int(int(project[1]['memory_limit'][:-len(" Gi")])/2))+" Gi", project[1]['memory_limit']], "cpu": [str(int(int(project[1]['cpu_limit'])/2)), project[1]['cpu_limit']], } } if project[1]['gpu'] != "N/A" and project[1]['gpu'] != "NO": namespace_form["gpu"] = "1" if project[1]['conda'] != "N/A" and project[1]['conda'] is not None: namespace_form["minio_env_name"] = group_id+"_env" cluster_id = project[1]['cluster'] if cluster_id == "N/A": cluster_id = None return namespace_form, cluster_id else: if project[1]['namespace'] == group_id: keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password='', realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=False ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) groups = keycloak_admin.get_groups() maia_groups = {group['id']:group['name'][len("MAIA:"):] for group in groups if group['name'].startswith("MAIA:")} group_users = [] for maia_group in maia_groups: if maia_groups[maia_group] == group_id: users = keycloak_admin.get_group_members(group_id=maia_group) for user in users: group_users.append(user['email']) namespace_form = { "group_ID": group_id, "group_subdomain": group_id.lower().replace("_", "-"), "users": group_users, "resources_limits": { "memory": [str(int(int(project[1]['memory_limit'][:-len(" Gi")])/2))+" Gi", project[1]['memory_limit']], "cpu": [str(int(int(project[1]['cpu_limit'])/2)), project[1]['cpu_limit']], } } if project[1]['gpu'] != "N/A" and project[1]['gpu'] != "NO": namespace_form["gpu_request"] = "1" if project[1]['conda'] != "N/A" and project[1]['conda'] is not None: namespace_form["minio_env_name"] = group_id+"_env" cluster_id = project[1]['cluster'] if cluster_id == "N/A": cluster_id = None return namespace_form, cluster_id return None
[docs] def register_user_in_keycloak(email, settings): """ Registers a user in Keycloak and sends an approved registration email. Parameters ---------- email : str The email address of the user to be registered. settings : object An object containing the necessary settings for Keycloak connection and email sending. Settings Attributes ------------------- OIDC_SERVER_URL : str The URL of the Keycloak server. OIDC_USERNAME : str The username for Keycloak authentication. OIDC_REALM_NAME : str The name of the Keycloak realm. OIDC_RP_CLIENT_ID : str The client ID for Keycloak. OIDC_RP_CLIENT_SECRET : str The client secret for Keycloak. HOSTNAME : str The hostname for generating the MAIA login URL. Returns ------- None """ keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password='', realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=False ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) temp_password = "MAIA" maia_login_url = "https://"+settings.HOSTNAME+"/maia/" keycloak_admin.create_user({'username':email, 'email':email, 'emailVerified':True, 'enabled':True, #'firstName':'Demo2', #'lastName':'Maia', 'requiredActions':['UPDATE_PASSWORD'], 'credentials':[{'type':'password', 'temporary':True, 'value': temp_password}], }) if "email_account" in os.environ and "email_password" in os.environ and "email_smtp_server" in os.environ: send_approved_registration_email(email, maia_login_url, temp_password)
[docs] def register_group_in_keycloak(group_id, settings): """ Registers a group in Keycloak with the specified group ID and settings. Parameters ---------- group_id : str The ID of the group to be registered. settings : object An object containing the Keycloak server settings, including: - OIDC_SERVER_URL : str The URL of the Keycloak server. - OIDC_USERNAME : str The username for Keycloak authentication. - OIDC_REALM_NAME : str The name of the Keycloak realm. - OIDC_RP_CLIENT_ID : str The client ID for Keycloak. - OIDC_RP_CLIENT_SECRET : str The client secret for Keycloak. Returns ------- None """ keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password='', realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=False ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) payload = { "name": f"MAIA:{group_id}", "path": f"/MAIA:{group_id}", "attributes": {}, "realmRoles": [], "clientRoles": {}, "subGroups": [], "access": {"view": True, "manage": True, "manageMembership": True} } keycloak_admin.create_group(payload)
[docs] def register_users_in_group_in_keycloak(emails, group_id, settings): """ Registers users in a specified Keycloak group. Parameters ---------- emails : list A list of email addresses of users to be added to the group. group_id : str The ID of the group to which users should be added. settings : object An object containing Keycloak server settings, including: - OIDC_SERVER_URL : str The URL of the Keycloak server. - OIDC_USERNAME : str The username for Keycloak authentication. - OIDC_REALM_NAME : str The realm name in Keycloak. - OIDC_RP_CLIENT_ID : str The client ID for Keycloak. - OIDC_RP_CLIENT_SECRET : str The client secret for Keycloak. Returns ------- None """ keycloak_connection = KeycloakOpenIDConnection( server_url=settings.OIDC_SERVER_URL, username=settings.OIDC_USERNAME, password='', realm_name=settings.OIDC_REALM_NAME, client_id=settings.OIDC_RP_CLIENT_ID, client_secret_key=settings.OIDC_RP_CLIENT_SECRET, verify=False ) keycloak_admin = KeycloakAdmin(connection=keycloak_connection) groups = keycloak_admin.get_groups() users = keycloak_admin.get_users() for user in users: if "email" in user and user["email"] in emails: uid = user["id"] for group in groups: if group["name"] == "MAIA:"+group_id: gid = group["id"] keycloak_admin.group_user_add(uid, gid) elif group["name"] == "MAIA:users": try: gid = group["id"] keycloak_admin.group_user_add(uid, gid) except: ...
[docs] def get_list_of_groups_requesting_a_user(email, settings): """ Retrieves a list of groups (namespaces) that have requested a specific user based on their email. Parameters ---------- email : str The email address of the user to search for. settings : module The settings module containing configuration such as DEBUG and LOCAL_DB_PATH. Returns ------- list A list of namespaces that have requested the user. Returns an empty list if no groups are found. Raises ------ KeyError If environment variables 'DB_HOST', 'DB_USERNAME', or 'DB_PASS' are not set in non-debug mode. Exception If there is an issue connecting to the database or executing the SQL queries. """ if settings.DEBUG: cnx = sqlite3.connect(os.path.join(settings.LOCAL_DB_PATH, "db.sqlite3")) else: db_host = os.environ["DB_HOST"] db_user = os.environ["DB_USERNAME"] dp_password = os.environ["DB_PASS"] engine = create_engine(f"mysql+pymysql://{db_user}:{dp_password}@{db_host}:3306/mysql") cnx = engine.raw_connection() auth_user = pd.read_sql_query("SELECT * FROM auth_user", con=cnx) authentication_maiauser = pd.read_sql_query("SELECT * FROM authentication_maiauser", con=cnx) for user in auth_user.iterrows(): uid = user[1]['id'] if user[1]['email'] == email: if uid in authentication_maiauser['user_ptr_id'].values: requested_namespaces = authentication_maiauser[authentication_maiauser['user_ptr_id'] == uid][ 'namespace'].values[0].split(",") return requested_namespaces return []
[docs] def get_list_of_users_requesting_a_group(group_id, settings): """ Retrieves a list of email addresses of users who have requested access to a specific group. Parameters ---------- group_id : str The ID of the group to check for user requests. settings : object A settings object that contains configuration parameters, including DEBUG and LOCAL_DB_PATH. Returns ------- list A list of email addresses of users who have requested access to the specified group. Raises ------ KeyError If environment variables for database connection are not set when DEBUG is False. Exception If there is an issue with database connection or query execution. Notes ----- When settings.DEBUG is True, a local SQLite database is used. When settings.DEBUG is False, a MySQL database is used with connection parameters from environment variables. """ if settings.DEBUG: cnx = sqlite3.connect(os.path.join(settings.LOCAL_DB_PATH, "db.sqlite3")) else: db_host = os.environ["DB_HOST"] db_user = os.environ["DB_USERNAME"] dp_password = os.environ["DB_PASS"] engine = create_engine(f"mysql+pymysql://{db_user}:{dp_password}@{db_host}:3306/mysql") cnx = engine.raw_connection() auth_user = pd.read_sql_query("SELECT * FROM auth_user", con=cnx) authentication_maiauser = pd.read_sql_query("SELECT * FROM authentication_maiauser", con=cnx) users = [] for user in auth_user.iterrows(): uid = user[1]['id'] if uid in authentication_maiauser['user_ptr_id'].values: requested_namespaces = authentication_maiauser[authentication_maiauser['user_ptr_id'] == uid][ 'namespace'].values[0].split(",") if group_id in requested_namespaces: users.append(user[1]['email']) return users
[docs] def get_argocd_project_status(argocd_namespace, project_id): return verify_installed_maia_toolkit(project_id=project_id, namespace=argocd_namespace, get_chart_metadata=False)
[docs] def get_namespace_details(settings, id_token, namespace, user_id, is_admin=False): """ Retrieve details about the namespace including workspace applications, remote desktops, SSH ports, MONAI models, and Orthanc instances. Parameters ---------- settings : object Configuration settings containing API URLs and private cluster tokens. id_token : str Identity token for authentication. namespace : str The namespace to retrieve details for. user_id : str The user ID to filter resources. is_admin : bool, optional Flag indicating if the user has admin privileges. Defaults to False. Returns ------- tuple A tuple containing: - maia_workspace_apps (dict): Dictionary of workspace applications with their URLs. - remote_desktop_dict (dict): Dictionary of remote desktop URLs for users. - ssh_ports (dict): Dictionary of SSH ports for users. - monai_models (dict): Dictionary of MONAI models. - orthanc_list (dict): Dictionary of Orthanc instances. """ maia_workspace_apps = {} remote_desktop_dict = {} orthanc_list = {} monai_models = {} ssh_ports = {} for API_URL in settings.API_URL: if API_URL in settings.PRIVATE_CLUSTERS: token = settings.PRIVATE_CLUSTERS[API_URL] response = requests.get(API_URL + "/apis/networking.k8s.io/v1/namespaces/{}/ingresses".format(namespace), headers={"Authorization": "Bearer {}".format(token)}, verify=False) else: response = requests.get(API_URL + "/apis/networking.k8s.io/v1/namespaces/{}/ingresses".format(namespace), headers={"Authorization": "Bearer {}".format(id_token)}, verify=False) ingresses = json.loads(response.text) if API_URL in settings.PRIVATE_CLUSTERS: token = settings.PRIVATE_CLUSTERS[API_URL] try: response = requests.get(API_URL + "/api/v1/namespaces/{}/services".format(namespace), headers={"Authorization": "Bearer {}".format(token)}, verify=False) except: continue else: try: response = requests.get(API_URL + "/api/v1/namespaces/{}/services".format(namespace), headers={"Authorization": "Bearer {}".format(id_token)}, verify=False) except: continue services = json.loads(response.text) if 'code' in services: if services['code'] == 403: ... for ingress in ingresses['items']: for rule in ingress['spec']['rules']: if 'host' not in rule: rule['host'] = settings.DEFAULT_INGRESS_HOST for path in rule['http']['paths']: if path['backend']['service']['name'] == 'proxy-public': ## JupyterHub maia_workspace_apps['hub'] = "https://" + rule['host'] + path['path'] for service in services['items']: for port in service['spec']['ports']: if 'name' in port and port['name'] == 'remote-desktop-port': hub_url = maia_workspace_apps['hub'] user = service["metadata"]["name"][len("jupyter-"):].replace("-2d", "-").replace("-40", "@").replace("-2e", ".") url = f"{hub_url}/user/{user}/proxy/80/desktop/{user}/" if user_id == user or is_admin: remote_desktop_dict[user] = url if 'name' in port and port['name'] == 'ssh': user = service["metadata"]["name"][len("jupyter-"):].replace("-2d", "-").replace("-40", "@").replace("-2e", ".") if user_id == user or is_admin: ssh_ports[user] = port['port'] if "hub" not in maia_workspace_apps: maia_workspace_apps["hub"] = "N/A" if "orthanc" not in maia_workspace_apps: maia_workspace_apps["orthanc"] = "N/A" if "monai_label" not in maia_workspace_apps: maia_workspace_apps["monai_label"] = "N/A" if "label_studio" not in maia_workspace_apps: maia_workspace_apps["label_studio"] = "N/A" if "kubeflow" not in maia_workspace_apps: maia_workspace_apps["kubeflow"] = "N/A" if "mlflow" not in maia_workspace_apps: maia_workspace_apps["mlflow"] = "N/A" if "minio_console" not in maia_workspace_apps: maia_workspace_apps["minio_console"] = "N/A" if "xnat" not in maia_workspace_apps: maia_workspace_apps["xnat"] = "N/A" return maia_workspace_apps, remote_desktop_dict, ssh_ports, monai_models, orthanc_list
[docs] def get_allocation_date_for_project(settings, group_id, is_namespace_style=False): """ Retrieves the allocation date for a project based on the provided group ID. Parameters ---------- settings : object The settings object containing configuration details. group_id : str The group ID or namespace to search for. is_namespace_style : bool, optional Flag to indicate if the group ID should be treated as a namespace. Defaults to False. Returns ------- datetime or None The allocation date of the project if found, otherwise None. """ if settings.DEBUG: cnx = sqlite3.connect(os.path.join(settings.LOCAL_DB_PATH, "db.sqlite3")) else: db_host = os.environ["DB_HOST"] db_user = os.environ["DB_USERNAME"] dp_password = os.environ["DB_PASS"] engine = create_engine(f"mysql+pymysql://{db_user}:{dp_password}@{db_host}:3306/mysql") cnx = engine.raw_connection() authentication_maiaproject = pd.read_sql_query("SELECT * FROM authentication_maiaproject", con=cnx) for project in authentication_maiaproject.iterrows(): if is_namespace_style: if str(project[1]['namespace']).lower().replace("_", "-") == group_id: return project[1]['date'] else: if project[1]['namespace'] == group_id: return project[1]['date'] return None
[docs] def get_project_argo_status_and_user_table(request, settings): """ Retrieves the Argo CD project status and user table information. Parameters ---------- request : HttpRequest The HTTP request object containing session and user information. settings : Settings The settings object containing configuration values. Returns ------- tuple A tuple containing: - user_table (dict): The user table information. - to_register_in_groups (list): List of users to register in groups. - to_register_in_keycloak (list): List of users to register in Keycloak. - maia_groups_dict (dict): Dictionary of MAIA groups. - project_argo_status (dict): Dictionary containing the Argo CD project status for each project. """ argocd_cluster_id = settings.ARGOCD_CLUSTER id_token = request.session.get('oidc_id_token') kubeconfig_dict = generate_kubeconfig(id_token, request.user.username, "default", argocd_cluster_id, settings=settings) config.load_kube_config_from_dict(kubeconfig_dict) with open(Path("/tmp").joinpath("kubeconfig"), "w") as f: yaml.dump(kubeconfig_dict, f) os.environ["KUBECONFIG"] = str(Path("/tmp").joinpath("kubeconfig")) user_table, to_register_in_groups, to_register_in_keycloak, maia_groups_dict = get_user_table(settings=settings) project_argo_status = {} for project_id in maia_groups_dict: project_argo_status[project_id] = asyncio.run(get_argocd_project_status(argocd_namespace="argocd", project_id=project_id.lower().replace("_", "-"))) return user_table, to_register_in_groups, to_register_in_keycloak, maia_groups_dict, project_argo_status
[docs] def create_namespace(request, settings, namespace_id, cluster_id): """ Creates a Kubernetes namespace using the provided request, settings, namespace ID, and cluster ID. Parameters ---------- request : HttpRequest The HTTP request object containing session and user information. settings : Settings The settings object containing configuration details. namespace_id : str The ID of the namespace to be created. cluster_id : str The ID of the Kubernetes cluster where the namespace will be created. Returns ------- None Raises ------ ApiException If an error occurs while creating the namespace using the Kubernetes API. """ id_token = request.session.get('oidc_id_token') kubeconfig_dict = generate_kubeconfig(id_token, request.user.username, "default", cluster_id, settings=settings) config.load_kube_config_from_dict(kubeconfig_dict) with open(Path("/tmp").joinpath("kubeconfig"), "w") as f: yaml.dump(kubeconfig_dict, f) os.environ["KUBECONFIG"] = str(Path("/tmp").joinpath("kubeconfig")) with kubernetes.client.ApiClient() as api_client: api_instance = kubernetes.client.CoreV1Api(api_client) body = kubernetes.client.V1Namespace(metadata=kubernetes.client.V1ObjectMeta(name=namespace_id)) try: api_response = api_instance.create_namespace(body) print(api_response) except ApiException as e: print("Exception when calling CoreV1Api->create_namespace: %s\n" % e)