diff --git a/stack_orchestrator/constants.py b/stack_orchestrator/constants.py index 07fc68f4..322b57eb 100644 --- a/stack_orchestrator/constants.py +++ b/stack_orchestrator/constants.py @@ -39,3 +39,5 @@ node_affinities_key = "node-affinities" node_tolerations_key = "node-tolerations" kind_config_filename = "kind-config.yml" kube_config_filename = "kubeconfig.yml" +cri_base_filename = "cri-base.json" +unlimited_memlock_key = "unlimited-memlock" diff --git a/stack_orchestrator/deploy/k8s/deploy_k8s.py b/stack_orchestrator/deploy/k8s/deploy_k8s.py index fdc29f51..cd765317 100644 --- a/stack_orchestrator/deploy/k8s/deploy_k8s.py +++ b/stack_orchestrator/deploy/k8s/deploy_k8s.py @@ -20,10 +20,24 @@ from typing import List from stack_orchestrator import constants from stack_orchestrator.deploy.deployer import Deployer, DeployerConfigGenerator -from stack_orchestrator.deploy.k8s.helpers import create_cluster, destroy_cluster, load_images_into_kind -from stack_orchestrator.deploy.k8s.helpers import install_ingress_for_kind, wait_for_ingress_in_kind -from stack_orchestrator.deploy.k8s.helpers import pods_in_deployment, containers_in_pod, log_stream_from_string -from stack_orchestrator.deploy.k8s.helpers import generate_kind_config +from stack_orchestrator.deploy.k8s.helpers import ( + create_cluster, + destroy_cluster, + load_images_into_kind, +) +from stack_orchestrator.deploy.k8s.helpers import ( + install_ingress_for_kind, + wait_for_ingress_in_kind, +) +from stack_orchestrator.deploy.k8s.helpers import ( + pods_in_deployment, + containers_in_pod, + log_stream_from_string, +) +from stack_orchestrator.deploy.k8s.helpers import ( + generate_kind_config, + generate_cri_base_json, +) from stack_orchestrator.deploy.k8s.cluster_info import ClusterInfo from stack_orchestrator.opts import opts from stack_orchestrator.deploy.deployment_context import DeploymentContext @@ -57,18 +71,31 @@ class K8sDeployer(Deployer): deployment_dir: Path deployment_context: DeploymentContext - def __init__(self, type, deployment_context: DeploymentContext, compose_files, compose_project_name, compose_env_file) -> None: + def __init__( + self, + type, + deployment_context: DeploymentContext, + compose_files, + compose_project_name, + compose_env_file, + ) -> None: self.type = type self.skip_cluster_management = False - # TODO: workaround pending refactoring above to cope with being created with a null deployment_context + # TODO: workaround pending refactoring above to cope with being + # created with a null deployment_context if deployment_context is None: return self.deployment_dir = deployment_context.deployment_dir self.deployment_context = deployment_context self.kind_cluster_name = compose_project_name self.cluster_info = ClusterInfo() - self.cluster_info.int(compose_files, compose_env_file, compose_project_name, deployment_context.spec) - if (opts.o.debug): + self.cluster_info.int( + compose_files, + compose_env_file, + compose_project_name, + deployment_context.spec, + ) + if opts.o.debug: print(f"Deployment dir: {deployment_context.deployment_dir}") print(f"Compose files: {compose_files}") print(f"Project name: {compose_project_name}") @@ -80,7 +107,11 @@ class K8sDeployer(Deployer): config.load_kube_config(context=f"kind-{self.kind_cluster_name}") else: # Get the config file and pass to load_kube_config() - config.load_kube_config(config_file=self.deployment_dir.joinpath(constants.kube_config_filename).as_posix()) + config.load_kube_config( + config_file=self.deployment_dir.joinpath( + constants.kube_config_filename + ).as_posix() + ) self.core_api = client.CoreV1Api() self.networking_api = client.NetworkingV1Api() self.apps_api = client.AppsV1Api() @@ -94,7 +125,9 @@ class K8sDeployer(Deployer): print(f"Sending this pv: {pv}") if not opts.o.dry_run: try: - pv_resp = self.core_api.read_persistent_volume(name=pv.metadata.name) + pv_resp = self.core_api.read_persistent_volume( + name=pv.metadata.name + ) if pv_resp: if opts.o.debug: print("PVs already present:") @@ -117,7 +150,8 @@ class K8sDeployer(Deployer): if not opts.o.dry_run: try: pvc_resp = self.core_api.read_namespaced_persistent_volume_claim( - name=pvc.metadata.name, namespace=self.k8s_namespace) + name=pvc.metadata.name, namespace=self.k8s_namespace + ) if pvc_resp: if opts.o.debug: print("PVCs already present:") @@ -126,7 +160,9 @@ class K8sDeployer(Deployer): except: # noqa: E722 pass - pvc_resp = self.core_api.create_namespaced_persistent_volume_claim(body=pvc, namespace=self.k8s_namespace) + pvc_resp = self.core_api.create_namespaced_persistent_volume_claim( + body=pvc, namespace=self.k8s_namespace + ) if opts.o.debug: print("PVCs created:") print(f"{pvc_resp}") @@ -138,8 +174,7 @@ class K8sDeployer(Deployer): print(f"Sending this ConfigMap: {cfg_map}") if not opts.o.dry_run: cfg_rsp = self.core_api.create_namespaced_config_map( - body=cfg_map, - namespace=self.k8s_namespace + body=cfg_map, namespace=self.k8s_namespace ) if opts.o.debug: print("ConfigMap created:") @@ -147,7 +182,9 @@ class K8sDeployer(Deployer): def _create_deployment(self): # Process compose files into a Deployment - deployment = self.cluster_info.get_deployment(image_pull_policy=None if self.is_kind() else "Always") + deployment = self.cluster_info.get_deployment( + image_pull_policy=None if self.is_kind() else "Always" + ) # Create the k8s objects if opts.o.debug: print(f"Sending this deployment: {deployment}") @@ -157,16 +194,18 @@ class K8sDeployer(Deployer): ) if opts.o.debug: print("Deployment created:") - print(f"{deployment_resp.metadata.namespace} {deployment_resp.metadata.name} \ - {deployment_resp.metadata.generation} {deployment_resp.spec.template.spec.containers[0].image}") + ns = deployment_resp.metadata.namespace + name = deployment_resp.metadata.name + gen = deployment_resp.metadata.generation + img = deployment_resp.spec.template.spec.containers[0].image + print(f"{ns} {name} {gen} {img}") service: client.V1Service = self.cluster_info.get_service() if opts.o.debug: print(f"Sending this service: {service}") if not opts.o.dry_run: service_resp = self.core_api.create_namespaced_service( - namespace=self.k8s_namespace, - body=service + namespace=self.k8s_namespace, body=service ) if opts.o.debug: print("Service created:") @@ -177,7 +216,7 @@ class K8sDeployer(Deployer): group="cert-manager.io", version="v1", namespace=self.k8s_namespace, - plural="certificates" + plural="certificates", ) host_parts = host_name.split(".", 1) @@ -202,7 +241,9 @@ class K8sDeployer(Deployer): if before < now < after: # Check the status is Ready for condition in status.get("conditions", []): - if "True" == condition.get("status") and "Ready" == condition.get("type"): + if "True" == condition.get( + "status" + ) and "Ready" == condition.get("type"): return cert return None @@ -211,14 +252,20 @@ class K8sDeployer(Deployer): if not opts.o.dry_run: if self.is_kind() and not self.skip_cluster_management: # Create the kind cluster - create_cluster(self.kind_cluster_name, self.deployment_dir.joinpath(constants.kind_config_filename)) + create_cluster( + self.kind_cluster_name, + self.deployment_dir.joinpath(constants.kind_config_filename), + ) # Ensure the referenced containers are copied into kind - load_images_into_kind(self.kind_cluster_name, self.cluster_info.image_set) + load_images_into_kind( + self.kind_cluster_name, self.cluster_info.image_set + ) self.connect_api() if self.is_kind() and not self.skip_cluster_management: - # Now configure an ingress controller (not installed by default in kind) + # Configure ingress controller (not installed by default in kind) install_ingress_for_kind() - # Wait for ingress to start (deployment provisioning will fail unless this is done) + # Wait for ingress to start + # (deployment provisioning will fail unless this is done) wait_for_ingress_in_kind() else: @@ -228,21 +275,26 @@ class K8sDeployer(Deployer): self._create_deployment() http_proxy_info = self.cluster_info.spec.get_http_proxy() - # Note: at present we don't support tls for kind (and enabling tls causes errors) + # Note: we don't support tls for kind (enabling tls causes errors) use_tls = http_proxy_info and not self.is_kind() - certificate = self._find_certificate_for_host_name(http_proxy_info[0]["host-name"]) if use_tls else None + certificate = ( + self._find_certificate_for_host_name(http_proxy_info[0]["host-name"]) + if use_tls + else None + ) if opts.o.debug: if certificate: print(f"Using existing certificate: {certificate}") - ingress: client.V1Ingress = self.cluster_info.get_ingress(use_tls=use_tls, certificate=certificate) + ingress: client.V1Ingress = self.cluster_info.get_ingress( + use_tls=use_tls, certificate=certificate + ) if ingress: if opts.o.debug: print(f"Sending this ingress: {ingress}") if not opts.o.dry_run: ingress_resp = self.networking_api.create_namespaced_ingress( - namespace=self.k8s_namespace, - body=ingress + namespace=self.k8s_namespace, body=ingress ) if opts.o.debug: print("Ingress created:") @@ -257,8 +309,7 @@ class K8sDeployer(Deployer): print(f"Sending this nodeport: {nodeport}") if not opts.o.dry_run: nodeport_resp = self.core_api.create_namespaced_service( - namespace=self.k8s_namespace, - body=nodeport + namespace=self.k8s_namespace, body=nodeport ) if opts.o.debug: print("NodePort created:") @@ -276,7 +327,9 @@ class K8sDeployer(Deployer): if opts.o.debug: print(f"Deleting this pv: {pv}") try: - pv_resp = self.core_api.delete_persistent_volume(name=pv.metadata.name) + pv_resp = self.core_api.delete_persistent_volume( + name=pv.metadata.name + ) if opts.o.debug: print("PV deleted:") print(f"{pv_resp}") @@ -328,13 +381,14 @@ class K8sDeployer(Deployer): print(f"Deleting service: {service}") try: self.core_api.delete_namespaced_service( - namespace=self.k8s_namespace, - name=service.metadata.name + namespace=self.k8s_namespace, name=service.metadata.name ) except client.exceptions.ApiException as e: _check_delete_exception(e) - ingress: client.V1Ingress = self.cluster_info.get_ingress(use_tls=not self.is_kind()) + ingress: client.V1Ingress = self.cluster_info.get_ingress( + use_tls=not self.is_kind() + ) if ingress: if opts.o.debug: print(f"Deleting this ingress: {ingress}") @@ -354,8 +408,7 @@ class K8sDeployer(Deployer): print(f"Deleting this nodeport: {nodeport}") try: self.core_api.delete_namespaced_service( - namespace=self.k8s_namespace, - name=nodeport.metadata.name + namespace=self.k8s_namespace, name=nodeport.metadata.name ) except client.exceptions.ApiException as e: _check_delete_exception(e) @@ -385,21 +438,25 @@ class K8sDeployer(Deployer): ip = "?" tls = "?" try: - ingress = self.networking_api.read_namespaced_ingress(namespace=self.k8s_namespace, - name=self.cluster_info.get_ingress().metadata.name) + ingress = self.networking_api.read_namespaced_ingress( + namespace=self.k8s_namespace, + name=self.cluster_info.get_ingress().metadata.name, + ) cert = self.custom_obj_api.get_namespaced_custom_object( group="cert-manager.io", version="v1", namespace=self.k8s_namespace, plural="certificates", - name=ingress.spec.tls[0].secret_name + name=ingress.spec.tls[0].secret_name, ) hostname = ingress.spec.rules[0].host ip = ingress.status.load_balancer.ingress[0].ip tls = "notBefore: %s; notAfter: %s; names: %s" % ( - cert["status"]["notBefore"], cert["status"]["notAfter"], ingress.spec.tls[0].hosts + cert["status"]["notBefore"], + cert["status"]["notAfter"], + ingress.spec.tls[0].hosts, ) except: # noqa: E722 pass @@ -412,10 +469,14 @@ class K8sDeployer(Deployer): print("Pods:") for p in pods: + ns = p.metadata.namespace + name = p.metadata.name if p.metadata.deletion_timestamp: - print(f"\t{p.metadata.namespace}/{p.metadata.name}: Terminating ({p.metadata.deletion_timestamp})") + ts = p.metadata.deletion_timestamp + print(f"\t{ns}/{name}: Terminating ({ts})") else: - print(f"\t{p.metadata.namespace}/{p.metadata.name}: Running ({p.metadata.creation_timestamp})") + ts = p.metadata.creation_timestamp + print(f"\t{ns}/{name}: Running ({ts})") def ps(self): self.connect_api() @@ -430,19 +491,22 @@ class K8sDeployer(Deployer): for c in p.spec.containers: if c.ports: for prt in c.ports: - ports[str(prt.container_port)] = [AttrDict({ - "HostIp": pod_ip, - "HostPort": prt.container_port - })] + ports[str(prt.container_port)] = [ + AttrDict( + {"HostIp": pod_ip, "HostPort": prt.container_port} + ) + ] - ret.append(AttrDict({ - "id": f"{p.metadata.namespace}/{p.metadata.name}", - "name": p.metadata.name, - "namespace": p.metadata.namespace, - "network_settings": AttrDict({ - "ports": ports - }) - })) + ret.append( + AttrDict( + { + "id": f"{p.metadata.namespace}/{p.metadata.name}", + "name": p.metadata.name, + "namespace": p.metadata.namespace, + "network_settings": AttrDict({"ports": ports}), + } + ) + ) return ret @@ -465,11 +529,13 @@ class K8sDeployer(Deployer): else: k8s_pod_name = pods[0] containers = containers_in_pod(self.core_api, k8s_pod_name) - # If the pod is not yet started, the logs request below will throw an exception + # If pod not started, logs request below will throw an exception try: log_data = "" for container in containers: - container_log = self.core_api.read_namespaced_pod_log(k8s_pod_name, namespace="default", container=container) + container_log = self.core_api.read_namespaced_pod_log( + k8s_pod_name, namespace="default", container=container + ) container_log_lines = container_log.splitlines() for line in container_log_lines: log_data += f"{container}: {line}\n" @@ -484,8 +550,7 @@ class K8sDeployer(Deployer): ref_deployment = self.cluster_info.get_deployment() deployment = self.apps_api.read_namespaced_deployment( - name=ref_deployment.metadata.name, - namespace=self.k8s_namespace + name=ref_deployment.metadata.name, namespace=self.k8s_namespace ) new_env = ref_deployment.spec.template.spec.containers[0].env @@ -503,10 +568,20 @@ class K8sDeployer(Deployer): self.apps_api.patch_namespaced_deployment( name=ref_deployment.metadata.name, namespace=self.k8s_namespace, - body=deployment + body=deployment, ) - def run(self, image: str, command=None, user=None, volumes=None, entrypoint=None, env={}, ports=[], detach=False): + def run( + self, + image: str, + command=None, + user=None, + volumes=None, + entrypoint=None, + env={}, + ports=[], + detach=False, + ): # We need to figure out how to do this -- check why we're being called first pass @@ -518,7 +593,10 @@ class K8sDeployer(Deployer): chart_dir = self.deployment_dir / "chart" if not chart_dir.exists(): # TODO: Implement job support for compose-based K8s deployments - raise Exception(f"Job support is only available for helm-based deployments. Chart directory not found: {chart_dir}") + raise Exception( + f"Job support is only available for helm-based " + f"deployments. Chart directory not found: {chart_dir}" + ) # Run the job using the helm job runner run_helm_job( @@ -527,7 +605,7 @@ class K8sDeployer(Deployer): release=helm_release, namespace=self.k8s_namespace, timeout=600, - verbose=opts.o.verbose + verbose=opts.o.verbose, ) def is_kind(self): @@ -545,6 +623,18 @@ class K8sDeployerConfigGenerator(DeployerConfigGenerator): def generate(self, deployment_dir: Path): # No need to do this for the remote k8s case if self.type == "k8s-kind": + # Generate cri-base.json if unlimited_memlock is enabled. + # Must be done before generate_kind_config() which references it. + if self.deployment_context.spec.get_unlimited_memlock(): + cri_base_content = generate_cri_base_json() + cri_base_file = deployment_dir.joinpath(constants.cri_base_filename) + if opts.o.debug: + print( + f"Creating cri-base.json for unlimited memlock: {cri_base_file}" + ) + with open(cri_base_file, "w") as output_file: + output_file.write(cri_base_content) + # Check the file isn't already there # Get the config file contents content = generate_kind_config(deployment_dir, self.deployment_context) diff --git a/stack_orchestrator/deploy/k8s/helpers.py b/stack_orchestrator/deploy/k8s/helpers.py index de2dea7f..010b656a 100644 --- a/stack_orchestrator/deploy/k8s/helpers.py +++ b/stack_orchestrator/deploy/k8s/helpers.py @@ -24,6 +24,7 @@ from stack_orchestrator.util import get_k8s_dir, error_exit from stack_orchestrator.opts import opts from stack_orchestrator.deploy.deploy_util import parsed_pod_files_map_from_file_names from stack_orchestrator.deploy.deployer import DeployerException +from stack_orchestrator import constants def get_kind_cluster(): @@ -33,10 +34,7 @@ def get_kind_cluster(): Returns the cluster name or None if no cluster exists. """ result = subprocess.run( - "kind get clusters", - shell=True, - capture_output=True, - text=True + "kind get clusters", shell=True, capture_output=True, text=True ) if result.returncode != 0: return None @@ -71,12 +69,14 @@ def wait_for_ingress_in_kind(): for i in range(20): warned_waiting = False w = watch.Watch() - for event in w.stream(func=core_v1.list_namespaced_pod, - namespace="ingress-nginx", - label_selector="app.kubernetes.io/component=controller", - timeout_seconds=30): - if event['object'].status.container_statuses: - if event['object'].status.container_statuses[0].ready is True: + for event in w.stream( + func=core_v1.list_namespaced_pod, + namespace="ingress-nginx", + label_selector="app.kubernetes.io/component=controller", + timeout_seconds=30, + ): + if event["object"].status.container_statuses: + if event["object"].status.container_statuses[0].ready is True: if warned_waiting: print("Ingress controller is ready") return @@ -87,7 +87,11 @@ def wait_for_ingress_in_kind(): def install_ingress_for_kind(): api_client = client.ApiClient() - ingress_install = os.path.abspath(get_k8s_dir().joinpath("components", "ingress", "ingress-nginx-kind-deploy.yaml")) + ingress_install = os.path.abspath( + get_k8s_dir().joinpath( + "components", "ingress", "ingress-nginx-kind-deploy.yaml" + ) + ) if opts.o.debug: print("Installing nginx ingress controller in kind cluster") utils.create_from_yaml(api_client, yaml_file=ingress_install) @@ -95,14 +99,18 @@ def install_ingress_for_kind(): def load_images_into_kind(kind_cluster_name: str, image_set: Set[str]): for image in image_set: - result = _run_command(f"kind load docker-image {image} --name {kind_cluster_name}") + result = _run_command( + f"kind load docker-image {image} --name {kind_cluster_name}" + ) if result.returncode != 0: raise DeployerException(f"kind load docker-image failed: {result}") def pods_in_deployment(core_api: client.CoreV1Api, deployment_name: str): pods = [] - pod_response = core_api.list_namespaced_pod(namespace="default", label_selector=f"app={deployment_name}") + pod_response = core_api.list_namespaced_pod( + namespace="default", label_selector=f"app={deployment_name}" + ) if opts.o.debug: print(f"pod_response: {pod_response}") for pod_info in pod_response.items: @@ -158,13 +166,16 @@ def volume_mounts_for_service(parsed_pod_files, service): if "volumes" in service_obj: volumes = service_obj["volumes"] for mount_string in volumes: - # Looks like: test-data:/data or test-data:/data:ro or test-data:/data:rw + # Looks like: test-data:/data + # or test-data:/data:ro or test-data:/data:rw if opts.o.debug: print(f"mount_string: {mount_string}") mount_split = mount_string.split(":") volume_name = mount_split[0] mount_path = mount_split[1] - mount_options = mount_split[2] if len(mount_split) == 3 else None + mount_options = ( + mount_split[2] if len(mount_split) == 3 else None + ) if opts.o.debug: print(f"volume_name: {volume_name}") print(f"mount path: {mount_path}") @@ -172,7 +183,7 @@ def volume_mounts_for_service(parsed_pod_files, service): volume_device = client.V1VolumeMount( mount_path=mount_path, name=volume_name, - read_only="ro" == mount_options + read_only="ro" == mount_options, ) result.append(volume_device) return result @@ -187,12 +198,18 @@ def volumes_for_pod_files(parsed_pod_files, spec, app_name): for volume_name in volumes.keys(): if volume_name in spec.get_configmaps(): # Set defaultMode=0o755 to make scripts executable - config_map = client.V1ConfigMapVolumeSource(name=f"{app_name}-{volume_name}", default_mode=0o755) + config_map = client.V1ConfigMapVolumeSource( + name=f"{app_name}-{volume_name}", default_mode=0o755 + ) volume = client.V1Volume(name=volume_name, config_map=config_map) result.append(volume) else: - claim = client.V1PersistentVolumeClaimVolumeSource(claim_name=f"{app_name}-{volume_name}") - volume = client.V1Volume(name=volume_name, persistent_volume_claim=claim) + claim = client.V1PersistentVolumeClaimVolumeSource( + claim_name=f"{app_name}-{volume_name}" + ) + volume = client.V1Volume( + name=volume_name, persistent_volume_claim=claim + ) result.append(volume) return result @@ -224,7 +241,8 @@ def _generate_kind_mounts(parsed_pod_files, deployment_dir, deployment_context): if "volumes" in service_obj: volumes = service_obj["volumes"] for mount_string in volumes: - # Looks like: test-data:/data or test-data:/data:ro or test-data:/data:rw + # Looks like: test-data:/data + # or test-data:/data:ro or test-data:/data:rw if opts.o.debug: print(f"mount_string: {mount_string}") mount_split = mount_string.split(":") @@ -236,15 +254,21 @@ def _generate_kind_mounts(parsed_pod_files, deployment_dir, deployment_context): print(f"mount path: {mount_path}") if volume_name not in deployment_context.spec.get_configmaps(): if volume_host_path_map[volume_name]: + host_path = _make_absolute_host_path( + volume_host_path_map[volume_name], + deployment_dir, + ) + container_path = get_kind_pv_bind_mount_path( + volume_name + ) volume_definitions.append( - f" - hostPath: {_make_absolute_host_path(volume_host_path_map[volume_name], deployment_dir)}\n" - f" containerPath: {get_kind_pv_bind_mount_path(volume_name)}\n" + f" - hostPath: {host_path}\n" + f" containerPath: {container_path}\n" ) return ( - "" if len(volume_definitions) == 0 else ( - " extraMounts:\n" - f"{''.join(volume_definitions)}" - ) + "" + if len(volume_definitions) == 0 + else (" extraMounts:\n" f"{''.join(volume_definitions)}") ) @@ -262,12 +286,14 @@ def _generate_kind_port_mappings_from_services(parsed_pod_files): for port_string in ports: # TODO handle the complex cases # Looks like: 80 or something more complicated - port_definitions.append(f" - containerPort: {port_string}\n hostPort: {port_string}\n") + port_definitions.append( + f" - containerPort: {port_string}\n" + f" hostPort: {port_string}\n" + ) return ( - "" if len(port_definitions) == 0 else ( - " extraPortMappings:\n" - f"{''.join(port_definitions)}" - ) + "" + if len(port_definitions) == 0 + else (" extraPortMappings:\n" f"{''.join(port_definitions)}") ) @@ -275,13 +301,48 @@ def _generate_kind_port_mappings(parsed_pod_files): port_definitions = [] # For now we just map port 80 for the nginx ingress controller we install in kind port_string = "80" - port_definitions.append(f" - containerPort: {port_string}\n hostPort: {port_string}\n") - return ( - "" if len(port_definitions) == 0 else ( - " extraPortMappings:\n" - f"{''.join(port_definitions)}" - ) + port_definitions.append( + f" - containerPort: {port_string}\n hostPort: {port_string}\n" ) + return ( + "" + if len(port_definitions) == 0 + else (" extraPortMappings:\n" f"{''.join(port_definitions)}") + ) + + +def _generate_cri_base_mount(deployment_dir: Path): + """Generate the extraMount entry for cri-base.json to set RLIMIT_MEMLOCK.""" + cri_base_path = deployment_dir.joinpath(constants.cri_base_filename).resolve() + return ( + f" - hostPath: {cri_base_path}\n" + f" containerPath: /etc/containerd/cri-base.json\n" + ) + + +def generate_cri_base_json(): + """Generate cri-base.json content with unlimited RLIMIT_MEMLOCK. + + This is needed for workloads like Solana validators that require large + amounts of locked memory for memory-mapped files during snapshot decompression. + + The IPC_LOCK capability alone doesn't raise the RLIMIT_MEMLOCK limit - it only + allows mlock() calls. We need to set the rlimit in the OCI runtime spec. + """ + import json + + # Use maximum 64-bit signed integer value for unlimited + max_rlimit = 9223372036854775807 + cri_base = { + "ociVersion": "1.0.2-dev", + "process": { + "rlimits": [ + {"type": "RLIMIT_MEMLOCK", "hard": max_rlimit, "soft": max_rlimit}, + {"type": "RLIMIT_NOFILE", "hard": 1048576, "soft": 1048576}, + ] + }, + } + return json.dumps(cri_base, indent=2) # Note: this makes any duplicate definition in b overwrite a @@ -314,7 +375,9 @@ def _expand_shell_vars(raw_val: str, env_map: Mapping[str, str] = None) -> str: return raw_val -def envs_from_compose_file(compose_file_envs: Mapping[str, str], env_map: Mapping[str, str] = None) -> Mapping[str, str]: +def envs_from_compose_file( + compose_file_envs: Mapping[str, str], env_map: Mapping[str, str] = None +) -> Mapping[str, str]: result = {} for env_var, env_val in compose_file_envs.items(): expanded_env_val = _expand_shell_vars(env_val, env_map) @@ -322,7 +385,9 @@ def envs_from_compose_file(compose_file_envs: Mapping[str, str], env_map: Mappin return result -def envs_from_environment_variables_map(map: Mapping[str, str]) -> List[client.V1EnvVar]: +def envs_from_environment_variables_map( + map: Mapping[str, str] +) -> List[client.V1EnvVar]: result = [] for env_var, env_val in map.items(): result.append(client.V1EnvVar(env_var, env_val)) @@ -353,7 +418,20 @@ def generate_kind_config(deployment_dir: Path, deployment_context): pod_files = [p for p in compose_file_dir.iterdir() if p.is_file()] parsed_pod_files_map = parsed_pod_files_map_from_file_names(pod_files) port_mappings_yml = _generate_kind_port_mappings(parsed_pod_files_map) - mounts_yml = _generate_kind_mounts(parsed_pod_files_map, deployment_dir, deployment_context) + mounts_yml = _generate_kind_mounts( + parsed_pod_files_map, deployment_dir, deployment_context + ) + + # Check if unlimited_memlock is enabled and add cri-base.json mount + unlimited_memlock = deployment_context.spec.get_unlimited_memlock() + if unlimited_memlock: + cri_base_mount = _generate_cri_base_mount(deployment_dir) + if mounts_yml: + # Append to existing mounts + mounts_yml = mounts_yml.rstrip() + "\n" + cri_base_mount + else: + mounts_yml = f" extraMounts:\n{cri_base_mount}" + return ( "kind: Cluster\n" "apiVersion: kind.x-k8s.io/v1alpha4\n" @@ -364,7 +442,7 @@ def generate_kind_config(deployment_dir: Path, deployment_context): " kind: InitConfiguration\n" " nodeRegistration:\n" " kubeletExtraArgs:\n" - " node-labels: \"ingress-ready=true\"\n" + ' node-labels: "ingress-ready=true"\n' f"{port_mappings_yml}\n" f"{mounts_yml}\n" ) diff --git a/stack_orchestrator/deploy/spec.py b/stack_orchestrator/deploy/spec.py index bc1247eb..09a99d41 100644 --- a/stack_orchestrator/deploy/spec.py +++ b/stack_orchestrator/deploy/spec.py @@ -72,7 +72,6 @@ class Resources: class Spec: - obj: typing.Any file_path: Path @@ -105,10 +104,14 @@ class Spec: return self.obj.get(constants.configmaps_key, {}) def get_container_resources(self): - return Resources(self.obj.get(constants.resources_key, {}).get("containers", {})) + return Resources( + self.obj.get(constants.resources_key, {}).get("containers", {}) + ) def get_volume_resources(self): - return Resources(self.obj.get(constants.resources_key, {}).get(constants.volumes_key, {})) + return Resources( + self.obj.get(constants.resources_key, {}).get(constants.volumes_key, {}) + ) def get_http_proxy(self): return self.obj.get(constants.network_key, {}).get(constants.http_proxy_key, []) @@ -129,17 +132,34 @@ class Spec: return self.obj.get(constants.labels_key, {}) def get_privileged(self): - return "true" == str(self.obj.get(constants.security_key, {}).get("privileged", "false")).lower() + return ( + "true" + == str( + self.obj.get(constants.security_key, {}).get("privileged", "false") + ).lower() + ) def get_capabilities(self): return self.obj.get(constants.security_key, {}).get("capabilities", []) + def get_unlimited_memlock(self): + return ( + "true" + == str( + self.obj.get(constants.security_key, {}).get( + constants.unlimited_memlock_key, "false" + ) + ).lower() + ) + def get_deployment_type(self): return self.obj.get(constants.deploy_to_key) def is_kubernetes_deployment(self): - return self.get_deployment_type() in [constants.k8s_kind_deploy_type, - constants.k8s_deploy_type] + return self.get_deployment_type() in [ + constants.k8s_kind_deploy_type, + constants.k8s_deploy_type, + ] def is_kind_deployment(self): return self.get_deployment_type() in [constants.k8s_kind_deploy_type]