Add kind cluster reuse and list command

- Add get_kind_cluster() to detect existing kind clusters
- Modify create_cluster() to reuse existing clusters automatically
- Add 'laconic-so deploy k8s list cluster' command
- Skip --stack requirement for k8s subcommand

This allows multiple deployments to share the same kind cluster,
simplifying local development workflows.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
afd-caddy-ingress
A. F. Dudley 2026-01-20 04:08:16 -05:00
parent 3606b5dd90
commit 8426d99ed9
4 changed files with 96 additions and 2 deletions

View File

@ -42,6 +42,7 @@ from stack_orchestrator.deploy.deployment_context import DeploymentContext
from stack_orchestrator.deploy.deployment_create import create as deployment_create from stack_orchestrator.deploy.deployment_create import create as deployment_create
from stack_orchestrator.deploy.deployment_create import init as deployment_init from stack_orchestrator.deploy.deployment_create import init as deployment_init
from stack_orchestrator.deploy.deployment_create import setup as deployment_setup from stack_orchestrator.deploy.deployment_create import setup as deployment_setup
from stack_orchestrator.deploy.k8s import k8s_command
@click.group() @click.group()
@ -54,6 +55,10 @@ from stack_orchestrator.deploy.deployment_create import setup as deployment_setu
def command(ctx, include, exclude, env_file, cluster, deploy_to): def command(ctx, include, exclude, env_file, cluster, deploy_to):
'''deploy a stack''' '''deploy a stack'''
# k8s subcommand doesn't require stack
if ctx.invoked_subcommand == "k8s":
return
# Although in theory for some subcommands (e.g. deploy create) the stack can be inferred, # Although in theory for some subcommands (e.g. deploy create) the stack can be inferred,
# Click doesn't allow us to know that here, so we make providing the stack mandatory # Click doesn't allow us to know that here, so we make providing the stack mandatory
stack = global_options2(ctx).stack stack = global_options2(ctx).stack
@ -460,3 +465,4 @@ def _orchestrate_cluster_config(ctx, cluster_config, deployer, container_exec_en
command.add_command(deployment_init) command.add_command(deployment_init)
command.add_command(deployment_create) command.add_command(deployment_create)
command.add_command(deployment_setup) command.add_command(deployment_setup)
command.add_command(k8s_command.command, "k8s")

View File

@ -210,8 +210,12 @@ class K8sDeployer(Deployer):
self.skip_cluster_management = skip_cluster_management self.skip_cluster_management = skip_cluster_management
if not opts.o.dry_run: if not opts.o.dry_run:
if self.is_kind() and not self.skip_cluster_management: if self.is_kind() and not self.skip_cluster_management:
# Create the kind cluster # Create the kind cluster (or reuse existing one)
create_cluster(self.kind_cluster_name, self.deployment_dir.joinpath(constants.kind_config_filename)) kind_config = self.deployment_dir.joinpath(constants.kind_config_filename)
actual_cluster = create_cluster(self.kind_cluster_name, kind_config)
if actual_cluster != self.kind_cluster_name:
# An existing cluster was found, use it instead
self.kind_cluster_name = actual_cluster
# Ensure the referenced containers are copied into kind # Ensure the referenced containers are copied into kind
load_images_into_kind(self.kind_cluster_name, self.cluster_info.image_set) load_images_into_kind(self.kind_cluster_name, self.cluster_info.image_set)
self.connect_api() self.connect_api()

View File

@ -35,10 +35,51 @@ def _run_command(command: str):
return result return result
def get_kind_cluster():
"""Get an existing kind cluster, if any.
Uses `kind get clusters` to find existing clusters.
Returns the cluster name or None if no cluster exists.
"""
result = subprocess.run(
"kind get clusters",
shell=True,
capture_output=True,
text=True
)
if result.returncode != 0:
return None
clusters = result.stdout.strip().splitlines()
if clusters:
return clusters[0] # Return the first cluster found
return None
def create_cluster(name: str, config_file: str): def create_cluster(name: str, config_file: str):
"""Create a kind cluster, or reuse an existing one.
Checks if any kind cluster already exists. If so, uses that cluster
instead of creating a new one. This allows multiple deployments to
share the same kind cluster.
Args:
name: The desired cluster name (used only if creating new)
config_file: Path to kind config file (used only if creating new)
Returns:
The name of the cluster being used (either existing or newly created)
"""
existing = get_kind_cluster()
if existing:
print(f"Using existing cluster: {existing}")
return existing
print(f"Creating new cluster: {name}")
result = _run_command(f"kind create cluster --name {name} --config {config_file}") result = _run_command(f"kind create cluster --name {name} --config {config_file}")
if result.returncode != 0: if result.returncode != 0:
raise DeployerException(f"kind create cluster failed: {result}") raise DeployerException(f"kind create cluster failed: {result}")
return name
def destroy_cluster(name: str): def destroy_cluster(name: str):

View File

@ -0,0 +1,43 @@
# Copyright © 2024 Vulcanize
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http:#www.gnu.org/licenses/>.
import click
from stack_orchestrator.deploy.k8s.helpers import get_kind_cluster
@click.group()
@click.pass_context
def command(ctx):
'''k8s cluster management commands'''
pass
@command.group()
@click.pass_context
def list(ctx):
'''list k8s resources'''
pass
@list.command()
@click.pass_context
def cluster(ctx):
'''Show the existing kind cluster'''
existing_cluster = get_kind_cluster()
if existing_cluster:
print(existing_cluster)
else:
print("No cluster found")