Baladithya Balamurugan Claude Opus 4.8 (1M context) commited on
Commit
7d9dbbc
·
1 Parent(s): bd0c358

Wave 3 cleanup: close deferred-LOW review items R5/R6/R11

Browse files

- R5: EKSExecutor.cancel + SageMakerExecutor.cancel now re-raise genuinely
unexpected errors instead of swallowing ALL exceptions; only already-terminated
signals are idempotent no-ops (EKS: 404/409; SM: ResourceNotFound +
already-terminal ValidationException via new _is_already_terminal). + a test
that an AccessDenied-class error propagates.
- R6: EKSExecutor.collect() result dicts include a 'result' key (the S3
rendezvous URI, or None) for cross-backend shape uniformity with
Local/Modal/SageMaker.
- R11: seed torch.manual_seed(0) in spike-006 test_alternating_batches_loss_
decreases so the loss-trend assertion is reproducible and no longer flaky
under CPU contention (it failed only in the contended full suite, passed in
isolation). Root cause was unseeded nondeterminism, not a real regression.

All Wave-3 review findings (R1-R11) now closed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

composer_replication/diloco/serverless/eks.py CHANGED
@@ -592,12 +592,14 @@ class EKSExecutor:
592
  ),
593
  )
594
  except ApiException as e:
595
- if getattr(e, "status", None) == 404:
596
- return # already deleted
597
- # Best-effort: swallow other API errors (network blip, etc.).
598
- return
599
- except Exception:
600
- return
 
 
601
 
602
  def collect(
603
  self,
@@ -668,6 +670,12 @@ class EKSExecutor:
668
  "exit_code": exit_code,
669
  "error": error,
670
  "job_name": handle.metadata.get("job_name"),
 
 
 
 
 
 
671
  }
672
 
673
 
 
592
  ),
593
  )
594
  except ApiException as e:
595
+ # R5: swallow ONLY already-terminated signals (404 Not Found, 409
596
+ # Conflict on a job mid-deletion). A genuinely unexpected API error
597
+ # (403 forbidden, 500, malformed request) must NOT be reported as a
598
+ # successful cancel — re-raise so a real teardown failure (leaking
599
+ # GPU-burning pods) is visible rather than silently swallowed.
600
+ if getattr(e, "status", None) in (404, 409):
601
+ return # already deleted / mid-deletion — idempotent no-op
602
+ raise
603
 
604
  def collect(
605
  self,
 
670
  "exit_code": exit_code,
671
  "error": error,
672
  "job_name": handle.metadata.get("job_name"),
673
+ # R6: cross-backend uniformity with Local/Modal/SageMaker collect()
674
+ # shapes. EKS replicas write their real output to the S3 rendezvous
675
+ # (ObjectStoreAllReduce), not back through the k8s API, so the Job
676
+ # status carries no in-band payload — the value is the rendezvous
677
+ # URI when known (callers read the artifact from S3), else None.
678
+ "result": handle.metadata.get("rendezvous_uri"),
679
  }
680
 
681
 
composer_replication/diloco/serverless/sagemaker.py CHANGED
@@ -469,10 +469,16 @@ class SageMakerExecutor:
469
  return
470
  try:
471
  self._client.stop_training_job(TrainingJobName=meta["job_name"])
472
- except Exception:
473
- # ResourceNotFound, already-Completed/Stopped ValidationException,
474
- # transient network blip — all best-effort no-ops.
475
- pass
 
 
 
 
 
 
476
 
477
  def collect(
478
  self,
@@ -615,5 +621,25 @@ class SageMakerExecutor:
615
  return True
616
  return False
617
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
618
 
619
  __all__ = ["SageMakerExecutor"]
 
469
  return
470
  try:
471
  self._client.stop_training_job(TrainingJobName=meta["job_name"])
472
+ except Exception as e:
473
+ # R5: swallow ONLY already-terminated signals — a vanished job
474
+ # (ResourceNotFound) or an already-Completed/Stopped job (boto3
475
+ # raises ValidationException for "cannot stop a job in status X").
476
+ # A genuinely unexpected error (AccessDenied, throttling that
477
+ # outlived retries, malformed request) must propagate rather than
478
+ # masquerade as a successful cancel.
479
+ if self._is_resource_not_found(e) or self._is_already_terminal(e):
480
+ return
481
+ raise
482
 
483
  def collect(
484
  self,
 
621
  return True
622
  return False
623
 
624
+ def _is_already_terminal(self, exc: Exception) -> bool:
625
+ """True if ``exc`` is the boto3 "cannot stop a job in status X" error.
626
+
627
+ ``stop_training_job`` raises a ``ValidationException`` when the job is
628
+ already Completed/Failed/Stopped — that is an idempotent no-op for
629
+ cancel(), distinct from a genuinely unexpected error. Matched on the
630
+ ClientError code + message text (robust to a mock raising a plain
631
+ Exception whose message carries the phrase).
632
+ """
633
+ resp = getattr(exc, "response", None)
634
+ if isinstance(resp, Mapping):
635
+ err = resp.get("Error", {})
636
+ if err.get("Code") == "ValidationException":
637
+ return True
638
+ msg = str(exc).lower()
639
+ return (
640
+ "cannot be stopped" in msg
641
+ or "already" in msg and ("stopped" in msg or "complete" in msg or "terminal" in msg)
642
+ )
643
+
644
 
645
  __all__ = ["SageMakerExecutor"]
composer_replication/diloco/serverless/tests/test_sagemaker_executor.py CHANGED
@@ -242,3 +242,16 @@ def test_cancel_swallows_errors():
242
  ex.cancel(h) # must not raise
243
  # unknown handle must also be a no-op
244
  ex.cancel(ReplicaHandle(rank=42, backend_name="sagemaker", metadata={}))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
242
  ex.cancel(h) # must not raise
243
  # unknown handle must also be a no-op
244
  ex.cancel(ReplicaHandle(rank=42, backend_name="sagemaker", metadata={}))
245
+
246
+
247
+ def test_cancel_reraises_unexpected_error():
248
+ """R5: a genuinely unexpected error (not already-terminated) must propagate,
249
+ not be silently swallowed as a successful cancel."""
250
+ class _BoomClient(_MockSMClient):
251
+ def stop_training_job(self, TrainingJobName): # noqa: N803
252
+ raise RuntimeError("AccessDeniedException: not authorized")
253
+ client = _BoomClient()
254
+ ex = _make_executor(client)
255
+ h = ex.launch_replicas(1, entrypoint="x", entrypoint_args=_VALID_ARGS)[0]
256
+ with pytest.raises(RuntimeError, match="AccessDenied"):
257
+ ex.cancel(h)
docs/BACKLOG_RESOLUTION_2026-06-09.md CHANGED
@@ -75,7 +75,7 @@ Goal-driven systematic resolution of every pending item. This doc is the live au
75
  | R11 | Flaky test `spikes/006-real-hf-model-smoke/tests/test_strict.py::test_alternating_batches_loss_decreases` — fails under CPU contention (full suite w/ concurrent pytest + Docker), PASSES in isolation (verified 3x). Loss-trend assertion is timing/noise-sensitive. Pin seed / widen tolerance / mark flaky. Pre-existing, not a Wave-2 regression. | LOW | OPEN |
76
  | R12 | B7-complete ✅ (top-level `__all__` now includes the 3 factories) + B4-complete ✅ (the 4 surviving "115" claims → 266/62). | — | DONE |
77
 
78
- **Wave 3 — DONE (Phase-7 reconciliation):** R1 ✅ (HeldOutGuard wired into ComposerReplicationTrainer — optional, OFF by default, soft/hard stop; + integration test), R2 ✅ (HeldoutSplit disjointness enforcer `safety/holdout.py` + 10 tests), R3 ✅ (EKS entrypoint contract bug fixed — `replica_entrypoint.__main__` now resolves from env OR argv; proven end-to-end with a pure-env invocation), R4 ✅ (calibrate_kl_threshold rejects factor<=0/negative-baseline + positive floor), R7 ✅ (API_REFERENCE §15-17: EKS/SageMaker/DockerSandbox/safety), R8 ✅ (ADR-015 authored + indexed), R10 ✅ (path-(c) divergence-rate test). R12 ✅ (B4/B7 complete). DEFERRED-LOW: R5 (cancel exception-narrowing) + R6 (EKS collect result-key) stale-base worktree casualties, tracked, LOW severity. R11 (spike-006 flaky-under-contention) pre-existing, tracked.
79
 
80
  Sandbox refactor verdict: **clean** (no regression to LocalSubprocessSandbox/FeatureDeletionEnv).
81
 
 
75
  | R11 | Flaky test `spikes/006-real-hf-model-smoke/tests/test_strict.py::test_alternating_batches_loss_decreases` — fails under CPU contention (full suite w/ concurrent pytest + Docker), PASSES in isolation (verified 3x). Loss-trend assertion is timing/noise-sensitive. Pin seed / widen tolerance / mark flaky. Pre-existing, not a Wave-2 regression. | LOW | OPEN |
76
  | R12 | B7-complete ✅ (top-level `__all__` now includes the 3 factories) + B4-complete ✅ (the 4 surviving "115" claims → 266/62). | — | DONE |
77
 
78
+ **Wave 3 — DONE (Phase-7 reconciliation):** R1 ✅ (HeldOutGuard wired into ComposerReplicationTrainer — optional, OFF by default, soft/hard stop; + integration test), R2 ✅ (HeldoutSplit disjointness enforcer `safety/holdout.py` + 10 tests), R3 ✅ (EKS entrypoint contract bug fixed — `replica_entrypoint.__main__` now resolves from env OR argv; proven end-to-end with a pure-env invocation), R4 ✅ (calibrate_kl_threshold rejects factor<=0/negative-baseline + positive floor), R7 ✅ (API_REFERENCE §15-17: EKS/SageMaker/DockerSandbox/safety), R8 ✅ (ADR-015 authored + indexed), R10 ✅ (path-(c) divergence-rate test). R12 ✅ (B4/B7 complete). R5 (EKS+SageMaker cancel now re-raise unexpected errors, swallow only 404/409/already-terminal + propagation test), R6 (EKS collect() result dicts include `result`=rendezvous URI), R11 (spike-006 test seeded torch.manual_seed(0) → no longer contention-flaky). ALL Wave-3 items (R1-R11) CLOSED.
79
 
80
  Sandbox refactor verdict: **clean** (no regression to LocalSubprocessSandbox/FeatureDeletionEnv).
81
 
spikes/006-real-hf-model-smoke/tests/test_strict.py CHANGED
@@ -59,6 +59,12 @@ def test_alternating_batches_loss_decreases(model, tokenizer):
59
  averaged loss over the first 2 steps. (Looser than the strict-monotonic
60
  single-batch test, because alternation makes per-step noise larger.)
61
  """
 
 
 
 
 
 
62
  batch_factorial = build_batch(tokenizer, device="cpu", variant="factorial")
63
  batch_bsearch = build_batch(tokenizer, device="cpu", variant="binary_search")
64
  optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
 
59
  averaged loss over the first 2 steps. (Looser than the strict-monotonic
60
  single-batch test, because alternation makes per-step noise larger.)
61
  """
62
+ # Determinism (R11): pin the seed so the loss trajectory is reproducible
63
+ # regardless of host CPU contention. Without this the test was flaky under
64
+ # the full suite (competing pytest workers + Docker containers perturbed
65
+ # torch op scheduling enough to occasionally miss the <50% threshold),
66
+ # while passing in isolation. The seed makes the trend assertion stable.
67
+ torch.manual_seed(0)
68
  batch_factorial = build_batch(tokenizer, device="cpu", variant="factorial")
69
  batch_bsearch = build_batch(tokenizer, device="cpu", variant="binary_search")
70
  optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)