- Explore MCP Servers
- stream2iceberg-mcp
Stream2iceberg Mcp
What is Stream2iceberg Mcp
stream2iceberg-mcp is a modular data pipeline designed to automate the retrieval, transformation, and loading of daily stock price data into an Apache Iceberg table stored on MinIO, facilitating incremental analysis.
Use cases
Use cases include automating daily stock data ingestion for analytics, performing historical data analysis on stock performance, and integrating with SQL analytics tools like Dremio for querying data.
How to use
To use stream2iceberg-mcp, install the required dependencies using ‘pip install -r requirements.txt’, configure the MinIO connection, set the bucket and object prefix, and run the main ETL script with ‘python daily_pipeline.py’.
Key features
Key features include fetching daily stock OHLC data using ‘yfinance’, calculating daily percent changes, storing data in Parquet format with Snappy compression, and ensuring compatibility with Iceberg for query engines like Dremio.
Where to use
stream2iceberg-mcp is suitable for financial analysis, stock market data processing, and any application requiring incremental data updates in a data lake environment.
Clients Supporting MCP
The following are the main client software that supports the Model Context Protocol. Click the link to visit the official website for more information.
Overview
What is Stream2iceberg Mcp
stream2iceberg-mcp is a modular data pipeline designed to automate the retrieval, transformation, and loading of daily stock price data into an Apache Iceberg table stored on MinIO, facilitating incremental analysis.
Use cases
Use cases include automating daily stock data ingestion for analytics, performing historical data analysis on stock performance, and integrating with SQL analytics tools like Dremio for querying data.
How to use
To use stream2iceberg-mcp, install the required dependencies using ‘pip install -r requirements.txt’, configure the MinIO connection, set the bucket and object prefix, and run the main ETL script with ‘python daily_pipeline.py’.
Key features
Key features include fetching daily stock OHLC data using ‘yfinance’, calculating daily percent changes, storing data in Parquet format with Snappy compression, and ensuring compatibility with Iceberg for query engines like Dremio.
Where to use
stream2iceberg-mcp is suitable for financial analysis, stock market data processing, and any application requiring incremental data updates in a data lake environment.
Clients Supporting MCP
The following are the main client software that supports the Model Context Protocol. Click the link to visit the official website for more information.
Content
📈 Stock Daily Data Pipeline to Apache Iceberg on MinIO connected to MCP server
This project automates the retrieval, transformation, and loading of daily stock price data using Kafka streaming from Yahoo Finance into an Apache Iceberg table stored on MinIO. The data is queryable using Dremio. The daily and batch pipeline gets loaded onto the Iceberg table which supports an MCP client-server architecture through a Javascript frontend
🚀 Features
- Fetches daily stock OHLC data for the last n years using
yfinance
. - Calculates daily percent change for each stock.
- Writes data in Parquet format with Snappy compression.
- Stores the data in MinIO under a designated bucket and prefix.
- Iceberg-compatible format for data lake query engines like Dremio.
- TQDM progress bar for monitoring download status.
🧾 Tech Stack
- Python with
yfinance
,pandas
,s3fs
, andpyarrow
- Apache Iceberg format (optional Iceberg table promotion)
- Apache Kafka for daily streaming (can be extended to lower grains)
- MinIO for S3-compatible object storage
- Dremio as query engine for SQL analytics
- Claude LLM powering MCP server
- Simple HTML+JS frontend
📂 Project Structure
stock_daily_pipeline/ │ ├── daily_pipeline.py # Main ETL script ├── requirements.txt # Dependencies ├── README.md # Project documentation
🛠️ Setup Instructions
1. Install Requirements
pip install -r requirements.txt
2. Configure MinIO Connection
Make sure MinIO is running at http://127.0.0.1:9000
with:
fs = s3fs.S3FileSystem(
key='admin',
secret='password',
client_kwargs={'endpoint_url': 'http://127.0.0.1:9000'}
)
3. Set Bucket and Object Prefix
BUCKET_NAME = "lakehouse"
OBJECT_PREFIX = "stock_batch"
✅ How to Run
python daily_pipeline.py
You will be prompted to enter the number of years (e.g., 2
), and the script will:
- Fetch historical data for 20 large-cap tickers.
- Generate daily metrics (
trade_date
,open_price
,close_price
,pct_change
). - Write to:
s3://lakehouse/stock_batch/daily_stock_data.parquet
in Parquet format.
📊 Example Query in Dremio
SELECT
trade_date,
ticker,
open_price,
close_price,
pct_change
FROM lakehouse.stock_batch
WHERE trade_date < DATE '2025-05-16';
If
trade_date
is a string, wrap it withTO_DATE(trade_date, 'YYYY-MM-DD')
.
🧪 Sample Output Format
{
"trade_date": "2025-05-15",
"ticker": "AAPL",
"open_price": 189.55,
"close_price": 191.23,
"pct_change": 0.89
}
📈 Ticker List
The script currently processes the following tickers:
AAPL, MSFT, GOOGL, AMZN, NVDA, TSLA, META, BRK-B, LLY, JPM, V, UNH, AVGO, MA, JNJ, HD, XOM, PG, COST, BAC
(Note: BRK-B
is internally converted to BRK.B
for Yahoo Finance.)
✅ Frontend
cd ~/frontend && python mcp_server.py Enter your ANTRHOPIC API KEY cd frontend/public && python3 -m http.server 3001
🧪 System HLD
🔄 Pipeline LLD
App screenshot
Chatbot Demo
https://github.com/user-attachments/assets/67df52f3-419a-4ebb-970f-bf090d1c290e
🔄 Future Enhancements
- Partition data by
year
orticker
for faster querying. - Automate Iceberg table promotion using Dremio REST API or Nessie.
- Schedule with Github actions for CI/CD
🧾 License
MIT License
🙋♂️ Questions?
Feel free to raise issues or reach out for improvements or support!
Dev Tools Supporting MCP
The following are the main code editors that support the Model Context Protocol. Click the link to visit the official website for more information.