fix(k8s): make deploy_k8s.py idempotent with create-or-replace semantics

All K8s resource creation in deploy_k8s.py now uses try-create, catch
ApiException(409), then replace — matching the pattern already used for
secrets in deployment_create.py. This allows `deployment start` to be
safely re-run without 409 Conflict errors.

Resources made idempotent:
- Deployment (create_namespaced_deployment → replace on 409)
- Service (create_namespaced_service → replace on 409)
- Ingress (create_namespaced_ingress → replace on 409)
- NodePort services (same as Service)
- ConfigMap (create_namespaced_config_map → replace on 409)
- PV/PVC: bare `except: pass` replaced with explicit ApiException
  catch for 404

Extracted _ensure_deployment(), _ensure_service(), _ensure_ingress(),
and _ensure_config_map() helpers to keep cyclomatic complexity in check.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fix/kind-mount-propagation
A. F. Dudley 2026-03-08 04:15:03 +00:00
parent cc6acd5f09
commit 1da69cf739
1 changed files with 104 additions and 45 deletions

View File

@ -192,6 +192,99 @@ class K8sDeployer(Deployer):
else: else:
raise raise
def _ensure_config_map(self, cfg_map):
"""Create or replace a ConfigMap (idempotent)."""
try:
resp = self.core_api.create_namespaced_config_map(
body=cfg_map, namespace=self.k8s_namespace
)
if opts.o.debug:
print(f"ConfigMap created: {resp}")
except ApiException as e:
if e.status == 409:
resp = self.core_api.replace_namespaced_config_map(
name=cfg_map.metadata.name,
namespace=self.k8s_namespace,
body=cfg_map,
)
if opts.o.debug:
print(f"ConfigMap updated: {resp}")
else:
raise
def _ensure_deployment(self, deployment):
"""Create or replace a Deployment (idempotent)."""
try:
resp = cast(
client.V1Deployment,
self.apps_api.create_namespaced_deployment(
body=deployment, namespace=self.k8s_namespace
),
)
if opts.o.debug:
print("Deployment created:")
except ApiException as e:
if e.status == 409:
resp = cast(
client.V1Deployment,
self.apps_api.replace_namespaced_deployment(
name=deployment.metadata.name,
namespace=self.k8s_namespace,
body=deployment,
),
)
if opts.o.debug:
print("Deployment updated:")
else:
raise
if opts.o.debug:
meta = resp.metadata
spec = resp.spec
if meta and spec and spec.template.spec:
containers = spec.template.spec.containers
img = containers[0].image if containers else None
print(f"{meta.namespace} {meta.name} {meta.generation} {img}")
def _ensure_service(self, service, kind: str = "Service"):
"""Create or replace a Service (idempotent)."""
try:
resp = self.core_api.create_namespaced_service(
namespace=self.k8s_namespace, body=service
)
if opts.o.debug:
print(f"{kind} created: {resp}")
except ApiException as e:
if e.status == 409:
resp = self.core_api.replace_namespaced_service(
name=service.metadata.name,
namespace=self.k8s_namespace,
body=service,
)
if opts.o.debug:
print(f"{kind} updated: {resp}")
else:
raise
def _ensure_ingress(self, ingress):
"""Create or replace an Ingress (idempotent)."""
try:
resp = self.networking_api.create_namespaced_ingress(
namespace=self.k8s_namespace, body=ingress
)
if opts.o.debug:
print(f"Ingress created: {resp}")
except ApiException as e:
if e.status == 409:
resp = self.networking_api.replace_namespaced_ingress(
name=ingress.metadata.name,
namespace=self.k8s_namespace,
body=ingress,
)
if opts.o.debug:
print(f"Ingress updated: {resp}")
else:
raise
def _create_volume_data(self): def _create_volume_data(self):
# Create the host-path-mounted PVs for this deployment # Create the host-path-mounted PVs for this deployment
pvs = self.cluster_info.get_pvs() pvs = self.cluster_info.get_pvs()
@ -208,8 +301,9 @@ class K8sDeployer(Deployer):
print("PVs already present:") print("PVs already present:")
print(f"{pv_resp}") print(f"{pv_resp}")
continue continue
except: # noqa: E722 except ApiException as e:
pass if e.status != 404:
raise
pv_resp = self.core_api.create_persistent_volume(body=pv) pv_resp = self.core_api.create_persistent_volume(body=pv)
if opts.o.debug: if opts.o.debug:
@ -232,8 +326,9 @@ class K8sDeployer(Deployer):
print("PVCs already present:") print("PVCs already present:")
print(f"{pvc_resp}") print(f"{pvc_resp}")
continue continue
except: # noqa: E722 except ApiException as e:
pass if e.status != 404:
raise
pvc_resp = self.core_api.create_namespaced_persistent_volume_claim( pvc_resp = self.core_api.create_namespaced_persistent_volume_claim(
body=pvc, namespace=self.k8s_namespace body=pvc, namespace=self.k8s_namespace
@ -248,12 +343,7 @@ class K8sDeployer(Deployer):
if opts.o.debug: if opts.o.debug:
print(f"Sending this ConfigMap: {cfg_map}") print(f"Sending this ConfigMap: {cfg_map}")
if not opts.o.dry_run: if not opts.o.dry_run:
cfg_rsp = self.core_api.create_namespaced_config_map( self._ensure_config_map(cfg_map)
body=cfg_map, namespace=self.k8s_namespace
)
if opts.o.debug:
print("ConfigMap created:")
print(f"{cfg_rsp}")
def _create_deployment(self): def _create_deployment(self):
# Process compose files into a Deployment # Process compose files into a Deployment
@ -264,34 +354,13 @@ class K8sDeployer(Deployer):
if opts.o.debug: if opts.o.debug:
print(f"Sending this deployment: {deployment}") print(f"Sending this deployment: {deployment}")
if not opts.o.dry_run: if not opts.o.dry_run:
deployment_resp = cast( self._ensure_deployment(deployment)
client.V1Deployment,
self.apps_api.create_namespaced_deployment(
body=deployment, namespace=self.k8s_namespace
),
)
if opts.o.debug:
print("Deployment created:")
meta = deployment_resp.metadata
spec = deployment_resp.spec
if meta and spec and spec.template.spec:
ns = meta.namespace
name = meta.name
gen = meta.generation
containers = spec.template.spec.containers
img = containers[0].image if containers else None
print(f"{ns} {name} {gen} {img}")
service = self.cluster_info.get_service() service = self.cluster_info.get_service()
if opts.o.debug: if opts.o.debug:
print(f"Sending this service: {service}") print(f"Sending this service: {service}")
if service and not opts.o.dry_run: if service and not opts.o.dry_run:
service_resp = self.core_api.create_namespaced_service( self._ensure_service(service)
namespace=self.k8s_namespace, body=service
)
if opts.o.debug:
print("Service created:")
print(f"{service_resp}")
def _find_certificate_for_host_name(self, host_name): def _find_certificate_for_host_name(self, host_name):
all_certificates = self.custom_obj_api.list_namespaced_custom_object( all_certificates = self.custom_obj_api.list_namespaced_custom_object(
@ -404,12 +473,7 @@ class K8sDeployer(Deployer):
if opts.o.debug: if opts.o.debug:
print(f"Sending this ingress: {ingress}") print(f"Sending this ingress: {ingress}")
if not opts.o.dry_run: if not opts.o.dry_run:
ingress_resp = self.networking_api.create_namespaced_ingress( self._ensure_ingress(ingress)
namespace=self.k8s_namespace, body=ingress
)
if opts.o.debug:
print("Ingress created:")
print(f"{ingress_resp}")
else: else:
if opts.o.debug: if opts.o.debug:
print("No ingress configured") print("No ingress configured")
@ -419,12 +483,7 @@ class K8sDeployer(Deployer):
if opts.o.debug: if opts.o.debug:
print(f"Sending this nodeport: {nodeport}") print(f"Sending this nodeport: {nodeport}")
if not opts.o.dry_run: if not opts.o.dry_run:
nodeport_resp = self.core_api.create_namespaced_service( self._ensure_service(nodeport, kind="NodePort")
namespace=self.k8s_namespace, body=nodeport
)
if opts.o.debug:
print("NodePort created:")
print(f"{nodeport_resp}")
def down(self, timeout, volumes, skip_cluster_management): def down(self, timeout, volumes, skip_cluster_management):
self.skip_cluster_management = skip_cluster_management self.skip_cluster_management = skip_cluster_management