MCP ExplorerExplorer

Phantom Mcp

@dbernheiselon 12 days ago
1 MIT
FreeCommunity
AI Systems
MCP server implemented in Elixir

Overview

What is Phantom Mcp

phantom_mcp is an MCP server implemented in Elixir, designed to provide a complete implementation of the Model Context Protocol (MCP) specification using the Elixir Plug framework.

Use cases

Use cases for phantom_mcp include creating a code review tool that provides feedback on architecture, managing studies with structured data, and facilitating interactions between users and the server through defined prompts and resources.

How to use

To use phantom_mcp, you need to forward a path from your Plug or Phoenix Router to the MCP router. Define your MCP Router with available tooling, prompts, resources, and optional connect and close callbacks.

Key features

Key features of phantom_mcp include full compliance with the MCP server specification, support for JSON parsing, customizable prompts and resources, and integration with Elixir’s Plug framework.

Where to use

phantom_mcp can be used in web applications that require a structured protocol for managing model contexts, particularly in scenarios involving data processing, code review, and research studies.

Content

Phantom MCP

Hex.pm
Documentation

MCP (Model Context Protocol) framework for Elixir Plug.

This library provides a complete implementation of the MCP server specification with Plug.

Installation

Add Phantom to your dependencies:

  {:phantom_mcp, "~> 0.2.0"},

When using with Plug/Phoenix, configure MIME to accept SSE:

# config/config.exs
config :mime, :types, %{
  "text/event-stream" => ["sse"]
}

For Streamable HTTP access to your MCP server, forward
a path from your Plug or Phoenix Router to your MCP router.

Phoenix

defmodule MyAppWeb.Router do
  use MyAppWeb, :router
  # ...

  pipeline :mcp do
    plug :accepts, ["json", "sse"]

    plug Plug.Parsers,
      parsers: [{:json, length: 1_000_000}],
      pass: ["application/json"],
      json_decoder: JSON
  end

  scope "/mcp" do
    pipe_through :mcp

    forward "/", Phantom.Plug,
      router: MyApp.MCPRouter
  end
end

Plug.Router

defmodule MyAppWeb.Router do
  use Plug.Router

  plug :match

  plug Plug.Parsers,
    parsers: [{:json, length: 1_000_000}],
    pass: ["application/json"],
    json_decoder: JSON

  plug :dispatch

  forward "/mcp",
    to: Phantom.Plug,
    init_opts: [
      router: MyApp.MCP.Router
    ]
end

Now the fun begins: it’s time to define your MCP router that catalogs all your tools, prompts, and resources. When you’re creating your MCP server, make sure
you test it with the MCP Inspector.

We’ll go through each one and show how to respond synchronously or asynchronously.

Defining Prompts

defmodule MyApp.MCP.Router do
  use Phantom.Router,
    name: "MyApp",
    vsn: "1.0"

  # Prompts may contain arguments. If there are arguments
  # you may want to also provide a completion function to
  # help the client fill in the argument.

  @description """
  Review the provided Study and provide meaningful feedback about the study and let me know if there are gaps or missing questions. We want
  a meaningful study that can provide insight to the research goals stated
  in the study.
  """
  prompt :suggest_questions,
    completion_function: :study_complete,
    arguments: [
      %{
        name: "study_id",
        description: "The study to review",
        required: true
      }
    ]

Then implement it

Synchronously

require Phantom.Prompt, as: Prompt

def suggest_questions(%{"study_id" => study_id}, session) do
  case MyApp.MCP.Router.read_resource(session, :study, id: study_id) do
    {:ok, uri, resource} ->
      {:reply,
        Prompt.response(
          assistant: Prompt.embedded_resource(uri, resource),
          user: Prompt.text("Wowzers"),
          assistant: Prompt.image(File.read!("foo.png")),
          user: Prompt.text("Seriously, wowzers")
        ), session}

    error ->
      {:error, Phantom.Request.internal_error(), session}
  end
end

Asynchronously

require Phantom.Prompt, as: Prompt

def suggest_questions(%{"study_id" => study_id}, session) do
  Task.async(fn ->
    case MyApp.MCP.Router.read_resource(session, :study, id: study_id) do
      {:ok, uri, resource} ->
        Session.respond(session, Prompt.response(
          assistant: Prompt.embedded_resource(uri, resource),
          user: Prompt.text("Wowzers"),
          assistant: Prompt.image(File.read!("foo.png")),
          user: Prompt.text("Seriously, wowzers")
        ))

      error ->
        Session.respond(session, Phantom.Request.internal_error())
    end
  end)
  {:noreply, sessin}
end

Defining Resources

Let’s define a resource with a resource template:

@description """
Read the cover image of a Study to gain some context of the
audience, research goals, and questions.
"""
resource "myapp:///studies/:study_id/cover", :study_cover,
  completion_function: :study_complete,
  mime_type: "image/png"

@description """
Read the contents of a study. This includes the questions and general
context, which is helpful for understanding research goals.
"""
resource "https://example.com/studies/:study_id/md", :study,
  completion_function: :study_complete,
  mime_type: "text/markdown"

Then implement them:

Synchronously

require Phantom.Resource, as: Resource

def study(%{"study_id" => id} = params, session) do
  study = Repo.get(Study, id)
  text = Study.to_markdown(study)
  {:reply, Resource.text(text), session}
end

def study_cover(%{"study_id" => id} = params, session) do
  study = Repo.get(Study, id)
  blob = File.read!(study.cover)
  {:reply, Resource.blob(blob), session}
end

## Implement the completion handler:
import Ecto.Query

def study_complete("study_id", value, session) do
  study_ids = Repo.all(
    from s in Study,
      select: s.id,
      where: like(type(:id, :string), "#{value}%"),
      where: s.account_id == ^session.user.account_id,
      order_by: s.id,
      limit: 101
    )

  # You may also return a map with more info:
  # `%{values: study_ids, has_more: true, total: 1_000_000}`
  # If you return more than 100, then Phantom will set `has_more: true`
  # and only return the first 100.
  {:reply, study_ids, session}
end

Asynchronously

require Phantom.Resource, as: Resource

def study(%{"study_id" => id} = params, session) do
  Task.async(fn ->
    Process.sleep(1000)
    study = Repo.get(Study, id)
    text = Study.to_markdown(study)
    Session.respond(session, Resource.response(Resource.text(text)))
  end)

  {:noreply, session}
end

def study_cover(%{"study_id" => id} = params, session) do
  Task.async(fn ->
    Process.sleep(1000)
    study = Repo.get(Study, id)
    blob = File.read!(study.cover)
    Session.respond(session, Resource.response(Resource.blob(blob)))
  end)

  {:noreply, session}
end

## Implement the completion handler:
import Ecto.Query

def study_complete("study_id", value, session) do
  study_ids = Repo.all(
    from s in Study,
      select: s.id,
      where: like(type(:id, :string), "#{value}%"),
      where: s.account_id == ^session.user.account_id,
      order_by: s.id,
      limit: 101
    )

  # You may also return a map with more info:
  # `%{values: study_ids, has_more: true, total: 1_000_000}`
  # If you return more than 100, then Phantom will set `has_more: true`
  # and only return the first 100.
  {:reply, study_ids, session}
end

You’ll also want to implement list_resources/2 in your router which is
to provide a list of all available resources in your system and return
resource links to them.

@salt "cursor"
def list_resources(cursor, session) do
  # Remember to check for allowed resources according to `session.allowed_resource_templates`
  # Below is a toy implementation for illustrative purposes.
  cursor =
    if cursor do
      {:ok, cursor} = Phoenix.Token.verify(Test.Endpoint, @salt, cursor)
      cursor
    else
      0
    end

  {_before_cursor, after_cursor} = Enum.split_while(1..1000, fn i -> i < cursor end)
  {page, [next | _drop]} = Enum.split(after_cursor, 100)
  next_cursor = Phoenix.Token.sign(Test.Endpoint, @salt, next)

  resource_links =
    Enum.map(page, fn i ->
      {:ok, uri, spec} = resource_for(session, :study, id: i)
      Resource.resource_link(uri, spec, name: "Study #{i}")
    end)

  {:reply,
    Resource.list(resource_links, next_cursor),
    session}
end

Defining Tools

You can define tools that have an optional input_schema and an optional output_schema. If no schema is provided, then the client will not know to send arguments to your handlers.

# Defining available tools
# Be mindful, the input_schema is not validated upon requests.
@description """
Create a question for the provided Study.
"""
tool :create_question, MyApp.MCP,
  input_schema: %{
    required: ~w[description label study_id],
    properties: %{
      study_id: %{
        type: "integer",
        description: "The unique identifier for the Study"
      },
      label: %{
        type: "string",
        description: "The title of the Question. The first thing the participant will see when presented with the question"
      },
      description: %{
        type: "string",
        description: "The contents of the question. About one paragraph of detail that defines one question or task for the participant to perform or answer"
      }
    }
  }
}]

Then implement it:

Synchronously

require Phantom.Tool, as: Tool

def create_question(%{"study_id" => study_id} = params, session) do
  changeset = MyApp.Question.changeset(%Question{}, params)
  with {:ok, question} <- MyApp.Repo.insert(changeset),
       {:ok, uri, resource_template} <-
            Test.MCP.Router.resource_for(session, :question, id: question.id) do
    {:reply, Tool.resource_link(uri, resource_template), session}
  else
    _ -> {:reply, Tool.error("Invalid paramaters"), session}
  end
end

Asynchronously

require Phantom.Tool, as: Tool

def create_question(%{"study_id" => study_id} = params, session) do
  Task.async(fn ->
    Process.sleep(1000)
    changeset = MyApp.Question.changeset(%Question{}, params)
    with {:ok, question} <- MyApp.Repo.insert(changeset),
        {:ok, uri, resource_template} <-
              Test.MCP.Router.resource_for(session, :question, id: question.id) do

      Session.respond(session, Tool.resource_link(uri, resource_template))
    else
      _ ->  Session.respond(session, Tool.error("Invalid paramaters")))
    end
  end)
  {:noreply, session}
end

What PhantomMCP supports

Phantom will implement these MCP requests on your behalf:

  • initialize. Phantom will detect what capabilities are available to the client based on the provided tooling defined in the Phantom router.
  • prompts/list list either the allowed prompts provided in the connect/2 callback, or all prompts by default. To disable, supply [] in the connect/2 callback.
  • prompts/get dispatch the request to your handler if allowed. Read more in Phantom.Prompt.
  • resources/list dispatch to your MCP router. By default it will be an empty list until you implement it. Read more in Phantom.Resource.
  • resource/templates/list list either the allowed resources as provided in the connect/2 callback or all resource templates by default. To disable, supply [] in the connect/2 callback. Read more in Phantom.ResourceTemplate.
  • resources/read dispatch the request to your handler. Phantom.Resource.
  • resources/subscribe available if the MCP router is configured with pubsub. The session will subscribe to the Phantom.Session.resource_subscription_topic/0 and listen for the shapes {:resource_updated, uri} and {:resource_updated, name, path_params}. Upon receiving the update, a notification will be sent to the client.
  • logging/setLevel available if the MCP router is configured with pubsub. Logs can be sent to client with Session.log_{level}(session, map_content). See docs.
  • tools/list list either the allowed tools as provided in the connect/2 callback or all tools by default. To disable, supply [] in the connect/2 callback.
  • tools/call dispatch the request to your handler. Read more in Phantom.Tool.
  • completion/complete dispatch the request to your completion handler for the given prompt or resource.
  • notification/* no-op.
  • ping pong

Phantom does not yet support these methods:

  • notifications/prompts/list_changed - The server informs the client the list of prompts has updated.
  • notifications/resources/list_changed - The server informs the client the list of resources has updated.
  • notifications/tools/list_changed - The server informs the client the list of tools has updated.
  • roots/list - The server requests the client to provide a list of files available for interaction. This is like resources/list but for the client.
  • sampling/createMessage - The server requests the client to query their LLM and provide its response. This is for human-in-the-loop agentic actions and could be leveraged when the client requests a prompt from the server.
  • elicitation/create - The server requests input from
    the client in order to complete a request the client has made of it.

Batched Requests

Batched requests will also be handled transparently. please note there is not an abstraction for efficiently providing these as a group to your handler. Since the MCP specification is deprecating batched request support in the next version, there is no plan to make this more efficient.

Authentication and Authorization

Phantom does not implement authentication on its own. MCP applications needing authentication should investigate OAuth provider solutions like Oidcc or Boruta or ExOauth2Provider and configure the route to serve a discovery endpoint.

  1. MCP authentication and discovery is not handled by Phantom itself. You will need to implement OAuth2 and provide the discovery mechanisms as described in the specification. In the connect/2 callback you can return {:unauthorized, www_authenticate_info} or {:forbidden, "error message"} to inform the client of how to move forward. An {:ok, session} result will imply successful auth.

  2. Once the authentication flow has been completed, a request to the MCP router should land with an authorization header that can be received and verified in the connect/2 callback of your MCP router.

  3. You may also decide to limit the available tools, prompts, or resources depending on your authorization rules. An example is below.

defmodule MyApp.MCP.Router do
  use Phantom.Router,
    name: "MyApp",
    vsn: "1.0"

  require Logger

  def connect(session, auth_info) do
    # The `auth_info` will depend on the adapter, in this case it's from
    # Plug, so it will be the request headers.
    with {:ok, user} <- MyApp.authenticate(conn, auth_info),
         {:ok, my_session_state} <- MyApp.load_session(session.id) do
      {:ok,
        session
        |> assign(some_state: my_session_state, user: user)
        |> limit_for_plan(user.plan)}
    else
      :not_found ->
        # See `Phantom.Plug.www_authenticate/1`
        {:unauthorized, %{
          method: "Bearer",
          resource_metadata:  "https://myapp.com/.well-known/oauth-protected-resource"
        }}
      :not_allowed ->
        {:forbidden, "Please upgrade plan to use MCP server"}
    end
  end

  defp limit_for_plan(session, :ultra), do: session
  defp limit_for_plan(session, :basic) do
    # allow-list tools by stringified name. The name is either supplied as the `name` when defining it, or the stringified function name.
    session
    |> Phantom.Session.allowed_tools(~w[create_question])
    |> Phantom.Session.allowed_resource_templates(~w[study])
  end

Optional callbacks

There are several optional callbacks to help you hook into the lifecycle of the connections.

  • disconnect/1 means the request has closed, not that the session is finished.
  • terminate/1 means the session has finished and the client doesn’t intend to resume it.

For Telemetry, please see m:Phantom.Plug#module-telemetry and m:Phantom.Router#module-telemetry for emitted telemetry hooks.

Persistent Streams

MCP supports SSE streams to get notifications allow resource subscriptions.
To support this, Phantom needs to track connection pids in your cluster and uses
Phoenix.Tracker (phoenix_pubsub) to do this.

MCP defines a “Streamable HTTP” protocol, which is typical HTTP and SSE connections but with a certain behavior. MCP will typically have multiple connections to facilitate requests:

  1. POST for every command, such as tools/call or resources/read. Phantom will open an SSE stream and then immediately close the connection once work has completed.
  2. GET to start an SSE stream for any events such as logs and notifications. There is no work to complete with these requests, and therefore is just a “channel” for receiving server requests and notifications. The connection will remain open until either the client or server closes it.

All connections may provide an mcp-session-id header to resume a session.

Not yet supported is the ability to resume broken connections with missed messages with the last-event-id header, however this is planned to be supported with a ttl-expiring distributed circular buffer.

To make Phantom distributed, start the Phantom.Tracker and pass in your pubsub module to the Phantom.Plug options:

# Add to your application supervision tree:

{Phoenix.PubSub, name: MyApp.PubSub},
{Phantom.Tracker, [name: Phantom.Tracker, pubsub_server: MyApp.PubSub]},

Adjust the Phoenix router or Plug.Router options to include the PubSub server

Phoenix

forward "/", Phantom.Plug,
  router: MyApp.MCP.Router,
  pubsub: MyApp.PubSub

Plug.Router

forward "/mcp", to: Phantom.Plug, init_opts: [
  router: MyApp.MCP.Router,
  pubsub: MyApp.PubSub
]

Tools

No tools

Comments