feat(k8s): add Job support for non-Helm k8s-kind deployments
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>feature/k8s-jobs
parent
589ed3cf69
commit
74deb3f8d6
|
|
@ -35,6 +35,7 @@ from stack_orchestrator.util import (
|
|||
get_dev_root_path,
|
||||
stack_is_in_deployment,
|
||||
resolve_compose_file,
|
||||
get_job_list,
|
||||
)
|
||||
from stack_orchestrator.deploy.deployer import DeployerException
|
||||
from stack_orchestrator.deploy.deployer_factory import getDeployer
|
||||
|
|
@ -130,6 +131,7 @@ def create_deploy_context(
|
|||
compose_files=cluster_context.compose_files,
|
||||
compose_project_name=cluster_context.cluster,
|
||||
compose_env_file=cluster_context.env_file,
|
||||
job_compose_files=cluster_context.job_compose_files,
|
||||
)
|
||||
return DeployCommandContext(stack, cluster_context, deployer)
|
||||
|
||||
|
|
@ -403,7 +405,7 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file):
|
|||
stack_config = get_parsed_stack_config(stack)
|
||||
if stack_config is not None:
|
||||
# TODO: syntax check the input here
|
||||
pods_in_scope = stack_config["pods"]
|
||||
pods_in_scope = stack_config.get("pods") or []
|
||||
cluster_config = (
|
||||
stack_config["config"] if "config" in stack_config else None
|
||||
)
|
||||
|
|
@ -477,6 +479,22 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file):
|
|||
if ctx.verbose:
|
||||
print(f"files: {compose_files}")
|
||||
|
||||
# Gather job compose files (from compose-jobs/ directory in deployment)
|
||||
job_compose_files = []
|
||||
if deployment and stack:
|
||||
stack_config = get_parsed_stack_config(stack)
|
||||
if stack_config:
|
||||
jobs = get_job_list(stack_config)
|
||||
compose_jobs_dir = stack.joinpath("compose-jobs")
|
||||
for job in jobs:
|
||||
job_file_name = os.path.join(
|
||||
compose_jobs_dir, f"docker-compose-{job}.yml"
|
||||
)
|
||||
if os.path.exists(job_file_name):
|
||||
job_compose_files.append(job_file_name)
|
||||
if ctx.verbose:
|
||||
print(f"job files: {job_compose_files}")
|
||||
|
||||
return ClusterContext(
|
||||
ctx,
|
||||
cluster,
|
||||
|
|
@ -485,6 +503,7 @@ def _make_cluster_context(ctx, stack, include, exclude, cluster, env_file):
|
|||
post_start_commands,
|
||||
cluster_config,
|
||||
env_file,
|
||||
job_compose_files=job_compose_files if job_compose_files else None,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ class ClusterContext:
|
|||
post_start_commands: List[str]
|
||||
config: Optional[str]
|
||||
env_file: Optional[str]
|
||||
job_compose_files: Optional[List[str]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
|
|||
|
|
@ -34,7 +34,12 @@ def getDeployerConfigGenerator(type: str, deployment_context):
|
|||
|
||||
|
||||
def getDeployer(
|
||||
type: str, deployment_context, compose_files, compose_project_name, compose_env_file
|
||||
type: str,
|
||||
deployment_context,
|
||||
compose_files,
|
||||
compose_project_name,
|
||||
compose_env_file,
|
||||
job_compose_files=None,
|
||||
):
|
||||
if type == "compose" or type is None:
|
||||
return DockerDeployer(
|
||||
|
|
@ -54,6 +59,7 @@ def getDeployer(
|
|||
compose_files,
|
||||
compose_project_name,
|
||||
compose_env_file,
|
||||
job_compose_files=job_compose_files,
|
||||
)
|
||||
else:
|
||||
print(f"ERROR: deploy-to {type} is not valid")
|
||||
|
|
|
|||
|
|
@ -1017,9 +1017,9 @@ def _write_deployment_files(
|
|||
dirs_exist_ok=True,
|
||||
)
|
||||
|
||||
# Copy the job files into the target dir (for Docker deployments)
|
||||
# Copy the job files into the target dir
|
||||
jobs = get_job_list(parsed_stack)
|
||||
if jobs and not parsed_spec.is_kubernetes_deployment():
|
||||
if jobs:
|
||||
destination_compose_jobs_dir = target_dir.joinpath("compose-jobs")
|
||||
os.makedirs(destination_compose_jobs_dir, exist_ok=True)
|
||||
for job in jobs:
|
||||
|
|
|
|||
|
|
@ -72,13 +72,14 @@ def to_k8s_resource_requirements(resources: Resources) -> client.V1ResourceRequi
|
|||
|
||||
class ClusterInfo:
|
||||
parsed_pod_yaml_map: Any
|
||||
parsed_job_yaml_map: Any
|
||||
image_set: Set[str] = set()
|
||||
app_name: str
|
||||
environment_variables: DeployEnvVars
|
||||
spec: Spec
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
self.parsed_job_yaml_map = {}
|
||||
|
||||
def int(self, pod_files: List[str], compose_env_file, deployment_name, spec: Spec):
|
||||
self.parsed_pod_yaml_map = parsed_pod_files_map_from_file_names(pod_files)
|
||||
|
|
@ -94,6 +95,12 @@ class ClusterInfo:
|
|||
if opts.o.debug:
|
||||
print(f"Env vars: {self.environment_variables.map}")
|
||||
|
||||
def init_jobs(self, job_files: List[str]):
|
||||
"""Initialize parsed job YAML map from job compose files."""
|
||||
self.parsed_job_yaml_map = parsed_pod_files_map_from_file_names(job_files)
|
||||
if opts.o.debug:
|
||||
print(f"Parsed job yaml map: {self.parsed_job_yaml_map}")
|
||||
|
||||
def get_nodeports(self):
|
||||
nodeports = []
|
||||
for pod_name in self.parsed_pod_yaml_map:
|
||||
|
|
@ -424,15 +431,25 @@ class ClusterInfo:
|
|||
# 3. Fall back to spec.yml global (already resolved with DEFAULT fallback)
|
||||
return global_resources
|
||||
|
||||
# TODO: put things like image pull policy into an object-scope struct
|
||||
def get_deployment(self, image_pull_policy: Optional[str] = None):
|
||||
def _build_containers(
|
||||
self,
|
||||
parsed_yaml_map: Any,
|
||||
image_pull_policy: Optional[str] = None,
|
||||
) -> tuple:
|
||||
"""Build k8s container specs from parsed compose YAML.
|
||||
|
||||
Returns a tuple of (containers, services, volumes) where:
|
||||
- containers: list of V1Container objects
|
||||
- services: the last services dict processed (used for annotations/labels)
|
||||
- volumes: list of V1Volume objects
|
||||
"""
|
||||
containers = []
|
||||
services = {}
|
||||
global_resources = self.spec.get_container_resources()
|
||||
if not global_resources:
|
||||
global_resources = DEFAULT_CONTAINER_RESOURCES
|
||||
for pod_name in self.parsed_pod_yaml_map:
|
||||
pod = self.parsed_pod_yaml_map[pod_name]
|
||||
for pod_name in parsed_yaml_map:
|
||||
pod = parsed_yaml_map[pod_name]
|
||||
services = pod["services"]
|
||||
for service_name in services:
|
||||
container_name = service_name
|
||||
|
|
@ -489,7 +506,7 @@ class ClusterInfo:
|
|||
else image
|
||||
)
|
||||
volume_mounts = volume_mounts_for_service(
|
||||
self.parsed_pod_yaml_map, service_name
|
||||
parsed_yaml_map, service_name
|
||||
)
|
||||
# Handle command/entrypoint from compose file
|
||||
# In docker-compose: entrypoint -> k8s command, command -> k8s args
|
||||
|
|
@ -548,7 +565,14 @@ class ClusterInfo:
|
|||
)
|
||||
containers.append(container)
|
||||
volumes = volumes_for_pod_files(
|
||||
self.parsed_pod_yaml_map, self.spec, self.app_name
|
||||
parsed_yaml_map, self.spec, self.app_name
|
||||
)
|
||||
return containers, services, volumes
|
||||
|
||||
# TODO: put things like image pull policy into an object-scope struct
|
||||
def get_deployment(self, image_pull_policy: Optional[str] = None):
|
||||
containers, services, volumes = self._build_containers(
|
||||
self.parsed_pod_yaml_map, image_pull_policy
|
||||
)
|
||||
registry_config = self.spec.get_image_registry_config()
|
||||
if registry_config:
|
||||
|
|
@ -638,3 +662,70 @@ class ClusterInfo:
|
|||
spec=spec,
|
||||
)
|
||||
return deployment
|
||||
|
||||
def get_jobs(self, image_pull_policy: Optional[str] = None) -> List[client.V1Job]:
|
||||
"""Build k8s Job objects from parsed job compose files.
|
||||
|
||||
Each job compose file produces a V1Job with:
|
||||
- restartPolicy: Never
|
||||
- backoffLimit: 0
|
||||
- Name: {app_name}-job-{job_name}
|
||||
"""
|
||||
if not self.parsed_job_yaml_map:
|
||||
return []
|
||||
|
||||
jobs = []
|
||||
registry_config = self.spec.get_image_registry_config()
|
||||
if registry_config:
|
||||
secret_name = f"{self.app_name}-registry"
|
||||
image_pull_secrets = [client.V1LocalObjectReference(name=secret_name)]
|
||||
else:
|
||||
image_pull_secrets = []
|
||||
|
||||
for job_file in self.parsed_job_yaml_map:
|
||||
# Build containers for this single job file
|
||||
single_job_map = {job_file: self.parsed_job_yaml_map[job_file]}
|
||||
containers, _services, volumes = self._build_containers(
|
||||
single_job_map, image_pull_policy
|
||||
)
|
||||
|
||||
# Derive job name from file path: docker-compose-<name>.yml -> <name>
|
||||
import os
|
||||
|
||||
base = os.path.basename(job_file)
|
||||
# Strip docker-compose- prefix and .yml suffix
|
||||
job_name = base
|
||||
if job_name.startswith("docker-compose-"):
|
||||
job_name = job_name[len("docker-compose-"):]
|
||||
if job_name.endswith(".yml"):
|
||||
job_name = job_name[: -len(".yml")]
|
||||
elif job_name.endswith(".yaml"):
|
||||
job_name = job_name[: -len(".yaml")]
|
||||
|
||||
template = client.V1PodTemplateSpec(
|
||||
metadata=client.V1ObjectMeta(
|
||||
labels={"app": self.app_name, "job-name": job_name}
|
||||
),
|
||||
spec=client.V1PodSpec(
|
||||
containers=containers,
|
||||
image_pull_secrets=image_pull_secrets,
|
||||
volumes=volumes,
|
||||
restart_policy="Never",
|
||||
),
|
||||
)
|
||||
job_spec = client.V1JobSpec(
|
||||
template=template,
|
||||
backoff_limit=0,
|
||||
)
|
||||
job = client.V1Job(
|
||||
api_version="batch/v1",
|
||||
kind="Job",
|
||||
metadata=client.V1ObjectMeta(
|
||||
name=f"{self.app_name}-job-{job_name}",
|
||||
labels={"app": self.app_name},
|
||||
),
|
||||
spec=job_spec,
|
||||
)
|
||||
jobs.append(job)
|
||||
|
||||
return jobs
|
||||
|
|
|
|||
|
|
@ -95,6 +95,7 @@ class K8sDeployer(Deployer):
|
|||
type: str
|
||||
core_api: client.CoreV1Api
|
||||
apps_api: client.AppsV1Api
|
||||
batch_api: client.BatchV1Api
|
||||
networking_api: client.NetworkingV1Api
|
||||
k8s_namespace: str
|
||||
kind_cluster_name: str
|
||||
|
|
@ -110,6 +111,7 @@ class K8sDeployer(Deployer):
|
|||
compose_files,
|
||||
compose_project_name,
|
||||
compose_env_file,
|
||||
job_compose_files=None,
|
||||
) -> None:
|
||||
self.type = type
|
||||
self.skip_cluster_management = False
|
||||
|
|
@ -130,9 +132,13 @@ class K8sDeployer(Deployer):
|
|||
compose_project_name,
|
||||
deployment_context.spec,
|
||||
)
|
||||
# Initialize job compose files if provided
|
||||
if job_compose_files:
|
||||
self.cluster_info.init_jobs(job_compose_files)
|
||||
if opts.o.debug:
|
||||
print(f"Deployment dir: {deployment_context.deployment_dir}")
|
||||
print(f"Compose files: {compose_files}")
|
||||
print(f"Job compose files: {job_compose_files}")
|
||||
print(f"Project name: {compose_project_name}")
|
||||
print(f"Env file: {compose_env_file}")
|
||||
print(f"Type: {type}")
|
||||
|
|
@ -150,6 +156,7 @@ class K8sDeployer(Deployer):
|
|||
self.core_api = client.CoreV1Api()
|
||||
self.networking_api = client.NetworkingV1Api()
|
||||
self.apps_api = client.AppsV1Api()
|
||||
self.batch_api = client.BatchV1Api()
|
||||
self.custom_obj_api = client.CustomObjectsApi()
|
||||
|
||||
def _ensure_namespace(self):
|
||||
|
|
@ -293,6 +300,26 @@ class K8sDeployer(Deployer):
|
|||
print("Service created:")
|
||||
print(f"{service_resp}")
|
||||
|
||||
def _create_jobs(self):
|
||||
# Process job compose files into k8s Jobs
|
||||
jobs = self.cluster_info.get_jobs(
|
||||
image_pull_policy=None if self.is_kind() else "Always"
|
||||
)
|
||||
for job in jobs:
|
||||
if opts.o.debug:
|
||||
print(f"Sending this job: {job}")
|
||||
if not opts.o.dry_run:
|
||||
job_resp = self.batch_api.create_namespaced_job(
|
||||
body=job, namespace=self.k8s_namespace
|
||||
)
|
||||
if opts.o.debug:
|
||||
print("Job created:")
|
||||
if job_resp.metadata:
|
||||
print(
|
||||
f" {job_resp.metadata.namespace} "
|
||||
f"{job_resp.metadata.name}"
|
||||
)
|
||||
|
||||
def _find_certificate_for_host_name(self, host_name):
|
||||
all_certificates = self.custom_obj_api.list_namespaced_custom_object(
|
||||
group="cert-manager.io",
|
||||
|
|
@ -384,6 +411,7 @@ class K8sDeployer(Deployer):
|
|||
|
||||
self._create_volume_data()
|
||||
self._create_deployment()
|
||||
self._create_jobs()
|
||||
|
||||
http_proxy_info = self.cluster_info.spec.get_http_proxy()
|
||||
# Note: we don't support tls for kind (enabling tls causes errors)
|
||||
|
|
@ -659,26 +687,43 @@ class K8sDeployer(Deployer):
|
|||
|
||||
def run_job(self, job_name: str, helm_release: Optional[str] = None):
|
||||
if not opts.o.dry_run:
|
||||
from stack_orchestrator.deploy.k8s.helm.job_runner import run_helm_job
|
||||
|
||||
# Check if this is a helm-based deployment
|
||||
chart_dir = self.deployment_dir / "chart"
|
||||
if not chart_dir.exists():
|
||||
# TODO: Implement job support for compose-based K8s deployments
|
||||
raise Exception(
|
||||
f"Job support is only available for helm-based "
|
||||
f"deployments. Chart directory not found: {chart_dir}"
|
||||
)
|
||||
if chart_dir.exists():
|
||||
from stack_orchestrator.deploy.k8s.helm.job_runner import run_helm_job
|
||||
|
||||
# Run the job using the helm job runner
|
||||
run_helm_job(
|
||||
chart_dir=chart_dir,
|
||||
job_name=job_name,
|
||||
release=helm_release,
|
||||
namespace=self.k8s_namespace,
|
||||
timeout=600,
|
||||
verbose=opts.o.verbose,
|
||||
)
|
||||
# Run the job using the helm job runner
|
||||
run_helm_job(
|
||||
chart_dir=chart_dir,
|
||||
job_name=job_name,
|
||||
release=helm_release,
|
||||
namespace=self.k8s_namespace,
|
||||
timeout=600,
|
||||
verbose=opts.o.verbose,
|
||||
)
|
||||
else:
|
||||
# Non-Helm path: create job from ClusterInfo
|
||||
self.connect_api()
|
||||
jobs = self.cluster_info.get_jobs(
|
||||
image_pull_policy=None if self.is_kind() else "Always"
|
||||
)
|
||||
# Find the matching job by name
|
||||
target_name = f"{self.cluster_info.app_name}-job-{job_name}"
|
||||
matched_job = None
|
||||
for job in jobs:
|
||||
if job.metadata and job.metadata.name == target_name:
|
||||
matched_job = job
|
||||
break
|
||||
if matched_job is None:
|
||||
raise Exception(
|
||||
f"Job '{job_name}' not found. Available jobs: "
|
||||
f"{[j.metadata.name for j in jobs if j.metadata]}"
|
||||
)
|
||||
if opts.o.debug:
|
||||
print(f"Creating job: {target_name}")
|
||||
self.batch_api.create_namespaced_job(
|
||||
body=matched_job, namespace=self.k8s_namespace
|
||||
)
|
||||
|
||||
def is_kind(self):
|
||||
return self.type == "k8s-kind"
|
||||
|
|
|
|||
|
|
@ -75,6 +75,8 @@ def get_parsed_stack_config(stack):
|
|||
|
||||
def get_pod_list(parsed_stack):
|
||||
# Handle both old and new format
|
||||
if "pods" not in parsed_stack or not parsed_stack["pods"]:
|
||||
return []
|
||||
pods = parsed_stack["pods"]
|
||||
if type(pods[0]) is str:
|
||||
result = pods
|
||||
|
|
@ -103,7 +105,7 @@ def get_job_list(parsed_stack):
|
|||
|
||||
def get_plugin_code_paths(stack) -> List[Path]:
|
||||
parsed_stack = get_parsed_stack_config(stack)
|
||||
pods = parsed_stack["pods"]
|
||||
pods = parsed_stack.get("pods") or []
|
||||
result: Set[Path] = set()
|
||||
for pod in pods:
|
||||
if type(pod) is str:
|
||||
|
|
@ -160,8 +162,10 @@ def resolve_job_compose_file(stack, job_name: str):
|
|||
|
||||
|
||||
def get_pod_file_path(stack, parsed_stack, pod_name: str):
|
||||
pods = parsed_stack["pods"]
|
||||
pods = parsed_stack.get("pods") or []
|
||||
result = None
|
||||
if not pods:
|
||||
return result
|
||||
if type(pods[0]) is str:
|
||||
result = resolve_compose_file(stack, pod_name)
|
||||
else:
|
||||
|
|
@ -189,9 +193,9 @@ def get_job_file_path(stack, parsed_stack, job_name: str):
|
|||
|
||||
|
||||
def get_pod_script_paths(parsed_stack, pod_name: str):
|
||||
pods = parsed_stack["pods"]
|
||||
pods = parsed_stack.get("pods") or []
|
||||
result = []
|
||||
if not type(pods[0]) is str:
|
||||
if not pods or not type(pods[0]) is str:
|
||||
for pod in pods:
|
||||
if pod["name"] == pod_name:
|
||||
pod_root_dir = os.path.join(
|
||||
|
|
@ -207,9 +211,9 @@ def get_pod_script_paths(parsed_stack, pod_name: str):
|
|||
|
||||
|
||||
def pod_has_scripts(parsed_stack, pod_name: str):
|
||||
pods = parsed_stack["pods"]
|
||||
pods = parsed_stack.get("pods") or []
|
||||
result = False
|
||||
if type(pods[0]) is str:
|
||||
if not pods or type(pods[0]) is str:
|
||||
result = False
|
||||
else:
|
||||
for pod in pods:
|
||||
|
|
|
|||
Loading…
Reference in New Issue