Add support for running jobs from a stack (#975)
Lint Checks / Run linter (push) Failing after 5s Details

Part of https://plan.wireit.in/deepstack/browse/VUL-265/

Reviewed-on: https://git.vdb.to/cerc-io/stack-orchestrator/pulls/975
Co-authored-by: Prathamesh Musale <prathamesh.musale0@gmail.com>
Co-committed-by: Prathamesh Musale <prathamesh.musale0@gmail.com>
helm-charts-support
Prathamesh Musale 2025-12-04 06:13:28 +00:00 committed by Prathamesh Musale
parent 7acabb0743
commit 8afae1904b
9 changed files with 416 additions and 57 deletions

View File

@ -94,6 +94,40 @@ class DockerDeployer(Deployer):
except DockerException as e:
raise DeployerException(e)
def run_job(self, job_name: str, release_name: str = None):
# release_name is ignored for Docker deployments (only used for K8s/Helm)
if not opts.o.dry_run:
try:
# Find job compose file in compose-jobs directory
# The deployment should have compose-jobs/docker-compose-<job_name>.yml
if not self.docker.compose_files:
raise DeployerException("No compose files configured")
# Deployment directory is parent of compose directory
compose_dir = Path(self.docker.compose_files[0]).parent
deployment_dir = compose_dir.parent
job_compose_file = deployment_dir / "compose-jobs" / f"docker-compose-{job_name}.yml"
if not job_compose_file.exists():
raise DeployerException(f"Job compose file not found: {job_compose_file}")
if opts.o.verbose:
print(f"Running job from: {job_compose_file}")
# Create a DockerClient for the job compose file with same project name and env file
# This allows the job to access volumes from the main deployment
job_docker = DockerClient(
compose_files=[job_compose_file],
compose_project_name=self.docker.compose_project_name,
compose_env_file=self.docker.compose_env_file
)
# Run the job with --rm flag to remove container after completion
return job_docker.compose.run(service=job_name, remove=True, tty=True)
except DockerException as e:
raise DeployerException(e)
class DockerDeployerConfigGenerator(DeployerConfigGenerator):

View File

@ -84,7 +84,22 @@ def create_deploy_context(
# Extract the cluster name from the deployment, if we have one
if deployment_context and cluster is None:
cluster = deployment_context.get_cluster_id()
cluster_context = _make_cluster_context(global_context, stack, include, exclude, cluster, env_file)
# Check if this is a helm chart deployment (has chart/ but no compose/)
# TODO: Add a new deployment type for helm chart deployments
# To avoid relying on chart existence in such cases
is_helm_chart_deployment = False
if deployment_context:
chart_dir = deployment_context.deployment_dir / "chart"
compose_dir = deployment_context.deployment_dir / "compose"
is_helm_chart_deployment = chart_dir.exists() and not compose_dir.exists()
# For helm chart deployments, skip compose file loading
if is_helm_chart_deployment:
cluster_context = ClusterContext(global_context, cluster, [], [], [], None, env_file)
else:
cluster_context = _make_cluster_context(global_context, stack, include, exclude, cluster, env_file)
deployer = getDeployer(deploy_to, deployment_context, compose_files=cluster_context.compose_files,
compose_project_name=cluster_context.cluster,
compose_env_file=cluster_context.env_file)
@ -188,6 +203,17 @@ def logs_operation(ctx, tail: int, follow: bool, extra_args: str):
print(stream_content.decode("utf-8"), end="")
def run_job_operation(ctx, job_name: str, helm_release: str = None):
global_context = ctx.parent.parent.obj
if not global_context.dry_run:
print(f"Running job: {job_name}")
try:
ctx.obj.deployer.run_job(job_name, helm_release)
except Exception as e:
print(f"Error running job {job_name}: {e}")
sys.exit(1)
@command.command()
@click.argument('extra_args', nargs=-1) # help: command: up <service1> <service2>
@click.pass_context

View File

@ -55,6 +55,10 @@ class Deployer(ABC):
def run(self, image: str, command=None, user=None, volumes=None, entrypoint=None, env={}, ports=[], detach=False):
pass
@abstractmethod
def run_job(self, job_name: str, release_name: str = None):
pass
class DeployerException(Exception):
def __init__(self, *args: object) -> None:

View File

@ -167,3 +167,14 @@ def status(ctx):
def update(ctx):
ctx.obj = make_deploy_context(ctx)
update_operation(ctx)
@command.command()
@click.argument('job_name')
@click.option('--helm-release', help='Helm release name (only for k8s helm chart deployments, defaults to chart name)')
@click.pass_context
def run_job(ctx, job_name, helm_release):
'''run a one-time job from the stack'''
from stack_orchestrator.deploy.deploy import run_job_operation
ctx.obj = make_deploy_context(ctx)
run_job_operation(ctx, job_name, helm_release)

View File

@ -27,7 +27,7 @@ from stack_orchestrator.opts import opts
from stack_orchestrator.util import (get_stack_path, get_parsed_deployment_spec, get_parsed_stack_config,
global_options, get_yaml, get_pod_list, get_pod_file_path, pod_has_scripts,
get_pod_script_paths, get_plugin_code_paths, error_exit, env_var_map_from_file,
resolve_config_dir)
resolve_config_dir, get_job_list, get_job_file_path)
from stack_orchestrator.deploy.spec import Spec
from stack_orchestrator.deploy.deploy_types import LaconicStackSetupCommand
from stack_orchestrator.deploy.deployer_factory import getDeployerConfigGenerator
@ -461,13 +461,6 @@ def create_operation(deployment_command_context, spec_file, deployment_dir, helm
stack_name = parsed_spec["stack"]
deployment_type = parsed_spec[constants.deploy_to_key]
# Branch to Helm chart generation flow early if --helm-chart flag is set
if deployment_type == "k8s" and helm_chart:
from stack_orchestrator.deploy.k8s.helm.chart_generator import generate_helm_chart
generate_helm_chart(stack_name, spec_file, deployment_dir)
return # Exit early, completely separate from existing k8s deployment flow
# Existing deployment flow continues unchanged
stack_file = get_stack_path(stack_name).joinpath(constants.stack_file_name)
parsed_stack = get_parsed_stack_config(stack_name)
if opts.o.debug:
@ -482,7 +475,17 @@ def create_operation(deployment_command_context, spec_file, deployment_dir, helm
# Copy spec file and the stack file into the deployment dir
copyfile(spec_file, deployment_dir_path.joinpath(constants.spec_file_name))
copyfile(stack_file, deployment_dir_path.joinpath(constants.stack_file_name))
# Create deployment.yml with cluster-id
_create_deployment_file(deployment_dir_path)
# Branch to Helm chart generation flow if --helm-chart flag is set
if deployment_type == "k8s" and helm_chart:
from stack_orchestrator.deploy.k8s.helm.chart_generator import generate_helm_chart
generate_helm_chart(stack_name, spec_file, deployment_dir_path)
return # Exit early for helm chart generation
# Existing deployment flow continues unchanged
# Copy any config varibles from the spec file into an env file suitable for compose
_write_config_file(spec_file, deployment_dir_path.joinpath(constants.config_file_name))
# Copy any k8s config file into the deployment dir
@ -540,6 +543,21 @@ def create_operation(deployment_command_context, spec_file, deployment_dir, helm
if os.path.exists(destination_config_dir) and not os.listdir(destination_config_dir):
copytree(source_config_dir, destination_config_dir, dirs_exist_ok=True)
# Copy the job files into the deployment dir (for Docker deployments)
jobs = get_job_list(parsed_stack)
if jobs and not parsed_spec.is_kubernetes_deployment():
destination_compose_jobs_dir = deployment_dir_path.joinpath("compose-jobs")
os.mkdir(destination_compose_jobs_dir)
for job in jobs:
job_file_path = get_job_file_path(stack_name, parsed_stack, job)
if job_file_path and job_file_path.exists():
parsed_job_file = yaml.load(open(job_file_path, "r"))
_fixup_pod_file(parsed_job_file, parsed_spec, destination_compose_dir)
with open(destination_compose_jobs_dir.joinpath("docker-compose-%s.yml" % job), "w") as output_file:
yaml.dump(parsed_job_file, output_file)
if opts.o.debug:
print(f"Copied job compose file: {job}")
# Delegate to the stack's Python code
# The deploy create command doesn't require a --stack argument so we need to insert the
# stack member here.

View File

@ -510,6 +510,26 @@ class K8sDeployer(Deployer):
# We need to figure out how to do this -- check why we're being called first
pass
def run_job(self, job_name: str, helm_release: str = None):
if not opts.o.dry_run:
from stack_orchestrator.deploy.k8s.helm.job_runner import run_helm_job
# Check if this is a helm-based deployment
chart_dir = self.deployment_dir / "chart"
if not chart_dir.exists():
# TODO: Implement job support for compose-based K8s deployments
raise Exception(f"Job support is only available for helm-based deployments. Chart directory not found: {chart_dir}")
# Run the job using the helm job runner
run_helm_job(
chart_dir=chart_dir,
job_name=job_name,
release=helm_release,
namespace=self.k8s_namespace,
timeout=600,
verbose=opts.o.verbose
)
def is_kind(self):
return self.type == "k8s-kind"

View File

@ -13,16 +13,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http:#www.gnu.org/licenses/>.
import shutil
from pathlib import Path
from stack_orchestrator import constants
from stack_orchestrator.opts import opts
from stack_orchestrator.util import (
get_stack_path,
get_parsed_stack_config,
get_pod_list,
get_pod_file_path,
get_job_list,
get_job_file_path,
error_exit
)
from stack_orchestrator.deploy.k8s.helm.kompose_wrapper import (
@ -33,12 +33,52 @@ from stack_orchestrator.deploy.k8s.helm.kompose_wrapper import (
from stack_orchestrator.util import get_yaml
def _post_process_chart(chart_dir: Path, chart_name: str) -> None:
def _wrap_job_templates_with_conditionals(chart_dir: Path, jobs: list) -> None:
"""
Wrap job templates with conditional checks so they are not created by default.
Jobs will only be created when explicitly enabled via --set jobs.<name>.enabled=true
"""
templates_dir = chart_dir / "templates"
if not templates_dir.exists():
return
for job_name in jobs:
# Find job template file (kompose generates <service-name>-job.yaml)
job_template_file = templates_dir / f"{job_name}-job.yaml"
if not job_template_file.exists():
if opts.o.debug:
print(f"Warning: Job template not found: {job_template_file}")
continue
# Read the template content
content = job_template_file.read_text()
# Wrap with conditional (default false)
# Use 'index' function to handle job names with dashes
# Provide default dict for .Values.jobs to handle case where it doesn't exist
condition = (
f"{{{{- if (index (.Values.jobs | default dict) "
f'"{job_name}" | default dict).enabled | default false }}}}'
)
wrapped_content = f"""{condition}
{content}{{{{- end }}}}
"""
# Write back
job_template_file.write_text(wrapped_content)
if opts.o.debug:
print(f"Wrapped job template with conditional: {job_template_file.name}")
def _post_process_chart(chart_dir: Path, chart_name: str, jobs: list) -> None:
"""
Post-process Kompose-generated chart to fix common issues.
Fixes:
1. Chart.yaml name, description and keywords
2. Add conditional wrappers to job templates (default: disabled)
TODO:
- Add defaultMode: 0755 to ConfigMap volumes containing scripts (.sh files)
@ -63,35 +103,34 @@ def _post_process_chart(chart_dir: Path, chart_name: str) -> None:
with open(chart_yaml_path, "w") as f:
yaml.dump(chart_yaml, f)
# Process job templates: wrap with conditionals (default disabled)
if jobs:
_wrap_job_templates_with_conditionals(chart_dir, jobs)
def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir: str = None) -> None:
def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir_path: Path) -> None:
"""
Generate a self-sufficient Helm chart from stack compose files using Kompose.
Args:
stack_path: Path to the stack directory
spec_file: Path to the deployment spec file
deployment_dir: Optional directory for deployment output
deployment_dir_path: Deployment directory path (already created with deployment.yml)
Output structure:
deployment-dir/
spec.yml # Reference
stack.yml # Reference
chart/ # Self-sufficient Helm chart
deployment.yml # Contains cluster-id
spec.yml # Reference
stack.yml # Reference
chart/ # Self-sufficient Helm chart
Chart.yaml
README.md
templates/
*.yaml
TODO: Enhancements:
- Parse generated templates and extract values to values.yaml
- Replace hardcoded image tags with {{ .Values.image.tag }}
- Replace hardcoded PVC sizes with {{ .Values.persistence.size }}
- Convert Deployments to StatefulSets for stateful services (zenithd, postgres)
- Add _helpers.tpl with common label/selector functions
- Embed config files (scripts, templates) into ConfigMap templates
- Generate Secret templates for validator keys with placeholders
- Add init containers for genesis/config setup
- Enhance Chart.yaml with proper metadata (version, description, etc.)
"""
@ -102,46 +141,43 @@ def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir: str = N
if not check_kompose_available():
error_exit("kompose not found in PATH.\n")
# 2. Setup deployment directory
if deployment_dir:
deployment_dir_path = Path(deployment_dir)
else:
deployment_dir_path = Path(f"{stack_name}-deployment")
# 2. Read cluster-id from deployment.yml
deployment_file = deployment_dir_path / constants.deployment_file_name
if not deployment_file.exists():
error_exit(f"Deployment file not found: {deployment_file}")
if deployment_dir_path.exists():
error_exit(f"Deployment directory already exists: {deployment_dir_path}")
yaml = get_yaml()
deployment_config = yaml.load(open(deployment_file, "r"))
cluster_id = deployment_config.get(constants.cluster_id_key)
if not cluster_id:
error_exit(f"cluster-id not found in {deployment_file}")
# 3. Derive chart name from stack name + cluster-id suffix
# Sanitize stack name for use in chart name
sanitized_stack_name = stack_name.replace("_", "-").replace(" ", "-")
# Extract hex suffix from cluster-id (after the prefix)
# cluster-id format: "laconic-<hex>" -> extract the hex part
cluster_id_suffix = cluster_id.split("-", 1)[1] if "-" in cluster_id else cluster_id
# Combine to create human-readable + unique chart name
chart_name = f"{sanitized_stack_name}-{cluster_id_suffix}"
if opts.o.debug:
print(f"Creating deployment directory: {deployment_dir_path}")
print(f"Cluster ID: {cluster_id}")
print(f"Chart name: {chart_name}")
deployment_dir_path.mkdir(parents=True)
# 3. Copy spec and stack files to deployment directory (for reference)
spec_path = Path(spec_file).resolve()
if not spec_path.exists():
error_exit(f"Spec file not found: {spec_file}")
stack_file_path = get_stack_path(stack_path).joinpath(constants.stack_file_name)
if not stack_file_path.exists():
error_exit(f"Stack file not found: {stack_file_path}")
shutil.copy(spec_path, deployment_dir_path / constants.spec_file_name)
shutil.copy(stack_file_path, deployment_dir_path / constants.stack_file_name)
if opts.o.debug:
print(f"Copied spec file: {spec_path}")
print(f"Copied stack file: {stack_file_path}")
# 4. Get compose files from stack
# 4. Get compose files from stack (pods + jobs)
pods = get_pod_list(parsed_stack)
if not pods:
error_exit(f"No pods found in stack: {stack_path}")
# Get clean stack name from stack.yml
chart_name = stack_name.replace("_", "-").replace(" ", "-")
jobs = get_job_list(parsed_stack)
if opts.o.debug:
print(f"Found {len(pods)} pod(s) in stack: {pods}")
if jobs:
print(f"Found {len(jobs)} job(s) in stack: {jobs}")
compose_files = []
for pod in pods:
@ -152,6 +188,17 @@ def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir: str = N
if opts.o.debug:
print(f"Found compose file: {pod_file.name}")
# Add job compose files
job_files = []
for job in jobs:
job_file = get_job_file_path(stack_path, parsed_stack, job)
if not job_file.exists():
error_exit(f"Job file not found: {job_file}")
compose_files.append(job_file)
job_files.append(job_file)
if opts.o.debug:
print(f"Found job compose file: {job_file.name}")
try:
version = get_kompose_version()
print(f"Using kompose version: {version}")
@ -175,12 +222,12 @@ def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir: str = N
error_exit(f"Helm chart generation failed: {e}")
# 6. Post-process generated chart
_post_process_chart(chart_dir, chart_name)
_post_process_chart(chart_dir, chart_name, jobs)
# 7. Generate README.md with basic installation instructions
readme_content = f"""# {chart_name} Helm Chart
Generated by laconic-so from stack: `{stack_path}
Generated by laconic-so from stack: `{stack_path}`
## Prerequisites
@ -194,6 +241,9 @@ Generated by laconic-so from stack: `{stack_path}
# Install the chart
helm install {chart_name} {chart_dir}
# Alternatively, install with your own release name
# helm install <your-release-name> {chart_dir}
# Check deployment status
kubectl get pods
```
@ -246,9 +296,10 @@ Edit the generated template files in `templates/` to customize:
print("\nDeployment directory structure:")
print(f" {deployment_dir_path}/")
print(" ├── spec.yml (reference)")
print(" ├── stack.yml (reference)")
print(" └── chart/ (self-sufficient Helm chart)")
print(" ├── deployment.yml (cluster-id)")
print(" ├── spec.yml (reference)")
print(" ├── stack.yml (reference)")
print(" └── chart/ (self-sufficient Helm chart)")
print("\nNext steps:")
print(" 1. Review the chart:")
@ -261,6 +312,9 @@ Edit the generated template files in `templates/` to customize:
print(" 3. Install to Kubernetes:")
print(f" helm install {chart_name} {chart_dir}")
print("")
print(" # Or use your own release name")
print(f" helm install <your-release-name> {chart_dir}")
print("")
print(" 4. Check deployment:")
print(" kubectl get pods")
print("")

View File

@ -0,0 +1,149 @@
# Copyright © 2025 Vulcanize
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http:#www.gnu.org/licenses/>.
import subprocess
import tempfile
import os
import json
from pathlib import Path
from stack_orchestrator.util import get_yaml
def get_release_name_from_chart(chart_dir: Path) -> str:
"""
Read the chart name from Chart.yaml to use as the release name.
Args:
chart_dir: Path to the Helm chart directory
Returns:
Chart name from Chart.yaml
Raises:
Exception if Chart.yaml not found or name is missing
"""
chart_yaml_path = chart_dir / "Chart.yaml"
if not chart_yaml_path.exists():
raise Exception(f"Chart.yaml not found: {chart_yaml_path}")
yaml = get_yaml()
chart_yaml = yaml.load(open(chart_yaml_path, "r"))
if "name" not in chart_yaml:
raise Exception(f"Chart name not found in {chart_yaml_path}")
return chart_yaml["name"]
def run_helm_job(
chart_dir: Path,
job_name: str,
release: str = None,
namespace: str = "default",
timeout: int = 600,
verbose: bool = False
) -> None:
"""
Run a one-time job from a Helm chart.
This function:
1. Uses provided release name, or reads it from Chart.yaml if not provided
2. Uses helm template to render the job manifest with the job enabled
3. Applies the job manifest to the cluster
4. Waits for the job to complete
Args:
chart_dir: Path to the Helm chart directory
job_name: Name of the job to run (without -job suffix)
release: Optional Helm release name (defaults to chart name from Chart.yaml)
namespace: Kubernetes namespace
timeout: Timeout in seconds for job completion (default: 600)
verbose: Enable verbose output
Raises:
Exception if the job fails or times out
"""
if not chart_dir.exists():
raise Exception(f"Chart directory not found: {chart_dir}")
# Use provided release name, or get it from Chart.yaml
if release is None:
release = get_release_name_from_chart(chart_dir)
if verbose:
print(f"Using release name from Chart.yaml: {release}")
else:
if verbose:
print(f"Using provided release name: {release}")
job_template_file = f"templates/{job_name}-job.yaml"
if verbose:
print(f"Running job '{job_name}' from helm chart: {chart_dir}")
# Use helm template to render the job manifest
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as tmp_file:
try:
# Render job template with job enabled
# Use --set-json to properly handle job names with dashes
jobs_dict = {job_name: {"enabled": True}}
values_json = json.dumps(jobs_dict)
helm_cmd = [
"helm", "template", release, str(chart_dir),
"--show-only", job_template_file,
"--set-json", f"jobs={values_json}"
]
if verbose:
print(f"Running: {' '.join(helm_cmd)}")
result = subprocess.run(helm_cmd, check=True, capture_output=True, text=True)
tmp_file.write(result.stdout)
tmp_file.flush()
if verbose:
print(f"Generated job manifest:\n{result.stdout}")
# Parse the manifest to get the actual job name
yaml = get_yaml()
manifest = yaml.load(result.stdout)
actual_job_name = manifest.get("metadata", {}).get("name", job_name)
# Apply the job manifest
kubectl_apply_cmd = ["kubectl", "apply", "-f", tmp_file.name, "-n", namespace]
subprocess.run(kubectl_apply_cmd, check=True, capture_output=True, text=True)
if verbose:
print(f"Job {actual_job_name} created, waiting for completion...")
# Wait for job completion
wait_cmd = [
"kubectl", "wait", "--for=condition=complete",
f"job/{actual_job_name}",
f"--timeout={timeout}s",
"-n", namespace
]
subprocess.run(wait_cmd, check=True, capture_output=True, text=True)
if verbose:
print(f"Job {job_name} completed successfully")
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
raise Exception(f"Job failed: {error_msg}")
finally:
# Clean up temp file
if os.path.exists(tmp_file.name):
os.unlink(tmp_file.name)

View File

@ -78,6 +78,22 @@ def get_pod_list(parsed_stack):
return result
def get_job_list(parsed_stack):
# Return list of jobs from stack config, or empty list if no jobs defined
if "jobs" not in parsed_stack:
return []
jobs = parsed_stack["jobs"]
if not jobs:
return []
if type(jobs[0]) is str:
result = jobs
else:
result = []
for job in jobs:
result.append(job["name"])
return result
def get_plugin_code_paths(stack) -> List[Path]:
parsed_stack = get_parsed_stack_config(stack)
pods = parsed_stack["pods"]
@ -119,6 +135,21 @@ def resolve_compose_file(stack, pod_name: str):
return compose_base.joinpath(f"docker-compose-{pod_name}.yml")
# Find a job compose file in compose-jobs directory
def resolve_job_compose_file(stack, job_name: str):
if stack_is_external(stack):
# First try looking in the external stack for the job compose file
compose_jobs_base = Path(stack).parent.parent.joinpath("compose-jobs")
proposed_file = compose_jobs_base.joinpath(f"docker-compose-{job_name}.yml")
if proposed_file.exists():
return proposed_file
# If we don't find it fall through to the internal case
# TODO: Add internal compose-jobs directory support if needed
# For now, jobs are expected to be in external stacks only
compose_jobs_base = Path(stack).parent.parent.joinpath("compose-jobs")
return compose_jobs_base.joinpath(f"docker-compose-{job_name}.yml")
def get_pod_file_path(stack, parsed_stack, pod_name: str):
pods = parsed_stack["pods"]
if type(pods[0]) is str:
@ -131,6 +162,18 @@ def get_pod_file_path(stack, parsed_stack, pod_name: str):
return result
def get_job_file_path(stack, parsed_stack, job_name: str):
if "jobs" not in parsed_stack or not parsed_stack["jobs"]:
return None
jobs = parsed_stack["jobs"]
if type(jobs[0]) is str:
result = resolve_job_compose_file(stack, job_name)
else:
# TODO: Support complex job definitions if needed
result = resolve_job_compose_file(stack, job_name)
return result
def get_pod_script_paths(parsed_stack, pod_name: str):
pods = parsed_stack["pods"]
result = []