- Explore MCP Servers
- ai-spark-mcp-server
Ai Spark Mcp Server
What is Ai Spark Mcp Server
ai-spark-mcp-server is a Model Context Protocol (MCP) server and client designed to optimize Apache Spark code. It provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
Use cases
Use cases include optimizing existing PySpark code for better performance, generating performance analysis reports, and integrating AI-driven suggestions for code improvements.
How to use
To use ai-spark-mcp-server, input your PySpark code into ‘spark_code_input.py’ and run ‘run_client.py’ to initiate the client. The client communicates with the MCP server, which processes the code and returns optimized code and performance analysis.
Key features
Key features include a client-server architecture, asynchronous HTTP communication, a tool registry for optimization and analysis, and integration with Claude AI for advanced code analysis and optimization.
Where to use
ai-spark-mcp-server can be used in data engineering, big data analytics, and machine learning projects where Apache Spark is employed for processing large datasets.
Clients Supporting MCP
The following are the main client software that supports the Model Context Protocol. Click the link to visit the official website for more information.
Overview
What is Ai Spark Mcp Server
ai-spark-mcp-server is a Model Context Protocol (MCP) server and client designed to optimize Apache Spark code. It provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
Use cases
Use cases include optimizing existing PySpark code for better performance, generating performance analysis reports, and integrating AI-driven suggestions for code improvements.
How to use
To use ai-spark-mcp-server, input your PySpark code into ‘spark_code_input.py’ and run ‘run_client.py’ to initiate the client. The client communicates with the MCP server, which processes the code and returns optimized code and performance analysis.
Key features
Key features include a client-server architecture, asynchronous HTTP communication, a tool registry for optimization and analysis, and integration with Claude AI for advanced code analysis and optimization.
Where to use
ai-spark-mcp-server can be used in data engineering, big data analytics, and machine learning projects where Apache Spark is employed for processing large datasets.
Clients Supporting MCP
The following are the main client software that supports the Model Context Protocol. Click the link to visit the official website for more information.
Content
Spark MCP (Model Context Protocol) Optimizer
This project implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code. The system provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
How It Works
Code Optimization Workflow
graph TB subgraph Input A[Input PySpark Code] --> |spark_code_input.py| B[run_client.py] end subgraph MCP Client B --> |Async HTTP| C[SparkMCPClient] C --> |Protocol Handler| D[Tools Interface] end subgraph MCP Server E[run_server.py] --> F[SparkMCPServer] F --> |Tool Registry| G[optimize_spark_code] F --> |Tool Registry| H[analyze_performance] F --> |Protocol Handler| I[Claude AI Integration] end subgraph Resources I --> |Code Analysis| J[Claude AI Model] J --> |Optimization| K[Optimized Code Generation] K --> |Validation| L[PySpark Runtime] end subgraph Output M[optimized_spark_code.py] N[performance_analysis.md] end D --> |MCP Request| F G --> |Generate| M H --> |Generate| N classDef client fill:#e1f5fe,stroke:#01579b classDef server fill:#f3e5f5,stroke:#4a148c classDef resource fill:#e8f5e9,stroke:#1b5e20 classDef output fill:#fff3e0,stroke:#e65100 class A,B,C,D client class E,F,G,H,I server class J,K,L resource class M,N,O output
Component Details
-
Input Layer
spark_code_input.py: Source PySpark code for optimizationrun_client.py: Client startup and configuration
-
MCP Client Layer
- Tools Interface: Protocol-compliant tool invocation
-
MCP Server Layer
run_server.py: Server initialization- Tool Registry: Optimization and analysis tools
- Protocol Handler: MCP request/response management
-
Resource Layer
- Claude AI: Code analysis and optimization
- PySpark Runtime: Code execution and validation
-
Output Layer
optimized_spark_code.py: Optimized codeperformance_analysis.md: Detailed analysis
This workflow illustrates:
- Input PySpark code submission
- MCP protocol handling and routing
- Claude AI analysis and optimization
- Code transformation and validation
- Performance analysis and reporting
Architecture
This project follows the Model Context Protocol architecture for standardized AI model interactions:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ │ │ MCP Server │ │ Resources │ │ MCP Client │ │ (SparkMCPServer)│ │ │ │ (SparkMCPClient) │ │ │ │ ┌──────────────┐ │ │ │ │ ┌─────────┐ │ │ │ Claude AI │ │ │ ┌─────────┐ │ │ │ Tools │ │ <──> │ │ Model │ │ │ │ Tools │ │ │ │Registry │ │ │ └──────────────┘ │ │ │Interface│ │ <──> │ └─────────┘ │ │ │ │ └─────────┘ │ │ ┌─────────┐ │ │ ┌──────────────┐ │ │ │ │ │Protocol │ │ │ │ PySpark │ │ │ │ │ │Handler │ │ │ │ Runtime │ │ │ │ │ └─────────┘ │ │ └──────────────┘ │ └──────────────────┘ └──────────────────┘ └──────────────────┘ │ │ │ │ │ │ v v v ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Available │ │ Registered │ │ External │ │ Tools │ │ Tools │ │ Resources │ ├──────────────┤ ├──────────────┤ ├──────────────┤ │optimize_code │ │optimize_code │ │ Claude API │ │analyze_perf │ │analyze_perf │ │ Spark Engine │ └──────────────┘ └──────────────┘ └──────────────┘
Components
-
MCP Client
- Provides tool interface for code optimization
- Handles async communication with server
- Manages file I/O for code generation
-
MCP Server
- Implements MCP protocol handlers
- Manages tool registry and execution
- Coordinates between client and resources
-
Resources
- Claude AI: Provides code optimization intelligence
- PySpark Runtime: Executes and validates optimizations
Protocol Flow
- Client sends optimization request via MCP protocol
- Server validates request and invokes appropriate tool
- Tool utilizes Claude AI for optimization
- Optimized code is returned via MCP response
- Client saves and validates the optimized code
End-to-End Functionality
sequenceDiagram participant U as User participant C as MCP Client participant S as MCP Server participant AI as Claude AI participant P as PySpark Runtime U->>C: Submit Spark Code C->>S: Send Optimization Request S->>AI: Analyze Code AI-->>S: Optimization Suggestions S->>C: Return Optimized Code C->>P: Run Original Code C->>P: Run Optimized Code P-->>C: Execution Results C->>C: Generate Analysis C-->>U: Final Report
-
Code Submission
- User places PySpark code in
v1/input/spark_code_input.py - Code is read by the MCP client
- User places PySpark code in
-
Optimization Process
- MCP client connects to server via standardized protocol
- Server forwards code to Claude AI for analysis
- AI suggests optimizations based on best practices
- Server validates and processes suggestions
-
Code Generation
- Optimized code saved to
v1/output/optimized_spark_code.py - Includes detailed comments explaining optimizations
- Maintains original code structure while improving performance
- Optimized code saved to
-
Performance Analysis
- Both versions executed in PySpark runtime
- Execution times compared
- Results validated for correctness
- Metrics collected and analyzed
-
Results Generation
- Comprehensive analysis in
v1/output/performance_analysis.md - Side-by-side execution comparison
- Performance improvement statistics
- Optimization explanations and rationale
- Comprehensive analysis in
Usage
Requirements
- Python 3.8+
- PySpark 3.2.0+
- Anthropic API Key (for Claude AI)
Installation
pip install -r requirements.txt
Quick Start
-
Add your Spark code to optimize in
input/spark_code_input.py -
Start the MCP server:
python v1/run_server.py
- Run the client to optimize your code:
python v1/run_client.py
This will generate two files:
output/optimized_spark_example.py: The optimized Spark code with detailed optimization commentsoutput/performance_analysis.md: Comprehensive performance analysis
- Run and compare code versions:
python v1/run_optimized.py
This will:
- Execute both original and optimized code
- Compare execution times and results
- Update the performance analysis with execution metrics
- Show detailed performance improvement statistics
Project Structure
ai-mcp/ ├── input/ │ └── spark_code_input.py # Original Spark code to optimize ├── output/ │ ├── optimized_spark_example.py # Generated optimized code │ └── performance_analysis.md # Detailed performance comparison ├── spark_mcp/ │ ├── client.py # MCP client implementation │ └── server.py # MCP server implementation ├── run_client.py # Client script to optimize code ├── run_server.py # Server startup script └── run_optimized.py # Script to run and compare code versions
Why MCP?
The Model Context Protocol (MCP) provides several key advantages for Spark code optimization:
Direct Claude AI Call vs MCP Server
| Aspect | Direct Claude AI Call | MCP Server |
|---|---|---|
| Integration | • Custom integration per team • Manual response handling • Duplicate implementations |
• Pre-built client libraries • Automated workflows • Unified interfaces |
| Infrastructure | • No built-in validation • No result persistence • Manual tracking |
• Automatic validation • Result persistence • Version control |
| Context | • Basic code suggestions • No execution context • Limited optimization scope |
• Context-aware optimization • Full execution history • Comprehensive improvements |
| Validation | • Manual testing required • No performance metrics • Uncertain outcomes |
• Automated testing • Performance metrics • Validated results |
| Workflow | • Ad-hoc process • No standardization • Manual intervention needed |
• Structured process • Standard protocols • Automated pipeline |
Key Differences:
1. AI Integration
| Approach | Code Example | Benefits |
|---|---|---|
| Traditional | client = anthropic.Client(api_key)response = client.messages.create(...) |
• Complex setup • Custom error handling • Tight coupling |
| MCP | client = SparkMCPClient()result = await client.optimize_spark_code(code) |
• Simple interface • Built-in validation • Loose coupling |
2. Tool Management
| Approach | Code Example | Benefits |
|---|---|---|
| Traditional | class SparkOptimizer:def register_tool(self, name, func):self.tools[name] = func |
• Manual registration • No validation • Complex maintenance |
| MCP | @register_tool("optimize_spark_code")async def optimize_spark_code(code: str): |
• Auto-discovery • Type checking • Easy extension |
3. Resource Management
| Approach | Code Example | Benefits |
|---|---|---|
| Traditional | def __init__(self):self.claude = init_claude()self.spark = init_spark() |
• Manual orchestration • Manual cleanup • Error-prone |
| MCP | @requires_resources(["claude_ai", "spark"])async def optimize_spark_code(code: str): |
• Auto-coordination • Lifecycle management • Error handling |
4. Communication Protocol
| Approach | Code Example | Benefits |
|---|---|---|
| Traditional | {"type": "request","payload": {"code": code}} |
• Custom format • Manual validation • Custom debugging |
| MCP | {"method": "tools/call","params": {"name": "optimize_code"}} |
• Standard format • Auto-validation • Easy debugging |
Features
- Intelligent Code Optimization: Leverages Claude AI to analyze and optimize PySpark code
- Performance Analysis: Provides detailed analysis of performance differences between original and optimized code
- MCP Architecture: Implements the Model Context Protocol for standardized AI model interactions
- Easy Integration: Simple client interface for code optimization requests
- Code Generation: Automatically saves optimized code to separate files
Advanced Usage
You can also use the client programmatically:
from spark_mcp.client import SparkMCPClient
async def main():
# Connect to the MCP server
client = SparkMCPClient()
await client.connect()
# Your Spark code to optimize
spark_code = '''
# Your PySpark code here
'''
# Get optimized code with performance analysis
optimized_code = await client.optimize_spark_code(
code=spark_code,
optimization_level="advanced",
save_to_file=True # Save to output/optimized_spark_example.py
)
# Analyze performance differences
analysis = await client.analyze_performance(
original_code=spark_code,
optimized_code=optimized_code,
save_to_file=True # Save to output/performance_analysis.md
)
# Run both versions and compare
# You can use the run_optimized.py script or implement your own comparison
await client.close()
# Analyze performance
performance = await client.analyze_performance(spark_code, optimized_code)
await client.close()
Example Input and Output
The repository includes an example workflow:
- Input Code (
input/spark_code_input.py):
# Create DataFrames and join
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"])
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"])
# Join and analyze
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg({"salary": "avg", "age": "avg", "id": "count"}) \
.orderBy("dept")
- Optimized Code (
output/optimized_spark_example.py):
# Performance-optimized version with caching and improved configurations
spark = SparkSession.builder \
.appName("EmployeeAnalysis") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# Create and cache DataFrames
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache()
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache()
# Optimized join and analysis
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg(
avg("salary").alias("avg_salary"),
avg("age").alias("avg_age"),
count("id").alias("employee_count")
) \
.orderBy("dept")
- Performance Analysis (
output/performance_analysis.md):
## Execution Results Comparison
### Timing Comparison
- Original Code: 5.18 seconds
- Optimized Code: 0.65 seconds
- Performance Improvement: 87.4%
### Optimization Details
- Caching frequently used DataFrames
- Optimized shuffle partitions
- Improved column expressions
- Better memory management
Project Structure
ai-mcp/ ├── spark_mcp/ │ ├── __init__.py │ ├── client.py # MCP client implementation │ └── server.py # MCP server implementation ├── examples/ │ ├── optimize_code.py # Example usage │ └── optimized_spark_example.py # Generated optimized code ├── requirements.txt └── run_server.py # Server startup script
Available Tools
-
optimize_spark_code
- Optimizes PySpark code for better performance
- Supports basic and advanced optimization levels
- Automatically saves optimized code to examples/optimized_spark_example.py
-
analyze_performance
- Analyzes performance differences between original and optimized code
- Provides insights on:
- Performance improvements
- Resource utilization
- Scalability considerations
- Potential trade-offs
Environment Variables
ANTHROPIC_API_KEY: Your Anthropic API key for Claude AI
Example Optimizations
The system implements various PySpark optimizations including:
- Broadcast joins for small-large table joins
- Efficient window function usage
- Strategic data caching
- Query plan optimizations
- Performance-oriented operation ordering
Contributing
Feel free to submit issues and enhancement requests!
License
MIT License
Dev Tools Supporting MCP
The following are the main code editors that support the Model Context Protocol. Click the link to visit the official website for more information.










