scanpy_mcp / tools /clustering.py
Paper2Agent's picture
Upload 5 files
ffcb052 verified
raw
history blame
31.3 kB
"""
Scanpy tutorial for single-cell RNA sequencing preprocessing and clustering analysis.
This MCP Server provides 7 tools:
1. quality_control: Calculate and visualize QC metrics, filter cells and genes, detect doublets
2. normalize_data: Normalize count data with median total counts and log transformation
3. select_features: Identify highly variable genes for feature selection
4. reduce_dimensionality: Perform PCA analysis and variance visualization
5. build_neighborhood_graph: Construct nearest neighbor graph and UMAP embedding
6. cluster_cells: Perform Leiden clustering with visualization
7. annotate_cell_types: Multi-resolution clustering, marker gene analysis, and differential expression
All tools extracted from `https://github.com/scverse/scanpy/tree/main/docs/tutorials/basics/clustering.ipynb`.
"""
# Standard imports
from typing import Annotated, Literal, Any
import pandas as pd
import numpy as np
from pathlib import Path
import os
from fastmcp import FastMCP
from datetime import datetime
import matplotlib.pyplot as plt
# Scanpy and related imports
import scanpy as sc
import anndata as ad
# Base persistent directory (HF Spaces guarantees /data is writable & persistent)
BASE_DIR = Path("/data")
DEFAULT_INPUT_DIR = BASE_DIR / "tmp_inputs"
DEFAULT_OUTPUT_DIR = BASE_DIR / "tmp_outputs"
INPUT_DIR = Path(os.environ.get("CLUSTERING_INPUT_DIR", DEFAULT_INPUT_DIR))
OUTPUT_DIR = Path(os.environ.get("CLUSTERING_OUTPUT_DIR", DEFAULT_OUTPUT_DIR))
# Ensure directories exist
INPUT_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# Timestamp for unique outputs
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# MCP server instance
clustering_mcp = FastMCP(name="clustering")
# Set scanpy figure parameters
sc.settings.set_figure_params(dpi=300, facecolor="white")
@clustering_mcp.tool
def quality_control(
# Primary data inputs
data_path: Annotated[str, "Path to h5ad file or directory with 10X data. The h5ad file should contain raw count data in AnnData format."] = None,
# Analysis parameters with tutorial defaults
mt_prefix: Annotated[str, "Prefix for mitochondrial genes"] = "MT-",
ribo_prefixes: Annotated[list, "Prefixes for ribosomal genes"] = ["RPS", "RPL"],
hb_pattern: Annotated[str, "Pattern for hemoglobin genes"] = "^HB[^(P)]",
min_genes: Annotated[int, "Minimum number of genes expressed per cell"] = 100,
min_cells: Annotated[int, "Minimum number of cells expressing a gene"] = 3,
batch_key: Annotated[str | None, "Column name in adata.obs for batch information"] = None,
out_prefix: Annotated[str | None, "Output file prefix"] = None,
) -> dict:
"""
Calculate quality control metrics, visualize QC distributions, and filter low-quality cells and genes.
Input is single-cell count data in AnnData format and output is QC plots, filtered data, and doublet scores.
"""
# Validate exactly one input
if data_path is None:
raise ValueError("Path to h5ad file or 10X data directory must be provided")
# Set output prefix
if out_prefix is None:
out_prefix = f"qc_{timestamp}"
# Load data
data_path = Path(data_path)
if data_path.is_dir():
# Assume 10X directory format
adata = sc.read_10x_mtx(data_path)
adata.var_names_make_unique()
elif data_path.suffix in ['.h5', '.h5ad']:
if data_path.suffix == '.h5':
adata = sc.read_10x_h5(data_path)
adata.var_names_make_unique()
else:
adata = ad.read_h5ad(data_path)
else:
raise ValueError("data_path must be a directory with 10X data or h5/h5ad file")
# Define gene categories
adata.var["mt"] = adata.var_names.str.startswith(mt_prefix)
adata.var["ribo"] = adata.var_names.str.startswith(tuple(ribo_prefixes))
adata.var["hb"] = adata.var_names.str.contains(hb_pattern)
# Calculate QC metrics
sc.pp.calculate_qc_metrics(
adata, qc_vars=["mt", "ribo", "hb"], inplace=True, log1p=True
)
# Create QC violin plots
plt.figure(figsize=(12, 4))
sc.pl.violin(
adata,
["n_genes_by_counts", "total_counts", "pct_counts_mt"],
jitter=0.4,
multi_panel=True,
)
violin_path = OUTPUT_DIR / f"{out_prefix}_qc_violin.png"
plt.savefig(violin_path, dpi=300, bbox_inches='tight')
plt.close()
# Create QC scatter plot
plt.figure(figsize=(8, 6))
sc.pl.scatter(adata, "total_counts", "n_genes_by_counts", color="pct_counts_mt")
scatter_path = OUTPUT_DIR / f"{out_prefix}_qc_scatter.png"
plt.savefig(scatter_path, dpi=300, bbox_inches='tight')
plt.close()
# Filter cells and genes
print(f"Before filtering: {adata.n_obs} cells, {adata.n_vars} genes")
sc.pp.filter_cells(adata, min_genes=min_genes)
sc.pp.filter_genes(adata, min_cells=min_cells)
print(f"After filtering: {adata.n_obs} cells, {adata.n_vars} genes")
# Doublet detection
if batch_key and batch_key in adata.obs.columns:
sc.pp.scrublet(adata, batch_key=batch_key)
else:
sc.pp.scrublet(adata)
# Save processed data
output_file = OUTPUT_DIR / f"{out_prefix}_qc_processed.h5ad"
adata.write_h5ad(output_file)
# Save QC metrics summary
qc_summary = pd.DataFrame({
'metric': ['n_obs', 'n_vars', 'mean_n_genes_by_counts', 'mean_total_counts', 'mean_pct_counts_mt', 'doublet_rate'],
'value': [
adata.n_obs,
adata.n_vars,
adata.obs['n_genes_by_counts'].mean(),
adata.obs['total_counts'].mean(),
adata.obs['pct_counts_mt'].mean(),
adata.obs['predicted_doublet'].sum() / adata.n_obs
]
})
qc_summary_path = OUTPUT_DIR / f"{out_prefix}_qc_summary.csv"
qc_summary.to_csv(qc_summary_path, index=False)
return {
"message": f"Quality control completed for {adata.n_obs} cells and {adata.n_vars} genes",
"reference": "https://github.com/scverse/scanpy/tree/main/docs/tutorials/basics/clustering.ipynb",
"artifacts": [
{
"description": "QC violin plots",
"path": str(violin_path.resolve())
},
{
"description": "QC scatter plot",
"path": str(scatter_path.resolve())
},
{
"description": "QC processed data",
"path": str(output_file.resolve())
},
{
"description": "QC metrics summary",
"path": str(qc_summary_path.resolve())
}
]
}
@clustering_mcp.tool
def normalize_data(
# Primary data inputs
data_path: Annotated[str, "Path to h5ad file with QC-processed single-cell data. Should be output from quality_control tool."],
# Analysis parameters with tutorial defaults
target_sum: Annotated[float | None, "Target sum for normalization. None uses median total counts"] = None,
out_prefix: Annotated[str | None, "Output file prefix"] = None,
) -> dict:
"""
Normalize count data using median total counts scaling followed by log1p transformation.
Input is quality-controlled AnnData object and output is normalized expression data.
"""
# Validate exactly one input
if data_path is None:
raise ValueError("Path to h5ad file must be provided")
# Set output prefix
if out_prefix is None:
out_prefix = f"normalized_{timestamp}"
# Load data
adata = ad.read_h5ad(data_path)
# Saving count data
adata.layers["counts"] = adata.X.copy()
# Normalizing to median total counts (or target_sum if specified)
sc.pp.normalize_total(adata, target_sum=target_sum)
# Logarithmize the data
sc.pp.log1p(adata)
# Save normalized data
output_file = OUTPUT_DIR / f"{out_prefix}_normalized.h5ad"
adata.write_h5ad(output_file)
# Create normalization summary
import numpy as np
from scipy import sparse
# Handle sparse matrices properly
if sparse.issparse(adata.layers["counts"]):
counts_mean = adata.layers["counts"].mean()
counts_std = np.sqrt(adata.layers["counts"].multiply(adata.layers["counts"]).mean() - counts_mean**2)
else:
counts_mean = np.mean(adata.layers["counts"])
counts_std = np.std(adata.layers["counts"])
if sparse.issparse(adata.X):
x_mean = adata.X.mean()
x_std = np.sqrt(adata.X.multiply(adata.X).mean() - x_mean**2)
else:
x_mean = np.mean(adata.X)
x_std = np.std(adata.X)
norm_summary = pd.DataFrame({
'layer': ['raw_counts', 'normalized_log1p'],
'mean_expression': [float(counts_mean), float(x_mean)],
'std_expression': [float(counts_std), float(x_std)]
})
summary_path = OUTPUT_DIR / f"{out_prefix}_normalization_summary.csv"
norm_summary.to_csv(summary_path, index=False)
return {
"message": f"Data normalized with log1p transformation for {adata.n_obs} cells",
"reference": "https://github.com/scverse/scanpy/tree/main/docs/tutorials/basics/clustering.ipynb",
"artifacts": [
{
"description": "Normalized data",
"path": str(output_file.resolve())
},
{
"description": "Normalization summary",
"path": str(summary_path.resolve())
}
]
}
@clustering_mcp.tool
def select_features(
# Primary data inputs
data_path: Annotated[str, "Path to h5ad file with normalized single-cell data. Should be output from normalize_data tool."],
# Analysis parameters with tutorial defaults
n_top_genes: Annotated[int, "Number of highly variable genes to select"] = 2000,
batch_key: Annotated[str | None, "Column name in adata.obs for batch correction"] = None,
flavor: Annotated[Literal["seurat", "cell_ranger", "seurat_v3"], "Method for highly variable gene selection"] = "seurat",
out_prefix: Annotated[str | None, "Output file prefix"] = None,
) -> dict:
"""
Identify highly variable genes for feature selection using specified method.
Input is normalized AnnData object and output is feature selection plot and filtered data.
"""
# Validate exactly one input
if data_path is None:
raise ValueError("Path to h5ad file must be provided")
# Set output prefix
if out_prefix is None:
out_prefix = f"features_{timestamp}"
# Load data
adata = ad.read_h5ad(data_path)
# Find highly variable genes
if batch_key and batch_key in adata.obs.columns:
sc.pp.highly_variable_genes(adata, n_top_genes=n_top_genes, batch_key=batch_key, flavor=flavor)
else:
sc.pp.highly_variable_genes(adata, n_top_genes=n_top_genes, flavor=flavor)
# Plot highly variable genes
plt.figure(figsize=(10, 6))
sc.pl.highly_variable_genes(adata)
plot_path = OUTPUT_DIR / f"{out_prefix}_highly_variable_genes.png"
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
plt.close()
# Save data with feature selection
output_file = OUTPUT_DIR / f"{out_prefix}_feature_selected.h5ad"
adata.write_h5ad(output_file)
# Create feature selection summary
n_highly_var = adata.var['highly_variable'].sum()
feature_summary = pd.DataFrame({
'metric': ['total_genes', 'highly_variable_genes', 'selection_fraction'],
'value': [
adata.n_vars,
n_highly_var,
n_highly_var / adata.n_vars
]
})
summary_path = OUTPUT_DIR / f"{out_prefix}_feature_summary.csv"
feature_summary.to_csv(summary_path, index=False)
return {
"message": f"Selected {n_highly_var} highly variable genes from {adata.n_vars} total genes",
"reference": "https://github.com/scverse/scanpy/tree/main/docs/tutorials/basics/clustering.ipynb",
"artifacts": [
{
"description": "Highly variable genes plot",
"path": str(plot_path.resolve())
},
{
"description": "Feature selected data",
"path": str(output_file.resolve())
},
{
"description": "Feature selection summary",
"path": str(summary_path.resolve())
}
]
}
@clustering_mcp.tool
def reduce_dimensionality(
# Primary data inputs
data_path: Annotated[str, "Path to h5ad file with feature-selected data. Should be output from select_features tool."],
# Analysis parameters with tutorial defaults
n_comps: Annotated[int, "Number of principal components to compute"] = 50,
use_highly_variable: Annotated[bool, "Whether to use only highly variable genes"] = True,
n_pcs_plot: Annotated[int, "Number of PCs to show in variance plot"] = 50,
color_vars: Annotated[list, "Variables to color PCA plot by"] = ["sample", "pct_counts_mt"],
out_prefix: Annotated[str | None, "Output file prefix"] = None,
) -> dict:
"""
Perform principal component analysis for dimensionality reduction and visualization.
Input is feature-selected AnnData object and output is PCA embeddings and variance plots.
"""
# Validate exactly one input
if data_path is None:
raise ValueError("Path to h5ad file must be provided")
# Set output prefix
if out_prefix is None:
out_prefix = f"pca_{timestamp}"
# Load data
adata = ad.read_h5ad(data_path)
# Perform PCA
sc.tl.pca(adata, n_comps=n_comps, use_highly_variable=use_highly_variable)
# Plot PCA variance ratio
plt.figure(figsize=(10, 6))
sc.pl.pca_variance_ratio(adata, n_pcs=n_pcs_plot, log=True)
variance_path = OUTPUT_DIR / f"{out_prefix}_pca_variance.png"
plt.savefig(variance_path, dpi=300, bbox_inches='tight')
plt.close()
# Plot PCA colored by specified variables
available_vars = [var for var in color_vars if var in adata.obs.columns]
if available_vars:
# Create combinations for plotting
plot_colors = []
plot_dims = []
for var in available_vars[:2]: # Limit to 2 variables to match tutorial
plot_colors.extend([var, var])
plot_dims.extend([(0, 1), (2, 3)])
plt.figure(figsize=(12, 8))
sc.pl.pca(
adata,
color=plot_colors,
dimensions=plot_dims,
ncols=2,
size=2,
)
pca_path = OUTPUT_DIR / f"{out_prefix}_pca_colored.png"
plt.savefig(pca_path, dpi=300, bbox_inches='tight')
plt.close()
pca_artifacts = [{"description": "PCA colored by variables", "path": str(pca_path.resolve())}]
else:
pca_artifacts = []
# Save data with PCA
output_file = OUTPUT_DIR / f"{out_prefix}_pca.h5ad"
adata.write_h5ad(output_file)
# Create PCA summary
pca_summary = pd.DataFrame({
'PC': [f'PC{i+1}' for i in range(min(10, n_comps))],
'variance_ratio': adata.uns['pca']['variance_ratio'][:min(10, n_comps)]
})
summary_path = OUTPUT_DIR / f"{out_prefix}_pca_summary.csv"
pca_summary.to_csv(summary_path, index=False)
artifacts = [
{
"description": "PCA variance plot",
"path": str(variance_path.resolve())
},
{
"description": "PCA processed data",
"path": str(output_file.resolve())
},
{
"description": "PCA summary",
"path": str(summary_path.resolve())
}
] + pca_artifacts
return {
"message": f"PCA completed with {n_comps} components explaining {adata.uns['pca']['variance_ratio'].sum():.2%} variance",
"reference": "https://github.com/scverse/scanpy/tree/main/docs/tutorials/basics/clustering.ipynb",
"artifacts": artifacts
}
@clustering_mcp.tool
def build_neighborhood_graph(
# Primary data inputs
data_path: Annotated[str, "Path to h5ad file with PCA data. Should be output from reduce_dimensionality tool."],
# Analysis parameters with tutorial defaults
n_neighbors: Annotated[int, "Number of neighbors for graph construction"] = 15,
n_pcs: Annotated[int, "Number of principal components to use"] = None,
color_by: Annotated[str, "Variable to color UMAP by"] = "sample",
point_size: Annotated[float, "Point size for UMAP plot"] = 2,
out_prefix: Annotated[str | None, "Output file prefix"] = None,
) -> dict:
"""
Build nearest neighbor graph from PCA space and compute UMAP embedding for visualization.
Input is PCA-processed AnnData object and output is neighbor graph, UMAP embedding, and visualization.
"""
# Validate exactly one input
if data_path is None:
raise ValueError("Path to h5ad file must be provided")
# Set output prefix
if out_prefix is None:
out_prefix = f"neighbors_{timestamp}"
# Load data
adata = ad.read_h5ad(data_path)
# Compute the neighborhood graph
sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=n_pcs)
# Compute UMAP
sc.tl.umap(adata)
# Plot UMAP
if color_by in adata.obs.columns:
plt.figure(figsize=(8, 6))
sc.pl.umap(adata, color=color_by, size=point_size)
umap_path = OUTPUT_DIR / f"{out_prefix}_umap.png"
plt.savefig(umap_path, dpi=300, bbox_inches='tight')
plt.close()
else:
# Plot without coloring if variable doesn't exist
plt.figure(figsize=(8, 6))
sc.pl.umap(adata, size=point_size)
umap_path = OUTPUT_DIR / f"{out_prefix}_umap.png"
plt.savefig(umap_path, dpi=300, bbox_inches='tight')
plt.close()
# Save data with neighborhood graph and UMAP
output_file = OUTPUT_DIR / f"{out_prefix}_neighbors.h5ad"
adata.write_h5ad(output_file)
# Create neighborhood summary
neighbor_summary = pd.DataFrame({
'metric': ['n_neighbors', 'n_pcs_used', 'umap_dimensions'],
'value': [n_neighbors, n_pcs, adata.obsm['X_umap'].shape[1]]
})
summary_path = OUTPUT_DIR / f"{out_prefix}_neighbor_summary.csv"
neighbor_summary.to_csv(summary_path, index=False)
return {
"message": f"Neighborhood graph and UMAP completed for {adata.n_obs} cells",
"reference": "https://github.com/scverse/scanpy/tree/main/docs/tutorials/basics/clustering.ipynb",
"artifacts": [
{
"description": "UMAP visualization",
"path": str(umap_path.resolve())
},
{
"description": "Neighborhood graph data",
"path": str(output_file.resolve())
},
{
"description": "Neighborhood summary",
"path": str(summary_path.resolve())
}
]
}
@clustering_mcp.tool
def cluster_cells(
# Primary data inputs
data_path: Annotated[str, "Path to h5ad file with neighborhood graph. Should be output from build_neighborhood_graph tool."],
# Analysis parameters with tutorial defaults
resolution: Annotated[float, "Resolution parameter for Leiden clustering"] = 0.5,
flavor: Annotated[Literal["igraph", "leidenalg"], "Leiden algorithm implementation"] = "igraph",
n_iterations: Annotated[int, "Number of iterations for clustering"] = 2,
cluster_key: Annotated[str, "Key name for storing clusters in adata.obs"] = "leiden",
out_prefix: Annotated[str | None, "Output file prefix"] = None,
) -> dict:
"""
Perform Leiden clustering on the neighborhood graph and visualize results.
Input is AnnData with neighborhood graph and output is clustered data with UMAP visualization.
"""
# Validate exactly one input
if data_path is None:
raise ValueError("Path to h5ad file must be provided")
# Set output prefix
if out_prefix is None:
out_prefix = f"clusters_{timestamp}"
# Load data
adata = ad.read_h5ad(data_path)
# Perform Leiden clustering
sc.tl.leiden(
adata,
resolution=resolution,
flavor=flavor,
n_iterations=n_iterations,
key_added=cluster_key
)
# Plot UMAP colored by clusters
plt.figure(figsize=(8, 6))
sc.pl.umap(adata, color=[cluster_key])
cluster_path = OUTPUT_DIR / f"{out_prefix}_clusters_umap.png"
plt.savefig(cluster_path, dpi=300, bbox_inches='tight')
plt.close()
# Save clustered data
output_file = OUTPUT_DIR / f"{out_prefix}_clustered.h5ad"
adata.write_h5ad(output_file)
# Create clustering summary
n_clusters = len(adata.obs[cluster_key].unique())
cluster_counts = adata.obs[cluster_key].value_counts().sort_index()
cluster_summary = pd.DataFrame({
'cluster': cluster_counts.index,
'n_cells': cluster_counts.values,
'fraction': cluster_counts.values / adata.n_obs
})
summary_path = OUTPUT_DIR / f"{out_prefix}_cluster_summary.csv"
cluster_summary.to_csv(summary_path, index=False)
return {
"message": f"Leiden clustering identified {n_clusters} clusters at resolution {resolution}",
"reference": "https://github.com/scverse/scanpy/tree/main/docs/tutorials/basics/clustering.ipynb",
"artifacts": [
{
"description": "Clusters UMAP plot",
"path": str(cluster_path.resolve())
},
{
"description": "Clustered data",
"path": str(output_file.resolve())
},
{
"description": "Cluster summary",
"path": str(summary_path.resolve())
}
]
}
@clustering_mcp.tool
def annotate_cell_types(
# Primary data inputs
data_path: Annotated[str, "Path to h5ad file with clustered data. Should be output from cluster_cells tool."],
# Analysis parameters with tutorial defaults
resolutions: Annotated[list, "List of resolutions for multi-resolution clustering"] = [0.02, 0.5, 2.0],
groupby_key: Annotated[str, "Clustering key to use for marker analysis"] = "leiden_res_0.50",
method: Annotated[Literal["wilcoxon", "t-test", "logreg"], "Method for differential expression"] = "wilcoxon",
n_genes: Annotated[int, "Number of top genes to show in plots"] = 5,
marker_genes: Annotated[dict | None, "Dictionary of cell type marker genes"] = None,
out_prefix: Annotated[str | None, "Output file prefix"] = None,
) -> dict:
"""
Perform multi-resolution clustering, marker gene analysis, and differential expression for cell type annotation.
Input is clustered AnnData object and output is multi-resolution plots, marker analysis, and differential expression results.
"""
# Validate exactly one input
if data_path is None:
raise ValueError("Path to h5ad file must be provided")
# Set output prefix
if out_prefix is None:
out_prefix = f"annotation_{timestamp}"
# Load data
adata = ad.read_h5ad(data_path)
# Define default marker genes if not provided
if marker_genes is None:
marker_genes = {
"CD14+ Mono": ["FCN1", "CD14"],
"CD16+ Mono": ["TCF7L2", "FCGR3A", "LYN"],
"cDC2": ["CST3", "COTL1", "LYZ", "DMXL2", "CLEC10A", "FCER1A"],
"Erythroblast": ["MKI67", "HBA1", "HBB"],
"Proerythroblast": ["CDK6", "SYNGR1", "HBM", "GYPA"],
"NK": ["GNLY", "NKG7", "CD247", "FCER1G", "TYROBP", "KLRG1", "FCGR3A"],
"ILC": ["ID2", "PLCG2", "GNLY", "SYNE1"],
"Naive CD20+ B": ["MS4A1", "IL4R", "IGHD", "FCRL1", "IGHM"],
"B cells": ["MS4A1", "ITGB1", "COL4A4", "PRDM1", "IRF4", "PAX5", "BCL11A", "BLK", "IGHD", "IGHM"],
"Plasma cells": ["MZB1", "HSP90B1", "FNDC3B", "PRDM1", "IGKC", "JCHAIN"],
"Plasmablast": ["XBP1", "PRDM1", "PAX5"],
"CD4+ T": ["CD4", "IL7R", "TRBC2"],
"CD8+ T": ["CD8A", "CD8B", "GZMK", "GZMA", "CCL5", "GZMB", "GZMH", "GZMA"],
"T naive": ["LEF1", "CCR7", "TCF7"],
"pDC": ["GZMB", "IL3RA", "COBLL1", "TCF4"],
}
# Perform multi-resolution clustering
for res in resolutions:
sc.tl.leiden(
adata, key_added=f"leiden_res_{res:4.2f}", resolution=res, flavor="igraph"
)
# Plot multi-resolution clustering
cluster_keys = [f"leiden_res_{res:4.2f}" for res in resolutions]
plt.figure(figsize=(15, 5))
sc.pl.umap(
adata,
color=cluster_keys,
legend_loc="on data",
)
multiresolution_path = OUTPUT_DIR / f"{out_prefix}_multiresolution_clusters.png"
plt.savefig(multiresolution_path, dpi=300, bbox_inches='tight')
plt.close()
# Check if groupby_key exists, if not use first resolution
if groupby_key not in adata.obs.columns:
groupby_key = cluster_keys[1] if len(cluster_keys) > 1 else cluster_keys[0]
# Plot marker genes
# Filter marker genes to only include those present in the data
available_markers = {}
for cell_type, genes in marker_genes.items():
available_genes = [g for g in genes if g in adata.var_names]
if available_genes:
available_markers[cell_type] = available_genes
if available_markers:
plt.figure(figsize=(12, 8))
sc.pl.dotplot(adata, available_markers, groupby=groupby_key, standard_scale="var")
marker_path = OUTPUT_DIR / f"{out_prefix}_marker_genes.png"
plt.savefig(marker_path, dpi=300, bbox_inches='tight')
plt.close()
marker_artifacts = [{"description": "Marker genes dotplot", "path": str(marker_path.resolve())}]
else:
marker_artifacts = []
# Differential expression analysis
sc.tl.rank_genes_groups(adata, groupby=groupby_key, method=method)
# Plot top differentially expressed genes
plt.figure(figsize=(10, 8))
sc.pl.rank_genes_groups_dotplot(
adata, groupby=groupby_key, standard_scale="var", n_genes=n_genes
)
de_path = OUTPUT_DIR / f"{out_prefix}_differential_expression.png"
plt.savefig(de_path, dpi=300, bbox_inches='tight')
plt.close()
# Create manual cell type annotations for coarse resolution
coarse_key = f"leiden_res_{resolutions[0]:4.2f}"
if coarse_key in adata.obs.columns:
adata.obs["cell_type_lvl1"] = adata.obs[coarse_key].map({
"0": "Lymphocytes",
"1": "Monocytes",
"2": "Erythroid",
"3": "B Cells",
})
# Save annotated data
output_file = OUTPUT_DIR / f"{out_prefix}_annotated.h5ad"
adata.write_h5ad(output_file)
# Export differential expression results
de_results = []
for cluster in adata.obs[groupby_key].unique():
cluster_genes = sc.get.rank_genes_groups_df(adata, group=cluster).head(n_genes)
cluster_genes['cluster'] = cluster
de_results.append(cluster_genes)
if de_results:
de_df = pd.concat(de_results, ignore_index=True)
de_path_csv = OUTPUT_DIR / f"{out_prefix}_differential_genes.csv"
de_df.to_csv(de_path_csv, index=False)
de_artifacts = [{"description": "Differential expression genes", "path": str(de_path_csv.resolve())}]
else:
de_artifacts = []
# Create annotation summary
annotation_summary = pd.DataFrame({
'resolution': resolutions,
'n_clusters': [len(adata.obs[f"leiden_res_{res:4.2f}"].unique()) for res in resolutions]
})
summary_path = OUTPUT_DIR / f"{out_prefix}_annotation_summary.csv"
annotation_summary.to_csv(summary_path, index=False)
artifacts = [
{
"description": "Multi-resolution clustering",
"path": str(multiresolution_path.resolve())
},
{
"description": "Differential expression plot",
"path": str(de_path.resolve())
},
{
"description": "Annotated data",
"path": str(output_file.resolve())
},
{
"description": "Annotation summary",
"path": str(summary_path.resolve())
}
] + marker_artifacts + de_artifacts
return {
"message": f"Cell type annotation completed with {len(resolutions)} resolutions and marker analysis",
"reference": "https://github.com/scverse/scanpy/tree/main/docs/tutorials/basics/clustering.ipynb",
"artifacts": artifacts
}
@clustering_mcp.prompt
def preprocess_and_cluster_scanpy(data_path: str) -> str:
"""
Complete preprocessing and clustering pipeline for single-cell RNA sequencing data analysis.
This comprehensive workflow performs all essential steps for analyzing scRNA-seq data from raw counts
to cell type annotation, following the standard Scanpy tutorial for single-cell analysis.
"""
return f"""
Execute a complete single-cell RNA-seq preprocessing and clustering pipeline on {data_path}.
First inspect the data to understand:
- Dataset size and complexity
- Organism (human/mouse) from gene names
- Batch information in adata.obs (e.g., "sample", "batch", "donor", "experiment", "condition")
- Data quality distribution
IMPORTANT: Adapt parameters intelligently based on data characteristics.
Stick to the defaults if there is no strong reason (e.g. unchanged leads to false results) to change.
Then run the pipeline sequentially, making smart parameter choices:
1. **quality_control** - Examine data and adapt:
- data_path="{data_path}"
- batch_key: Set if batch columns exist (for batch-aware doublet detection)
- mt_prefix: "MT-" (human) or "Mt-" (mouse) based on gene names
- min_genes/min_cells: Adjust based on quality distributions
- Review QC plots before proceeding
2. **normalize_data** - Use QC output:
- target_sum: None (median) or 10000 (CP10K)
3. **select_features** - Feature selection:
- batch_key: Use same as step 1 if batches present
- n_top_genes: 2000-3000 based on complexity
- flavor: "seurat" or "seurat_v3" for high dropout
4. **reduce_dimensionality** - PCA analysis:
- n_comps: 50 (or less for small datasets)
- Review variance plot for optimal PC selection
- color_vars: Include relevant metadata
5. **build_neighborhood_graph** - Graph construction:
- n_pcs: Based on elbow in variance plot (20-40)
- n_neighbors: 10-30 based on dataset size
- Check UMAP for batch effects
6. **cluster_cells** - Clustering:
- resolution: 0.1-0.4 (broad) or 0.6-1.5 (fine)
- Based on expected cell type diversity
7. **annotate_cell_types** - Annotation:
- resolutions: Test multiple [low, medium, high]
- marker_genes: Provide tissue-specific markers if known
- Validate with marker expression
KEY DECISIONS:
- Identify and consistently use batch_key throughout if batches exist
- Adjust all thresholds based on data quality
- Validate each step before proceeding
- Document any anomalies or batch effects
The pipeline produces a fully annotated dataset with QC metrics, embeddings, clusters, and cell type markers.
"""