by aci-labs
ms-fabric-mcp is a Python-based server that implements the Model Context Protocol (MCP) to facilitate interaction with Microsoft Fabric APIs. It integrates Large Language Models (LLMs) to enhance PySpark notebook development, testing, and optimization within the Microsoft Fabric environment.
ms-fabric-mcp (Microsoft Fabric MCP Server) is a comprehensive Python-based server that implements the Model Context Protocol (MCP) to facilitate interaction with Microsoft Fabric APIs. It integrates Large Language Models (LLMs) to enhance PySpark notebook development, testing, and optimization within the Microsoft Fabric environment.
To use ms-fabric-mcp, you need Python 3.12+, Azure credentials, uv
, and Azure CLI. Optional Node.js is required for the MCP inspector. After cloning the repository and setting up the virtual environment with uv sync
and pip install -r requirements.txt
, you can connect to Microsoft Fabric using az login --scope https://api.fabric.microsoft.com/.default
. The server can be run via STDIO or HTTP. For STDIO, use uv run --with mcp mcp dev fabric_mcp.py
to start the server with the inspector at http://localhost:6274
. For HTTP, use uv run python .\fabric_mcp.py --port 8081
. VSCode integration is supported by configuring launch.json
for either STDIO or HTTP server types.
Q: What are common issues and how to troubleshoot them?
A: Common issues include authentication problems (ensure az login
with correct scope), context issues (use clear_context()
to reset session state), and workspace verification (check names and permissions). Validation tools and performance analysis can also help in troubleshooting.
Q: What PySpark templates are available?
A: Basic templates include basic
, etl
, analytics
, and ml
. Advanced templates are fabric_integration
and streaming
.
Q: How does ms-fabric-mcp help with performance optimization? A: It provides tools for comprehensive performance analysis, identifies anti-patterns and bottlenecks, suggests specific optimizations, generates optimized code alternatives, and offers before/after comparisons. It also encourages best practices like caching DataFrames, using broadcast for small tables, and partitioning large datasets.
Q: How does the LLM integration work? A: The LLM acts as an intelligent interface. Developers request assistance in their IDE, the LLM analyzes the request using context and reasoning, calls MCP server tools, and then the results flow back through the LLM with intelligent formatting, providing contextual and smart responses.
A comprehensive Python-based MCP (Model Context Protocol) server for interacting with Microsoft Fabric APIs, featuring advanced PySpark notebook development, testing, and optimization capabilities with LLM integration.
Clone the repository:
git clone https://github.com/your-repo/fabric-mcp.git
cd fabric-mcp
Set up virtual environment:
uv sync
Install dependencies:
pip install -r requirements.txt
az login --scope https://api.fabric.microsoft.com/.default
uv run --with mcp mcp dev fabric_mcp.py
This starts the server with inspector at http://localhost:6274
.
Add to your launch.json
:
{
"mcp": {
"servers": {
"ms-fabric-mcp": {
"type": "stdio",
"command": "<FullPathToProjectFolder>\\.venv\\Scripts\\python.exe",
"args": ["<FullPathToProjectFolder>\\fabric_mcp.py"]
}
}
}
}
uv run python .\fabric_mcp.py --port 8081
Add to your launch.json
:
{
"mcp": {
"servers": {
"ms-fabric-mcp": {
"type": "http",
"url": "http://<localhost or remote IP>:8081/mcp/",
"headers": {
"Accept": "application/json,text/event-stream",
}
}
}
}
}
list_workspaces
List all available Fabric workspaces.
# Usage in LLM: "List all my Fabric workspaces"
set_workspace
Set the current workspace context for the session.
set_workspace(workspace="Analytics-Workspace")
list_lakehouses
List all lakehouses in a workspace.
list_lakehouses(workspace="Analytics-Workspace")
create_lakehouse
Create a new lakehouse.
create_lakehouse(
name="Sales-Data-Lake",
workspace="Analytics-Workspace",
description="Sales data lakehouse"
)
set_lakehouse
Set current lakehouse context.
set_lakehouse(lakehouse="Sales-Data-Lake")
list_warehouses
List all warehouses in a workspace.
list_warehouses(workspace="Analytics-Workspace")
create_warehouse
Create a new warehouse.
create_warehouse(
name="Sales-DW",
workspace="Analytics-Workspace",
description="Sales data warehouse"
)
set_warehouse
Set current warehouse context.
set_warehouse(warehouse="Sales-DW")
list_tables
List all tables in a lakehouse.
list_tables(workspace="Analytics-Workspace", lakehouse="Sales-Data-Lake")
get_lakehouse_table_schema
Get schema for a specific table.
get_lakehouse_table_schema(
workspace="Analytics-Workspace",
lakehouse="Sales-Data-Lake",
table_name="transactions"
)
get_all_lakehouse_schemas
Get schemas for all tables in a lakehouse.
get_all_lakehouse_schemas(
workspace="Analytics-Workspace",
lakehouse="Sales-Data-Lake"
)
set_table
Set current table context.
set_table(table_name="transactions")
get_sql_endpoint
Get SQL endpoint for lakehouse or warehouse.
get_sql_endpoint(
workspace="Analytics-Workspace",
lakehouse="Sales-Data-Lake",
type="lakehouse"
)
run_query
Execute SQL queries.
run_query(
workspace="Analytics-Workspace",
lakehouse="Sales-Data-Lake",
query="SELECT COUNT(*) FROM transactions",
type="lakehouse"
)
load_data_from_url
Load data from URL into tables.
load_data_from_url(
url="https://example.com/data.csv",
destination_table="new_data",
workspace="Analytics-Workspace",
lakehouse="Sales-Data-Lake"
)
list_reports
List all reports in a workspace.
list_reports(workspace="Analytics-Workspace")
get_report
Get specific report details.
get_report(workspace="Analytics-Workspace", report_id="report-id")
list_semantic_models
List semantic models in workspace.
list_semantic_models(workspace="Analytics-Workspace")
get_semantic_model
Get specific semantic model.
get_semantic_model(workspace="Analytics-Workspace", model_id="model-id")
list_notebooks
List all notebooks in a workspace.
list_notebooks(workspace="Analytics-Workspace")
get_notebook_content
Retrieve notebook content.
get_notebook_content(
workspace="Analytics-Workspace",
notebook_id="notebook-id"
)
update_notebook_cell
Update specific notebook cells.
update_notebook_cell(
workspace="Analytics-Workspace",
notebook_id="notebook-id",
cell_index=0,
cell_content="print('Hello, Fabric!')",
cell_type="code"
)
create_pyspark_notebook
Create notebooks from basic templates.
create_pyspark_notebook(
workspace="Analytics-Workspace",
notebook_name="Data-Analysis",
template_type="analytics" # Options: basic, etl, analytics, ml
)
create_fabric_notebook
Create Fabric-optimized notebooks.
create_fabric_notebook(
workspace="Analytics-Workspace",
notebook_name="Fabric-Pipeline",
template_type="fabric_integration" # Options: fabric_integration, streaming
)
generate_pyspark_code
Generate code for common operations.
generate_pyspark_code(
operation="read_table",
source_table="sales.transactions",
columns="id,amount,date"
)
# Available operations:
# - read_table, write_table, transform, join, aggregate
# - schema_inference, data_quality, performance_optimization
generate_fabric_code
Generate Fabric-specific code.
generate_fabric_code(
operation="read_lakehouse",
lakehouse_name="Sales-Data-Lake",
table_name="transactions"
)
# Available operations:
# - read_lakehouse, write_lakehouse, merge_delta, performance_monitor
validate_pyspark_code
Validate PySpark code syntax and best practices.
validate_pyspark_code(code="""
df = spark.table('transactions')
df.show()
""")
validate_fabric_code
Validate Fabric compatibility.
validate_fabric_code(code="""
df = spark.table('lakehouse.transactions')
df.write.format('delta').saveAsTable('summary')
""")
analyze_notebook_performance
Comprehensive performance analysis.
analyze_notebook_performance(
workspace="Analytics-Workspace",
notebook_id="notebook-id"
)
clear_context
Clear current session context.
clear_context()
# β
Use managed tables
df = spark.table("lakehouse.my_table")
# β
Use Delta Lake format
df.write.format("delta").mode("overwrite").saveAsTable("my_table")
# β
Leverage notebookutils
import notebookutils as nbu
workspace_id = nbu.runtime.context.workspaceId
# β
Cache frequently used DataFrames
df.cache()
# β
Use broadcast for small tables
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# β
Partition large datasets
df.write.partitionBy("year", "month").saveAsTable("partitioned_table")
# β
Define explicit schemas
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])
# β
Handle null values
df.filter(col("column").isNotNull())
Human: "Create a PySpark notebook that reads sales data, cleans it, and optimizes performance"
LLM Response:
1. Creates Fabric-optimized notebook with ETL template
2. Generates lakehouse reading code
3. Adds data cleaning transformations
4. Includes performance optimization patterns
5. Validates code for best practices
Human: "My PySpark notebook is slow. Help me optimize it."
LLM Response:
1. Analyzes notebook performance (scoring 0-100)
2. Identifies anti-patterns and bottlenecks
3. Suggests specific optimizations
4. Generates optimized code alternatives
5. Provides before/after comparisons
az login
with correct scopeclear_context()
to reset session stateThe analysis tools provide:
This project welcomes contributions! Please see our contributing guidelines for details.
This project is licensed under the MIT License. See the LICENSE file for details.
Inspired by: https://github.com/Augustab/microsoft_fabric_mcp/tree/main
Ready to supercharge your Microsoft Fabric development with intelligent PySpark assistance! π
Please log in to share your review and rating for this MCP.