From 74deb3f8d64d4fa44fe0151bea69857efe0edce1 Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Tue, 3 Mar 2026 13:42:04 +0000 Subject: [PATCH] feat(k8s): add Job support for non-Helm k8s-kind deployments Co-Authored-By: Claude Opus 4.6 --- stack_orchestrator/deploy/deploy.py | 21 +++- stack_orchestrator/deploy/deploy_types.py | 1 + stack_orchestrator/deploy/deployer_factory.py | 8 +- .../deploy/deployment_create.py | 4 +- stack_orchestrator/deploy/k8s/cluster_info.py | 105 ++++++++++++++++-- stack_orchestrator/deploy/k8s/deploy_k8s.py | 79 ++++++++++--- stack_orchestrator/util.py | 16 ++- 7 files changed, 200 insertions(+), 34 deletions(-) diff --git a/stack_orchestrator/deploy/deploy.py b/stack_orchestrator/deploy/deploy.py index 86c1856c..30f88fa2 100644 --- a/stack_orchestrator/deploy/deploy.py +++ b/stack_orchestrator/deploy/deploy.py @@ -35,6 +35,7 @@ from stack_orchestrator.util import ( get_dev_root_path, stack_is_in_deployment, resolve_compose_file, + get_job_list, ) from stack_orchestrator.deploy.deployer import DeployerException from stack_orchestrator.deploy.deployer_factory import getDeployer @@ -130,6 +131,7 @@ def create_deploy_context( compose_files=cluster_context.compose_files, compose_project_name=cluster_context.cluster, compose_env_file=cluster_context.env_file, + job_compose_files=cluster_context.job_compose_files, ) return DeployCommandContext(stack, cluster_context, deployer) @@ -403,7 +405,7 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file): stack_config = get_parsed_stack_config(stack) if stack_config is not None: # TODO: syntax check the input here - pods_in_scope = stack_config["pods"] + pods_in_scope = stack_config.get("pods") or [] cluster_config = ( stack_config["config"] if "config" in stack_config else None ) @@ -477,6 +479,22 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file): if ctx.verbose: print(f"files: {compose_files}") + # Gather job compose files (from compose-jobs/ directory in deployment) + job_compose_files = [] + if deployment and stack: + stack_config = get_parsed_stack_config(stack) + if stack_config: + jobs = get_job_list(stack_config) + compose_jobs_dir = stack.joinpath("compose-jobs") + for job in jobs: + job_file_name = os.path.join( + compose_jobs_dir, f"docker-compose-{job}.yml" + ) + if os.path.exists(job_file_name): + job_compose_files.append(job_file_name) + if ctx.verbose: + print(f"job files: {job_compose_files}") + return ClusterContext( ctx, cluster, @@ -485,6 +503,7 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file): post_start_commands, cluster_config, env_file, + job_compose_files=job_compose_files if job_compose_files else None, ) diff --git a/stack_orchestrator/deploy/deploy_types.py b/stack_orchestrator/deploy/deploy_types.py index 202e0fa5..68a5f903 100644 --- a/stack_orchestrator/deploy/deploy_types.py +++ b/stack_orchestrator/deploy/deploy_types.py @@ -29,6 +29,7 @@ class ClusterContext: post_start_commands: List[str] config: Optional[str] env_file: Optional[str] + job_compose_files: Optional[List[str]] = None @dataclass diff --git a/stack_orchestrator/deploy/deployer_factory.py b/stack_orchestrator/deploy/deployer_factory.py index 1de14cc5..3bbae74c 100644 --- a/stack_orchestrator/deploy/deployer_factory.py +++ b/stack_orchestrator/deploy/deployer_factory.py @@ -34,7 +34,12 @@ def getDeployerConfigGenerator(type: str, deployment_context): def getDeployer( - type: str, deployment_context, compose_files, compose_project_name, compose_env_file + type: str, + deployment_context, + compose_files, + compose_project_name, + compose_env_file, + job_compose_files=None, ): if type == "compose" or type is None: return DockerDeployer( @@ -54,6 +59,7 @@ def getDeployer( compose_files, compose_project_name, compose_env_file, + job_compose_files=job_compose_files, ) else: print(f"ERROR: deploy-to {type} is not valid") diff --git a/stack_orchestrator/deploy/deployment_create.py b/stack_orchestrator/deploy/deployment_create.py index ffbc2872..0546f370 100644 --- a/stack_orchestrator/deploy/deployment_create.py +++ b/stack_orchestrator/deploy/deployment_create.py @@ -1017,9 +1017,9 @@ def _write_deployment_files( dirs_exist_ok=True, ) - # Copy the job files into the target dir (for Docker deployments) + # Copy the job files into the target dir jobs = get_job_list(parsed_stack) - if jobs and not parsed_spec.is_kubernetes_deployment(): + if jobs: destination_compose_jobs_dir = target_dir.joinpath("compose-jobs") os.makedirs(destination_compose_jobs_dir, exist_ok=True) for job in jobs: diff --git a/stack_orchestrator/deploy/k8s/cluster_info.py b/stack_orchestrator/deploy/k8s/cluster_info.py index 088292ca..fe816cb5 100644 --- a/stack_orchestrator/deploy/k8s/cluster_info.py +++ b/stack_orchestrator/deploy/k8s/cluster_info.py @@ -72,13 +72,14 @@ def to_k8s_resource_requirements(resources: Resources) -> client.V1ResourceRequi class ClusterInfo: parsed_pod_yaml_map: Any + parsed_job_yaml_map: Any image_set: Set[str] = set() app_name: str environment_variables: DeployEnvVars spec: Spec def __init__(self) -> None: - pass + self.parsed_job_yaml_map = {} def int(self, pod_files: List[str], compose_env_file, deployment_name, spec: Spec): self.parsed_pod_yaml_map = parsed_pod_files_map_from_file_names(pod_files) @@ -94,6 +95,12 @@ class ClusterInfo: if opts.o.debug: print(f"Env vars: {self.environment_variables.map}") + def init_jobs(self, job_files: List[str]): + """Initialize parsed job YAML map from job compose files.""" + self.parsed_job_yaml_map = parsed_pod_files_map_from_file_names(job_files) + if opts.o.debug: + print(f"Parsed job yaml map: {self.parsed_job_yaml_map}") + def get_nodeports(self): nodeports = [] for pod_name in self.parsed_pod_yaml_map: @@ -424,15 +431,25 @@ class ClusterInfo: # 3. Fall back to spec.yml global (already resolved with DEFAULT fallback) return global_resources - # TODO: put things like image pull policy into an object-scope struct - def get_deployment(self, image_pull_policy: Optional[str] = None): + def _build_containers( + self, + parsed_yaml_map: Any, + image_pull_policy: Optional[str] = None, + ) -> tuple: + """Build k8s container specs from parsed compose YAML. + + Returns a tuple of (containers, services, volumes) where: + - containers: list of V1Container objects + - services: the last services dict processed (used for annotations/labels) + - volumes: list of V1Volume objects + """ containers = [] services = {} global_resources = self.spec.get_container_resources() if not global_resources: global_resources = DEFAULT_CONTAINER_RESOURCES - for pod_name in self.parsed_pod_yaml_map: - pod = self.parsed_pod_yaml_map[pod_name] + for pod_name in parsed_yaml_map: + pod = parsed_yaml_map[pod_name] services = pod["services"] for service_name in services: container_name = service_name @@ -489,7 +506,7 @@ class ClusterInfo: else image ) volume_mounts = volume_mounts_for_service( - self.parsed_pod_yaml_map, service_name + parsed_yaml_map, service_name ) # Handle command/entrypoint from compose file # In docker-compose: entrypoint -> k8s command, command -> k8s args @@ -548,7 +565,14 @@ class ClusterInfo: ) containers.append(container) volumes = volumes_for_pod_files( - self.parsed_pod_yaml_map, self.spec, self.app_name + parsed_yaml_map, self.spec, self.app_name + ) + return containers, services, volumes + + # TODO: put things like image pull policy into an object-scope struct + def get_deployment(self, image_pull_policy: Optional[str] = None): + containers, services, volumes = self._build_containers( + self.parsed_pod_yaml_map, image_pull_policy ) registry_config = self.spec.get_image_registry_config() if registry_config: @@ -638,3 +662,70 @@ class ClusterInfo: spec=spec, ) return deployment + + def get_jobs(self, image_pull_policy: Optional[str] = None) -> List[client.V1Job]: + """Build k8s Job objects from parsed job compose files. + + Each job compose file produces a V1Job with: + - restartPolicy: Never + - backoffLimit: 0 + - Name: {app_name}-job-{job_name} + """ + if not self.parsed_job_yaml_map: + return [] + + jobs = [] + registry_config = self.spec.get_image_registry_config() + if registry_config: + secret_name = f"{self.app_name}-registry" + image_pull_secrets = [client.V1LocalObjectReference(name=secret_name)] + else: + image_pull_secrets = [] + + for job_file in self.parsed_job_yaml_map: + # Build containers for this single job file + single_job_map = {job_file: self.parsed_job_yaml_map[job_file]} + containers, _services, volumes = self._build_containers( + single_job_map, image_pull_policy + ) + + # Derive job name from file path: docker-compose-.yml -> + import os + + base = os.path.basename(job_file) + # Strip docker-compose- prefix and .yml suffix + job_name = base + if job_name.startswith("docker-compose-"): + job_name = job_name[len("docker-compose-"):] + if job_name.endswith(".yml"): + job_name = job_name[: -len(".yml")] + elif job_name.endswith(".yaml"): + job_name = job_name[: -len(".yaml")] + + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta( + labels={"app": self.app_name, "job-name": job_name} + ), + spec=client.V1PodSpec( + containers=containers, + image_pull_secrets=image_pull_secrets, + volumes=volumes, + restart_policy="Never", + ), + ) + job_spec = client.V1JobSpec( + template=template, + backoff_limit=0, + ) + job = client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta( + name=f"{self.app_name}-job-{job_name}", + labels={"app": self.app_name}, + ), + spec=job_spec, + ) + jobs.append(job) + + return jobs diff --git a/stack_orchestrator/deploy/k8s/deploy_k8s.py b/stack_orchestrator/deploy/k8s/deploy_k8s.py index f7f8ad43..915b2cd7 100644 --- a/stack_orchestrator/deploy/k8s/deploy_k8s.py +++ b/stack_orchestrator/deploy/k8s/deploy_k8s.py @@ -95,6 +95,7 @@ class K8sDeployer(Deployer): type: str core_api: client.CoreV1Api apps_api: client.AppsV1Api + batch_api: client.BatchV1Api networking_api: client.NetworkingV1Api k8s_namespace: str kind_cluster_name: str @@ -110,6 +111,7 @@ class K8sDeployer(Deployer): compose_files, compose_project_name, compose_env_file, + job_compose_files=None, ) -> None: self.type = type self.skip_cluster_management = False @@ -130,9 +132,13 @@ class K8sDeployer(Deployer): compose_project_name, deployment_context.spec, ) + # Initialize job compose files if provided + if job_compose_files: + self.cluster_info.init_jobs(job_compose_files) if opts.o.debug: print(f"Deployment dir: {deployment_context.deployment_dir}") print(f"Compose files: {compose_files}") + print(f"Job compose files: {job_compose_files}") print(f"Project name: {compose_project_name}") print(f"Env file: {compose_env_file}") print(f"Type: {type}") @@ -150,6 +156,7 @@ class K8sDeployer(Deployer): self.core_api = client.CoreV1Api() self.networking_api = client.NetworkingV1Api() self.apps_api = client.AppsV1Api() + self.batch_api = client.BatchV1Api() self.custom_obj_api = client.CustomObjectsApi() def _ensure_namespace(self): @@ -293,6 +300,26 @@ class K8sDeployer(Deployer): print("Service created:") print(f"{service_resp}") + def _create_jobs(self): + # Process job compose files into k8s Jobs + jobs = self.cluster_info.get_jobs( + image_pull_policy=None if self.is_kind() else "Always" + ) + for job in jobs: + if opts.o.debug: + print(f"Sending this job: {job}") + if not opts.o.dry_run: + job_resp = self.batch_api.create_namespaced_job( + body=job, namespace=self.k8s_namespace + ) + if opts.o.debug: + print("Job created:") + if job_resp.metadata: + print( + f" {job_resp.metadata.namespace} " + f"{job_resp.metadata.name}" + ) + def _find_certificate_for_host_name(self, host_name): all_certificates = self.custom_obj_api.list_namespaced_custom_object( group="cert-manager.io", @@ -384,6 +411,7 @@ class K8sDeployer(Deployer): self._create_volume_data() self._create_deployment() + self._create_jobs() http_proxy_info = self.cluster_info.spec.get_http_proxy() # Note: we don't support tls for kind (enabling tls causes errors) @@ -659,26 +687,43 @@ class K8sDeployer(Deployer): def run_job(self, job_name: str, helm_release: Optional[str] = None): if not opts.o.dry_run: - from stack_orchestrator.deploy.k8s.helm.job_runner import run_helm_job - # Check if this is a helm-based deployment chart_dir = self.deployment_dir / "chart" - if not chart_dir.exists(): - # TODO: Implement job support for compose-based K8s deployments - raise Exception( - f"Job support is only available for helm-based " - f"deployments. Chart directory not found: {chart_dir}" - ) + if chart_dir.exists(): + from stack_orchestrator.deploy.k8s.helm.job_runner import run_helm_job - # Run the job using the helm job runner - run_helm_job( - chart_dir=chart_dir, - job_name=job_name, - release=helm_release, - namespace=self.k8s_namespace, - timeout=600, - verbose=opts.o.verbose, - ) + # Run the job using the helm job runner + run_helm_job( + chart_dir=chart_dir, + job_name=job_name, + release=helm_release, + namespace=self.k8s_namespace, + timeout=600, + verbose=opts.o.verbose, + ) + else: + # Non-Helm path: create job from ClusterInfo + self.connect_api() + jobs = self.cluster_info.get_jobs( + image_pull_policy=None if self.is_kind() else "Always" + ) + # Find the matching job by name + target_name = f"{self.cluster_info.app_name}-job-{job_name}" + matched_job = None + for job in jobs: + if job.metadata and job.metadata.name == target_name: + matched_job = job + break + if matched_job is None: + raise Exception( + f"Job '{job_name}' not found. Available jobs: " + f"{[j.metadata.name for j in jobs if j.metadata]}" + ) + if opts.o.debug: + print(f"Creating job: {target_name}") + self.batch_api.create_namespaced_job( + body=matched_job, namespace=self.k8s_namespace + ) def is_kind(self): return self.type == "k8s-kind" diff --git a/stack_orchestrator/util.py b/stack_orchestrator/util.py index fc8437ca..7e1e442c 100644 --- a/stack_orchestrator/util.py +++ b/stack_orchestrator/util.py @@ -75,6 +75,8 @@ def get_parsed_stack_config(stack): def get_pod_list(parsed_stack): # Handle both old and new format + if "pods" not in parsed_stack or not parsed_stack["pods"]: + return [] pods = parsed_stack["pods"] if type(pods[0]) is str: result = pods @@ -103,7 +105,7 @@ def get_job_list(parsed_stack): def get_plugin_code_paths(stack) -> List[Path]: parsed_stack = get_parsed_stack_config(stack) - pods = parsed_stack["pods"] + pods = parsed_stack.get("pods") or [] result: Set[Path] = set() for pod in pods: if type(pod) is str: @@ -160,8 +162,10 @@ def resolve_job_compose_file(stack, job_name: str): def get_pod_file_path(stack, parsed_stack, pod_name: str): - pods = parsed_stack["pods"] + pods = parsed_stack.get("pods") or [] result = None + if not pods: + return result if type(pods[0]) is str: result = resolve_compose_file(stack, pod_name) else: @@ -189,9 +193,9 @@ def get_job_file_path(stack, parsed_stack, job_name: str): def get_pod_script_paths(parsed_stack, pod_name: str): - pods = parsed_stack["pods"] + pods = parsed_stack.get("pods") or [] result = [] - if not type(pods[0]) is str: + if not pods or not type(pods[0]) is str: for pod in pods: if pod["name"] == pod_name: pod_root_dir = os.path.join( @@ -207,9 +211,9 @@ def get_pod_script_paths(parsed_stack, pod_name: str): def pod_has_scripts(parsed_stack, pod_name: str): - pods = parsed_stack["pods"] + pods = parsed_stack.get("pods") or [] result = False - if type(pods[0]) is str: + if not pods or type(pods[0]) is str: result = False else: for pod in pods: