fix: switch ramdisk from /dev/ram0 to tmpfs, refactor snapshot-download.py

The /dev/ram0 + XFS + format-ramdisk.service approach was unnecessary
complexity from a migration confusion — there was no actual tmpfs bug
with io_uring. tmpfs is simpler (no format-on-boot), resizable on the
fly, and what every other Solana operator uses.

Changes:
- prepare-agave: remove format-ramdisk.service and ramdisk-accounts.service,
  use tmpfs fstab entry with size=1024G (was 600G /dev/ram0, too small)
- recover: remove ramdisk_device var (no longer needed)
- redeploy: wipe accounts by rm -rf instead of umount+mkfs
- snapshot-download.py: extract download_best_snapshot() public API for
  use by the new container entrypoint.py (in agave-stack)
- CLAUDE.md: update ramdisk docs, fix /srv/solana → /srv/kind/solana paths
- health-check: fix ramdisk path references

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fix/kind-mount-propagation
A. F. Dudley 2026-03-08 18:43:41 +00:00
parent 591d158e1f
commit b2342bc539
8 changed files with 314 additions and 302 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
.venv/
sessions.duckdb
sessions.duckdb.wal
.worktrees

View File

@ -14,9 +14,8 @@ below it are correct. Playbooks belong to exactly one layer.
| 5. Deploy agave | Deploy agave-stack into kind, snapshot download, scale up | `biscayne-redeploy.yml` (snapshot/verify tags), `biscayne-recover.yml` |
**Layer 4 invariants** (asserted by `biscayne-prepare-agave.yml`):
- `/srv/solana` is XFS on a zvol — agave uses io_uring which deadlocks on ZFS
- `/srv/solana/ramdisk` is XFS on `/dev/ram0` — accounts must be on ramdisk
- `/srv/kind/solana` is an rbind of `/srv/solana` — makes the zvol visible to kind at `/mnt/solana`
- `/srv/kind/solana` is XFS on a zvol — agave uses io_uring which deadlocks on ZFS. `/srv/solana` is NOT the zvol (it's a ZFS dataset directory); never use it for data paths
- `/srv/kind/solana/ramdisk` is tmpfs (1TB) — accounts must be in RAM
These invariants are checked at runtime and persisted to fstab/systemd so they
survive reboot. They are agave's requirements reaching into the boot sequence,
@ -58,18 +57,13 @@ Correct shutdown sequence:
### Ramdisk
The accounts directory must be on a ramdisk for performance. `/dev/ram0` loses its
filesystem on reboot and must be reformatted before mounting.
The accounts directory must be in RAM for performance. tmpfs is used instead of
`/dev/ram0` — simpler (no format-on-boot service needed), resizable on the fly
with `mount -o remount,size=<new>`, and what most Solana operators use.
**Boot ordering is handled by systemd units** (installed by `biscayne-prepare-agave.yml`):
- `format-ramdisk.service`: runs `mkfs.xfs -f /dev/ram0` before `local-fs.target`
- fstab entry: mounts `/dev/ram0` at `/srv/solana/ramdisk` with
`x-systemd.requires=format-ramdisk.service`
- `ramdisk-accounts.service`: creates `/srv/solana/ramdisk/accounts` and sets
ownership after the mount
These units run before docker, so the kind node's bind mounts always see the
ramdisk. **No manual intervention is needed after reboot.**
**Boot ordering**: fstab entry mounts tmpfs at `/srv/kind/solana/ramdisk` with
`x-systemd.requires=srv-kind-solana.mount`. tmpfs mounts natively via fstab —
no systemd format service needed. **No manual intervention after reboot.**
**Mount propagation**: The kind node bind-mounts `/srv/kind``/mnt` at container
start. laconic-so sets `propagation: HostToContainer` on all kind extraMounts
@ -139,10 +133,11 @@ kind node via a single bind mount.
- Deployment: `laconic-70ce4c4b47e23b85-deployment`
- Kind node container: `laconic-70ce4c4b47e23b85-control-plane`
- Deployment dir: `/srv/deployments/agave`
- Snapshot dir: `/srv/solana/snapshots`
- Ledger dir: `/srv/solana/ledger`
- Accounts dir: `/srv/solana/ramdisk/accounts`
- Log dir: `/srv/solana/log`
- Snapshot dir: `/srv/kind/solana/snapshots` (on zvol, visible to kind at `/mnt/validator-snapshots`)
- Ledger dir: `/srv/kind/solana/ledger` (on zvol, visible to kind at `/mnt/validator-ledger`)
- Accounts dir: `/srv/kind/solana/ramdisk/accounts` (on ramdisk `/dev/ram0`, visible to kind at `/mnt/validator-accounts`)
- Log dir: `/srv/kind/solana/log` (on zvol, visible to kind at `/mnt/validator-log`)
- **WARNING**: `/srv/solana` is a ZFS dataset directory, NOT the zvol. Never use it for data paths.
- Host bind mount root: `/srv/kind` -> kind node `/mnt`
- laconic-so: `/home/rix/.local/bin/laconic-so` (editable install)
@ -150,10 +145,10 @@ kind node via a single bind mount.
| PV Name | hostPath |
|----------------------|-------------------------------|
| validator-snapshots | /mnt/solana/snapshots |
| validator-ledger | /mnt/solana/ledger |
| validator-accounts | /mnt/solana/ramdisk/accounts |
| validator-log | /mnt/solana/log |
| validator-snapshots | /mnt/validator-snapshots |
| validator-ledger | /mnt/validator-ledger |
| validator-accounts | /mnt/validator-accounts |
| validator-log | /mnt/validator-log |
### Snapshot Freshness
@ -164,7 +159,7 @@ try to catch up from an old snapshot — it will take too long and may never con
Check with:
```
# Snapshot slot (from filename)
ls /srv/solana/snapshots/snapshot-*.tar.*
ls /srv/kind/solana/snapshots/snapshot-*.tar.*
# Current mainnet slot
curl -s -X POST -H "Content-Type: application/json" \

View File

@ -10,26 +10,18 @@
#
# Agave requires three things from the host that kind doesn't provide:
#
# Invariant 1: /srv/solana is XFS on a zvol (not ZFS)
# Invariant 1: /srv/kind/solana is XFS on a zvol (not ZFS)
# Why: agave uses io_uring for async I/O. io_uring workers deadlock on
# ZFS datasets (D-state in dsl_dir_tempreserve_space). XFS on a zvol
# (block device) works fine. This is why the data lives on a zvol, not
# a ZFS dataset.
# Persisted as: fstab entry mounting /dev/zvol/.../solana at /srv/solana
# (block device) works fine. /srv/solana is NOT the zvol — it's a
# directory on the ZFS dataset biscayne/DATA/srv. All data paths must
# use /srv/kind/solana which is the actual zvol mount.
# Persisted as: fstab entry mounting /dev/zvol/.../solana at /srv/kind/solana
#
# Invariant 2: /srv/solana/ramdisk is XFS on /dev/ram0 (600G ramdisk)
# Why: agave accounts must be on ramdisk for performance. /dev/ram0
# loses its filesystem on reboot, so it must be reformatted before
# mounting each boot.
# Persisted as: format-ramdisk.service (mkfs before mount) + fstab entry
#
# Invariant 3: /srv/kind/solana is XFS (zvol) and /srv/kind/solana/ramdisk is XFS (ram0)
# Why: kind mounts /srv/kind → /mnt inside the kind node. PVs reference
# /mnt/solana/*. An rbind of /srv/solana does NOT work because ZFS's
# shared propagation (shared:75 on /srv) overlays ZFS on top of the bind.
# Direct device mounts bypass propagation entirely.
# Persisted as: two fstab entries — zvol at /srv/kind/solana, ram0 at
# /srv/kind/solana/ramdisk, both with x-systemd.requires ordering
# Invariant 2: /srv/kind/solana/ramdisk is tmpfs (1TB)
# Why: agave accounts must be in RAM for performance. tmpfs survives
# process restarts but not host reboots (same as /dev/ram0 but simpler).
# Persisted as: fstab entry (no format service needed)
#
# This playbook checks each invariant and only acts if it's not met.
# Idempotent — safe to run multiple times.
@ -42,132 +34,76 @@
gather_facts: false
become: true
vars:
ramdisk_device: /dev/ram0
zvol_device: /dev/zvol/biscayne/DATA/volumes/solana
solana_dir: /srv/solana
ramdisk_mount: /srv/solana/ramdisk
kind_solana_dir: /srv/kind/solana
accounts_dir: /srv/solana/ramdisk/accounts
ramdisk_mount: /srv/kind/solana/ramdisk
ramdisk_size: 1024G
accounts_dir: /srv/kind/solana/ramdisk/accounts
deployment_dir: /srv/deployments/agave
kind_ramdisk_opts: "noatime,nodiratime,nofail,x-systemd.requires=format-ramdisk.service,x-systemd.requires=srv-kind-solana.mount"
tasks:
# ---- systemd units ----------------------------------------------------------
- name: Install ramdisk format service
ansible.builtin.copy:
dest: /etc/systemd/system/format-ramdisk.service
mode: "0644"
content: |
[Unit]
Description=Format /dev/ram0 as XFS for Solana accounts
DefaultDependencies=no
Before=local-fs.target
After=systemd-modules-load.service
ConditionPathExists={{ ramdisk_device }}
# ---- cleanup legacy ramdisk services -----------------------------------------
- name: Stop and disable legacy ramdisk services
ansible.builtin.systemd:
name: "{{ item }}"
state: stopped
enabled: false
loop:
- format-ramdisk.service
- ramdisk-accounts.service
failed_when: false
[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/sbin/mkfs.xfs -f {{ ramdisk_device }}
[Install]
WantedBy=local-fs.target
register: unit_file
- name: Install ramdisk post-mount service
ansible.builtin.copy:
dest: /etc/systemd/system/ramdisk-accounts.service
mode: "0644"
content: |
[Unit]
Description=Create Solana accounts directory on ramdisk
After=srv-solana-ramdisk.mount
Requires=srv-solana-ramdisk.mount
[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/bin/bash -c 'mkdir -p {{ accounts_dir }} && chown solana:solana {{ ramdisk_mount }} {{ accounts_dir }}'
[Install]
WantedBy=multi-user.target
register: accounts_unit
- name: Remove legacy ramdisk service files
ansible.builtin.file:
path: "/etc/systemd/system/{{ item }}"
state: absent
loop:
- format-ramdisk.service
- ramdisk-accounts.service
register: legacy_units_removed
# ---- fstab entries ----------------------------------------------------------
- name: Ensure zvol fstab entry
# /srv/solana is NOT the zvol — it's a directory on the ZFS dataset.
# All data paths use /srv/kind/solana (the actual zvol mount).
- name: Remove stale /srv/solana zvol fstab entry
ansible.builtin.lineinfile:
path: /etc/fstab
regexp: '^\S+\s+{{ solana_dir }}\s'
line: '{{ zvol_device }} {{ solana_dir }} xfs defaults 0 2'
register: fstab_zvol
regexp: '^\S+\s+/srv/solana\s+xfs'
state: absent
- name: Ensure ramdisk fstab entry
- name: Remove stale /srv/solana/ramdisk fstab entry
ansible.builtin.lineinfile:
path: /etc/fstab
regexp: '^{{ ramdisk_device }}\s+{{ ramdisk_mount }}\s'
line: '{{ ramdisk_device }} {{ ramdisk_mount }} xfs noatime,nodiratime,nofail,x-systemd.requires=format-ramdisk.service 0 0'
register: fstab_ramdisk
regexp: '^/dev/ram0\s+'
state: absent
# Direct device mounts at /srv/kind/solana — bypasses ZFS shared propagation.
# An rbind of /srv/solana fails because ZFS's shared:75 on /srv overlays
# ZFS on top of any bind mount under /srv. Direct device mounts avoid this.
- name: Ensure kind zvol fstab entry
ansible.builtin.lineinfile:
path: /etc/fstab
regexp: '^\S+\s+{{ kind_solana_dir }}\s'
line: '{{ zvol_device }} {{ kind_solana_dir }} xfs defaults,nofail,x-systemd.requires=zfs-mount.service 0 0'
register: fstab_kind
- name: Ensure kind ramdisk fstab entry
ansible.builtin.lineinfile:
path: /etc/fstab
regexp: '^\S+\s+{{ kind_solana_dir }}/ramdisk\s'
line: "{{ ramdisk_device }} {{ kind_solana_dir }}/ramdisk xfs {{ kind_ramdisk_opts }} 0 0"
register: fstab_kind_ramdisk
# Remove stale rbind fstab entry from previous approach
- name: Remove stale kind rbind fstab entry
ansible.builtin.lineinfile:
path: /etc/fstab
regexp: '^\S+\s+{{ kind_solana_dir }}\s+none\s+rbind'
state: absent
register: fstab_stale_rbind
# ---- reload and enable ------------------------------------------------------
- name: Ensure zvol fstab entry
ansible.builtin.lineinfile:
path: /etc/fstab
regexp: '^\S+\s+{{ kind_solana_dir }}\s'
line: '{{ zvol_device }} {{ kind_solana_dir }} xfs defaults,nofail,x-systemd.requires=zfs-mount.service 0 0'
register: fstab_zvol
- name: Ensure tmpfs ramdisk fstab entry
ansible.builtin.lineinfile:
path: /etc/fstab
regexp: '^\S+\s+{{ ramdisk_mount }}\s'
line: "tmpfs {{ ramdisk_mount }} tmpfs nodev,nosuid,noexec,nodiratime,size={{ ramdisk_size }},nofail,x-systemd.requires=srv-kind-solana.mount 0 0"
register: fstab_ramdisk
# ---- reload systemd if anything changed --------------------------------------
- name: Reload systemd
ansible.builtin.systemd:
daemon_reload: true
when: >-
unit_file.changed or accounts_unit.changed or
fstab_zvol.changed or fstab_ramdisk.changed or
fstab_kind.changed or fstab_kind_ramdisk.changed or
fstab_stale_rbind.changed
when: legacy_units_removed.changed or fstab_zvol.changed or fstab_ramdisk.changed
- name: Enable ramdisk services
ansible.builtin.systemd:
name: "{{ item }}"
enabled: true
loop:
- format-ramdisk.service
- ramdisk-accounts.service
# ---- apply now if ramdisk not mounted --------------------------------------
- name: Check if ramdisk is mounted
ansible.builtin.command: mountpoint -q {{ ramdisk_mount }}
register: ramdisk_mounted
failed_when: false
changed_when: false
- name: Format and mount ramdisk now
ansible.builtin.shell: |
mkfs.xfs -f {{ ramdisk_device }}
mount {{ ramdisk_mount }}
mkdir -p {{ accounts_dir }}
chown solana:solana {{ ramdisk_mount }} {{ accounts_dir }}
changed_when: ramdisk_mounted.rc != 0
when: ramdisk_mounted.rc != 0
# ---- apply kind device mounts now if not correct ----------------------------
# ---- apply device mounts now if not correct ----------------------------------
- name: Check kind zvol mount is XFS
ansible.builtin.shell:
cmd: >
@ -178,16 +114,16 @@
failed_when: false
changed_when: false
- name: Unmount stale kind mounts
- name: Unmount stale mounts
ansible.builtin.shell:
cmd: |
umount {{ kind_solana_dir }}/ramdisk 2>/dev/null || true
umount {{ ramdisk_mount }} 2>/dev/null || true
umount {{ kind_solana_dir }} 2>/dev/null || true
executable: /bin/bash
changed_when: kind_zvol_check.rc != 0
when: kind_zvol_check.rc != 0
- name: Mount zvol at kind solana dir
- name: Mount zvol
ansible.posix.mount:
path: "{{ kind_solana_dir }}"
src: "{{ zvol_device }}"
@ -195,24 +131,32 @@
state: mounted
when: kind_zvol_check.rc != 0
- name: Check kind ramdisk mount is XFS
- name: Check ramdisk mount is tmpfs
ansible.builtin.shell:
cmd: >
set -o pipefail &&
findmnt -n -o FSTYPE {{ kind_solana_dir }}/ramdisk | grep -q xfs
findmnt -n -o FSTYPE {{ ramdisk_mount }} | grep -q tmpfs
executable: /bin/bash
register: kind_ramdisk_check
register: ramdisk_check
failed_when: false
changed_when: false
- name: Mount ramdisk at kind solana ramdisk dir
- name: Mount tmpfs ramdisk
ansible.posix.mount:
path: "{{ kind_solana_dir }}/ramdisk"
src: "{{ ramdisk_device }}"
fstype: xfs
opts: noatime,nodiratime
path: "{{ ramdisk_mount }}"
src: tmpfs
fstype: tmpfs
opts: "nodev,nosuid,noexec,nodiratime,size={{ ramdisk_size }}"
state: mounted
when: kind_ramdisk_check.rc != 0
when: ramdisk_check.rc != 0
- name: Create accounts directory
ansible.builtin.file:
path: "{{ accounts_dir }}"
state: directory
owner: solana
group: solana
mode: "0755"
# Docker requires shared propagation on mounts it bind-mounts into
# containers. Without this, `docker start` fails with "not a shared
@ -227,36 +171,24 @@
changed_when: false
# ---- verify -----------------------------------------------------------------
- name: Verify ramdisk is XFS
ansible.builtin.shell:
cmd: set -o pipefail && df -T {{ ramdisk_mount }} | grep -q xfs
executable: /bin/bash
changed_when: false
- name: Verify zvol is XFS
ansible.builtin.shell:
cmd: set -o pipefail && df -T {{ solana_dir }} | grep -q xfs
executable: /bin/bash
changed_when: false
- name: Verify kind zvol is XFS
ansible.builtin.shell:
cmd: set -o pipefail && df -T {{ kind_solana_dir }} | grep -q xfs
executable: /bin/bash
changed_when: false
- name: Verify kind ramdisk is XFS
- name: Verify ramdisk is tmpfs
ansible.builtin.shell:
cmd: set -o pipefail && df -T {{ kind_solana_dir }}/ramdisk | grep -q xfs
cmd: set -o pipefail && df -T {{ ramdisk_mount }} | grep -q tmpfs
executable: /bin/bash
changed_when: false
- name: Verify kind mount contents
- name: Verify mount contents
ansible.builtin.shell:
cmd: >
set -o pipefail &&
ls {{ kind_solana_dir }}/ledger {{ kind_solana_dir }}/snapshots
{{ kind_solana_dir }}/ramdisk/accounts 2>&1 | head -5
{{ ramdisk_mount }}/accounts 2>&1 | head -5
executable: /bin/bash
register: kind_mount_verify
changed_when: false
@ -273,13 +205,12 @@
register: cluster_id_result
changed_when: false
- name: Check kind node XFS visibility
- name: Check kind node filesystem visibility
ansible.builtin.shell:
cmd: >
set -o pipefail &&
docker exec {{ cluster_id_result.stdout }}-control-plane
df -T /mnt/validator-ledger /mnt/validator-accounts
| grep -c xfs
executable: /bin/bash
register: kind_fstype
changed_when: false
@ -289,7 +220,7 @@
ansible.builtin.debug:
msg:
kind_mount: "{{ kind_mount_verify.stdout_lines }}"
kind_fstype: "{{ 'xfs (correct)' if kind_fstype.stdout | default('0') | int >= 2 else 'NOT XFS — kind restart required' }}"
kind_fstype: "{{ kind_fstype.stdout_lines | default([]) }}"
- name: Configure Ashburn validator relay
ansible.builtin.import_playbook: ashburn-relay-biscayne.yml

View File

@ -33,10 +33,9 @@
kind_cluster: laconic-70ce4c4b47e23b85
k8s_namespace: "laconic-{{ kind_cluster }}"
deployment_name: "{{ kind_cluster }}-deployment"
snapshot_dir: /srv/solana/snapshots
accounts_dir: /srv/solana/ramdisk/accounts
ramdisk_mount: /srv/solana/ramdisk
ramdisk_device: /dev/ram0
snapshot_dir: /srv/kind/solana/snapshots
accounts_dir: /srv/kind/solana/ramdisk/accounts
ramdisk_mount: /srv/kind/solana/ramdisk
snapshot_script_local: "{{ playbook_dir }}/../scripts/snapshot-download.py"
snapshot_script: /tmp/snapshot-download.py
snapshot_args: ""

View File

@ -57,11 +57,11 @@
kind_cluster: laconic-70ce4c4b47e23b85
k8s_namespace: "laconic-{{ kind_cluster }}"
deployment_name: "{{ kind_cluster }}-deployment"
snapshot_dir: /srv/solana/snapshots
ledger_dir: /srv/solana/ledger
accounts_dir: /srv/solana/ramdisk/accounts
ramdisk_mount: /srv/solana/ramdisk
ramdisk_device: /dev/ram0
snapshot_dir: /srv/kind/solana/snapshots
ledger_dir: /srv/kind/solana/ledger
accounts_dir: /srv/kind/solana/ramdisk/accounts
ramdisk_mount: /srv/kind/solana/ramdisk
ramdisk_size: 1024G
snapshot_script_local: "{{ playbook_dir }}/../scripts/snapshot-download.py"
snapshot_script: /tmp/snapshot-download.py
# Flags — non-destructive by default
@ -139,12 +139,9 @@
when: wipe_ledger | bool
tags: [wipe]
- name: Wipe accounts ramdisk (umount + mkfs.xfs + mount)
- name: Wipe accounts ramdisk
ansible.builtin.shell: |
set -o pipefail
mountpoint -q {{ ramdisk_mount }} && umount {{ ramdisk_mount }} || true
mkfs.xfs -f {{ ramdisk_device }}
mount {{ ramdisk_mount }}
rm -rf {{ accounts_dir }}/*
mkdir -p {{ accounts_dir }}
chown solana:solana {{ ramdisk_mount }} {{ accounts_dir }}
become: true

View File

@ -6,7 +6,7 @@
#
# Prerequisites:
# - biscayne-prepare-agave.yml has been run (fstab entries, systemd units)
# - A snapshot exists in /srv/solana/snapshots (or use biscayne-recover.yml)
# - A snapshot exists in /srv/kind/solana/snapshots (or use biscayne-recover.yml)
#
# Usage:
# ansible-playbook playbooks/biscayne-start.yml

View File

@ -211,7 +211,7 @@
# ------------------------------------------------------------------
- name: Check ramdisk usage
ansible.builtin.command:
cmd: df -h /srv/solana/ramdisk
cmd: df -h /srv/kind/solana/ramdisk
register: ramdisk_df
changed_when: false
failed_when: false
@ -238,7 +238,7 @@
cmd: >
set -o pipefail &&
findmnt -n -o TARGET,SOURCE,FSTYPE,PROPAGATION
/srv/solana /srv/solana/ramdisk /srv/kind/solana 2>&1
/srv/kind/solana /srv/kind/solana/ramdisk 2>&1
executable: /bin/bash
register: host_mounts
changed_when: false

View File

@ -9,8 +9,8 @@ Based on the discovery approach from etcusr/solana-snapshot-finder but replaces
the single-connection wget download with aria2c parallel chunked downloads.
Usage:
# Download to /srv/solana/snapshots (mainnet, 16 connections)
./snapshot-download.py -o /srv/solana/snapshots
# Download to /srv/kind/solana/snapshots (mainnet, 16 connections)
./snapshot-download.py -o /srv/kind/solana/snapshots
# Dry run — find best source, print URL
./snapshot-download.py --dry-run
@ -43,7 +43,6 @@ import urllib.request
from dataclasses import dataclass, field
from http.client import HTTPResponse
from pathlib import Path
from typing import NoReturn
from urllib.request import Request
log: logging.Logger = logging.getLogger("snapshot-download")
@ -192,16 +191,12 @@ def _parse_snapshot_filename(location: str) -> tuple[str, str | None]:
def probe_rpc_snapshot(
rpc_address: str,
current_slot: int,
max_age_slots: int,
max_latency_ms: float,
) -> SnapshotSource | None:
"""Probe a single RPC node for available snapshots.
Probes for full snapshot first (required), then incremental. Records all
available files. Which files to actually download is decided at download
time based on what already exists locally not here.
Based on the discovery approach from etcusr/solana-snapshot-finder.
Discovery only no filtering. Returns a SnapshotSource with all available
info so the caller can decide what to keep. Filtering happens after all
probes complete, so rejected sources are still visible for debugging.
"""
full_url: str = f"http://{rpc_address}/snapshot.tar.bz2"
@ -211,8 +206,6 @@ def probe_rpc_snapshot(
return None
latency_ms: float = full_latency * 1000
if latency_ms > max_latency_ms:
return None
full_filename, full_path = _parse_snapshot_filename(full_location)
fm: re.Match[str] | None = FULL_SNAP_RE.match(full_filename)
@ -222,9 +215,6 @@ def probe_rpc_snapshot(
full_snap_slot: int = int(fm.group(1))
slots_diff: int = current_slot - full_snap_slot
if slots_diff > max_age_slots or slots_diff < -100:
return None
file_paths: list[str] = [full_path]
# Also check for incremental snapshot
@ -255,7 +245,11 @@ def discover_sources(
threads: int,
version_filter: str | None,
) -> list[SnapshotSource]:
"""Discover all snapshot sources from the cluster."""
"""Discover all snapshot sources, then filter.
Probing and filtering are separate: all reachable sources are collected
first so we can report what exists even if filters reject everything.
"""
rpc_nodes: list[str] = get_cluster_rpc_nodes(rpc_url, version_filter)
if not rpc_nodes:
log.error("No RPC nodes found via getClusterNodes")
@ -263,31 +257,59 @@ def discover_sources(
log.info("Found %d RPC nodes, probing for snapshots...", len(rpc_nodes))
sources: list[SnapshotSource] = []
all_sources: list[SnapshotSource] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as pool:
futures: dict[concurrent.futures.Future[SnapshotSource | None], str] = {
pool.submit(
probe_rpc_snapshot, addr, current_slot,
max_age_slots, max_latency_ms,
): addr
pool.submit(probe_rpc_snapshot, addr, current_slot): addr
for addr in rpc_nodes
}
done: int = 0
for future in concurrent.futures.as_completed(futures):
done += 1
if done % 200 == 0:
log.info(" probed %d/%d nodes, %d sources found",
done, len(rpc_nodes), len(sources))
log.info(" probed %d/%d nodes, %d reachable",
done, len(rpc_nodes), len(all_sources))
try:
result: SnapshotSource | None = future.result()
except (urllib.error.URLError, OSError, TimeoutError) as e:
log.debug("Probe failed for %s: %s", futures[future], e)
continue
if result:
sources.append(result)
all_sources.append(result)
log.info("Found %d RPC nodes with suitable snapshots", len(sources))
return sources
log.info("Discovered %d reachable sources", len(all_sources))
# Apply filters
filtered: list[SnapshotSource] = []
rejected_age: int = 0
rejected_latency: int = 0
for src in all_sources:
if src.slots_diff > max_age_slots or src.slots_diff < -100:
rejected_age += 1
continue
if src.latency_ms > max_latency_ms:
rejected_latency += 1
continue
filtered.append(src)
if rejected_age or rejected_latency:
log.info("Filtered: %d rejected by age (>%d slots), %d by latency (>%.0fms)",
rejected_age, max_age_slots, rejected_latency, max_latency_ms)
if not filtered and all_sources:
# Show what was available so the user can adjust filters
all_sources.sort(key=lambda s: s.slots_diff)
best = all_sources[0]
log.warning("All %d sources rejected by filters. Best available: "
"%s (age=%d slots, latency=%.0fms). "
"Try --max-snapshot-age %d --max-latency %.0f",
len(all_sources), best.rpc_address,
best.slots_diff, best.latency_ms,
best.slots_diff + 500,
max(best.latency_ms * 1.5, 500))
log.info("Found %d sources after filtering", len(filtered))
return filtered
# -- Speed benchmark -----------------------------------------------------------
@ -336,7 +358,7 @@ def download_aria2c(
cmd: list[str] = [
"aria2c",
"--file-allocation=none",
"--continue=true",
"--continue=false",
f"--max-connection-per-server={connections}",
f"--split={total_splits}",
"--min-split-size=50M",
@ -380,97 +402,74 @@ def download_aria2c(
return True
# -- Main ----------------------------------------------------------------------
# -- Public API ----------------------------------------------------------------
def main() -> int:
p: argparse.ArgumentParser = argparse.ArgumentParser(
description="Download Solana snapshots with aria2c parallel downloads",
)
p.add_argument("-o", "--output", default="/srv/solana/snapshots",
help="Snapshot output directory (default: /srv/solana/snapshots)")
p.add_argument("-c", "--cluster", default="mainnet-beta",
choices=list(CLUSTER_RPC),
help="Solana cluster (default: mainnet-beta)")
p.add_argument("-r", "--rpc", default=None,
help="RPC URL for cluster discovery (default: public RPC)")
p.add_argument("-n", "--connections", type=int, default=16,
help="aria2c connections per download (default: 16)")
p.add_argument("-t", "--threads", type=int, default=500,
help="Threads for parallel RPC probing (default: 500)")
p.add_argument("--max-snapshot-age", type=int, default=1300,
help="Max snapshot age in slots (default: 1300)")
p.add_argument("--max-latency", type=float, default=100,
help="Max RPC probe latency in ms (default: 100)")
p.add_argument("--min-download-speed", type=int, default=20,
help="Min download speed in MiB/s (default: 20)")
p.add_argument("--measurement-time", type=int, default=7,
help="Speed measurement duration in seconds (default: 7)")
p.add_argument("--max-speed-checks", type=int, default=15,
help="Max nodes to benchmark before giving up (default: 15)")
p.add_argument("--version", default=None,
help="Filter nodes by version prefix (e.g. '2.2')")
p.add_argument("--full-only", action="store_true",
help="Download only full snapshot, skip incremental")
p.add_argument("--dry-run", action="store_true",
help="Find best source and print URL, don't download")
p.add_argument("-v", "--verbose", action="store_true")
args: argparse.Namespace = p.parse_args()
def download_best_snapshot(
output_dir: str,
*,
cluster: str = "mainnet-beta",
rpc_url: str | None = None,
connections: int = 16,
threads: int = 500,
max_snapshot_age: int = 10000,
max_latency: float = 500,
min_download_speed: int = 20,
measurement_time: int = 7,
max_speed_checks: int = 15,
version_filter: str | None = None,
full_only: bool = False,
) -> bool:
"""Download the best available snapshot to output_dir.
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
Programmatic API for use by entrypoint.py or other callers.
Returns True on success, False on failure.
"""
resolved_rpc: str = rpc_url or CLUSTER_RPC[cluster]
rpc_url: str = args.rpc or CLUSTER_RPC[args.cluster]
# aria2c is required for actual downloads (not dry-run)
if not args.dry_run and not shutil.which("aria2c"):
if not shutil.which("aria2c"):
log.error("aria2c not found. Install with: apt install aria2")
return 1
return False
# Get current slot
log.info("Cluster: %s | RPC: %s", args.cluster, rpc_url)
current_slot: int | None = get_current_slot(rpc_url)
log.info("Cluster: %s | RPC: %s", cluster, resolved_rpc)
current_slot: int | None = get_current_slot(resolved_rpc)
if current_slot is None:
log.error("Cannot get current slot from %s", rpc_url)
return 1
log.error("Cannot get current slot from %s", resolved_rpc)
return False
log.info("Current slot: %d", current_slot)
# Discover sources
sources: list[SnapshotSource] = discover_sources(
rpc_url, current_slot,
max_age_slots=args.max_snapshot_age,
max_latency_ms=args.max_latency,
threads=args.threads,
version_filter=args.version,
resolved_rpc, current_slot,
max_age_slots=max_snapshot_age,
max_latency_ms=max_latency,
threads=threads,
version_filter=version_filter,
)
if not sources:
log.error("No snapshot sources found")
return 1
return False
# Sort by latency (lowest first) for speed benchmarking
sources.sort(key=lambda s: s.latency_ms)
# Benchmark top candidates — all speeds in MiB/s (binary, 1 MiB = 1048576 bytes)
log.info("Benchmarking download speed on top %d sources...", args.max_speed_checks)
# Benchmark top candidates
log.info("Benchmarking download speed on top %d sources...", max_speed_checks)
fast_sources: list[SnapshotSource] = []
checked: int = 0
min_speed_bytes: int = args.min_download_speed * 1024 * 1024 # MiB to bytes
min_speed_bytes: int = min_download_speed * 1024 * 1024
for source in sources:
if checked >= args.max_speed_checks:
if checked >= max_speed_checks:
break
checked += 1
speed: float = measure_speed(source.rpc_address, args.measurement_time)
speed: float = measure_speed(source.rpc_address, measurement_time)
source.download_speed = speed
speed_mib: float = speed / (1024 ** 2)
if speed < min_speed_bytes:
log.info(" %s: %.1f MiB/s (too slow, need >=%d MiB/s)",
source.rpc_address, speed_mib, args.min_download_speed)
source.rpc_address, speed_mib, min_download_speed)
continue
log.info(" %s: %.1f MiB/s (latency: %.0fms, age: %d slots)",
@ -480,19 +479,17 @@ def main() -> int:
if not fast_sources:
log.error("No source met minimum speed requirement (%d MiB/s)",
args.min_download_speed)
log.info("Try: --min-download-speed 10")
return 1
min_download_speed)
return False
# Use the fastest source as primary, collect mirrors for each file
best: SnapshotSource = fast_sources[0]
file_paths: list[str] = best.file_paths
if args.full_only:
if full_only:
file_paths = [fp for fp in file_paths
if fp.rsplit("/", 1)[-1].startswith("snapshot-")]
# Build mirror URL lists: for each file, collect URLs from all fast sources
# that serve the same filename
# Build mirror URL lists
download_plan: list[tuple[str, list[str]]] = []
for fp in file_paths:
filename: str = fp.rsplit("/", 1)[-1]
@ -509,38 +506,130 @@ def main() -> int:
best.rpc_address, speed_mib, len(fast_sources))
for filename, mirror_urls in download_plan:
log.info(" %s (%d mirrors)", filename, len(mirror_urls))
for url in mirror_urls:
log.info(" %s", url)
if args.dry_run:
for _, mirror_urls in download_plan:
for url in mirror_urls:
print(url)
return 0
# Download — skip files that already exist locally
os.makedirs(args.output, exist_ok=True)
# Download
os.makedirs(output_dir, exist_ok=True)
total_start: float = time.monotonic()
for filename, mirror_urls in download_plan:
filepath: Path = Path(args.output) / filename
filepath: Path = Path(output_dir) / filename
if filepath.exists() and filepath.stat().st_size > 0:
log.info("Skipping %s (already exists: %.1f GB)",
filename, filepath.stat().st_size / (1024 ** 3))
continue
if not download_aria2c(mirror_urls, args.output, filename, args.connections):
if not download_aria2c(mirror_urls, output_dir, filename, connections):
log.error("Failed to download %s", filename)
return 1
return False
total_elapsed: float = time.monotonic() - total_start
log.info("All downloads complete in %.0fs", total_elapsed)
for filename, _ in download_plan:
fp: Path = Path(args.output) / filename
if fp.exists():
log.info(" %s (%.1f GB)", fp.name, fp.stat().st_size / (1024 ** 3))
fp_path: Path = Path(output_dir) / filename
if fp_path.exists():
log.info(" %s (%.1f GB)", fp_path.name, fp_path.stat().st_size / (1024 ** 3))
return True
# -- Main (CLI) ----------------------------------------------------------------
def main() -> int:
p: argparse.ArgumentParser = argparse.ArgumentParser(
description="Download Solana snapshots with aria2c parallel downloads",
)
p.add_argument("-o", "--output", default="/srv/kind/solana/snapshots",
help="Snapshot output directory (default: /srv/kind/solana/snapshots)")
p.add_argument("-c", "--cluster", default="mainnet-beta",
choices=list(CLUSTER_RPC),
help="Solana cluster (default: mainnet-beta)")
p.add_argument("-r", "--rpc", default=None,
help="RPC URL for cluster discovery (default: public RPC)")
p.add_argument("-n", "--connections", type=int, default=16,
help="aria2c connections per download (default: 16)")
p.add_argument("-t", "--threads", type=int, default=500,
help="Threads for parallel RPC probing (default: 500)")
p.add_argument("--max-snapshot-age", type=int, default=10000,
help="Max snapshot age in slots (default: 10000)")
p.add_argument("--max-latency", type=float, default=500,
help="Max RPC probe latency in ms (default: 500)")
p.add_argument("--min-download-speed", type=int, default=20,
help="Min download speed in MiB/s (default: 20)")
p.add_argument("--measurement-time", type=int, default=7,
help="Speed measurement duration in seconds (default: 7)")
p.add_argument("--max-speed-checks", type=int, default=15,
help="Max nodes to benchmark before giving up (default: 15)")
p.add_argument("--version", default=None,
help="Filter nodes by version prefix (e.g. '2.2')")
p.add_argument("--full-only", action="store_true",
help="Download only full snapshot, skip incremental")
p.add_argument("--dry-run", action="store_true",
help="Find best source and print URL, don't download")
p.add_argument("--post-cmd",
help="Shell command to run after successful download "
"(e.g. 'kubectl scale deployment ... --replicas=1')")
p.add_argument("-v", "--verbose", action="store_true")
args: argparse.Namespace = p.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
# Dry-run uses inline flow (needs access to sources for URL printing)
if args.dry_run:
rpc_url: str = args.rpc or CLUSTER_RPC[args.cluster]
current_slot: int | None = get_current_slot(rpc_url)
if current_slot is None:
log.error("Cannot get current slot from %s", rpc_url)
return 1
sources: list[SnapshotSource] = discover_sources(
rpc_url, current_slot,
max_age_slots=args.max_snapshot_age,
max_latency_ms=args.max_latency,
threads=args.threads,
version_filter=args.version,
)
if not sources:
log.error("No snapshot sources found")
return 1
sources.sort(key=lambda s: s.latency_ms)
best = sources[0]
for fp in best.file_paths:
print(f"http://{best.rpc_address}{fp}")
return 0
ok: bool = download_best_snapshot(
args.output,
cluster=args.cluster,
rpc_url=args.rpc,
connections=args.connections,
threads=args.threads,
max_snapshot_age=args.max_snapshot_age,
max_latency=args.max_latency,
min_download_speed=args.min_download_speed,
measurement_time=args.measurement_time,
max_speed_checks=args.max_speed_checks,
version_filter=args.version,
full_only=args.full_only,
)
if ok and args.post_cmd:
log.info("Running post-download command: %s", args.post_cmd)
result: subprocess.CompletedProcess[bytes] = subprocess.run(
args.post_cmd, shell=True,
)
if result.returncode != 0:
log.error("Post-download command failed with exit code %d",
result.returncode)
return 1
log.info("Post-download command completed successfully")
return 0 if ok else 1
if __name__ == "__main__":
sys.exit(main())