Source code for MAIA.maia_fn

import base64
import json
import os
import subprocess
from pathlib import Path
from pprint import pprint
from secrets import token_urlsafe
from typing import Dict
from omegaconf import OmegaConf
import kubernetes
import toml
import yaml
from MAIA.helm_values import read_config_dict_and_generate_helm_values_dict
from kubernetes import client, config
from kubernetes.client.rest import ApiException


[docs] def create_config_map_from_data(data: str, config_map_name: str, namespace: str, kubeconfig_dict: Dict, data_key: str = "values.yaml"): """ Create a ConfigMap on a Kubernetes Cluster. Parameters ---------- data : str String containing the content of the ConfigMap to dump. config_map_name : str ConfigMap name. namespace : str Namespace where to create the ConfigMap. data_key : str, optional Value to use as the filename for the content in the ConfigMap. kubeconfig_dict : dict Kube Configuration dictionary for Kubernetes cluster authentication. """ config.load_kube_config_from_dict(kubeconfig_dict) metadata = kubernetes.client.V1ObjectMeta( name=config_map_name, namespace=namespace, ) if type(data_key) == list and type(data) == list: configmap = kubernetes.client.V1ConfigMap( api_version="v1", kind="ConfigMap", data={data_key[i]: data[i] for i in range(len(data))}, metadata=metadata ) else: configmap = kubernetes.client.V1ConfigMap( api_version="v1", kind="ConfigMap", data={data_key: data}, metadata=metadata ) with kubernetes.client.ApiClient() as api_client: api_instance = kubernetes.client.CoreV1Api(api_client) pretty = 'true' try: api_response = api_instance.create_namespaced_config_map(namespace, configmap, pretty=pretty) pprint(api_response) except ApiException as e: print("Exception when calling CoreV1Api->delete_namespaced_config_map: %s\n" % e)
[docs] def get_ssh_ports(n_requested_ports, port_type, ip_range, maia_metallb_ip=None): """ Retrieve a list of available SSH ports based on the specified criteria. Parameters ---------- n_requested_ports : int The number of SSH ports requested. port_type : str The type of port to search for ('LoadBalancer' or 'NodePort'). ip_range : tuple A tuple specifying the range of IPs to search within (start, end). maia_metallb_ip : str, optional The specific IP address to match for 'LoadBalancer' type. Defaults to None. Returns ------- list A list of available SSH ports that meet the specified criteria. None If an error occurs during the process. """ kubeconfig = yaml.safe_load(Path(os.environ["KUBECONFIG"]).read_text()) config.load_kube_config_from_dict(kubeconfig) v1 = client.CoreV1Api() try: used_port = [] services = v1.list_service_for_all_namespaces(watch=False) for svc in services.items: if port_type == 'LoadBalancer': if svc.status.load_balancer.ingress is not None: if svc.spec.type == 'LoadBalancer' and svc.status.load_balancer.ingress[0].ip == maia_metallb_ip: for port in svc.spec.ports: if port.name == 'ssh': # TODO: or port.name == 'pt-orthanc' used_port.append(int(port.port)) elif port_type == "NodePort": if svc.spec.type == 'NodePort': for port in svc.spec.ports: used_port.append(int(port.port)) ports = [] for request in range(n_requested_ports): for port in range(ip_range[0], ip_range[1]): if port not in used_port: ports.append(port) used_port.append(port) break return ports except: return None
[docs] def convert_username_to_jupyterhub_username(username): """ Convert a username to a JupyterHub-compatible username. Parameters ---------- username : str The original username. Returns ------- str The JupyterHub-compatible username. """ return username.replace("-", "-2d").replace("@", "-40").replace(".", "-2e")
[docs] def encode_docker_registry_secret(docker_server, docker_username, docker_password): """ Encode Docker registry credentials into a base64-encoded string. Parameters ---------- docker_server : str The Docker registry server. docker_username : str The Docker registry username. docker_password : str The Docker registry password. Returns ------- str The base64-encoded Docker registry credentials. """ auth = base64.b64encode(f"{docker_username}:{docker_password}".encode("utf-8")).decode("utf-8") return base64.b64encode(json.dumps({ "auths": { docker_server: { "username": docker_username, "password": docker_password, "auth": auth, } } }).encode("utf-8")).decode("utf-8")
[docs] def deploy_oauth2_proxy(cluster_config, user_config, config_folder=None): """ Deploy an OAuth2 Proxy using the provided cluster and user configurations. Parameters ---------- cluster_config : dict Configuration dictionary for the cluster. Expected keys include: - "keycloak": A dictionary with "issuer_url", "client_id", and "client_secret". - "domain": The domain name for the cluster. - "url_type": The type of URL, either "subpath" or other. - "storage_class": The storage class for Redis. - "nginx_cluster_issuer" (optional): The cluster issuer for NGINX. - "traefik_resolver" (optional): The resolver for Traefik. user_config : dict Configuration dictionary for the user. Expected keys include: - "group_ID": The group ID for the user. - "group_subdomain": The subdomain for the user's group. config_folder : str, optional The folder path where the configuration files will be saved. Defaults to None. Returns ------- dict A dictionary containing deployment details: - "namespace": The namespace for the deployment. - "release": The release name for the deployment. - "chart": The chart name for the deployment. - "repo": The repository URL for the chart. - "version": The chart version. - "values": The path to the generated values YAML file. """ config_file = { "oidc_issuer_url": cluster_config["keycloak"]["issuer_url"], "provider": "oidc", "upstreams": ["static://202"], "http_address": "0.0.0.0:4180", "oidc_groups_claim": "groups", "skip_jwt_bearer_tokens": True, "oidc_email_claim": "email", "allowed_groups": ["MAIA:" + user_config["group_ID"], "MAIA:admin"], "scope": "openid email profile", "redirect_url": "https://{}.{}/oauth2/callback".format(user_config["group_subdomain"], cluster_config["domain"]), "email_domains": ["*"], "proxy_prefix": "/oauth2", "ssl_insecure_skip_verify": True, "insecure_oidc_skip_issuer_verification": True, "cookie_secure": True, "reverse_proxy": True, "pass_access_token": True, "pass_authorization_header": True, "set_authorization_header": True, "set_xauthrequest": True, "pass_user_headers": True, "whitelist_domains": ["*"] } if cluster_config["url_type"] == "subpath": config_file["redirect_url"] = "https://{}/oauth2-{}/callback".format(cluster_config["domain"], user_config["group_subdomain"]) config_file["proxy_prefix"] = "/oauth2-{}".format(user_config["group_subdomain"]) oauth2_proxy_config = { "config": { "clientID": cluster_config["keycloak"]["client_id"], "clientSecret": cluster_config["keycloak"]["client_secret"], "cookieSecret": token_urlsafe(16), "configFile": toml.dumps(config_file) }, "redis": { "enabled": True, "global": { "storageClass": cluster_config["storage_class"] } }, "sessionStorage": { "type": "redis", }, "image": { "repository": "quay.io/oauth2-proxy/oauth2-proxy", "tag": "", "pullPolicy": "IfNotPresent" }, "service": { "type": "ClusterIP", "portNumber": 80, "appProtocol": "https", "annotations": {} }, "serviceAccount": { "enabled": True, "name": "", "automountServiceAccountToken": True, "annotations": {} }, "ingress": { "enabled": True, "path": "/oauth2", "pathType": "Prefix", "tls": [{"secretName": "{}.{}-tls".format(user_config["group_subdomain"], cluster_config["domain"]), "hosts": ["{}.{}".format(user_config["group_subdomain"], cluster_config["domain"])]}], "hosts": ["{}.{}".format(user_config["group_subdomain"], cluster_config["domain"])], "annotations": {} } } if cluster_config["url_type"] == "subpath": oauth2_proxy_config["ingress"]["hosts"] = [cluster_config["domain"]] oauth2_proxy_config["ingress"]["tls"][0]["hosts"] = [cluster_config["domain"]] oauth2_proxy_config["ingress"]["path"] = "/oauth2-{}".format(user_config["group_subdomain"]) if "nginx_cluster_issuer" in cluster_config: oauth2_proxy_config["ingress"]["annotations"]["cert-manager.io/cluster-issuer"] = cluster_config["nginx_cluster_issuer"] if "traefik_resolver" in cluster_config: oauth2_proxy_config["ingress"]["annotations"]["traefik.ingress.kubernetes.io/router.entrypoints"] = "websecure" oauth2_proxy_config["ingress"]["annotations"]["traefik.ingress.kubernetes.io/router.tls"] = 'true' oauth2_proxy_config["ingress"]["annotations"]["traefik.ingress.kubernetes.io/router.tls.certresolver"] = cluster_config["traefik_resolver"] oauth2_proxy_config["chart_name"] = "oauth2-proxy" oauth2_proxy_config["chart_version"] = "7.7.8" oauth2_proxy_config["repo_url"] = "https://oauth2-proxy.github.io/manifests" # TODO: Change this to updated values Path(config_folder).joinpath(user_config["group_ID"], "oauth2_proxy_values").mkdir(parents=True, exist_ok=True) with open(Path(config_folder).joinpath(user_config["group_ID"], "oauth2_proxy_values", "oauth2_proxy_values.yaml"), "w") as f: f.write(OmegaConf.to_yaml(oauth2_proxy_config)) return { "namespace": user_config["group_ID"].lower().replace("_", "-"), "release": user_config["group_ID"].lower().replace("_", "-") + "-oauth2-proxy", "chart": oauth2_proxy_config["chart_name"], "repo": oauth2_proxy_config["repo_url"], "version": oauth2_proxy_config["chart_version"], "values": str(Path(config_folder).joinpath(user_config["group_ID"], "oauth2_proxy_values", "oauth2_proxy_values.yaml")) }
[docs] def deploy_mysql(cluster_config, user_config, config_folder, mysql_configs): """ Deploy a MySQL instance on a Kubernetes cluster using Helm. Parameters ---------- cluster_config : dict Configuration dictionary for the cluster, including storage class. user_config : dict Configuration dictionary for the user, including group ID. config_folder : str Path to the folder where configuration files will be stored. mysql_configs : dict Configuration dictionary for MySQL, including user, password, and other settings. Returns ------- dict A dictionary containing deployment details such as namespace, release name, chart name, repository URL, version, and values file path. """ namespace = user_config["group_ID"].lower().replace("_", "-") kubeconfig = yaml.safe_load(Path(os.environ["KUBECONFIG"]).read_text()) mysql_config = { "namespace": namespace, "chart_name": "mysql-db-v1", "docker_image": "mysql", "tag": "8.0.28", "memory_request": "2Gi", "cpu_request": "500m", "deployment": True, "ports": { "mysql": [ 3306 ] }, "persistent_volume": [ { "mountPath": "/var/lib/mysql", "size": "20Gi", "access_mode": "ReadWriteMany", "pvc_type": cluster_config["storage_class"] } ], "env_variables": { "MYSQL_ROOT_PASSWORD": mysql_configs.get("mysql_password", "root"), "MYSQL_USER": mysql_configs.get("mysql_user", "root"), "MYSQL_PASSWORD": mysql_configs.get("mysql_password", "root"), "MYSQL_DATABASE": "mysql" } } # TODO: Change this to updated values mysql_values = read_config_dict_and_generate_helm_values_dict(mysql_config, kubeconfig) mysql_values["chart_name"] = "mkg" mysql_values["chart_version"] = "1.0.2" mysql_values["repo_url"] = "https://kthcloud.github.io/MAIA/" # TODO: Change this to updated values Path(config_folder).joinpath(user_config["group_ID"], "mysql_values").mkdir(parents=True, exist_ok=True) with open(Path(config_folder).joinpath(user_config["group_ID"], "mysql_values", "mysql_values.yaml"), "w") as f: f.write(OmegaConf.to_yaml(mysql_values)) return { "namespace": user_config["group_ID"].lower().replace("_", "-"), "release": user_config["group_ID"].lower().replace("_", "-") + "-mysql", "chart": mysql_values["chart_name"], "repo": mysql_values["repo_url"], "version": mysql_values["chart_version"], "values": str(Path(config_folder).joinpath(user_config["group_ID"], "mysql_values", "mysql_values.yaml")) }
[docs] def deploy_mlflow(cluster_config, user_config, config_folder, mysql_config=None, minio_config=None): """ Deploy an MLflow instance on a Kubernetes cluster using Helm. Parameters ---------- cluster_config : dict Configuration dictionary for the Kubernetes cluster. user_config : dict Configuration dictionary for the user, including group_ID. config_folder : str Path to the folder where configuration files will be stored. mysql_config : dict, optional Configuration dictionary for MySQL, including mysql_user and mysql_password. Defaults to None. minio_config : dict, optional Configuration dictionary for MinIO, including console_access_key and console_secret_key. Defaults to None. Returns ------- dict A dictionary containing deployment details such as namespace, release name, chart name, repository URL, chart version, and path to the values file. """ namespace = user_config["group_ID"].lower().replace("_", "-") kubeconfig = yaml.safe_load(Path(os.environ["KUBECONFIG"]).read_text()) config.load_kube_config_from_dict(kubeconfig) mlflow_config = { "namespace": namespace, "chart_name": "mlflow-v1", "docker_image": "kthcloud/mlflow", # TODO: Make this configurable "tag": "1.1", "memory_request": "2Gi", "cpu_request": "500m", "allocationTime": "180d", "ports": { "mlflow": [ 5000 ] }, "user_secret": [ namespace ], "user_secret_params": [ "user", "password" ], "env_variables": { "MYSQL_URL": "{}-mysql-mkg".format(namespace), "MYSQL_PASSWORD": mysql_config.get("mysql_password", "root"), "MYSQL_USER": mysql_config.get("mysql_user", "root"), "BUCKET_NAME": "mlflow", "BUCKET_PATH": "mlflow", "AWS_ACCESS_KEY_ID": minio_config.get("console_access_key", "minio"), "AWS_SECRET_ACCESS_KEY": minio_config.get("console_secret_key", "minio"), "MLFLOW_S3_ENDPOINT_URL": "http://minio:80" } } # TODO: Change this to updated values if "imagePullSecrets" in cluster_config: mlflow_config["image_pull_secret"] = cluster_config["imagePullSecrets"] mlflow_values = read_config_dict_and_generate_helm_values_dict(mlflow_config, kubeconfig) mlflow_values["chart_name"] = "mkg" mlflow_values["chart_version"] = "1.0.2" mlflow_values["repo_url"] = "https://kthcloud.github.io/MAIA/" # TODO: Change this to updated values Path(config_folder).joinpath(user_config["group_ID"], "mlflow_values").mkdir(parents=True, exist_ok=True) with open(Path(config_folder).joinpath(user_config["group_ID"], "mlflow_values", "mlflow_values.yaml"), "w") as f: f.write(OmegaConf.to_yaml(mlflow_values)) return { "namespace": user_config["group_ID"].lower().replace("_", "-"), "release": user_config["group_ID"].lower().replace("_", "-") + "-mlflow", "chart": mlflow_values["chart_name"], "repo": mlflow_values["repo_url"], "version": mlflow_values["chart_version"], "values": str(Path(config_folder).joinpath(user_config["group_ID"], "mlflow_values", "mlflow_values.yaml")) }