feat: snapshot leapfrog — auto-recovery when validator falls behind

Entrypoint changes:
- Always require full + incremental before starting (retry until found)
- Check incremental freshness against convergence threshold (500 slots)
- Gap monitor thread: if validator falls >5000 slots behind for 3
  consecutive checks, graceful stop + restart with fresh incremental
- cmd_serve is now a loop: download → run → monitor → leapfrog → repeat
- --no-snapshot-fetch moved to common args (both RPC and validator modes)
- --maximum-full-snapshots-to-retain default 1 (validator deletes
  downloaded full after generating its own)
- SNAPSHOT_MAX_AGE_SLOTS default 100000 (one full snapshot generation)

snapshot_download.py refactoring:
- Extract _discover_and_benchmark() and _rolling_incremental_download()
  as shared helpers
- Restore download_incremental_for_slot() using shared helpers (downloads
  only an incremental for an existing full snapshot)
- download_best_snapshot() uses shared helpers, downloads full then
  incremental as separate operations

The leapfrog cycle: validator generates full snapshots at standard 100k
block height intervals (same slots as the rest of the network). When the
gap monitor triggers, the entrypoint loops back to maybe_download_snapshot
which finds the validator's local full, downloads a fresh network
incremental (generated every ~40s, converges within the ~11hr full
generation window), and restarts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fix/kind-mount-propagation
A. F. Dudley 2026-03-10 05:53:56 +00:00
parent cd36bfe5ee
commit 3bf87a2e9b
2 changed files with 415 additions and 212 deletions

View File

@ -43,10 +43,13 @@ SNAPSHOTS_DIR = "/data/snapshots"
LOG_DIR = "/data/log" LOG_DIR = "/data/log"
IDENTITY_FILE = f"{CONFIG_DIR}/validator-identity.json" IDENTITY_FILE = f"{CONFIG_DIR}/validator-identity.json"
# Snapshot filename pattern # Snapshot filename patterns
FULL_SNAP_RE: re.Pattern[str] = re.compile( FULL_SNAP_RE: re.Pattern[str] = re.compile(
r"^snapshot-(\d+)-[A-Za-z0-9]+\.tar\.(zst|bz2)$" r"^snapshot-(\d+)-[A-Za-z0-9]+\.tar\.(zst|bz2)$"
) )
INCR_SNAP_RE: re.Pattern[str] = re.compile(
r"^incremental-snapshot-(\d+)-(\d+)-[A-Za-z0-9]+\.tar\.(zst|bz2)$"
)
MAINNET_RPC = "https://api.mainnet-beta.solana.com" MAINNET_RPC = "https://api.mainnet-beta.solana.com"
@ -124,51 +127,89 @@ def clean_snapshots(snapshots_dir: str) -> None:
entry.unlink(missing_ok=True) entry.unlink(missing_ok=True)
def get_incremental_slot(snapshots_dir: str, full_slot: int | None) -> int | None:
"""Get the highest incremental snapshot slot matching the full's base slot."""
if full_slot is None:
return None
snap_path = Path(snapshots_dir)
if not snap_path.is_dir():
return None
best: int | None = None
for entry in snap_path.iterdir():
m = INCR_SNAP_RE.match(entry.name)
if m and int(m.group(1)) == full_slot:
slot = int(m.group(2))
if best is None or slot > best:
best = slot
return best
def maybe_download_snapshot(snapshots_dir: str) -> None: def maybe_download_snapshot(snapshots_dir: str) -> None:
"""Check snapshot freshness and download if needed. """Ensure full + incremental snapshots exist before starting.
The validator should always start from a full + incremental pair to
minimize replay time. If either is missing or the full is too old,
download fresh ones via download_best_snapshot (which does rolling
incremental convergence after downloading the full).
Controlled by env vars: Controlled by env vars:
SNAPSHOT_AUTO_DOWNLOAD (default: true) enable/disable SNAPSHOT_AUTO_DOWNLOAD (default: true) enable/disable
SNAPSHOT_MAX_AGE_SLOTS (default: 20000) staleness threshold SNAPSHOT_MAX_AGE_SLOTS (default: 100000) full snapshot staleness threshold
(one full snapshot generation, ~11 hours)
""" """
if not env_bool("SNAPSHOT_AUTO_DOWNLOAD", default=True): if not env_bool("SNAPSHOT_AUTO_DOWNLOAD", default=True):
log.info("Snapshot auto-download disabled") log.info("Snapshot auto-download disabled")
return return
max_age = int(env("SNAPSHOT_MAX_AGE_SLOTS", "20000")) max_age = int(env("SNAPSHOT_MAX_AGE_SLOTS", "100000"))
# Get mainnet current slot
mainnet_slot = rpc_get_slot(MAINNET_RPC) mainnet_slot = rpc_get_slot(MAINNET_RPC)
if mainnet_slot is None: if mainnet_slot is None:
log.warning("Cannot reach mainnet RPC — skipping snapshot check") log.warning("Cannot reach mainnet RPC — skipping snapshot check")
return return
# Check local snapshot
local_slot = get_local_snapshot_slot(snapshots_dir)
if local_slot is not None:
age = mainnet_slot - local_slot
log.info("Local snapshot at slot %d (mainnet: %d, age: %d slots)",
local_slot, mainnet_slot, age)
if age <= max_age:
log.info("Snapshot is fresh enough (age %d <= %d), skipping download", age, max_age)
return
log.info("Snapshot is stale (age %d > %d), downloading fresh", age, max_age)
else:
log.info("No local snapshot found, downloading")
# Clean old snapshots before downloading
clean_snapshots(snapshots_dir)
# Import and call snapshot download
# snapshot_download.py is installed alongside this file in /usr/local/bin/
script_dir = Path(__file__).resolve().parent script_dir = Path(__file__).resolve().parent
sys.path.insert(0, str(script_dir)) sys.path.insert(0, str(script_dir))
from snapshot_download import download_best_snapshot from snapshot_download import download_best_snapshot, download_incremental_for_slot
convergence = int(env("SNAPSHOT_CONVERGENCE_SLOTS", "500")) convergence = int(env("SNAPSHOT_CONVERGENCE_SLOTS", "500"))
ok = download_best_snapshot(snapshots_dir, convergence_slots=convergence) retry_delay = int(env("SNAPSHOT_RETRY_DELAY", "60"))
if not ok:
log.error("Snapshot download failed — starting without fresh snapshot") # Check local full snapshot
local_slot = get_local_snapshot_slot(snapshots_dir)
have_fresh_full = (local_slot is not None
and (mainnet_slot - local_slot) <= max_age)
if have_fresh_full:
assert local_slot is not None
inc_slot = get_incremental_slot(snapshots_dir, local_slot)
if inc_slot is not None:
inc_gap = mainnet_slot - inc_slot
if inc_gap <= convergence:
log.info("Full (slot %d) + incremental (slot %d, gap %d) "
"within convergence, starting",
local_slot, inc_slot, inc_gap)
return
log.info("Incremental too stale (slot %d, gap %d > %d)",
inc_slot, inc_gap, convergence)
# Fresh full, need a fresh incremental
log.info("Downloading incremental for full at slot %d", local_slot)
while True:
if download_incremental_for_slot(snapshots_dir, local_slot,
convergence_slots=convergence):
return
log.warning("Incremental download failed — retrying in %ds",
retry_delay)
time.sleep(retry_delay)
# No full or full too old — download both
log.info("Downloading full + incremental")
clean_snapshots(snapshots_dir)
while True:
if download_best_snapshot(snapshots_dir, convergence_slots=convergence):
return
log.warning("Snapshot download failed — retrying in %ds", retry_delay)
time.sleep(retry_delay)
# -- Directory and identity setup ---------------------------------------------- # -- Directory and identity setup ----------------------------------------------
@ -230,6 +271,7 @@ def build_common_args() -> list[str]:
"--no-os-network-limits-test", "--no-os-network-limits-test",
"--wal-recovery-mode", "skip_any_corrupted_record", "--wal-recovery-mode", "skip_any_corrupted_record",
"--limit-ledger-size", env("LIMIT_LEDGER_SIZE", "50000000"), "--limit-ledger-size", env("LIMIT_LEDGER_SIZE", "50000000"),
"--no-snapshot-fetch", # entrypoint handles snapshot download
] ]
# Snapshot generation # Snapshot generation
@ -238,7 +280,7 @@ def build_common_args() -> list[str]:
else: else:
args += [ args += [
"--full-snapshot-interval-slots", env("SNAPSHOT_INTERVAL_SLOTS", "100000"), "--full-snapshot-interval-slots", env("SNAPSHOT_INTERVAL_SLOTS", "100000"),
"--maximum-full-snapshots-to-retain", env("MAXIMUM_SNAPSHOTS_TO_RETAIN", "5"), "--maximum-full-snapshots-to-retain", env("MAXIMUM_SNAPSHOTS_TO_RETAIN", "1"),
] ]
if env("NO_INCREMENTAL_SNAPSHOTS") != "true": if env("NO_INCREMENTAL_SNAPSHOTS") != "true":
args += ["--maximum-incremental-snapshots-to-retain", "2"] args += ["--maximum-incremental-snapshots-to-retain", "2"]
@ -309,7 +351,6 @@ def build_rpc_args() -> list[str]:
"--rpc-pubsub-enable-block-subscription", "--rpc-pubsub-enable-block-subscription",
"--enable-extended-tx-metadata-storage", "--enable-extended-tx-metadata-storage",
"--no-wait-for-vote-to-start-leader", "--no-wait-for-vote-to-start-leader",
"--no-snapshot-fetch",
] ]
# Public vs private RPC # Public vs private RPC
@ -379,7 +420,7 @@ def append_extra_args(args: list[str]) -> list[str]:
GRACEFUL_EXIT_TIMEOUT = 270 GRACEFUL_EXIT_TIMEOUT = 270
def graceful_exit(child: subprocess.Popen[bytes]) -> None: def graceful_exit(child: subprocess.Popen[bytes], reason: str = "SIGTERM") -> None:
"""Request graceful shutdown via the admin RPC Unix socket. """Request graceful shutdown via the admin RPC Unix socket.
Runs ``agave-validator exit --force --ledger /data/ledger`` which connects Runs ``agave-validator exit --force --ledger /data/ledger`` which connects
@ -390,7 +431,7 @@ def graceful_exit(child: subprocess.Popen[bytes]) -> None:
If the admin RPC exit fails or the child doesn't exit within the timeout, If the admin RPC exit fails or the child doesn't exit within the timeout,
falls back to SIGTERM then SIGKILL. falls back to SIGTERM then SIGKILL.
""" """
log.info("SIGTERM received — requesting graceful exit via admin RPC") log.info("%s — requesting graceful exit via admin RPC", reason)
try: try:
result = subprocess.run( result = subprocess.run(
["agave-validator", "exit", "--force", "--ledger", LEDGER_DIR], ["agave-validator", "exit", "--force", "--ledger", LEDGER_DIR],
@ -437,16 +478,69 @@ def graceful_exit(child: subprocess.Popen[bytes]) -> None:
# -- Serve subcommand --------------------------------------------------------- # -- Serve subcommand ---------------------------------------------------------
def cmd_serve() -> None: def _gap_monitor(
"""Main serve flow: snapshot check, setup, run agave-validator as child. child: subprocess.Popen[bytes],
leapfrog: threading.Event,
shutting_down: threading.Event,
) -> None:
"""Background thread: poll slot gap and trigger leapfrog if too far behind.
Python stays as PID 1 and traps SIGTERM to perform graceful shutdown Waits for a grace period (SNAPSHOT_MONITOR_GRACE, default 600s) before
via the admin RPC Unix socket. monitoring the validator needs time to extract snapshots and catch up.
Then polls every SNAPSHOT_MONITOR_INTERVAL (default 30s). If the gap
exceeds SNAPSHOT_LEAPFROG_SLOTS (default 5000) for SNAPSHOT_LEAPFROG_CHECKS
(default 3) consecutive checks, triggers graceful shutdown and sets the
leapfrog event so cmd_serve loops back to download a fresh incremental.
"""
threshold = int(env("SNAPSHOT_LEAPFROG_SLOTS", "5000"))
required_checks = int(env("SNAPSHOT_LEAPFROG_CHECKS", "3"))
interval = int(env("SNAPSHOT_MONITOR_INTERVAL", "30"))
grace = int(env("SNAPSHOT_MONITOR_GRACE", "600"))
rpc_port = env("RPC_PORT", "8899")
local_url = f"http://127.0.0.1:{rpc_port}"
# Grace period — don't monitor during initial catch-up
if shutting_down.wait(grace):
return
consecutive = 0
while not shutting_down.is_set():
local_slot = rpc_get_slot(local_url, timeout=5)
mainnet_slot = rpc_get_slot(MAINNET_RPC, timeout=10)
if local_slot is not None and mainnet_slot is not None:
gap = mainnet_slot - local_slot
if gap > threshold:
consecutive += 1
log.warning("Gap %d > %d (%d/%d consecutive)",
gap, threshold, consecutive, required_checks)
if consecutive >= required_checks:
log.warning("Leapfrog triggered: gap %d", gap)
leapfrog.set()
graceful_exit(child, reason="Leapfrog")
return
else:
if consecutive > 0:
log.info("Gap %d within threshold, resetting counter", gap)
consecutive = 0
shutting_down.wait(interval)
def cmd_serve() -> None:
"""Main serve flow: snapshot download, run validator, monitor gap, leapfrog.
Python stays as PID 1. On each iteration:
1. Download full + incremental snapshots (if needed)
2. Start agave-validator as child process
3. Monitor slot gap in background thread
4. If gap exceeds threshold graceful stop loop back to step 1
5. If SIGTERM graceful stop exit
6. If validator crashes exit with its return code
""" """
mode = env("AGAVE_MODE", "test") mode = env("AGAVE_MODE", "test")
log.info("AGAVE_MODE=%s", mode) log.info("AGAVE_MODE=%s", mode)
# Test mode dispatches to start-test.sh
if mode == "test": if mode == "test":
os.execvp("start-test.sh", ["start-test.sh"]) os.execvp("start-test.sh", ["start-test.sh"])
@ -454,47 +548,66 @@ def cmd_serve() -> None:
log.error("Unknown AGAVE_MODE: %s (valid: test, rpc, validator)", mode) log.error("Unknown AGAVE_MODE: %s (valid: test, rpc, validator)", mode)
sys.exit(1) sys.exit(1)
# Ensure directories # One-time setup
dirs = [CONFIG_DIR, LEDGER_DIR, ACCOUNTS_DIR, SNAPSHOTS_DIR] dirs = [CONFIG_DIR, LEDGER_DIR, ACCOUNTS_DIR, SNAPSHOTS_DIR]
if mode == "rpc": if mode == "rpc":
dirs.append(LOG_DIR) dirs.append(LOG_DIR)
ensure_dirs(*dirs) ensure_dirs(*dirs)
# Snapshot freshness check and auto-download if not env_bool("SKIP_IP_ECHO_PREFLIGHT"):
maybe_download_snapshot(SNAPSHOTS_DIR) script_dir = Path(__file__).resolve().parent
sys.path.insert(0, str(script_dir))
from ip_echo_preflight import main as ip_echo_main
if ip_echo_main() != 0:
sys.exit(1)
# Identity setup
if mode == "rpc": if mode == "rpc":
ensure_identity_rpc() ensure_identity_rpc()
print_identity() print_identity()
# Build args
if mode == "rpc": if mode == "rpc":
args = build_rpc_args() args = build_rpc_args()
else: else:
args = build_validator_args() args = build_validator_args()
args = append_extra_args(args) args = append_extra_args(args)
# Write startup timestamp for probe grace period # Main loop: download → run → monitor → leapfrog if needed
Path("/tmp/entrypoint-start").write_text(str(time.time())) while True:
maybe_download_snapshot(SNAPSHOTS_DIR)
log.info("Starting agave-validator with %d arguments", len(args)) Path("/tmp/entrypoint-start").write_text(str(time.time()))
child = subprocess.Popen(["agave-validator"] + args) log.info("Starting agave-validator with %d arguments", len(args))
child = subprocess.Popen(["agave-validator"] + args)
# Forward SIGUSR1 to child (log rotation) shutting_down = threading.Event()
signal.signal(signal.SIGUSR1, lambda _sig, _frame: child.send_signal(signal.SIGUSR1)) leapfrog = threading.Event()
# Trap SIGTERM — run graceful_exit in a thread so the signal handler returns signal.signal(signal.SIGUSR1,
# immediately and child.wait() in the main thread can observe the exit. lambda _sig, _frame: child.send_signal(signal.SIGUSR1))
def _on_sigterm(_sig: int, _frame: object) -> None:
threading.Thread(target=graceful_exit, args=(child,), daemon=True).start()
signal.signal(signal.SIGTERM, _on_sigterm) def _on_sigterm(_sig: int, _frame: object) -> None:
shutting_down.set()
threading.Thread(
target=graceful_exit, args=(child,), daemon=True,
).start()
# Wait for child — if it exits on its own (crash, normal exit), propagate code signal.signal(signal.SIGTERM, _on_sigterm)
child.wait()
sys.exit(child.returncode) # Start gap monitor
monitor = threading.Thread(
target=_gap_monitor,
args=(child, leapfrog, shutting_down),
daemon=True,
)
monitor.start()
child.wait()
if leapfrog.is_set():
log.info("Leapfrog: restarting with fresh incremental")
continue
sys.exit(child.returncode)
# -- Probe subcommand --------------------------------------------------------- # -- Probe subcommand ---------------------------------------------------------

View File

@ -461,9 +461,214 @@ def download_aria2c(
return True return True
# -- Shared helpers ------------------------------------------------------------
def _discover_and_benchmark(
rpc_url: str,
current_slot: int,
*,
max_snapshot_age: int = 10000,
max_latency: float = 500,
threads: int = 500,
min_download_speed: int = 20,
measurement_time: int = 7,
max_speed_checks: int = 15,
version_filter: str | None = None,
) -> list[SnapshotSource]:
"""Discover snapshot sources and benchmark download speed.
Returns sources that meet the minimum speed requirement, sorted by speed.
"""
sources: list[SnapshotSource] = discover_sources(
rpc_url, current_slot,
max_age_slots=max_snapshot_age,
max_latency_ms=max_latency,
threads=threads,
version_filter=version_filter,
)
if not sources:
return []
sources.sort(key=lambda s: s.latency_ms)
log.info("Benchmarking download speed on top %d sources...", max_speed_checks)
fast_sources: list[SnapshotSource] = []
checked: int = 0
min_speed_bytes: int = min_download_speed * 1024 * 1024
for source in sources:
if checked >= max_speed_checks:
break
checked += 1
speed: float = measure_speed(source.rpc_address, measurement_time)
source.download_speed = speed
speed_mib: float = speed / (1024 ** 2)
if speed < min_speed_bytes:
log.info(" %s: %.1f MiB/s (too slow, need >=%d MiB/s)",
source.rpc_address, speed_mib, min_download_speed)
continue
log.info(" %s: %.1f MiB/s (latency: %.0fms, age: %d slots)",
source.rpc_address, speed_mib,
source.latency_ms, source.slots_diff)
fast_sources.append(source)
return fast_sources
def _rolling_incremental_download(
fast_sources: list[SnapshotSource],
full_snap_slot: int,
output_dir: str,
convergence_slots: int,
connections: int,
rpc_url: str,
) -> str | None:
"""Download incrementals in a loop until converged.
Probes fast_sources for incrementals matching full_snap_slot, downloads
the freshest one, then re-probes until the gap to head is within
convergence_slots. Returns the filename of the final incremental,
or None if no incremental was found.
"""
prev_inc_filename: str | None = None
loop_start: float = time.monotonic()
max_convergence_time: float = 1800.0 # 30 min wall-clock limit
while True:
if time.monotonic() - loop_start > max_convergence_time:
if prev_inc_filename:
log.warning("Convergence timeout (%.0fs) — using %s",
max_convergence_time, prev_inc_filename)
else:
log.warning("Convergence timeout (%.0fs) — no incremental downloaded",
max_convergence_time)
break
inc_fn, inc_mirrors = probe_incremental(fast_sources, full_snap_slot)
if inc_fn is None:
if prev_inc_filename is None:
log.error("No matching incremental found for base slot %d",
full_snap_slot)
else:
log.info("No newer incremental available, using %s", prev_inc_filename)
break
m_inc: re.Match[str] | None = INCR_SNAP_RE.match(inc_fn)
assert m_inc is not None
inc_slot: int = int(m_inc.group(2))
head_slot: int | None = get_current_slot(rpc_url)
if head_slot is None:
log.warning("Cannot get current slot — downloading best available incremental")
gap: int = convergence_slots + 1
else:
gap = head_slot - inc_slot
if inc_fn == prev_inc_filename:
if gap <= convergence_slots:
log.info("Incremental %s already downloaded (gap %d slots, converged)",
inc_fn, gap)
break
log.info("No newer incremental yet (slot %d, gap %d slots), waiting...",
inc_slot, gap)
time.sleep(10)
continue
if prev_inc_filename is not None:
old_path: Path = Path(output_dir) / prev_inc_filename
if old_path.exists():
log.info("Removing superseded incremental %s", prev_inc_filename)
old_path.unlink()
log.info("Downloading incremental %s (%d mirrors, slot %d, gap %d slots)",
inc_fn, len(inc_mirrors), inc_slot, gap)
if not download_aria2c(inc_mirrors, output_dir, inc_fn, connections):
log.warning("Failed to download incremental %s — re-probing in 10s", inc_fn)
time.sleep(10)
continue
prev_inc_filename = inc_fn
if gap <= convergence_slots:
log.info("Converged: incremental slot %d is %d slots behind head",
inc_slot, gap)
break
if head_slot is None:
break
log.info("Not converged (gap %d > %d), re-probing in 10s...",
gap, convergence_slots)
time.sleep(10)
return prev_inc_filename
# -- Public API ---------------------------------------------------------------- # -- Public API ----------------------------------------------------------------
def download_incremental_for_slot(
output_dir: str,
full_snap_slot: int,
*,
cluster: str = "mainnet-beta",
rpc_url: str | None = None,
connections: int = 16,
threads: int = 500,
max_snapshot_age: int = 10000,
max_latency: float = 500,
min_download_speed: int = 20,
measurement_time: int = 7,
max_speed_checks: int = 15,
version_filter: str | None = None,
convergence_slots: int = 500,
) -> bool:
"""Download an incremental snapshot for an existing full snapshot.
Discovers sources, benchmarks speed, then runs the rolling incremental
download loop for the given full snapshot base slot. Does NOT download
a full snapshot.
Returns True if an incremental was downloaded, False otherwise.
"""
resolved_rpc: str = rpc_url or CLUSTER_RPC[cluster]
if not shutil.which("aria2c"):
log.error("aria2c not found. Install with: apt install aria2")
return False
log.info("Incremental download for base slot %d", full_snap_slot)
current_slot: int | None = get_current_slot(resolved_rpc)
if current_slot is None:
log.error("Cannot get current slot from %s", resolved_rpc)
return False
fast_sources: list[SnapshotSource] = _discover_and_benchmark(
resolved_rpc, current_slot,
max_snapshot_age=max_snapshot_age,
max_latency=max_latency,
threads=threads,
min_download_speed=min_download_speed,
measurement_time=measurement_time,
max_speed_checks=max_speed_checks,
version_filter=version_filter,
)
if not fast_sources:
log.error("No fast sources found")
return False
os.makedirs(output_dir, exist_ok=True)
result: str | None = _rolling_incremental_download(
fast_sources, full_snap_slot, output_dir,
convergence_slots, connections, resolved_rpc,
)
return result is not None
def download_best_snapshot( def download_best_snapshot(
output_dir: str, output_dir: str,
*, *,
@ -500,183 +705,68 @@ def download_best_snapshot(
return False return False
log.info("Current slot: %d", current_slot) log.info("Current slot: %d", current_slot)
sources: list[SnapshotSource] = discover_sources( fast_sources: list[SnapshotSource] = _discover_and_benchmark(
resolved_rpc, current_slot, resolved_rpc, current_slot,
max_age_slots=max_snapshot_age, max_snapshot_age=max_snapshot_age,
max_latency_ms=max_latency, max_latency=max_latency,
threads=threads, threads=threads,
min_download_speed=min_download_speed,
measurement_time=measurement_time,
max_speed_checks=max_speed_checks,
version_filter=version_filter, version_filter=version_filter,
) )
if not sources:
log.error("No snapshot sources found")
return False
# Sort by latency (lowest first) for speed benchmarking
sources.sort(key=lambda s: s.latency_ms)
# Benchmark top candidates
log.info("Benchmarking download speed on top %d sources...", max_speed_checks)
fast_sources: list[SnapshotSource] = []
checked: int = 0
min_speed_bytes: int = min_download_speed * 1024 * 1024
for source in sources:
if checked >= max_speed_checks:
break
checked += 1
speed: float = measure_speed(source.rpc_address, measurement_time)
source.download_speed = speed
speed_mib: float = speed / (1024 ** 2)
if speed < min_speed_bytes:
log.info(" %s: %.1f MiB/s (too slow, need >=%d MiB/s)",
source.rpc_address, speed_mib, min_download_speed)
continue
log.info(" %s: %.1f MiB/s (latency: %.0fms, age: %d slots)",
source.rpc_address, speed_mib,
source.latency_ms, source.slots_diff)
fast_sources.append(source)
if not fast_sources: if not fast_sources:
log.error("No source met minimum speed requirement (%d MiB/s)", log.error("No fast sources found")
min_download_speed)
return False return False
# Use the fastest source as primary, collect mirrors for each file # Use the fastest source as primary, build full snapshot download plan
best: SnapshotSource = fast_sources[0] best: SnapshotSource = fast_sources[0]
file_paths: list[str] = best.file_paths full_paths: list[str] = [fp for fp in best.file_paths
if full_only: if fp.rsplit("/", 1)[-1].startswith("snapshot-")]
file_paths = [fp for fp in file_paths if not full_paths:
if fp.rsplit("/", 1)[-1].startswith("snapshot-")] log.error("Best source has no full snapshot")
return False
# Build mirror URL lists # Build mirror URLs for the full snapshot
download_plan: list[tuple[str, list[str]]] = [] full_filename: str = full_paths[0].rsplit("/", 1)[-1]
for fp in file_paths: full_mirrors: list[str] = [f"http://{best.rpc_address}{full_paths[0]}"]
filename: str = fp.rsplit("/", 1)[-1] for other in fast_sources[1:]:
mirror_urls: list[str] = [f"http://{best.rpc_address}{fp}"] for other_fp in other.file_paths:
for other in fast_sources[1:]: if other_fp.rsplit("/", 1)[-1] == full_filename:
for other_fp in other.file_paths: full_mirrors.append(f"http://{other.rpc_address}{other_fp}")
if other_fp.rsplit("/", 1)[-1] == filename: break
mirror_urls.append(f"http://{other.rpc_address}{other_fp}")
break
download_plan.append((filename, mirror_urls))
speed_mib: float = best.download_speed / (1024 ** 2) speed_mib: float = best.download_speed / (1024 ** 2)
log.info("Best source: %s (%.1f MiB/s), %d mirrors total", log.info("Best source: %s (%.1f MiB/s), %d mirrors",
best.rpc_address, speed_mib, len(fast_sources)) best.rpc_address, speed_mib, len(full_mirrors))
for filename, mirror_urls in download_plan:
log.info(" %s (%d mirrors)", filename, len(mirror_urls))
# Download full snapshot first, then re-probe for fresh incremental # Download full snapshot
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
total_start: float = time.monotonic() total_start: float = time.monotonic()
# Separate full and incremental from the initial plan filepath: Path = Path(output_dir) / full_filename
full_downloads: list[tuple[str, list[str]]] = [] if filepath.exists() and filepath.stat().st_size > 0:
for filename, mirror_urls in download_plan: log.info("Skipping %s (already exists: %.1f GB)",
if filename.startswith("snapshot-"): full_filename, filepath.stat().st_size / (1024 ** 3))
full_downloads.append((filename, mirror_urls)) else:
if not download_aria2c(full_mirrors, output_dir, full_filename, connections):
# Download full snapshot(s) log.error("Failed to download %s", full_filename)
for filename, mirror_urls in full_downloads:
filepath: Path = Path(output_dir) / filename
if filepath.exists() and filepath.stat().st_size > 0:
log.info("Skipping %s (already exists: %.1f GB)",
filename, filepath.stat().st_size / (1024 ** 3))
continue
if not download_aria2c(mirror_urls, output_dir, filename, connections):
log.error("Failed to download %s", filename)
return False return False
# After full snapshot download, rolling incremental download loop. # Download incremental separately — the full download took minutes,
# The initial incremental is stale by now (full download takes 10+ min). # so any incremental from discovery is stale. Re-probe for fresh ones.
# Re-probe repeatedly until we find one close enough to head.
if not full_only: if not full_only:
full_filename: str = full_downloads[0][0] fm: re.Match[str] | None = FULL_SNAP_RE.match(full_filename)
fm_post: re.Match[str] | None = FULL_SNAP_RE.match(full_filename) if fm:
if fm_post: full_snap_slot: int = int(fm.group(1))
full_snap_slot: int = int(fm_post.group(1)) log.info("Downloading incremental for base slot %d...", full_snap_slot)
log.info("Rolling incremental download (base slot %d, convergence %d slots)...", _rolling_incremental_download(
full_snap_slot, convergence_slots) fast_sources, full_snap_slot, output_dir,
prev_inc_filename: str | None = None convergence_slots, connections, resolved_rpc,
loop_start: float = time.monotonic() )
max_convergence_time: float = 1800.0 # 30 min wall-clock limit
while True:
if time.monotonic() - loop_start > max_convergence_time:
if prev_inc_filename:
log.warning("Convergence timeout (%.0fs) — using %s",
max_convergence_time, prev_inc_filename)
else:
log.warning("Convergence timeout (%.0fs) — no incremental downloaded",
max_convergence_time)
break
inc_fn, inc_mirrors = probe_incremental(fast_sources, full_snap_slot)
if inc_fn is None:
if prev_inc_filename is None:
log.error("No matching incremental found for base slot %d "
"— validator will replay from full snapshot", full_snap_slot)
else:
log.info("No newer incremental available, using %s", prev_inc_filename)
break
# Parse the incremental slot from the filename
m_inc: re.Match[str] | None = INCR_SNAP_RE.match(inc_fn)
assert m_inc is not None # probe_incremental already validated
inc_slot: int = int(m_inc.group(2))
# Check convergence against current mainnet slot
head_slot: int | None = get_current_slot(resolved_rpc)
if head_slot is None:
log.warning("Cannot get current slot — downloading best available incremental")
gap: int = convergence_slots + 1 # force download, then break
else:
gap = head_slot - inc_slot
# Skip download if we already have this exact incremental
if inc_fn == prev_inc_filename:
if gap <= convergence_slots:
log.info("Incremental %s already downloaded (gap %d slots, converged)", inc_fn, gap)
break
log.info("No newer incremental yet (slot %d, gap %d slots), waiting...",
inc_slot, gap)
time.sleep(10)
continue
# Delete previous incremental before downloading the new one
if prev_inc_filename is not None:
old_path: Path = Path(output_dir) / prev_inc_filename
if old_path.exists():
log.info("Removing superseded incremental %s", prev_inc_filename)
old_path.unlink()
log.info("Downloading incremental %s (%d mirrors, slot %d, gap %d slots)",
inc_fn, len(inc_mirrors), inc_slot, gap)
if not download_aria2c(inc_mirrors, output_dir, inc_fn, connections):
log.warning("Failed to download incremental %s — re-probing in 10s", inc_fn)
time.sleep(10)
continue
prev_inc_filename = inc_fn
if gap <= convergence_slots:
log.info("Converged: incremental slot %d is %d slots behind head", inc_slot, gap)
break
if head_slot is None:
break
log.info("Not converged (gap %d > %d), re-probing in 10s...", gap, convergence_slots)
time.sleep(10)
total_elapsed: float = time.monotonic() - total_start total_elapsed: float = time.monotonic() - total_start
log.info("All downloads complete in %.0fs", total_elapsed) log.info("All downloads complete in %.0fs", total_elapsed)
for filename, _ in download_plan:
fp_path: Path = Path(output_dir) / filename
if fp_path.exists():
log.info(" %s (%.1f GB)", fp_path.name, fp_path.stat().st_size / (1024 ** 3))
return True return True