- Explore MCP Servers
- streamable-mcp-client
Streamable Mcp Client
What is Streamable Mcp Client
streamable-mcp-client is an extension for OpenAI Agents that enables real-time streaming of notifications and messages from an MCP server while the tool is still running.
Use cases
Use cases include tracking file uploads in real-time, providing live status updates during long-running processes, and enabling interactive tools that respond to ongoing events.
How to use
To use streamable-mcp-client, clone the repository, install dependencies, and run the SSE server using the provided command. This allows you to receive live updates from the MCP server.
Key features
Key features include real-time notification streaming, immediate response updates to front-ends, and integration with OpenAI Agents for enhanced tool interaction.
Where to use
streamable-mcp-client can be used in areas such as software development, data processing, and any application requiring real-time feedback from an MCP server.
Clients Supporting MCP
The following are the main client software that supports the Model Context Protocol. Click the link to visit the official website for more information.
Overview
What is Streamable Mcp Client
streamable-mcp-client is an extension for OpenAI Agents that enables real-time streaming of notifications and messages from an MCP server while the tool is still running.
Use cases
Use cases include tracking file uploads in real-time, providing live status updates during long-running processes, and enabling interactive tools that respond to ongoing events.
How to use
To use streamable-mcp-client, clone the repository, install dependencies, and run the SSE server using the provided command. This allows you to receive live updates from the MCP server.
Key features
Key features include real-time notification streaming, immediate response updates to front-ends, and integration with OpenAI Agents for enhanced tool interaction.
Where to use
streamable-mcp-client can be used in areas such as software development, data processing, and any application requiring real-time feedback from an MCP server.
Clients Supporting MCP
The following are the main client software that supports the Model Context Protocol. Click the link to visit the official website for more information.
Content
streamable‑mcp‑client
Real‑time streaming of MCP tools
Surface MCP “notifications/message” events while the tool is still running.
streamable‑mcp‑client extends OpenAI Agents to handle live notifications from an MCP server.
With it you can build tools that:
- push incremental results (e.g. “chunk #17 of your 1 GB file uploaded”)
- let the assistant comment on notifications (not yet working)
Behind the scenes the library:
- Surfaces every
notifications/messagechunk immediately as a normalResponseTextDeltaEvent, so front‑ends (web, CLI, etc.) print progress in real time. - Appends the chunk to the agent’s
RunResultStreaming.new_itemsright away, ensuring the LLM can reference it (in theory). - Steps the agent forward exactly once via a tiny helper patch (
Runner.continue_run) so the next model delta reflects the fresh tool output.
Reference servers streamable‑mcp‑server
Sibling repo that ships two demo servers, but ⚠ note:
OpenAI Agents SDK v0.0.14 supports only the legacy SSE transport.
The newer Streamable HTTP MCP endpoint is included for future‑proofing and interop tests, but the Python client in this project will ignore it until the SDK adds native support.
| server file (repo root) | spec served | default port | endpoint | start command |
|---|---|---|---|---|
sseServer.ts |
SSE MCP (legacy – supported by openai‑agents) | 3000 | http://localhost:3000/sse |
npm run sse |
mcpServer.ts |
Streamable HTTP MCP (latest spec – not yet supported by openai‑agents) | 3000 | http://localhost:3000/mcp |
npm run mcp |
Quick start:
git clone https://github.com/josephbharrison/streamable‑mcp‑servercd streamable‑mcp‑server && npm cinpm run sse(launches the compatible SSE server)- In this repo, set the client mode to
MCPServerMode.TYPESCRIPT_SSE(default) and runpython src/main.py.
Call flow
Diagram
graph LR %% ────────────────────── 1. Python application ───────────────────── subgraph Python_Application main[main.py] streamAgent[StreamableAgent] mux["StreamableAgentStream<br/>(multiplexer)"] main --> streamAgent streamAgent --> mux end %% ───────────────────── 2. MCP client wrapper ────────────────────── subgraph MCP_Client_Wrapper mcpWrap[MCPServerSseWithNotifications] mux --> mcpWrap end %% ────────────────────── 3. Remote MCP server ────────────────────── subgraph MCP_Server sse["SSE endpoint<br/>notifications/*"] mcpWrap --> sse end
-
main.py creates a normal OpenAI Agents Agent but wires in our custom MCPServerSseWithNotifications. It then calls StreamableAgent.run_streamed() for the long‑running tool request “stream up to 10 numbers.”.
-
StreamableAgent.run_streamed() just forwards to the SDK’s Runner.run_streamed() and wraps the returned RunResultStreaming instance inside a StreamableAgentStream multiplexer.
-
StreamableAgentStream keeps two asyncio tasks running:
- The agent’s own event generator (RunResultStreaming.stream_events()),
- The notification stream produced by MCPServerSseWithNotifications.stream_notifications().
- It emits a single, unified async stream that merges items from both.
- Every time an SSE notification chunk arrives, the multiplexer
- Exposes it immediately as a ResponseTextDeltaEvent (so the UI can print 1 2 3… in realtime),
- Copies the text into RunResultStreaming.new_items right now,
- Uses our patched helper Runner.continue_run() to step the outer agent forward once.
OpenAI Agent Extensions
| file | what it adds |
|---|---|
| mcp_extensions/server_with_notifications.py | Sub‑class of the SDK’s MCPServerSse that1. opens a second in‑memory stream for logging notifications, and 2. exposes a single async generator stream_notifications() that yields both tool notifications (from the SSE endpoint) and logging notifications injected by the server. |
| mcp_extensions/streamable_agent_stream.py | The realtime relay. • Multiplexes the agent‑event task and the notification‑task. • Converts each text chunk into the minimal set of UI events (ItemAdded → ContentPartAdded → TextDelta → ContentPartDone). • Appends a completed MessageOutputItem to run.new_items so the LLM can reference it.• Calls Runner.continue_run() (our SDK patch) to pull exactly one semantic event from the still‑running agent, then yields it downstream. |
| mcp_extensions/streamable_agent.py | Tiny convenience wrapper: given an Agent and an MCP server it returns a StreamableAgentStream each time you need a streamed call. |
| main.py | Diagnostic demo. • Shows how to spin up the SSE server. • Prints both raw model deltas and relay‑injected deltas in the console (see the two if branches in the loop). |
Sequence Diagram
sequenceDiagram %% concrete runtime objects participant Main as main.py run() participant SA as StreamableAgent participant SAS as StreamableAgentStream participant Runner as openai‑agents Runner participant MCP as MCPServerSseWithNotifications participant SSE as Remote SSE server %% 1 construction and first call Main ->> SA : run_streamed("stream up to 10 numbers") SA ->> Runner : run_streamed(agent,input) Runner-->> SA : RunResultStreaming base_stream SA -->> Main : StreamableAgentStream instance (SAS) Note over Main,SAS: Main now iterates SAS.stream_events() %% 2 background tasks inside SAS Note over SAS,MCP: Task A base_stream.stream_events() Task B MCP.stream_notifications() %% 2b open SSE channel for notifications SAS ->> MCP : subscribe_notifications MCP ->> SSE : HTTP GET /sse SSE -->> MCP : 200 OK (events begin) %% 2c legacy tool call via POST Runner->> MCP : tools/call "stream_numbers" MCP ->> SSE : HTTP POST /sse?sessionId=<id> JSON‑RPC body SSE -->> MCP : 200 OK (operation accepted) %% 3A normal model deltas SAS ->> Runner : Task A next base_stream event Runner-->> SAS : RawResponsesStreamEvent (model delta or tool‑call) SAS -->> Main : same RawResponsesStreamEvent %% 3B notifications from SSE stream SSE --) MCP : event notifications/number 1 MCP --) SAS : JSON‑RPC notification number 1 SAS --) Main : ResponseTextDelta 1 %% 4 commit chunk then advance agent one step SAS ->> Runner : continue_run(base_stream) Runner-->> SAS : next model delta SAS --) Main : ResponseTextDelta from LLM %% … numbers 2 through 10 follow the same 3B and 4 cycle … %% 5 server closes the stream SSE --) MCP : event notifications/stream_end MCP --) SAS : stream_end sentinel Runner-->> SAS : final assistant message SAS -->> Main : final assistant message
Key methods in StreamableAgentStream
| method | purpose |
|---|---|
stream_events() |
Main coroutine. Runs two tasks (agent_task, notif_task), waits on whichever completes first, and yields events. Also honors the grace period (_GRACE_TICKS) so late notifications are still processed after the agent has finished. |
_handle_notification() |
For one notifications/* payload:1 · Converts its text into delta events for the UI. 2 · Creates a completed assistant MessageOutputItem and appends it to the in‑flight run’s new_items.3 · Calls Runner.continue_run() once and yields that single event (usually a model delta or the final answer). |
_extract_text_chunks() |
Tiny helper that supports both the assistant‑style {"content":[…{"type":"text"}…]} payload and the flat {"data":{"type":"text","text":"…"} shape. |
Why patch OpenAI Agents SDK?
Runner.continue_run
- The public SDK lets you start a streamed run and then iterate:
async for evt in run.stream_events(): ...
- But you can’t say “give me just the next event and then pause”.
- After each notifification chunk, the realtime relay must:
- Wake the agent,
- Wait for one event,
- go back to waiting for the next notification.
- continue_run() is therefore a minimal, ~20‑line helper that peeks one item from the internal queue, taking care to propagate errors and to notice when the background task has already finished.
When the SDK one day exposes an official step() / poll() API the patch can be dropped.
Extending / modifying
-
Richer notification payloads (e.g. images, JSON)
Extend _extract_text_chunks() and the UI‑event construction logic.
-
Longer grace
Change _GRACE_TICKS (each tick = 100 ms).
-
Skip immediate model reaction (pass‑through only)
Remove the call to Runner.continue_run() in _handle_notification().
-
Multiple concurrent tools
Instantiate one StreamableAgentStream per tool invocation; each manages its own multiplexing.
Patching the openai‑agents SDK
This repo relies on a one‑liner helper (Runner.continue_run) that is not yet upstreamed to openai‑agents.
We ship that change as a standard git‑apply patch.
| path | |
|---|---|
| patch file | patches/continue_run.patch |
| target file | <venv‑site‑pkgs>/agents/runner.py |
Apply the patch
# from the repository root
git apply patches/continue_run.patch
or, if you prefer patch:
patch -p1 < patches/continue_run.patch
Tip 📦 If you vendor the SDK in ./libs/openai‑agents/, run the same command inside that folder.
Verify
python - <<'PY'
from agents.runner import Runner
assert hasattr(Runner, "continue_run"), "patch did not apply!"
print("continue_run helper is present")
PY
Revert / re‑apply after upgrades
git apply -R patches/continue_run.patch # ← undo pip install --upgrade openai-agents # upgrade SDK git apply patches/continue_run.patch # ← redo
Dev Tools Supporting MCP
The following are the main code editors that support the Model Context Protocol. Click the link to visit the official website for more information.










