diff --git a/scripts/agave-container/entrypoint.py b/scripts/agave-container/entrypoint.py index 1122fc9c..519c7be2 100644 --- a/scripts/agave-container/entrypoint.py +++ b/scripts/agave-container/entrypoint.py @@ -158,7 +158,8 @@ def maybe_download_snapshot(snapshots_dir: str) -> None: sys.path.insert(0, str(script_dir)) from snapshot_download import download_best_snapshot - ok = download_best_snapshot(snapshots_dir) + convergence = int(env("SNAPSHOT_CONVERGENCE_SLOTS", "500")) + ok = download_best_snapshot(snapshots_dir, convergence_slots=convergence) if not ok: log.error("Snapshot download failed — starting without fresh snapshot") diff --git a/scripts/agave-container/snapshot_download.py b/scripts/agave-container/snapshot_download.py index 9f9137ac..0abaa02a 100644 --- a/scripts/agave-container/snapshot_download.py +++ b/scripts/agave-container/snapshot_download.py @@ -343,6 +343,61 @@ def measure_speed(rpc_address: str, measure_time: int = 7) -> float: return 0.0 +# -- Incremental probing ------------------------------------------------------- + + +def probe_incremental( + fast_sources: list[SnapshotSource], + full_snap_slot: int, +) -> tuple[str | None, list[str]]: + """Probe fast sources for the best incremental matching full_snap_slot. + + Returns (filename, mirror_urls) or (None, []) if no match found. + The "best" incremental is the one with the highest slot (closest to head). + """ + best_filename: str | None = None + best_slot: int = 0 + best_source: SnapshotSource | None = None + best_path: str | None = None + + for source in fast_sources: + inc_url: str = f"http://{source.rpc_address}/incremental-snapshot.tar.bz2" + inc_location, _ = head_no_follow(inc_url, timeout=2) + if not inc_location: + continue + inc_fn, inc_fp = _parse_snapshot_filename(inc_location) + m: re.Match[str] | None = INCR_SNAP_RE.match(inc_fn) + if not m: + continue + if int(m.group(1)) != full_snap_slot: + log.debug(" %s: incremental base slot %s != full %d, skipping", + source.rpc_address, m.group(1), full_snap_slot) + continue + inc_slot: int = int(m.group(2)) + if inc_slot > best_slot: + best_slot = inc_slot + best_filename = inc_fn + best_source = source + best_path = inc_fp + + if best_filename is None or best_source is None or best_path is None: + return None, [] + + # Build mirror list — check other sources for the same filename + mirror_urls: list[str] = [f"http://{best_source.rpc_address}{best_path}"] + for other in fast_sources: + if other.rpc_address == best_source.rpc_address: + continue + other_loc, _ = head_no_follow( + f"http://{other.rpc_address}/incremental-snapshot.tar.bz2", timeout=2) + if other_loc: + other_fn, other_fp = _parse_snapshot_filename(other_loc) + if other_fn == best_filename: + mirror_urls.append(f"http://{other.rpc_address}{other_fp}") + + return best_filename, mirror_urls + + # -- Download ------------------------------------------------------------------ @@ -423,6 +478,7 @@ def download_best_snapshot( max_speed_checks: int = 15, version_filter: str | None = None, full_only: bool = False, + convergence_slots: int = 500, ) -> bool: """Download the best available snapshot to output_dir. @@ -534,46 +590,75 @@ def download_best_snapshot( log.error("Failed to download %s", filename) return False - # After full snapshot download, re-probe for a fresh incremental. + # After full snapshot download, rolling incremental download loop. # The initial incremental is stale by now (full download takes 10+ min). + # Re-probe repeatedly until we find one close enough to head. if not full_only: - # Get the full snapshot slot from the filename we just downloaded full_filename: str = full_downloads[0][0] fm_post: re.Match[str] | None = FULL_SNAP_RE.match(full_filename) if fm_post: full_snap_slot: int = int(fm_post.group(1)) - log.info("Re-probing for fresh incremental based on slot %d...", full_snap_slot) - inc_downloaded: bool = False - for source in fast_sources: - inc_url_re: str = f"http://{source.rpc_address}/incremental-snapshot.tar.bz2" - inc_location, _ = head_no_follow(inc_url_re, timeout=2) - if not inc_location: - continue - inc_fn, inc_fp = _parse_snapshot_filename(inc_location) + log.info("Rolling incremental download (base slot %d, convergence %d slots)...", + full_snap_slot, convergence_slots) + prev_inc_filename: str | None = None + + while True: + inc_fn, inc_mirrors = probe_incremental(fast_sources, full_snap_slot) + if inc_fn is None: + if prev_inc_filename is None: + log.error("No matching incremental found for base slot %d " + "— validator will replay from full snapshot", full_snap_slot) + else: + log.info("No newer incremental available, using %s", prev_inc_filename) + break + + # Parse the incremental slot from the filename m_inc: re.Match[str] | None = INCR_SNAP_RE.match(inc_fn) - if not m_inc: + assert m_inc is not None # probe_incremental already validated + inc_slot: int = int(m_inc.group(2)) + + # Check convergence against current mainnet slot + head_slot: int | None = get_current_slot(resolved_rpc) + if head_slot is None: + log.warning("Cannot get current slot — downloading best available incremental") + gap: int = convergence_slots + 1 # force download, then break + else: + gap = head_slot - inc_slot + + # Skip download if we already have this exact incremental + if inc_fn == prev_inc_filename: + if gap <= convergence_slots: + log.info("Incremental %s already downloaded (gap %d slots, converged)", inc_fn, gap) + break + log.info("No newer incremental yet (slot %d, gap %d slots), waiting...", + inc_slot, gap) + time.sleep(10) continue - if int(m_inc.group(1)) != full_snap_slot: - log.debug(" %s: incremental base slot %s != full %d, skipping", - source.rpc_address, m_inc.group(1), full_snap_slot) - continue - # Found a matching incremental — build mirror list and download - inc_mirrors: list[str] = [f"http://{source.rpc_address}{inc_fp}"] - for other in fast_sources: - if other.rpc_address == source.rpc_address: - continue - other_loc, _ = head_no_follow( - f"http://{other.rpc_address}/incremental-snapshot.tar.bz2", timeout=2) - if other_loc: - other_fn, other_fp = _parse_snapshot_filename(other_loc) - if other_fn == inc_fn: - inc_mirrors.append(f"http://{other.rpc_address}{other_fp}") - log.info(" Found incremental %s (%d mirrors)", inc_fn, len(inc_mirrors)) - if download_aria2c(inc_mirrors, output_dir, inc_fn, connections): - inc_downloaded = True - break - if not inc_downloaded: - log.info("No matching incremental found — validator will replay from full snapshot") + + # Delete previous incremental before downloading the new one + if prev_inc_filename is not None: + old_path: Path = Path(output_dir) / prev_inc_filename + if old_path.exists(): + log.info("Removing superseded incremental %s", prev_inc_filename) + old_path.unlink() + + log.info("Downloading incremental %s (%d mirrors, slot %d, gap %d slots)", + inc_fn, len(inc_mirrors), inc_slot, gap) + if not download_aria2c(inc_mirrors, output_dir, inc_fn, connections): + log.error("Failed to download incremental %s", inc_fn) + break + + prev_inc_filename = inc_fn + + if gap <= convergence_slots: + log.info("Converged: incremental slot %d is %d slots behind head", inc_slot, gap) + break + + if head_slot is None: + break + + log.info("Not converged (gap %d > %d), re-probing in 10s...", gap, convergence_slots) + time.sleep(10) total_elapsed: float = time.monotonic() - total_start log.info("All downloads complete in %.0fs", total_elapsed) @@ -615,6 +700,8 @@ def main() -> int: help="Max nodes to benchmark before giving up (default: 15)") p.add_argument("--version", default=None, help="Filter nodes by version prefix (e.g. '2.2')") + p.add_argument("--convergence-slots", type=int, default=500, + help="Max slot gap for incremental convergence (default: 500)") p.add_argument("--full-only", action="store_true", help="Download only full snapshot, skip incremental") p.add_argument("--dry-run", action="store_true", @@ -669,6 +756,7 @@ def main() -> int: max_speed_checks=args.max_speed_checks, version_filter=args.version, full_only=args.full_only, + convergence_slots=args.convergence_slots, ) if ok and args.post_cmd: