MCP ExplorerExplorer

Apache Airflow

@yangkyeongmoon 14 days ago
46 MIT
FreeCommunity
Dev Tools
#Apache Airflow#DAG#Workflow#Data Pipeline
A MCP Server that connects to [Apache Airflow](https://airflow.apache.org/) using official python client.

Overview

What is Apache Airflow

This project is a Model Context Protocol (MCP) server implementation for Apache Airflow that facilitates integration with MCP clients, enabling standardized interactions with Airflow’s REST API.

Use cases

This server can be used for managing Apache Airflow tasks, including creating and monitoring DAGs (Directed Acyclic Graphs), managing task instances and XComs, handling variables and connections, and performing health checks.

How to use

To use this MCP server, set required environment variables such as AIRFLOW_HOST, AIRFLOW_USERNAME, and AIRFLOW_PASSWORD. Then run the server using commands in a configuration setup for Claude Desktop or manually via the command line.

Key features

The server offers complete API coverage for DAG management, task management, variable and connection handling, as well as event logging and monitoring features. Each functionality is mapped to its respective API endpoint in Apache Airflow.

Where to use

This MCP server is suitable for environments utilizing Apache Airflow for workflow management, particularly when integration with various MCP clients is required, enhancing the flexibility and usability of Apache Airflow’s capabilities.

Content

MseeP.ai Security Assessment Badge

mcp-server-apache-airflow

smithery badge

A Model Context Protocol (MCP) server implementation for Apache Airflow, enabling seamless integration with MCP clients. This project provides a standardized way to interact with Apache Airflow through the Model Context Protocol.

Server for Apache Airflow MCP server

About

This project implements a Model Context Protocol server that wraps Apache Airflow’s REST API, allowing MCP clients to interact with Airflow in a standardized way. It uses the official Apache Airflow client library to ensure compatibility and maintainability.

Feature Implementation Status

Feature API Path Status
DAG Management
List DAGs /api/v1/dags
Get DAG Details /api/v1/dags/{dag_id}
Pause DAG /api/v1/dags/{dag_id}
Unpause DAG /api/v1/dags/{dag_id}
Update DAG /api/v1/dags/{dag_id}
Delete DAG /api/v1/dags/{dag_id}
Get DAG Source /api/v1/dagSources/{file_token}
Patch Multiple DAGs /api/v1/dags
Reparse DAG File /api/v1/dagSources/{file_token}/reparse
DAG Runs
List DAG Runs /api/v1/dags/{dag_id}/dagRuns
Create DAG Run /api/v1/dags/{dag_id}/dagRuns
Get DAG Run Details /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}
Update DAG Run /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}
Delete DAG Run /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}
Get DAG Runs Batch /api/v1/dags/~/dagRuns/list
Clear DAG Run /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/clear
Set DAG Run Note /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/setNote
Get Upstream Dataset Events /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents
Tasks
List DAG Tasks /api/v1/dags/{dag_id}/tasks
Get Task Details /api/v1/dags/{dag_id}/tasks/{task_id}
Get Task Instance /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}
List Task Instances /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances
Update Task Instance /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}
Clear Task Instances /api/v1/dags/{dag_id}/clearTaskInstances
Set Task Instances State /api/v1/dags/{dag_id}/updateTaskInstancesState
Variables
List Variables /api/v1/variables
Create Variable /api/v1/variables
Get Variable /api/v1/variables/{variable_key}
Update Variable /api/v1/variables/{variable_key}
Delete Variable /api/v1/variables/{variable_key}
Connections
List Connections /api/v1/connections
Create Connection /api/v1/connections
Get Connection /api/v1/connections/{connection_id}
Update Connection /api/v1/connections/{connection_id}
Delete Connection /api/v1/connections/{connection_id}
Test Connection /api/v1/connections/test
Pools
List Pools /api/v1/pools
Create Pool /api/v1/pools
Get Pool /api/v1/pools/{pool_name}
Update Pool /api/v1/pools/{pool_name}
Delete Pool /api/v1/pools/{pool_name}
XComs
List XComs /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries
Get XCom Entry /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}
Datasets
List Datasets /api/v1/datasets
Get Dataset /api/v1/datasets/{uri}
Get Dataset Events /api/v1/datasetEvents
Create Dataset Event /api/v1/datasetEvents
Get DAG Dataset Queued Event /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents/{uri}
Get DAG Dataset Queued Events /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents
Delete DAG Dataset Queued Event /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents/{uri}
Delete DAG Dataset Queued Events /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents
Get Dataset Queued Events /api/v1/datasets/{uri}/dagRuns/queued/datasetEvents
Delete Dataset Queued Events /api/v1/datasets/{uri}/dagRuns/queued/datasetEvents
Monitoring
Get Health /api/v1/health
DAG Stats
Get DAG Stats /api/v1/dags/statistics
Config
Get Config /api/v1/config
Plugins
Get Plugins /api/v1/plugins
Providers
List Providers /api/v1/providers
Event Logs
List Event Logs /api/v1/eventLogs
Get Event Log /api/v1/eventLogs/{event_log_id}
System
Get Import Errors /api/v1/importErrors
Get Import Error Details /api/v1/importErrors/{import_error_id}
Get Health Status /api/v1/health
Get Version /api/v1/version

Setup

Dependencies

This project depends on the official Apache Airflow client library (apache-airflow-client). It will be automatically installed when you install this package.

Environment Variables

Set the following environment variables:

AIRFLOW_HOST=<your-airflow-host>
AIRFLOW_USERNAME=<your-airflow-username>
AIRFLOW_PASSWORD=<your-airflow-password>
AIRFLOW_API_VERSION=v1  # Optional, defaults to v1

Usage with Claude Desktop

Add to your claude_desktop_config.json:

{
  "mcpServers": {
    "mcp-server-apache-airflow": {
      "command": "uvx",
      "args": [
        "mcp-server-apache-airflow"
      ],
      "env": {
        "AIRFLOW_HOST": "https://your-airflow-host",
        "AIRFLOW_USERNAME": "your-username",
        "AIRFLOW_PASSWORD": "your-password"
      }
    }
  }
}

Alternative configuration using uv:

{
  "mcpServers": {
    "mcp-server-apache-airflow": {
      "command": "uv",
      "args": [
        "--directory",
        "/path/to/mcp-server-apache-airflow",
        "run",
        "mcp-server-apache-airflow"
      ],
      "env": {
        "AIRFLOW_HOST": "https://your-airflow-host",
        "AIRFLOW_USERNAME": "your-username",
        "AIRFLOW_PASSWORD": "your-password"
      }
    }
  }
}

Replace /path/to/mcp-server-apache-airflow with the actual path where you’ve cloned the repository.

Selecting the API groups

You can select the API groups you want to use by setting the --apis flag.

uv run mcp-server-apache-airflow --apis "dag,dagrun"

The default is to use all APIs.

Allowed values are:

  • config
  • connections
  • dag
  • dagrun
  • dagstats
  • dataset
  • eventlog
  • importerror
  • monitoring
  • plugin
  • pool
  • provider
  • taskinstance
  • variable
  • xcom

Manual Execution

You can also run the server manually:

make run

make run accepts following options:

Options:

  • --port: Port to listen on for SSE (default: 8000)
  • --transport: Transport type (stdio/sse, default: stdio)

Or, you could run the sse server directly, which accepts same parameters:

make run-sse

Installing via Smithery

To install Apache Airflow MCP Server for Claude Desktop automatically via Smithery:

npx -y @smithery/cli install @yangkyeongmo/mcp-server-apache-airflow --client claude

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

The package is deployed automatically to PyPI when project.version is updated in pyproject.toml.
Follow semver for versioning.

Please include version update in the PR in order to apply the changes to core logic.

License

MIT License

Tools

get_config
Get current configuration
get_value
Get a specific option from configuration
list_connections
List all connections
create_connection
Create a connection
get_connection
Get a connection by ID
update_connection
Update a connection by ID
delete_connection
Delete a connection by ID
test_connection
Test a connection
fetch_dags
Fetch all DAGs
get_dag
Get a DAG by ID
get_dag_details
Get a simplified representation of DAG
get_dag_source
Get a source code
pause_dag
Pause a DAG by ID
unpause_dag
Unpause a DAG by ID
get_dag_tasks
Get tasks for DAG
get_task
Get a task by ID
get_tasks
Get tasks for DAG
patch_dag
Update a DAG
patch_dags
Update multiple DAGs
delete_dag
Delete a DAG
clear_task_instances
Clear a set of task instances
set_task_instances_state
Set a state of task instances
reparse_dag_file
Request re-parsing of a DAG file
post_dag_run
Trigger a DAG by ID
get_dag_runs
Get DAG runs by ID
get_dag_runs_batch
List DAG runs (batch)
get_dag_run
Get a DAG run by DAG ID and DAG run ID
update_dag_run_state
Update a DAG run state by DAG ID and DAG run ID
delete_dag_run
Delete a DAG run by DAG ID and DAG run ID
clear_dag_run
Clear a DAG run
set_dag_run_note
Update the DagRun note
get_upstream_dataset_events
Get dataset events for a DAG run
get_dag_stats
Get DAG stats
get_datasets
List datasets
get_dataset
Get a dataset by URI
get_dataset_events
Get dataset events
create_dataset_event
Create dataset event
get_dag_dataset_queued_event
Get a queued Dataset event for a DAG
get_dag_dataset_queued_events
Get queued Dataset events for a DAG
delete_dag_dataset_queued_event
Delete a queued Dataset event for a DAG
delete_dag_dataset_queued_events
Delete queued Dataset events for a DAG
get_dataset_queued_events
Get queued Dataset events for a Dataset
delete_dataset_queued_events
Delete queued Dataset events for a Dataset
get_event_logs
List log entries from event log
get_event_log
Get a specific log entry by ID
get_import_errors
List import errors
get_import_error
Get a specific import error by ID
get_health
Get instance status
get_version
Get version information
get_plugins
Get a list of loaded plugins
get_pools
List pools
get_pool
Get a pool by name
delete_pool
Delete a pool
post_pool
Create a pool
patch_pool
Update a pool
get_task_instance
Get a task instance by DAG ID, task ID, and DAG run ID
list_task_instances
List task instances by DAG ID and DAG run ID
update_task_instance
Update a task instance by DAG ID, DAG run ID, and task ID
list_variables
List all variables
create_variable
Create a variable
get_variable
Get a variable by key
update_variable
Update a variable by key
delete_variable
Delete a variable by key
get_xcom_entries
Get all XCom entries
get_xcom_entry
Get an XCom entry

Comments