From b2342bc5391d4a7839cfbc2662aad877cd8cff42 Mon Sep 17 00:00:00 2001 From: "A. F. Dudley" Date: Sun, 8 Mar 2026 18:43:41 +0000 Subject: [PATCH] fix: switch ramdisk from /dev/ram0 to tmpfs, refactor snapshot-download.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The /dev/ram0 + XFS + format-ramdisk.service approach was unnecessary complexity from a migration confusion — there was no actual tmpfs bug with io_uring. tmpfs is simpler (no format-on-boot), resizable on the fly, and what every other Solana operator uses. Changes: - prepare-agave: remove format-ramdisk.service and ramdisk-accounts.service, use tmpfs fstab entry with size=1024G (was 600G /dev/ram0, too small) - recover: remove ramdisk_device var (no longer needed) - redeploy: wipe accounts by rm -rf instead of umount+mkfs - snapshot-download.py: extract download_best_snapshot() public API for use by the new container entrypoint.py (in agave-stack) - CLAUDE.md: update ramdisk docs, fix /srv/solana → /srv/kind/solana paths - health-check: fix ramdisk path references Co-Authored-By: Claude Opus 4.6 --- .gitignore | 1 + CLAUDE.md | 41 ++-- playbooks/biscayne-prepare-agave.yml | 231 +++++++------------- playbooks/biscayne-recover.yml | 7 +- playbooks/biscayne-redeploy.yml | 17 +- playbooks/biscayne-start.yml | 2 +- playbooks/health-check.yml | 4 +- scripts/snapshot-download.py | 313 +++++++++++++++++---------- 8 files changed, 314 insertions(+), 302 deletions(-) diff --git a/.gitignore b/.gitignore index 06aea24a..220c6a36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .venv/ sessions.duckdb sessions.duckdb.wal +.worktrees diff --git a/CLAUDE.md b/CLAUDE.md index 49fb6be9..6fb2164c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -14,9 +14,8 @@ below it are correct. Playbooks belong to exactly one layer. | 5. Deploy agave | Deploy agave-stack into kind, snapshot download, scale up | `biscayne-redeploy.yml` (snapshot/verify tags), `biscayne-recover.yml` | **Layer 4 invariants** (asserted by `biscayne-prepare-agave.yml`): -- `/srv/solana` is XFS on a zvol — agave uses io_uring which deadlocks on ZFS -- `/srv/solana/ramdisk` is XFS on `/dev/ram0` — accounts must be on ramdisk -- `/srv/kind/solana` is an rbind of `/srv/solana` — makes the zvol visible to kind at `/mnt/solana` +- `/srv/kind/solana` is XFS on a zvol — agave uses io_uring which deadlocks on ZFS. `/srv/solana` is NOT the zvol (it's a ZFS dataset directory); never use it for data paths +- `/srv/kind/solana/ramdisk` is tmpfs (1TB) — accounts must be in RAM These invariants are checked at runtime and persisted to fstab/systemd so they survive reboot. They are agave's requirements reaching into the boot sequence, @@ -58,18 +57,13 @@ Correct shutdown sequence: ### Ramdisk -The accounts directory must be on a ramdisk for performance. `/dev/ram0` loses its -filesystem on reboot and must be reformatted before mounting. +The accounts directory must be in RAM for performance. tmpfs is used instead of +`/dev/ram0` — simpler (no format-on-boot service needed), resizable on the fly +with `mount -o remount,size=`, and what most Solana operators use. -**Boot ordering is handled by systemd units** (installed by `biscayne-prepare-agave.yml`): -- `format-ramdisk.service`: runs `mkfs.xfs -f /dev/ram0` before `local-fs.target` -- fstab entry: mounts `/dev/ram0` at `/srv/solana/ramdisk` with - `x-systemd.requires=format-ramdisk.service` -- `ramdisk-accounts.service`: creates `/srv/solana/ramdisk/accounts` and sets - ownership after the mount - -These units run before docker, so the kind node's bind mounts always see the -ramdisk. **No manual intervention is needed after reboot.** +**Boot ordering**: fstab entry mounts tmpfs at `/srv/kind/solana/ramdisk` with +`x-systemd.requires=srv-kind-solana.mount`. tmpfs mounts natively via fstab — +no systemd format service needed. **No manual intervention after reboot.** **Mount propagation**: The kind node bind-mounts `/srv/kind` → `/mnt` at container start. laconic-so sets `propagation: HostToContainer` on all kind extraMounts @@ -139,10 +133,11 @@ kind node via a single bind mount. - Deployment: `laconic-70ce4c4b47e23b85-deployment` - Kind node container: `laconic-70ce4c4b47e23b85-control-plane` - Deployment dir: `/srv/deployments/agave` -- Snapshot dir: `/srv/solana/snapshots` -- Ledger dir: `/srv/solana/ledger` -- Accounts dir: `/srv/solana/ramdisk/accounts` -- Log dir: `/srv/solana/log` +- Snapshot dir: `/srv/kind/solana/snapshots` (on zvol, visible to kind at `/mnt/validator-snapshots`) +- Ledger dir: `/srv/kind/solana/ledger` (on zvol, visible to kind at `/mnt/validator-ledger`) +- Accounts dir: `/srv/kind/solana/ramdisk/accounts` (on ramdisk `/dev/ram0`, visible to kind at `/mnt/validator-accounts`) +- Log dir: `/srv/kind/solana/log` (on zvol, visible to kind at `/mnt/validator-log`) +- **WARNING**: `/srv/solana` is a ZFS dataset directory, NOT the zvol. Never use it for data paths. - Host bind mount root: `/srv/kind` -> kind node `/mnt` - laconic-so: `/home/rix/.local/bin/laconic-so` (editable install) @@ -150,10 +145,10 @@ kind node via a single bind mount. | PV Name | hostPath | |----------------------|-------------------------------| -| validator-snapshots | /mnt/solana/snapshots | -| validator-ledger | /mnt/solana/ledger | -| validator-accounts | /mnt/solana/ramdisk/accounts | -| validator-log | /mnt/solana/log | +| validator-snapshots | /mnt/validator-snapshots | +| validator-ledger | /mnt/validator-ledger | +| validator-accounts | /mnt/validator-accounts | +| validator-log | /mnt/validator-log | ### Snapshot Freshness @@ -164,7 +159,7 @@ try to catch up from an old snapshot — it will take too long and may never con Check with: ``` # Snapshot slot (from filename) -ls /srv/solana/snapshots/snapshot-*.tar.* +ls /srv/kind/solana/snapshots/snapshot-*.tar.* # Current mainnet slot curl -s -X POST -H "Content-Type: application/json" \ diff --git a/playbooks/biscayne-prepare-agave.yml b/playbooks/biscayne-prepare-agave.yml index a817f946..8f5944c4 100644 --- a/playbooks/biscayne-prepare-agave.yml +++ b/playbooks/biscayne-prepare-agave.yml @@ -10,26 +10,18 @@ # # Agave requires three things from the host that kind doesn't provide: # -# Invariant 1: /srv/solana is XFS on a zvol (not ZFS) +# Invariant 1: /srv/kind/solana is XFS on a zvol (not ZFS) # Why: agave uses io_uring for async I/O. io_uring workers deadlock on # ZFS datasets (D-state in dsl_dir_tempreserve_space). XFS on a zvol -# (block device) works fine. This is why the data lives on a zvol, not -# a ZFS dataset. -# Persisted as: fstab entry mounting /dev/zvol/.../solana at /srv/solana +# (block device) works fine. /srv/solana is NOT the zvol — it's a +# directory on the ZFS dataset biscayne/DATA/srv. All data paths must +# use /srv/kind/solana which is the actual zvol mount. +# Persisted as: fstab entry mounting /dev/zvol/.../solana at /srv/kind/solana # -# Invariant 2: /srv/solana/ramdisk is XFS on /dev/ram0 (600G ramdisk) -# Why: agave accounts must be on ramdisk for performance. /dev/ram0 -# loses its filesystem on reboot, so it must be reformatted before -# mounting each boot. -# Persisted as: format-ramdisk.service (mkfs before mount) + fstab entry -# -# Invariant 3: /srv/kind/solana is XFS (zvol) and /srv/kind/solana/ramdisk is XFS (ram0) -# Why: kind mounts /srv/kind → /mnt inside the kind node. PVs reference -# /mnt/solana/*. An rbind of /srv/solana does NOT work because ZFS's -# shared propagation (shared:75 on /srv) overlays ZFS on top of the bind. -# Direct device mounts bypass propagation entirely. -# Persisted as: two fstab entries — zvol at /srv/kind/solana, ram0 at -# /srv/kind/solana/ramdisk, both with x-systemd.requires ordering +# Invariant 2: /srv/kind/solana/ramdisk is tmpfs (1TB) +# Why: agave accounts must be in RAM for performance. tmpfs survives +# process restarts but not host reboots (same as /dev/ram0 but simpler). +# Persisted as: fstab entry (no format service needed) # # This playbook checks each invariant and only acts if it's not met. # Idempotent — safe to run multiple times. @@ -42,132 +34,76 @@ gather_facts: false become: true vars: - ramdisk_device: /dev/ram0 zvol_device: /dev/zvol/biscayne/DATA/volumes/solana - solana_dir: /srv/solana - ramdisk_mount: /srv/solana/ramdisk kind_solana_dir: /srv/kind/solana - accounts_dir: /srv/solana/ramdisk/accounts + ramdisk_mount: /srv/kind/solana/ramdisk + ramdisk_size: 1024G + accounts_dir: /srv/kind/solana/ramdisk/accounts deployment_dir: /srv/deployments/agave - kind_ramdisk_opts: "noatime,nodiratime,nofail,x-systemd.requires=format-ramdisk.service,x-systemd.requires=srv-kind-solana.mount" tasks: - # ---- systemd units ---------------------------------------------------------- - - name: Install ramdisk format service - ansible.builtin.copy: - dest: /etc/systemd/system/format-ramdisk.service - mode: "0644" - content: | - [Unit] - Description=Format /dev/ram0 as XFS for Solana accounts - DefaultDependencies=no - Before=local-fs.target - After=systemd-modules-load.service - ConditionPathExists={{ ramdisk_device }} + # ---- cleanup legacy ramdisk services ----------------------------------------- + - name: Stop and disable legacy ramdisk services + ansible.builtin.systemd: + name: "{{ item }}" + state: stopped + enabled: false + loop: + - format-ramdisk.service + - ramdisk-accounts.service + failed_when: false - [Service] - Type=oneshot - RemainAfterExit=yes - ExecStart=/sbin/mkfs.xfs -f {{ ramdisk_device }} - - [Install] - WantedBy=local-fs.target - register: unit_file - - - name: Install ramdisk post-mount service - ansible.builtin.copy: - dest: /etc/systemd/system/ramdisk-accounts.service - mode: "0644" - content: | - [Unit] - Description=Create Solana accounts directory on ramdisk - After=srv-solana-ramdisk.mount - Requires=srv-solana-ramdisk.mount - - [Service] - Type=oneshot - RemainAfterExit=yes - ExecStart=/bin/bash -c 'mkdir -p {{ accounts_dir }} && chown solana:solana {{ ramdisk_mount }} {{ accounts_dir }}' - - [Install] - WantedBy=multi-user.target - register: accounts_unit + - name: Remove legacy ramdisk service files + ansible.builtin.file: + path: "/etc/systemd/system/{{ item }}" + state: absent + loop: + - format-ramdisk.service + - ramdisk-accounts.service + register: legacy_units_removed # ---- fstab entries ---------------------------------------------------------- - - name: Ensure zvol fstab entry + # /srv/solana is NOT the zvol — it's a directory on the ZFS dataset. + # All data paths use /srv/kind/solana (the actual zvol mount). + - name: Remove stale /srv/solana zvol fstab entry ansible.builtin.lineinfile: path: /etc/fstab - regexp: '^\S+\s+{{ solana_dir }}\s' - line: '{{ zvol_device }} {{ solana_dir }} xfs defaults 0 2' - register: fstab_zvol + regexp: '^\S+\s+/srv/solana\s+xfs' + state: absent - - name: Ensure ramdisk fstab entry + - name: Remove stale /srv/solana/ramdisk fstab entry ansible.builtin.lineinfile: path: /etc/fstab - regexp: '^{{ ramdisk_device }}\s+{{ ramdisk_mount }}\s' - line: '{{ ramdisk_device }} {{ ramdisk_mount }} xfs noatime,nodiratime,nofail,x-systemd.requires=format-ramdisk.service 0 0' - register: fstab_ramdisk + regexp: '^/dev/ram0\s+' + state: absent - # Direct device mounts at /srv/kind/solana — bypasses ZFS shared propagation. - # An rbind of /srv/solana fails because ZFS's shared:75 on /srv overlays - # ZFS on top of any bind mount under /srv. Direct device mounts avoid this. - - name: Ensure kind zvol fstab entry - ansible.builtin.lineinfile: - path: /etc/fstab - regexp: '^\S+\s+{{ kind_solana_dir }}\s' - line: '{{ zvol_device }} {{ kind_solana_dir }} xfs defaults,nofail,x-systemd.requires=zfs-mount.service 0 0' - register: fstab_kind - - - name: Ensure kind ramdisk fstab entry - ansible.builtin.lineinfile: - path: /etc/fstab - regexp: '^\S+\s+{{ kind_solana_dir }}/ramdisk\s' - line: "{{ ramdisk_device }} {{ kind_solana_dir }}/ramdisk xfs {{ kind_ramdisk_opts }} 0 0" - register: fstab_kind_ramdisk - - # Remove stale rbind fstab entry from previous approach - name: Remove stale kind rbind fstab entry ansible.builtin.lineinfile: path: /etc/fstab regexp: '^\S+\s+{{ kind_solana_dir }}\s+none\s+rbind' state: absent - register: fstab_stale_rbind - # ---- reload and enable ------------------------------------------------------ + - name: Ensure zvol fstab entry + ansible.builtin.lineinfile: + path: /etc/fstab + regexp: '^\S+\s+{{ kind_solana_dir }}\s' + line: '{{ zvol_device }} {{ kind_solana_dir }} xfs defaults,nofail,x-systemd.requires=zfs-mount.service 0 0' + register: fstab_zvol + + - name: Ensure tmpfs ramdisk fstab entry + ansible.builtin.lineinfile: + path: /etc/fstab + regexp: '^\S+\s+{{ ramdisk_mount }}\s' + line: "tmpfs {{ ramdisk_mount }} tmpfs nodev,nosuid,noexec,nodiratime,size={{ ramdisk_size }},nofail,x-systemd.requires=srv-kind-solana.mount 0 0" + register: fstab_ramdisk + + # ---- reload systemd if anything changed -------------------------------------- - name: Reload systemd ansible.builtin.systemd: daemon_reload: true - when: >- - unit_file.changed or accounts_unit.changed or - fstab_zvol.changed or fstab_ramdisk.changed or - fstab_kind.changed or fstab_kind_ramdisk.changed or - fstab_stale_rbind.changed + when: legacy_units_removed.changed or fstab_zvol.changed or fstab_ramdisk.changed - - name: Enable ramdisk services - ansible.builtin.systemd: - name: "{{ item }}" - enabled: true - loop: - - format-ramdisk.service - - ramdisk-accounts.service - - # ---- apply now if ramdisk not mounted -------------------------------------- - - name: Check if ramdisk is mounted - ansible.builtin.command: mountpoint -q {{ ramdisk_mount }} - register: ramdisk_mounted - failed_when: false - changed_when: false - - - name: Format and mount ramdisk now - ansible.builtin.shell: | - mkfs.xfs -f {{ ramdisk_device }} - mount {{ ramdisk_mount }} - mkdir -p {{ accounts_dir }} - chown solana:solana {{ ramdisk_mount }} {{ accounts_dir }} - changed_when: ramdisk_mounted.rc != 0 - when: ramdisk_mounted.rc != 0 - - # ---- apply kind device mounts now if not correct ---------------------------- + # ---- apply device mounts now if not correct ---------------------------------- - name: Check kind zvol mount is XFS ansible.builtin.shell: cmd: > @@ -178,16 +114,16 @@ failed_when: false changed_when: false - - name: Unmount stale kind mounts + - name: Unmount stale mounts ansible.builtin.shell: cmd: | - umount {{ kind_solana_dir }}/ramdisk 2>/dev/null || true + umount {{ ramdisk_mount }} 2>/dev/null || true umount {{ kind_solana_dir }} 2>/dev/null || true executable: /bin/bash changed_when: kind_zvol_check.rc != 0 when: kind_zvol_check.rc != 0 - - name: Mount zvol at kind solana dir + - name: Mount zvol ansible.posix.mount: path: "{{ kind_solana_dir }}" src: "{{ zvol_device }}" @@ -195,24 +131,32 @@ state: mounted when: kind_zvol_check.rc != 0 - - name: Check kind ramdisk mount is XFS + - name: Check ramdisk mount is tmpfs ansible.builtin.shell: cmd: > set -o pipefail && - findmnt -n -o FSTYPE {{ kind_solana_dir }}/ramdisk | grep -q xfs + findmnt -n -o FSTYPE {{ ramdisk_mount }} | grep -q tmpfs executable: /bin/bash - register: kind_ramdisk_check + register: ramdisk_check failed_when: false changed_when: false - - name: Mount ramdisk at kind solana ramdisk dir + - name: Mount tmpfs ramdisk ansible.posix.mount: - path: "{{ kind_solana_dir }}/ramdisk" - src: "{{ ramdisk_device }}" - fstype: xfs - opts: noatime,nodiratime + path: "{{ ramdisk_mount }}" + src: tmpfs + fstype: tmpfs + opts: "nodev,nosuid,noexec,nodiratime,size={{ ramdisk_size }}" state: mounted - when: kind_ramdisk_check.rc != 0 + when: ramdisk_check.rc != 0 + + - name: Create accounts directory + ansible.builtin.file: + path: "{{ accounts_dir }}" + state: directory + owner: solana + group: solana + mode: "0755" # Docker requires shared propagation on mounts it bind-mounts into # containers. Without this, `docker start` fails with "not a shared @@ -227,36 +171,24 @@ changed_when: false # ---- verify ----------------------------------------------------------------- - - name: Verify ramdisk is XFS - ansible.builtin.shell: - cmd: set -o pipefail && df -T {{ ramdisk_mount }} | grep -q xfs - executable: /bin/bash - changed_when: false - - name: Verify zvol is XFS - ansible.builtin.shell: - cmd: set -o pipefail && df -T {{ solana_dir }} | grep -q xfs - executable: /bin/bash - changed_when: false - - - name: Verify kind zvol is XFS ansible.builtin.shell: cmd: set -o pipefail && df -T {{ kind_solana_dir }} | grep -q xfs executable: /bin/bash changed_when: false - - name: Verify kind ramdisk is XFS + - name: Verify ramdisk is tmpfs ansible.builtin.shell: - cmd: set -o pipefail && df -T {{ kind_solana_dir }}/ramdisk | grep -q xfs + cmd: set -o pipefail && df -T {{ ramdisk_mount }} | grep -q tmpfs executable: /bin/bash changed_when: false - - name: Verify kind mount contents + - name: Verify mount contents ansible.builtin.shell: cmd: > set -o pipefail && ls {{ kind_solana_dir }}/ledger {{ kind_solana_dir }}/snapshots - {{ kind_solana_dir }}/ramdisk/accounts 2>&1 | head -5 + {{ ramdisk_mount }}/accounts 2>&1 | head -5 executable: /bin/bash register: kind_mount_verify changed_when: false @@ -273,13 +205,12 @@ register: cluster_id_result changed_when: false - - name: Check kind node XFS visibility + - name: Check kind node filesystem visibility ansible.builtin.shell: cmd: > set -o pipefail && docker exec {{ cluster_id_result.stdout }}-control-plane df -T /mnt/validator-ledger /mnt/validator-accounts - | grep -c xfs executable: /bin/bash register: kind_fstype changed_when: false @@ -289,7 +220,7 @@ ansible.builtin.debug: msg: kind_mount: "{{ kind_mount_verify.stdout_lines }}" - kind_fstype: "{{ 'xfs (correct)' if kind_fstype.stdout | default('0') | int >= 2 else 'NOT XFS — kind restart required' }}" + kind_fstype: "{{ kind_fstype.stdout_lines | default([]) }}" - name: Configure Ashburn validator relay ansible.builtin.import_playbook: ashburn-relay-biscayne.yml diff --git a/playbooks/biscayne-recover.yml b/playbooks/biscayne-recover.yml index f8b9a89e..07388207 100644 --- a/playbooks/biscayne-recover.yml +++ b/playbooks/biscayne-recover.yml @@ -33,10 +33,9 @@ kind_cluster: laconic-70ce4c4b47e23b85 k8s_namespace: "laconic-{{ kind_cluster }}" deployment_name: "{{ kind_cluster }}-deployment" - snapshot_dir: /srv/solana/snapshots - accounts_dir: /srv/solana/ramdisk/accounts - ramdisk_mount: /srv/solana/ramdisk - ramdisk_device: /dev/ram0 + snapshot_dir: /srv/kind/solana/snapshots + accounts_dir: /srv/kind/solana/ramdisk/accounts + ramdisk_mount: /srv/kind/solana/ramdisk snapshot_script_local: "{{ playbook_dir }}/../scripts/snapshot-download.py" snapshot_script: /tmp/snapshot-download.py snapshot_args: "" diff --git a/playbooks/biscayne-redeploy.yml b/playbooks/biscayne-redeploy.yml index b6f263cd..608ec328 100644 --- a/playbooks/biscayne-redeploy.yml +++ b/playbooks/biscayne-redeploy.yml @@ -57,11 +57,11 @@ kind_cluster: laconic-70ce4c4b47e23b85 k8s_namespace: "laconic-{{ kind_cluster }}" deployment_name: "{{ kind_cluster }}-deployment" - snapshot_dir: /srv/solana/snapshots - ledger_dir: /srv/solana/ledger - accounts_dir: /srv/solana/ramdisk/accounts - ramdisk_mount: /srv/solana/ramdisk - ramdisk_device: /dev/ram0 + snapshot_dir: /srv/kind/solana/snapshots + ledger_dir: /srv/kind/solana/ledger + accounts_dir: /srv/kind/solana/ramdisk/accounts + ramdisk_mount: /srv/kind/solana/ramdisk + ramdisk_size: 1024G snapshot_script_local: "{{ playbook_dir }}/../scripts/snapshot-download.py" snapshot_script: /tmp/snapshot-download.py # Flags — non-destructive by default @@ -139,12 +139,9 @@ when: wipe_ledger | bool tags: [wipe] - - name: Wipe accounts ramdisk (umount + mkfs.xfs + mount) + - name: Wipe accounts ramdisk ansible.builtin.shell: | - set -o pipefail - mountpoint -q {{ ramdisk_mount }} && umount {{ ramdisk_mount }} || true - mkfs.xfs -f {{ ramdisk_device }} - mount {{ ramdisk_mount }} + rm -rf {{ accounts_dir }}/* mkdir -p {{ accounts_dir }} chown solana:solana {{ ramdisk_mount }} {{ accounts_dir }} become: true diff --git a/playbooks/biscayne-start.yml b/playbooks/biscayne-start.yml index 36220f4f..6c85699d 100644 --- a/playbooks/biscayne-start.yml +++ b/playbooks/biscayne-start.yml @@ -6,7 +6,7 @@ # # Prerequisites: # - biscayne-prepare-agave.yml has been run (fstab entries, systemd units) -# - A snapshot exists in /srv/solana/snapshots (or use biscayne-recover.yml) +# - A snapshot exists in /srv/kind/solana/snapshots (or use biscayne-recover.yml) # # Usage: # ansible-playbook playbooks/biscayne-start.yml diff --git a/playbooks/health-check.yml b/playbooks/health-check.yml index 55a5db34..cba75c2e 100644 --- a/playbooks/health-check.yml +++ b/playbooks/health-check.yml @@ -211,7 +211,7 @@ # ------------------------------------------------------------------ - name: Check ramdisk usage ansible.builtin.command: - cmd: df -h /srv/solana/ramdisk + cmd: df -h /srv/kind/solana/ramdisk register: ramdisk_df changed_when: false failed_when: false @@ -238,7 +238,7 @@ cmd: > set -o pipefail && findmnt -n -o TARGET,SOURCE,FSTYPE,PROPAGATION - /srv/solana /srv/solana/ramdisk /srv/kind/solana 2>&1 + /srv/kind/solana /srv/kind/solana/ramdisk 2>&1 executable: /bin/bash register: host_mounts changed_when: false diff --git a/scripts/snapshot-download.py b/scripts/snapshot-download.py index a8caddfc..c19830fe 100755 --- a/scripts/snapshot-download.py +++ b/scripts/snapshot-download.py @@ -9,8 +9,8 @@ Based on the discovery approach from etcusr/solana-snapshot-finder but replaces the single-connection wget download with aria2c parallel chunked downloads. Usage: - # Download to /srv/solana/snapshots (mainnet, 16 connections) - ./snapshot-download.py -o /srv/solana/snapshots + # Download to /srv/kind/solana/snapshots (mainnet, 16 connections) + ./snapshot-download.py -o /srv/kind/solana/snapshots # Dry run — find best source, print URL ./snapshot-download.py --dry-run @@ -43,7 +43,6 @@ import urllib.request from dataclasses import dataclass, field from http.client import HTTPResponse from pathlib import Path -from typing import NoReturn from urllib.request import Request log: logging.Logger = logging.getLogger("snapshot-download") @@ -192,16 +191,12 @@ def _parse_snapshot_filename(location: str) -> tuple[str, str | None]: def probe_rpc_snapshot( rpc_address: str, current_slot: int, - max_age_slots: int, - max_latency_ms: float, ) -> SnapshotSource | None: """Probe a single RPC node for available snapshots. - Probes for full snapshot first (required), then incremental. Records all - available files. Which files to actually download is decided at download - time based on what already exists locally — not here. - - Based on the discovery approach from etcusr/solana-snapshot-finder. + Discovery only — no filtering. Returns a SnapshotSource with all available + info so the caller can decide what to keep. Filtering happens after all + probes complete, so rejected sources are still visible for debugging. """ full_url: str = f"http://{rpc_address}/snapshot.tar.bz2" @@ -211,8 +206,6 @@ def probe_rpc_snapshot( return None latency_ms: float = full_latency * 1000 - if latency_ms > max_latency_ms: - return None full_filename, full_path = _parse_snapshot_filename(full_location) fm: re.Match[str] | None = FULL_SNAP_RE.match(full_filename) @@ -222,9 +215,6 @@ def probe_rpc_snapshot( full_snap_slot: int = int(fm.group(1)) slots_diff: int = current_slot - full_snap_slot - if slots_diff > max_age_slots or slots_diff < -100: - return None - file_paths: list[str] = [full_path] # Also check for incremental snapshot @@ -255,7 +245,11 @@ def discover_sources( threads: int, version_filter: str | None, ) -> list[SnapshotSource]: - """Discover all snapshot sources from the cluster.""" + """Discover all snapshot sources, then filter. + + Probing and filtering are separate: all reachable sources are collected + first so we can report what exists even if filters reject everything. + """ rpc_nodes: list[str] = get_cluster_rpc_nodes(rpc_url, version_filter) if not rpc_nodes: log.error("No RPC nodes found via getClusterNodes") @@ -263,31 +257,59 @@ def discover_sources( log.info("Found %d RPC nodes, probing for snapshots...", len(rpc_nodes)) - sources: list[SnapshotSource] = [] + all_sources: list[SnapshotSource] = [] with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as pool: futures: dict[concurrent.futures.Future[SnapshotSource | None], str] = { - pool.submit( - probe_rpc_snapshot, addr, current_slot, - max_age_slots, max_latency_ms, - ): addr + pool.submit(probe_rpc_snapshot, addr, current_slot): addr for addr in rpc_nodes } done: int = 0 for future in concurrent.futures.as_completed(futures): done += 1 if done % 200 == 0: - log.info(" probed %d/%d nodes, %d sources found", - done, len(rpc_nodes), len(sources)) + log.info(" probed %d/%d nodes, %d reachable", + done, len(rpc_nodes), len(all_sources)) try: result: SnapshotSource | None = future.result() except (urllib.error.URLError, OSError, TimeoutError) as e: log.debug("Probe failed for %s: %s", futures[future], e) continue if result: - sources.append(result) + all_sources.append(result) - log.info("Found %d RPC nodes with suitable snapshots", len(sources)) - return sources + log.info("Discovered %d reachable sources", len(all_sources)) + + # Apply filters + filtered: list[SnapshotSource] = [] + rejected_age: int = 0 + rejected_latency: int = 0 + for src in all_sources: + if src.slots_diff > max_age_slots or src.slots_diff < -100: + rejected_age += 1 + continue + if src.latency_ms > max_latency_ms: + rejected_latency += 1 + continue + filtered.append(src) + + if rejected_age or rejected_latency: + log.info("Filtered: %d rejected by age (>%d slots), %d by latency (>%.0fms)", + rejected_age, max_age_slots, rejected_latency, max_latency_ms) + + if not filtered and all_sources: + # Show what was available so the user can adjust filters + all_sources.sort(key=lambda s: s.slots_diff) + best = all_sources[0] + log.warning("All %d sources rejected by filters. Best available: " + "%s (age=%d slots, latency=%.0fms). " + "Try --max-snapshot-age %d --max-latency %.0f", + len(all_sources), best.rpc_address, + best.slots_diff, best.latency_ms, + best.slots_diff + 500, + max(best.latency_ms * 1.5, 500)) + + log.info("Found %d sources after filtering", len(filtered)) + return filtered # -- Speed benchmark ----------------------------------------------------------- @@ -336,7 +358,7 @@ def download_aria2c( cmd: list[str] = [ "aria2c", "--file-allocation=none", - "--continue=true", + "--continue=false", f"--max-connection-per-server={connections}", f"--split={total_splits}", "--min-split-size=50M", @@ -380,97 +402,74 @@ def download_aria2c( return True -# -- Main ---------------------------------------------------------------------- +# -- Public API ---------------------------------------------------------------- -def main() -> int: - p: argparse.ArgumentParser = argparse.ArgumentParser( - description="Download Solana snapshots with aria2c parallel downloads", - ) - p.add_argument("-o", "--output", default="/srv/solana/snapshots", - help="Snapshot output directory (default: /srv/solana/snapshots)") - p.add_argument("-c", "--cluster", default="mainnet-beta", - choices=list(CLUSTER_RPC), - help="Solana cluster (default: mainnet-beta)") - p.add_argument("-r", "--rpc", default=None, - help="RPC URL for cluster discovery (default: public RPC)") - p.add_argument("-n", "--connections", type=int, default=16, - help="aria2c connections per download (default: 16)") - p.add_argument("-t", "--threads", type=int, default=500, - help="Threads for parallel RPC probing (default: 500)") - p.add_argument("--max-snapshot-age", type=int, default=1300, - help="Max snapshot age in slots (default: 1300)") - p.add_argument("--max-latency", type=float, default=100, - help="Max RPC probe latency in ms (default: 100)") - p.add_argument("--min-download-speed", type=int, default=20, - help="Min download speed in MiB/s (default: 20)") - p.add_argument("--measurement-time", type=int, default=7, - help="Speed measurement duration in seconds (default: 7)") - p.add_argument("--max-speed-checks", type=int, default=15, - help="Max nodes to benchmark before giving up (default: 15)") - p.add_argument("--version", default=None, - help="Filter nodes by version prefix (e.g. '2.2')") - p.add_argument("--full-only", action="store_true", - help="Download only full snapshot, skip incremental") - p.add_argument("--dry-run", action="store_true", - help="Find best source and print URL, don't download") - p.add_argument("-v", "--verbose", action="store_true") - args: argparse.Namespace = p.parse_args() +def download_best_snapshot( + output_dir: str, + *, + cluster: str = "mainnet-beta", + rpc_url: str | None = None, + connections: int = 16, + threads: int = 500, + max_snapshot_age: int = 10000, + max_latency: float = 500, + min_download_speed: int = 20, + measurement_time: int = 7, + max_speed_checks: int = 15, + version_filter: str | None = None, + full_only: bool = False, +) -> bool: + """Download the best available snapshot to output_dir. - logging.basicConfig( - level=logging.DEBUG if args.verbose else logging.INFO, - format="%(asctime)s %(levelname)s %(message)s", - datefmt="%H:%M:%S", - ) + Programmatic API for use by entrypoint.py or other callers. + Returns True on success, False on failure. + """ + resolved_rpc: str = rpc_url or CLUSTER_RPC[cluster] - rpc_url: str = args.rpc or CLUSTER_RPC[args.cluster] - - # aria2c is required for actual downloads (not dry-run) - if not args.dry_run and not shutil.which("aria2c"): + if not shutil.which("aria2c"): log.error("aria2c not found. Install with: apt install aria2") - return 1 + return False - # Get current slot - log.info("Cluster: %s | RPC: %s", args.cluster, rpc_url) - current_slot: int | None = get_current_slot(rpc_url) + log.info("Cluster: %s | RPC: %s", cluster, resolved_rpc) + current_slot: int | None = get_current_slot(resolved_rpc) if current_slot is None: - log.error("Cannot get current slot from %s", rpc_url) - return 1 + log.error("Cannot get current slot from %s", resolved_rpc) + return False log.info("Current slot: %d", current_slot) - # Discover sources sources: list[SnapshotSource] = discover_sources( - rpc_url, current_slot, - max_age_slots=args.max_snapshot_age, - max_latency_ms=args.max_latency, - threads=args.threads, - version_filter=args.version, + resolved_rpc, current_slot, + max_age_slots=max_snapshot_age, + max_latency_ms=max_latency, + threads=threads, + version_filter=version_filter, ) if not sources: log.error("No snapshot sources found") - return 1 + return False # Sort by latency (lowest first) for speed benchmarking sources.sort(key=lambda s: s.latency_ms) - # Benchmark top candidates — all speeds in MiB/s (binary, 1 MiB = 1048576 bytes) - log.info("Benchmarking download speed on top %d sources...", args.max_speed_checks) + # Benchmark top candidates + log.info("Benchmarking download speed on top %d sources...", max_speed_checks) fast_sources: list[SnapshotSource] = [] checked: int = 0 - min_speed_bytes: int = args.min_download_speed * 1024 * 1024 # MiB to bytes + min_speed_bytes: int = min_download_speed * 1024 * 1024 for source in sources: - if checked >= args.max_speed_checks: + if checked >= max_speed_checks: break checked += 1 - speed: float = measure_speed(source.rpc_address, args.measurement_time) + speed: float = measure_speed(source.rpc_address, measurement_time) source.download_speed = speed speed_mib: float = speed / (1024 ** 2) if speed < min_speed_bytes: log.info(" %s: %.1f MiB/s (too slow, need >=%d MiB/s)", - source.rpc_address, speed_mib, args.min_download_speed) + source.rpc_address, speed_mib, min_download_speed) continue log.info(" %s: %.1f MiB/s (latency: %.0fms, age: %d slots)", @@ -480,19 +479,17 @@ def main() -> int: if not fast_sources: log.error("No source met minimum speed requirement (%d MiB/s)", - args.min_download_speed) - log.info("Try: --min-download-speed 10") - return 1 + min_download_speed) + return False # Use the fastest source as primary, collect mirrors for each file best: SnapshotSource = fast_sources[0] file_paths: list[str] = best.file_paths - if args.full_only: + if full_only: file_paths = [fp for fp in file_paths if fp.rsplit("/", 1)[-1].startswith("snapshot-")] - # Build mirror URL lists: for each file, collect URLs from all fast sources - # that serve the same filename + # Build mirror URL lists download_plan: list[tuple[str, list[str]]] = [] for fp in file_paths: filename: str = fp.rsplit("/", 1)[-1] @@ -509,37 +506,129 @@ def main() -> int: best.rpc_address, speed_mib, len(fast_sources)) for filename, mirror_urls in download_plan: log.info(" %s (%d mirrors)", filename, len(mirror_urls)) - for url in mirror_urls: - log.info(" %s", url) - if args.dry_run: - for _, mirror_urls in download_plan: - for url in mirror_urls: - print(url) - return 0 - - # Download — skip files that already exist locally - os.makedirs(args.output, exist_ok=True) + # Download + os.makedirs(output_dir, exist_ok=True) total_start: float = time.monotonic() for filename, mirror_urls in download_plan: - filepath: Path = Path(args.output) / filename + filepath: Path = Path(output_dir) / filename if filepath.exists() and filepath.stat().st_size > 0: log.info("Skipping %s (already exists: %.1f GB)", filename, filepath.stat().st_size / (1024 ** 3)) continue - if not download_aria2c(mirror_urls, args.output, filename, args.connections): + if not download_aria2c(mirror_urls, output_dir, filename, connections): log.error("Failed to download %s", filename) - return 1 + return False total_elapsed: float = time.monotonic() - total_start log.info("All downloads complete in %.0fs", total_elapsed) for filename, _ in download_plan: - fp: Path = Path(args.output) / filename - if fp.exists(): - log.info(" %s (%.1f GB)", fp.name, fp.stat().st_size / (1024 ** 3)) + fp_path: Path = Path(output_dir) / filename + if fp_path.exists(): + log.info(" %s (%.1f GB)", fp_path.name, fp_path.stat().st_size / (1024 ** 3)) - return 0 + return True + + +# -- Main (CLI) ---------------------------------------------------------------- + + +def main() -> int: + p: argparse.ArgumentParser = argparse.ArgumentParser( + description="Download Solana snapshots with aria2c parallel downloads", + ) + p.add_argument("-o", "--output", default="/srv/kind/solana/snapshots", + help="Snapshot output directory (default: /srv/kind/solana/snapshots)") + p.add_argument("-c", "--cluster", default="mainnet-beta", + choices=list(CLUSTER_RPC), + help="Solana cluster (default: mainnet-beta)") + p.add_argument("-r", "--rpc", default=None, + help="RPC URL for cluster discovery (default: public RPC)") + p.add_argument("-n", "--connections", type=int, default=16, + help="aria2c connections per download (default: 16)") + p.add_argument("-t", "--threads", type=int, default=500, + help="Threads for parallel RPC probing (default: 500)") + p.add_argument("--max-snapshot-age", type=int, default=10000, + help="Max snapshot age in slots (default: 10000)") + p.add_argument("--max-latency", type=float, default=500, + help="Max RPC probe latency in ms (default: 500)") + p.add_argument("--min-download-speed", type=int, default=20, + help="Min download speed in MiB/s (default: 20)") + p.add_argument("--measurement-time", type=int, default=7, + help="Speed measurement duration in seconds (default: 7)") + p.add_argument("--max-speed-checks", type=int, default=15, + help="Max nodes to benchmark before giving up (default: 15)") + p.add_argument("--version", default=None, + help="Filter nodes by version prefix (e.g. '2.2')") + p.add_argument("--full-only", action="store_true", + help="Download only full snapshot, skip incremental") + p.add_argument("--dry-run", action="store_true", + help="Find best source and print URL, don't download") + p.add_argument("--post-cmd", + help="Shell command to run after successful download " + "(e.g. 'kubectl scale deployment ... --replicas=1')") + p.add_argument("-v", "--verbose", action="store_true") + args: argparse.Namespace = p.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", + ) + + # Dry-run uses inline flow (needs access to sources for URL printing) + if args.dry_run: + rpc_url: str = args.rpc or CLUSTER_RPC[args.cluster] + current_slot: int | None = get_current_slot(rpc_url) + if current_slot is None: + log.error("Cannot get current slot from %s", rpc_url) + return 1 + + sources: list[SnapshotSource] = discover_sources( + rpc_url, current_slot, + max_age_slots=args.max_snapshot_age, + max_latency_ms=args.max_latency, + threads=args.threads, + version_filter=args.version, + ) + if not sources: + log.error("No snapshot sources found") + return 1 + + sources.sort(key=lambda s: s.latency_ms) + best = sources[0] + for fp in best.file_paths: + print(f"http://{best.rpc_address}{fp}") + return 0 + + ok: bool = download_best_snapshot( + args.output, + cluster=args.cluster, + rpc_url=args.rpc, + connections=args.connections, + threads=args.threads, + max_snapshot_age=args.max_snapshot_age, + max_latency=args.max_latency, + min_download_speed=args.min_download_speed, + measurement_time=args.measurement_time, + max_speed_checks=args.max_speed_checks, + version_filter=args.version, + full_only=args.full_only, + ) + + if ok and args.post_cmd: + log.info("Running post-download command: %s", args.post_cmd) + result: subprocess.CompletedProcess[bytes] = subprocess.run( + args.post_cmd, shell=True, + ) + if result.returncode != 0: + log.error("Post-download command failed with exit code %d", + result.returncode) + return 1 + log.info("Post-download command completed successfully") + + return 0 if ok else 1 if __name__ == "__main__":