feat(k8s): add Job support for non-Helm k8s-kind deployments

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
feature/k8s-jobs
Prathamesh Musale 2026-03-03 13:42:04 +00:00
parent be70df3a12
commit f997378c04
7 changed files with 200 additions and 34 deletions

View File

@ -35,6 +35,7 @@ from stack_orchestrator.util import (
get_dev_root_path, get_dev_root_path,
stack_is_in_deployment, stack_is_in_deployment,
resolve_compose_file, resolve_compose_file,
get_job_list,
) )
from stack_orchestrator.deploy.deployer import DeployerException from stack_orchestrator.deploy.deployer import DeployerException
from stack_orchestrator.deploy.deployer_factory import getDeployer from stack_orchestrator.deploy.deployer_factory import getDeployer
@ -130,6 +131,7 @@ def create_deploy_context(
compose_files=cluster_context.compose_files, compose_files=cluster_context.compose_files,
compose_project_name=cluster_context.cluster, compose_project_name=cluster_context.cluster,
compose_env_file=cluster_context.env_file, compose_env_file=cluster_context.env_file,
job_compose_files=cluster_context.job_compose_files,
) )
return DeployCommandContext(stack, cluster_context, deployer) return DeployCommandContext(stack, cluster_context, deployer)
@ -403,7 +405,7 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file):
stack_config = get_parsed_stack_config(stack) stack_config = get_parsed_stack_config(stack)
if stack_config is not None: if stack_config is not None:
# TODO: syntax check the input here # TODO: syntax check the input here
pods_in_scope = stack_config["pods"] pods_in_scope = stack_config.get("pods") or []
cluster_config = ( cluster_config = (
stack_config["config"] if "config" in stack_config else None stack_config["config"] if "config" in stack_config else None
) )
@ -477,6 +479,22 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file):
if ctx.verbose: if ctx.verbose:
print(f"files: {compose_files}") print(f"files: {compose_files}")
# Gather job compose files (from compose-jobs/ directory in deployment)
job_compose_files = []
if deployment and stack:
stack_config = get_parsed_stack_config(stack)
if stack_config:
jobs = get_job_list(stack_config)
compose_jobs_dir = stack.joinpath("compose-jobs")
for job in jobs:
job_file_name = os.path.join(
compose_jobs_dir, f"docker-compose-{job}.yml"
)
if os.path.exists(job_file_name):
job_compose_files.append(job_file_name)
if ctx.verbose:
print(f"job files: {job_compose_files}")
return ClusterContext( return ClusterContext(
ctx, ctx,
cluster, cluster,
@ -485,6 +503,7 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file):
post_start_commands, post_start_commands,
cluster_config, cluster_config,
env_file, env_file,
job_compose_files=job_compose_files if job_compose_files else None,
) )

View File

@ -29,6 +29,7 @@ class ClusterContext:
post_start_commands: List[str] post_start_commands: List[str]
config: Optional[str] config: Optional[str]
env_file: Optional[str] env_file: Optional[str]
job_compose_files: Optional[List[str]] = None
@dataclass @dataclass

View File

@ -34,7 +34,12 @@ def getDeployerConfigGenerator(type: str, deployment_context):
def getDeployer( def getDeployer(
type: str, deployment_context, compose_files, compose_project_name, compose_env_file type: str,
deployment_context,
compose_files,
compose_project_name,
compose_env_file,
job_compose_files=None,
): ):
if type == "compose" or type is None: if type == "compose" or type is None:
return DockerDeployer( return DockerDeployer(
@ -54,6 +59,7 @@ def getDeployer(
compose_files, compose_files,
compose_project_name, compose_project_name,
compose_env_file, compose_env_file,
job_compose_files=job_compose_files,
) )
else: else:
print(f"ERROR: deploy-to {type} is not valid") print(f"ERROR: deploy-to {type} is not valid")

View File

@ -1017,9 +1017,9 @@ def _write_deployment_files(
dirs_exist_ok=True, dirs_exist_ok=True,
) )
# Copy the job files into the target dir (for Docker deployments) # Copy the job files into the target dir
jobs = get_job_list(parsed_stack) jobs = get_job_list(parsed_stack)
if jobs and not parsed_spec.is_kubernetes_deployment(): if jobs:
destination_compose_jobs_dir = target_dir.joinpath("compose-jobs") destination_compose_jobs_dir = target_dir.joinpath("compose-jobs")
os.makedirs(destination_compose_jobs_dir, exist_ok=True) os.makedirs(destination_compose_jobs_dir, exist_ok=True)
for job in jobs: for job in jobs:

View File

@ -72,13 +72,14 @@ def to_k8s_resource_requirements(resources: Resources) -> client.V1ResourceRequi
class ClusterInfo: class ClusterInfo:
parsed_pod_yaml_map: Any parsed_pod_yaml_map: Any
parsed_job_yaml_map: Any
image_set: Set[str] = set() image_set: Set[str] = set()
app_name: str app_name: str
environment_variables: DeployEnvVars environment_variables: DeployEnvVars
spec: Spec spec: Spec
def __init__(self) -> None: def __init__(self) -> None:
pass self.parsed_job_yaml_map = {}
def int(self, pod_files: List[str], compose_env_file, deployment_name, spec: Spec): def int(self, pod_files: List[str], compose_env_file, deployment_name, spec: Spec):
self.parsed_pod_yaml_map = parsed_pod_files_map_from_file_names(pod_files) self.parsed_pod_yaml_map = parsed_pod_files_map_from_file_names(pod_files)
@ -94,6 +95,12 @@ class ClusterInfo:
if opts.o.debug: if opts.o.debug:
print(f"Env vars: {self.environment_variables.map}") print(f"Env vars: {self.environment_variables.map}")
def init_jobs(self, job_files: List[str]):
"""Initialize parsed job YAML map from job compose files."""
self.parsed_job_yaml_map = parsed_pod_files_map_from_file_names(job_files)
if opts.o.debug:
print(f"Parsed job yaml map: {self.parsed_job_yaml_map}")
def get_nodeports(self): def get_nodeports(self):
nodeports = [] nodeports = []
for pod_name in self.parsed_pod_yaml_map: for pod_name in self.parsed_pod_yaml_map:
@ -394,15 +401,25 @@ class ClusterInfo:
result.append(pv) result.append(pv)
return result return result
# TODO: put things like image pull policy into an object-scope struct def _build_containers(
def get_deployment(self, image_pull_policy: Optional[str] = None): self,
parsed_yaml_map: Any,
image_pull_policy: Optional[str] = None,
) -> tuple:
"""Build k8s container specs from parsed compose YAML.
Returns a tuple of (containers, services, volumes) where:
- containers: list of V1Container objects
- services: the last services dict processed (used for annotations/labels)
- volumes: list of V1Volume objects
"""
containers = [] containers = []
services = {} services = {}
resources = self.spec.get_container_resources() resources = self.spec.get_container_resources()
if not resources: if not resources:
resources = DEFAULT_CONTAINER_RESOURCES resources = DEFAULT_CONTAINER_RESOURCES
for pod_name in self.parsed_pod_yaml_map: for pod_name in parsed_yaml_map:
pod = self.parsed_pod_yaml_map[pod_name] pod = parsed_yaml_map[pod_name]
services = pod["services"] services = pod["services"]
for service_name in services: for service_name in services:
container_name = service_name container_name = service_name
@ -459,7 +476,7 @@ class ClusterInfo:
else image else image
) )
volume_mounts = volume_mounts_for_service( volume_mounts = volume_mounts_for_service(
self.parsed_pod_yaml_map, service_name parsed_yaml_map, service_name
) )
# Handle command/entrypoint from compose file # Handle command/entrypoint from compose file
# In docker-compose: entrypoint -> k8s command, command -> k8s args # In docker-compose: entrypoint -> k8s command, command -> k8s args
@ -515,7 +532,14 @@ class ClusterInfo:
) )
containers.append(container) containers.append(container)
volumes = volumes_for_pod_files( volumes = volumes_for_pod_files(
self.parsed_pod_yaml_map, self.spec, self.app_name parsed_yaml_map, self.spec, self.app_name
)
return containers, services, volumes
# TODO: put things like image pull policy into an object-scope struct
def get_deployment(self, image_pull_policy: Optional[str] = None):
containers, services, volumes = self._build_containers(
self.parsed_pod_yaml_map, image_pull_policy
) )
registry_config = self.spec.get_image_registry_config() registry_config = self.spec.get_image_registry_config()
if registry_config: if registry_config:
@ -602,3 +626,70 @@ class ClusterInfo:
spec=spec, spec=spec,
) )
return deployment return deployment
def get_jobs(self, image_pull_policy: Optional[str] = None) -> List[client.V1Job]:
"""Build k8s Job objects from parsed job compose files.
Each job compose file produces a V1Job with:
- restartPolicy: Never
- backoffLimit: 0
- Name: {app_name}-job-{job_name}
"""
if not self.parsed_job_yaml_map:
return []
jobs = []
registry_config = self.spec.get_image_registry_config()
if registry_config:
secret_name = f"{self.app_name}-registry"
image_pull_secrets = [client.V1LocalObjectReference(name=secret_name)]
else:
image_pull_secrets = []
for job_file in self.parsed_job_yaml_map:
# Build containers for this single job file
single_job_map = {job_file: self.parsed_job_yaml_map[job_file]}
containers, _services, volumes = self._build_containers(
single_job_map, image_pull_policy
)
# Derive job name from file path: docker-compose-<name>.yml -> <name>
import os
base = os.path.basename(job_file)
# Strip docker-compose- prefix and .yml suffix
job_name = base
if job_name.startswith("docker-compose-"):
job_name = job_name[len("docker-compose-"):]
if job_name.endswith(".yml"):
job_name = job_name[: -len(".yml")]
elif job_name.endswith(".yaml"):
job_name = job_name[: -len(".yaml")]
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": self.app_name, "job-name": job_name}
),
spec=client.V1PodSpec(
containers=containers,
image_pull_secrets=image_pull_secrets,
volumes=volumes,
restart_policy="Never",
),
)
job_spec = client.V1JobSpec(
template=template,
backoff_limit=0,
)
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(
name=f"{self.app_name}-job-{job_name}",
labels={"app": self.app_name},
),
spec=job_spec,
)
jobs.append(job)
return jobs

View File

@ -95,6 +95,7 @@ class K8sDeployer(Deployer):
type: str type: str
core_api: client.CoreV1Api core_api: client.CoreV1Api
apps_api: client.AppsV1Api apps_api: client.AppsV1Api
batch_api: client.BatchV1Api
networking_api: client.NetworkingV1Api networking_api: client.NetworkingV1Api
k8s_namespace: str k8s_namespace: str
kind_cluster_name: str kind_cluster_name: str
@ -110,6 +111,7 @@ class K8sDeployer(Deployer):
compose_files, compose_files,
compose_project_name, compose_project_name,
compose_env_file, compose_env_file,
job_compose_files=None,
) -> None: ) -> None:
self.type = type self.type = type
self.skip_cluster_management = False self.skip_cluster_management = False
@ -130,9 +132,13 @@ class K8sDeployer(Deployer):
compose_project_name, compose_project_name,
deployment_context.spec, deployment_context.spec,
) )
# Initialize job compose files if provided
if job_compose_files:
self.cluster_info.init_jobs(job_compose_files)
if opts.o.debug: if opts.o.debug:
print(f"Deployment dir: {deployment_context.deployment_dir}") print(f"Deployment dir: {deployment_context.deployment_dir}")
print(f"Compose files: {compose_files}") print(f"Compose files: {compose_files}")
print(f"Job compose files: {job_compose_files}")
print(f"Project name: {compose_project_name}") print(f"Project name: {compose_project_name}")
print(f"Env file: {compose_env_file}") print(f"Env file: {compose_env_file}")
print(f"Type: {type}") print(f"Type: {type}")
@ -150,6 +156,7 @@ class K8sDeployer(Deployer):
self.core_api = client.CoreV1Api() self.core_api = client.CoreV1Api()
self.networking_api = client.NetworkingV1Api() self.networking_api = client.NetworkingV1Api()
self.apps_api = client.AppsV1Api() self.apps_api = client.AppsV1Api()
self.batch_api = client.BatchV1Api()
self.custom_obj_api = client.CustomObjectsApi() self.custom_obj_api = client.CustomObjectsApi()
def _ensure_namespace(self): def _ensure_namespace(self):
@ -293,6 +300,26 @@ class K8sDeployer(Deployer):
print("Service created:") print("Service created:")
print(f"{service_resp}") print(f"{service_resp}")
def _create_jobs(self):
# Process job compose files into k8s Jobs
jobs = self.cluster_info.get_jobs(
image_pull_policy=None if self.is_kind() else "Always"
)
for job in jobs:
if opts.o.debug:
print(f"Sending this job: {job}")
if not opts.o.dry_run:
job_resp = self.batch_api.create_namespaced_job(
body=job, namespace=self.k8s_namespace
)
if opts.o.debug:
print("Job created:")
if job_resp.metadata:
print(
f" {job_resp.metadata.namespace} "
f"{job_resp.metadata.name}"
)
def _find_certificate_for_host_name(self, host_name): def _find_certificate_for_host_name(self, host_name):
all_certificates = self.custom_obj_api.list_namespaced_custom_object( all_certificates = self.custom_obj_api.list_namespaced_custom_object(
group="cert-manager.io", group="cert-manager.io",
@ -384,6 +411,7 @@ class K8sDeployer(Deployer):
self._create_volume_data() self._create_volume_data()
self._create_deployment() self._create_deployment()
self._create_jobs()
http_proxy_info = self.cluster_info.spec.get_http_proxy() http_proxy_info = self.cluster_info.spec.get_http_proxy()
# Note: we don't support tls for kind (enabling tls causes errors) # Note: we don't support tls for kind (enabling tls causes errors)
@ -659,26 +687,43 @@ class K8sDeployer(Deployer):
def run_job(self, job_name: str, helm_release: Optional[str] = None): def run_job(self, job_name: str, helm_release: Optional[str] = None):
if not opts.o.dry_run: if not opts.o.dry_run:
from stack_orchestrator.deploy.k8s.helm.job_runner import run_helm_job
# Check if this is a helm-based deployment # Check if this is a helm-based deployment
chart_dir = self.deployment_dir / "chart" chart_dir = self.deployment_dir / "chart"
if not chart_dir.exists(): if chart_dir.exists():
# TODO: Implement job support for compose-based K8s deployments from stack_orchestrator.deploy.k8s.helm.job_runner import run_helm_job
raise Exception(
f"Job support is only available for helm-based "
f"deployments. Chart directory not found: {chart_dir}"
)
# Run the job using the helm job runner # Run the job using the helm job runner
run_helm_job( run_helm_job(
chart_dir=chart_dir, chart_dir=chart_dir,
job_name=job_name, job_name=job_name,
release=helm_release, release=helm_release,
namespace=self.k8s_namespace, namespace=self.k8s_namespace,
timeout=600, timeout=600,
verbose=opts.o.verbose, verbose=opts.o.verbose,
) )
else:
# Non-Helm path: create job from ClusterInfo
self.connect_api()
jobs = self.cluster_info.get_jobs(
image_pull_policy=None if self.is_kind() else "Always"
)
# Find the matching job by name
target_name = f"{self.cluster_info.app_name}-job-{job_name}"
matched_job = None
for job in jobs:
if job.metadata and job.metadata.name == target_name:
matched_job = job
break
if matched_job is None:
raise Exception(
f"Job '{job_name}' not found. Available jobs: "
f"{[j.metadata.name for j in jobs if j.metadata]}"
)
if opts.o.debug:
print(f"Creating job: {target_name}")
self.batch_api.create_namespaced_job(
body=matched_job, namespace=self.k8s_namespace
)
def is_kind(self): def is_kind(self):
return self.type == "k8s-kind" return self.type == "k8s-kind"

View File

@ -75,6 +75,8 @@ def get_parsed_stack_config(stack):
def get_pod_list(parsed_stack): def get_pod_list(parsed_stack):
# Handle both old and new format # Handle both old and new format
if "pods" not in parsed_stack or not parsed_stack["pods"]:
return []
pods = parsed_stack["pods"] pods = parsed_stack["pods"]
if type(pods[0]) is str: if type(pods[0]) is str:
result = pods result = pods
@ -103,7 +105,7 @@ def get_job_list(parsed_stack):
def get_plugin_code_paths(stack) -> List[Path]: def get_plugin_code_paths(stack) -> List[Path]:
parsed_stack = get_parsed_stack_config(stack) parsed_stack = get_parsed_stack_config(stack)
pods = parsed_stack["pods"] pods = parsed_stack.get("pods") or []
result: Set[Path] = set() result: Set[Path] = set()
for pod in pods: for pod in pods:
if type(pod) is str: if type(pod) is str:
@ -160,8 +162,10 @@ def resolve_job_compose_file(stack, job_name: str):
def get_pod_file_path(stack, parsed_stack, pod_name: str): def get_pod_file_path(stack, parsed_stack, pod_name: str):
pods = parsed_stack["pods"] pods = parsed_stack.get("pods") or []
result = None result = None
if not pods:
return result
if type(pods[0]) is str: if type(pods[0]) is str:
result = resolve_compose_file(stack, pod_name) result = resolve_compose_file(stack, pod_name)
else: else:
@ -189,9 +193,9 @@ def get_job_file_path(stack, parsed_stack, job_name: str):
def get_pod_script_paths(parsed_stack, pod_name: str): def get_pod_script_paths(parsed_stack, pod_name: str):
pods = parsed_stack["pods"] pods = parsed_stack.get("pods") or []
result = [] result = []
if not type(pods[0]) is str: if not pods or not type(pods[0]) is str:
for pod in pods: for pod in pods:
if pod["name"] == pod_name: if pod["name"] == pod_name:
pod_root_dir = os.path.join( pod_root_dir = os.path.join(
@ -207,9 +211,9 @@ def get_pod_script_paths(parsed_stack, pod_name: str):
def pod_has_scripts(parsed_stack, pod_name: str): def pod_has_scripts(parsed_stack, pod_name: str):
pods = parsed_stack["pods"] pods = parsed_stack.get("pods") or []
result = False result = False
if type(pods[0]) is str: if not pods or type(pods[0]) is str:
result = False result = False
else: else:
for pod in pods: for pod in pods: