Skip to content

feat: concurrent export, pagination bugfix, streaming writes, legacy import fix#1

Open
MarioAlessandroNapoli wants to merge 4 commits intofarouk09:mainfrom
MarioAlessandroNapoli:feat/concurrent-export-pagination-fix-legacy-import
Open

feat: concurrent export, pagination bugfix, streaming writes, legacy import fix#1
MarioAlessandroNapoli wants to merge 4 commits intofarouk09:mainfrom
MarioAlessandroNapoli:feat/concurrent-export-pagination-fix-legacy-import

Conversation

@MarioAlessandroNapoli
Copy link

Summary

This PR addresses several critical issues discovered during a production migration of 165 threads (12,904 checkpoints, 2.1 GB) between LangGraph Cloud deployments.

Critical bugfix: history pagination cursor format

The before cursor for get_history() pagination must use the format {"configurable": {"checkpoint_id": "..."}}, not the flat {"checkpoint_id": "..."}. Without this fix, every page after the first returns a server-side KeyError: 'configurable' (500 error), silently truncating exports to ~100 checkpoints per thread. This is the root cause of slow/incomplete exports on any thread with significant history.

Performance improvements

  • Concurrent thread fetching via asyncio.Semaphore (configurable --concurrency, default 5) — reduced export time from >1 hour to ~15 minutes for 165 threads
  • Streaming JSON export using producer-consumer asyncio.Queue — constant memory usage instead of buffering all data (critical for multi-GB exports)
  • Per-page retry with exponential backoff + jitter — retries individual history pages instead of the entire thread, preventing cascading failures

New features

  • --legacy-terminal-node NODE — When supersteps import fails (e.g., due to incompatible serialized objects in old checkpoints) and the tool falls back to create_thread() + update_thread_state(), the imported threads may have next=['SomeNode'] instead of next=[], making them non-continuable. This flag passes as_node to update_thread_state() to correctly set the terminal state. This is a LangGraph behavior (not a bug) — update_state without as_node defaults to the graph's entry node for routing.
  • --concurrency N — Control parallel thread fetches
  • Rich progress bars — Real-time progress with per-thread detail, elapsed time, and completion counts

Other fixes

  • json.loads(..., strict=False) in JSON loader to handle control characters in agent messages
  • Removed unused imports (time, Live, Layout, TaskProgressColumn)
  • Updated README with complete command reference, import strategies docs, troubleshooting, and key bugfix explanations

Changes

File Description
client.py Per-page retry, correct pagination cursor format
migrator.py Concurrent fetching, streaming export, --legacy-terminal-node support
migrate_threads.py Rich progress bars, new CLI args, cleaned imports
json_exporter.py strict=False for JSON loading
README.md Complete rewrite with new features, troubleshooting, import strategies

Test plan

  • Exported 165 threads / 12,904 checkpoints / 2.1 GB in ~15 minutes
  • Imported to new deployment: 163 created, 2 skipped (existing), 0 failed
  • Verified thread state matches between source and target
  • Tested --legacy-terminal-node fixes next=[] on legacy imports
  • Tested --test-single, --dry-run, --metadata-filter flags
  • JSON backup loadable with strict=False (control characters in agent messages)

Replace the current create+update_state import (which loses checkpoint
history) with supersteps-based import that preserves time-travel.

Two conversion strategies:
- Direct: uses metadata.writes when available (local checkpointer)
- Delta: computes state diffs between consecutive checkpoints (Cloud API)

Delta approach skips middleware no-ops, only generating supersteps for
state-changing steps (e.g. 100 checkpoints → 30 supersteps).

Includes automatic fallback to legacy import if the target API doesn't
support supersteps (400/422 → switches for all remaining threads).

Validation now optionally compares checkpoint history counts for a
sample thread (enabled with --test-single in full migration).
… (AE-260)

Add --source-api-key and --target-api-key CLI flags with LANGSMITH_SOURCE_API_KEY
and LANGSMITH_TARGET_API_KEY env vars. Backward compatible: LANGSMITH_API_KEY
used as fallback for both.
…ing JSON (AE-261, AE-262)

- Add --metadata-filter for server-side JSONB containment filtering
- Add --history-limit for optional checkpoint cap per thread
- Retry API calls with exponential backoff + jitter (3 attempts)
- Paginate checkpoint history via `before` cursor (no more limit=100)
- Rewrite JSONExporter for streaming writes (no in-memory buffering)
- Update README with new flags, examples, and feature docs
…import fix

Critical bugfix:
- Fix history pagination cursor format: server expects
  {"configurable": {"checkpoint_id": ...}}, not flat {"checkpoint_id": ...}.
  Without this fix, every page after the first returns a 500 error,
  silently truncating exports to ~100 checkpoints per thread.

Performance:
- Concurrent thread fetching with asyncio.Semaphore (configurable --concurrency, default 5)
- Streaming JSON export via producer-consumer queue (constant memory usage)
- Per-page retry with exponential backoff + jitter (instead of retrying entire history)

New features:
- --legacy-terminal-node: specify graph terminal node for legacy imports,
  ensuring next=[] so threads are continuable after migration
- --concurrency N: control parallel thread fetches
- Rich progress bars with per-thread detail and elapsed time

Fixes:
- JSON loader uses strict=False to handle control characters in agent messages
- Removed unused imports and dead code
- Updated README with complete command reference and troubleshooting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant