Skip to content

API Reference

App

Marimo notebook Flyte App entrypoint.

Glue between the note_env AppEnvironment (declared in stargazer.config) and the marimo notebook server. Researchers use Marimo notebooks to explore data, run tasks, and visualize results — bridging exploratory work and production workflows.

Local development

marimo edit src/stargazer/notebooks/byod.py

Run locally as a docker run container: docker run -p 8080:8080 ghcr.io/stargazerbio/stargazer-note:latest

Deploy hosted to Flyte

stargazer-app

spec: docs/architecture/notebook.md

main()

Deploy the Marimo notebook app to Flyte.

Source code in src/stargazer/app.py
def main():
    """Deploy the Marimo notebook app to Flyte."""
    flyte.init_from_config(root_dir=Path(__file__).parent)
    app = flyte.serve(note_env)
    print(f"App URL: {app.url}")

    def _shutdown(signum, frame):
        """Handle SIGINT/SIGTERM by killing the entire process group."""
        pid = app._process.pid
        try:
            # Kill the whole process group (marimo + its children)
            os.killpg(os.getpgid(pid), signal.SIGTERM)
        except (ProcessLookupError, PermissionError):
            pass
        app.deactivate(wait=True)
        sys.exit(0)

    signal.signal(signal.SIGINT, _shutdown)
    signal.signal(signal.SIGTERM, _shutdown)
    app._process.wait()

Build Images

Build Stargazer's Flyte task images locally.

Iterates the per-task Flyte environments declared in stargazer.configscrna_env and gatk_env — and calls flyte.build_images() on each. With image.builder = local in .flyte/config.yaml and no registry= set on the images, the docker builder runs with --load and the result stays in the local docker cache (no push, no registry credentials needed). CI is expected to publish to the hosted registry on merge to main.

Equivalent to running flyte build src/stargazer/config.py <env> once per env, but without the per-invocation init overhead.

The human-runnable images (note, chat) are built from the project's Dockerfile instead — see docs/guides/contributing.md for the docker build --target {note,chat} commands.

spec: docs/architecture/configuration.md

main()

Build and push images for every Flyte task environment.

Source code in src/stargazer/build_images.py
def main():
    """Build and push images for every Flyte task environment."""
    flyte.init_from_config(root_dir=Path(PROJECT_ROOT))

    for env in (scrna_env, gatk_env):
        logger.info(f"Building image for env: {env.name}")
        cache = flyte.build_images(env)
        logger.info(f"{env.name}: {cache!r}")

Config

Centralized configuration for Stargazer.

Sets environment variable defaults at import time. Consumers read os.environ directly rather than importing named values from this module.

Also the source of truth for the lean per-task Flyte environments (scrna_env, gatk_env) and the thin AppEnvironment that hosts the Marimo notebook UI (note_env). The human-runnable images (note, chat) are built from the project's Dockerfilenote_env consumes the pre-built stargazer-note image as its base.

Rules: - PINATA_JWT: No default — absence means no authenticated Pinata. - PINATA_GATEWAY: Defaults to dweb.link if unset. Set to empty string to force a failure on public downloads. - PINATA_VISIBILITY: Defaults to "private" if unset. Only evaluated by PinataClient — if JWT is unset, downloads are always public. - STARGAZER_LOCAL: Local storage directory. Defaults to ~/.stargazer/local.

spec: docs/architecture/configuration.md

log_execution()

Start a per-execution log sink and return the execution ID.

Derives the workflow name from the calling function, fetches the current git commit hash, and creates a dedicated logfile for this execution. Warns if the git tree has uncommitted changes.

Source code in src/stargazer/config.py
def log_execution() -> str:
    """Start a per-execution log sink and return the execution ID.

    Derives the workflow name from the calling function, fetches the current
    git commit hash, and creates a dedicated logfile for this execution.
    Warns if the git tree has uncommitted changes.
    """
    workflow = inspect.currentframe().f_back.f_code.co_name
    timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")

    result = subprocess.run(
        ["git", "rev-parse", "--short", "HEAD"],
        capture_output=True,
        text=True,
    )
    commit = result.stdout.strip() or "unknown"

    status = subprocess.run(
        ["git", "status", "--porcelain"],
        capture_output=True,
        text=True,
    )
    if status.stdout.strip():
        commit += "-dirty"
        logger.warning("Git tree is dirty — uncommitted changes present")

    execution_id = f"{workflow}-{commit}-{timestamp}"
    logger.add(_log_dir / f"{execution_id}.log")
    logger.info(f"Execution started: {execution_id}")
    return execution_id

Marshal

Output marshaling: typed object → dict (for MCP response serialization).

spec: docs/architecture/mcp-server.md

marshal_output(value)

Convert a typed Python object to a JSON-friendly structure for MCP transport.

Source code in src/stargazer/marshal.py
def marshal_output(value: Any) -> Any:
    """Convert a typed Python object to a JSON-friendly structure for MCP transport."""
    if value is None:
        return None

    if hasattr(value, "to_dict"):
        return value.to_dict()

    if isinstance(value, Path):
        return str(value)

    if isinstance(value, tuple):
        return {f"o{i}": marshal_output(item) for i, item in enumerate(value)}

    if isinstance(value, list):
        return [marshal_output(item) for item in value]

    if isinstance(value, dict):
        return {k: marshal_output(v) for k, v in value.items()}

    return value

Registry

Task registry for auto-discovery of Flyte tasks and workflows.

Discovers all tasks from stargazer.tasks and stargazer.workflows modules, extracts parameter types, defaults, and return types for MCP catalog exposure.

spec: docs/architecture/mcp-server.md

TaskInfo dataclass

Complete metadata about a registered task.

Source code in src/stargazer/registry.py
@dataclass
class TaskInfo:
    """Complete metadata about a registered task."""

    name: str
    category: str  # "task" or "workflow"
    description: str
    params: list[TaskParam]
    outputs: list[TaskOutput]
    task_obj: Any  # The Flyte task object

TaskOutput dataclass

Describes a single output of a task.

Source code in src/stargazer/registry.py
@dataclass
class TaskOutput:
    """Describes a single output of a task."""

    name: str
    type_hint: Any
    type_name: str

TaskParam dataclass

Describes a single parameter of a task.

Source code in src/stargazer/registry.py
@dataclass
class TaskParam:
    """Describes a single parameter of a task."""

    name: str
    type_hint: Any
    type_name: str
    required: bool
    default: Any = None

TaskRegistry dataclass

Discovers and provides access to all Flyte tasks and workflows.

Source code in src/stargazer/registry.py
@dataclass
class TaskRegistry:
    """Discovers and provides access to all Flyte tasks and workflows."""

    _tasks: dict[str, TaskInfo] = field(default_factory=dict)

    def __post_init__(self):
        """Discover all tasks and workflows on initialization."""
        self._discover()

    def _discover(self):
        """Walk task and workflow modules to register all Flyte tasks."""
        self._discover_tasks()
        self._discover_workflows()

    def _discover_tasks(self):
        """Register all tasks from stargazer.tasks.__all__."""
        import stargazer.tasks as tasks_mod

        for name in tasks_mod.__all__:
            obj = getattr(tasks_mod, name)
            if not hasattr(obj, "func"):
                continue
            self._register(obj.short_name, obj, category="task")

    def _discover_workflows(self):
        """Register all workflows from stargazer.workflows.__all__."""
        import stargazer.workflows as workflows_mod

        for name in workflows_mod.__all__:
            obj = getattr(workflows_mod, name)
            if not hasattr(obj, "func"):
                continue
            # Skip if already registered (e.g. duplicate short_name across modules)
            if obj.short_name in self._tasks:
                continue
            self._register(obj.short_name, obj, category="workflow")

    def _register(self, name: str, task_obj: Any, category: str):
        """Register a single task by introspecting its wrapped function."""
        func = task_obj.func
        sig = inspect.signature(func)
        hints = get_type_hints(func)

        # Extract parameters
        params = []
        for pname, param in sig.parameters.items():
            hint = hints.get(pname, Any)
            has_default = param.default is not inspect.Parameter.empty
            params.append(
                TaskParam(
                    name=pname,
                    type_hint=hint,
                    type_name=_type_name(hint),
                    required=not has_default,
                    default=param.default if has_default else None,
                )
            )

        # Extract outputs from return type
        return_hint = hints.get("return", type(None))
        outputs = _parse_outputs(return_hint)

        # Extract description from docstring
        doc = func.__doc__ or ""
        description = doc.strip().split("\n")[0] if doc.strip() else ""

        self._tasks[name] = TaskInfo(
            name=name,
            category=category,
            description=description,
            params=params,
            outputs=outputs,
            task_obj=task_obj,
        )

    def get(self, name: str) -> TaskInfo | None:
        """Look up a task by name."""
        return self._tasks.get(name)

    def list_tasks(self, category: str | None = None) -> list[TaskInfo]:
        """List all registered tasks, optionally filtered by category."""
        tasks = list(self._tasks.values())
        if category:
            tasks = [t for t in tasks if t.category == category]
        return tasks

    def to_catalog(self, category: str | None = None) -> list[dict]:
        """Return a JSON-serializable catalog of all tasks."""
        catalog = []
        for info in self.list_tasks(category=category):
            catalog.append(
                {
                    "name": info.name,
                    "category": info.category,
                    "description": info.description,
                    "params": [
                        {
                            "name": p.name,
                            "type": p.type_name,
                            "required": p.required,
                            "default": _serialize_default(p.default)
                            if not p.required
                            else None,
                        }
                        for p in info.params
                    ],
                    "outputs": [
                        {"name": o.name, "type": o.type_name} for o in info.outputs
                    ],
                }
            )
        return catalog

__post_init__()

Discover all tasks and workflows on initialization.

Source code in src/stargazer/registry.py
def __post_init__(self):
    """Discover all tasks and workflows on initialization."""
    self._discover()

get(name)

Look up a task by name.

Source code in src/stargazer/registry.py
def get(self, name: str) -> TaskInfo | None:
    """Look up a task by name."""
    return self._tasks.get(name)

list_tasks(category=None)

List all registered tasks, optionally filtered by category.

Source code in src/stargazer/registry.py
def list_tasks(self, category: str | None = None) -> list[TaskInfo]:
    """List all registered tasks, optionally filtered by category."""
    tasks = list(self._tasks.values())
    if category:
        tasks = [t for t in tasks if t.category == category]
    return tasks

to_catalog(category=None)

Return a JSON-serializable catalog of all tasks.

Source code in src/stargazer/registry.py
def to_catalog(self, category: str | None = None) -> list[dict]:
    """Return a JSON-serializable catalog of all tasks."""
    catalog = []
    for info in self.list_tasks(category=category):
        catalog.append(
            {
                "name": info.name,
                "category": info.category,
                "description": info.description,
                "params": [
                    {
                        "name": p.name,
                        "type": p.type_name,
                        "required": p.required,
                        "default": _serialize_default(p.default)
                        if not p.required
                        else None,
                    }
                    for p in info.params
                ],
                "outputs": [
                    {"name": o.name, "type": o.type_name} for o in info.outputs
                ],
            }
        )
    return catalog

Server

Stargazer MCP Server.

Exposes storage tools and a dynamic task runner via FastMCP. Tasks and workflows are auto-discovered from the registry and executed through the Flyte local run context.

Usage

stargazer # stdio transport (default) stargazer --http # streamable-http transport

spec: docs/architecture/mcp-server.md

delete_file(cid) async

Delete a file by CID.

Source code in src/stargazer/server.py
@mcp.tool()
async def delete_file(cid: str) -> str:
    """Delete a file by CID."""
    comp = Asset(cid=cid)
    await default_client.delete(comp)
    return f"Deleted file {cid}"

download_file(cid) async

Download a file by CID to local cache. Returns the local path.

Source code in src/stargazer/server.py
@mcp.tool()
async def download_file(cid: str) -> str:
    """Download a file by CID to local cache. Returns the local path."""
    comp = Asset(cid=cid)
    await default_client.download(comp)
    return str(comp.path)

fetch_resource_bundle(bundle_name) async

Download a predefined resource bundle into local storage.

Bundles are curated sets of files (e.g. reference genomes, demo datasets) defined in the codebase. Each file is identified by CID and downloaded via the standard storage path (signed URL with JWT, or public IPFS gateway).

When PINATA_JWT is set, remote metadata is authoritative and overwrites local records. Without a JWT, the bundle manifest provides the metadata.

Parameters:

Name Type Description Default
bundle_name str

Name of the bundle (from list_bundles).

required

Returns:

Type Description
list[dict]

List of fetched files with cid, keyvalues, and local path.

Source code in src/stargazer/server.py
@mcp.tool()
async def fetch_resource_bundle(bundle_name: str) -> list[dict]:
    """Download a predefined resource bundle into local storage.

    Bundles are curated sets of files (e.g. reference genomes, demo datasets)
    defined in the codebase. Each file is identified by CID and downloaded
    via the standard storage path (signed URL with JWT, or public IPFS gateway).

    When PINATA_JWT is set, remote metadata is authoritative and overwrites
    local records. Without a JWT, the bundle manifest provides the metadata.

    Args:
        bundle_name: Name of the bundle (from list_bundles).

    Returns:
        List of fetched files with cid, keyvalues, and local path.
    """
    from stargazer.bundles import fetch_bundle

    return await fetch_bundle(bundle_name)

list_bundles()

List available resource bundles.

Returns:

Type Description
list[dict]

List of bundles with name, description, and file_count.

Source code in src/stargazer/server.py
@mcp.tool()
def list_bundles() -> list[dict]:
    """List available resource bundles.

    Returns:
        List of bundles with name, description, and file_count.
    """
    from stargazer.bundles import list_bundles as _list_bundles

    return _list_bundles()

list_tasks(category=None)

List available tasks and workflows with their parameter signatures.

Parameters:

Name Type Description Default
category str | None

Filter by "task" or "workflow". Omit for all.

None

Returns:

Type Description
list[dict]

Catalog of tasks with name, category, description, params, and outputs.

Source code in src/stargazer/server.py
@mcp.tool()
def list_tasks(category: str | None = None) -> list[dict]:
    """List available tasks and workflows with their parameter signatures.

    Args:
        category: Filter by "task" or "workflow". Omit for all.

    Returns:
        Catalog of tasks with name, category, description, params, and outputs.
    """
    return _registry.to_catalog(category=category)

main()

Run the Stargazer MCP server.

Source code in src/stargazer/server.py
def main():
    """Run the Stargazer MCP server."""
    import sys

    transport = "stdio"
    if "--http" in sys.argv:
        transport = "streamable-http"
    mcp.run(transport=transport)

query_files(keyvalues) async

Query files by metadata key-value pairs. Returns matching files.

Source code in src/stargazer/server.py
@mcp.tool()
async def query_files(keyvalues: dict[str, str]) -> list[dict]:
    """Query files by metadata key-value pairs. Returns matching files."""
    return await default_client.query(keyvalues)

run_task(task_name, filters, inputs=None) async

Run a single task by name for ad-hoc experimentation.

Use this for testing individual tools in isolation. Asset parameters are assembled from storage using the provided filters — one call to assemble() resolves all required assets. Scalar and Path parameters are passed separately via inputs.

For reproducible pipeline runs, use run_workflow instead.

Parameters:

Name Type Description Default
task_name str

Name of the task (from list_tasks with category="task").

required
filters dict

Keyvalue filters for assemble() to resolve asset parameters (e.g. {"build": "GRCh38", "sample_id": "NA12878"}).

required
inputs dict | None

Optional scalar/Path keyword arguments (str, int, bool, list[str]).

None

Returns:

Type Description
dict

Serialized task output. Single outputs returned directly,

dict

multi-outputs as {"o0": ..., "o1": ...}.

Source code in src/stargazer/server.py
@mcp.tool()
async def run_task(task_name: str, filters: dict, inputs: dict | None = None) -> dict:
    """Run a single task by name for ad-hoc experimentation.

    Use this for testing individual tools in isolation. Asset parameters
    are assembled from storage using the provided filters — one call to
    assemble() resolves all required assets. Scalar and Path parameters
    are passed separately via inputs.

    For reproducible pipeline runs, use run_workflow instead.

    Args:
        task_name: Name of the task (from list_tasks with category="task").
        filters: Keyvalue filters for assemble() to resolve asset parameters
                 (e.g. {"build": "GRCh38", "sample_id": "NA12878"}).
        inputs: Optional scalar/Path keyword arguments (str, int, bool, list[str]).

    Returns:
        Serialized task output. Single outputs returned directly,
        multi-outputs as {"o0": ..., "o1": ...}.
    """
    info = _registry.get(task_name)
    if info is None:
        available = [t.name for t in _registry.list_tasks(category="task")]
        raise ValueError(f"Unknown task: {task_name!r}. Available: {available}")
    if info.category != "task":
        raise ValueError(f"{task_name!r} is a workflow — use run_workflow instead.")

    inputs = inputs or {}

    # Assemble all assets from storage in one query
    assets = await assemble(**filters) if filters else []

    # Build kwargs: match Asset params from the assembled list, scalars from inputs
    kwargs = {}
    for p in info.params:
        asset_key = _asset_key_for_hint(p.type_hint)
        if asset_key:
            matched = [a for a in assets if a._asset_key == asset_key]
            if not matched and p.required:
                raise ValueError(
                    f"Task {task_name!r} requires {p.name} ({asset_key}) "
                    f"but no matching asset found for filters: {filters}"
                )
            if matched:
                kwargs[p.name] = (
                    matched if _is_list_asset_hint(p.type_hint) else matched[-1]
                )
        elif p.name in inputs:
            value = inputs[p.name]
            if p.type_hint is Path and isinstance(value, str):
                value = Path(value)
            kwargs[p.name] = value

    return await _execute(info, kwargs)

run_workflow(workflow_name, inputs) async

Run a workflow by name for reproducible pipeline execution.

Workflows accept scalar parameters (str, int, bool, list[str]) and handle their own asset assembly internally. Pass inputs exactly as the workflow signature defines them — no automatic resolution is performed.

For ad-hoc experimentation with individual tools, use run_task instead.

Parameters:

Name Type Description Default
workflow_name str

Name of the workflow (from list_tasks with category="workflow").

required
inputs dict

Keyword arguments as a JSON dict (scalars only).

required

Returns:

Type Description
dict

Serialized workflow output. Single outputs returned directly,

dict

multi-outputs as {"o0": ..., "o1": ...}.

Source code in src/stargazer/server.py
@mcp.tool()
async def run_workflow(workflow_name: str, inputs: dict) -> dict:
    """Run a workflow by name for reproducible pipeline execution.

    Workflows accept scalar parameters (str, int, bool, list[str]) and
    handle their own asset assembly internally. Pass inputs exactly as
    the workflow signature defines them — no automatic resolution is
    performed.

    For ad-hoc experimentation with individual tools, use run_task instead.

    Args:
        workflow_name: Name of the workflow (from list_tasks with category="workflow").
        inputs: Keyword arguments as a JSON dict (scalars only).

    Returns:
        Serialized workflow output. Single outputs returned directly,
        multi-outputs as {"o0": ..., "o1": ...}.
    """
    info = _registry.get(workflow_name)
    if info is None:
        available = [t.name for t in _registry.list_tasks(category="workflow")]
        raise ValueError(f"Unknown workflow: {workflow_name!r}. Available: {available}")
    if info.category != "workflow":
        raise ValueError(f"{workflow_name!r} is a task — use run_task instead.")

    return await _execute(info, dict(inputs))

show_config() async

Show current Stargazer configuration and available task counts.

Source code in src/stargazer/server.py
@mcp.resource("stargazer://config")
async def show_config() -> str:
    """Show current Stargazer configuration and available task counts."""
    tasks = _registry.list_tasks(category="task")
    workflows = _registry.list_tasks(category="workflow")
    config = {
        "pinata_jwt": "set" if os.environ.get("PINATA_JWT") else "unset",
        "pinata_visibility": os.environ["PINATA_VISIBILITY"],
        "local_dir": str(default_client.local_dir),
        "tasks": len(tasks),
        "workflows": len(workflows),
    }
    return json.dumps(config, indent=2)

upload_file(path, keyvalues) async

Upload a file with metadata key-value pairs.

keyvalues must include "asset". Valid asset keys are derived from the Asset registry (e.g. asset=reference component=fasta).

When displaying results, always show a table with the CID and all keyvalues.

Source code in src/stargazer/server.py
@mcp.tool()
async def upload_file(path: str, keyvalues: dict[str, str]) -> dict:
    """Upload a file with metadata key-value pairs.

    keyvalues must include "asset". Valid asset keys are derived
    from the Asset registry (e.g. asset=reference component=fasta).

    When displaying results, always show a table with the CID and all keyvalues.
    """
    asset_key = keyvalues.get("asset")
    if asset_key not in ASSET_REGISTRY:
        valid = sorted(ASSET_REGISTRY.keys())
        raise ValueError(f"Invalid asset key {asset_key!r}. Valid keys: {valid}")
    cls = ASSET_REGISTRY[asset_key]
    import dataclasses
    from stargazer.assets.asset import _BASE_FIELDS

    declared = {f.name for f in dataclasses.fields(cls)} - _BASE_FIELDS
    unknown = set(keyvalues) - declared - {"asset"}
    if unknown:
        raise ValueError(
            f"Unknown keys for {asset_key!r}: {unknown}. Allowed: {sorted(declared)}"
        )
    comp = cls.from_keyvalues(keyvalues, path=Path(path))
    await default_client.upload(comp)
    return comp.to_dict()

Tasks

apply_bqsr task for Stargazer.

Applies BQSR recalibration to BAM files using GATK ApplyBQSR.

spec: docs/architecture/tasks.md

apply_bqsr(alignment, ref, bqsr_report) async

Apply Base Quality Score Recalibration to a BAM file.

Parameters:

Name Type Description Default
alignment Alignment

Input BAM asset

required
ref Reference

Reference FASTA asset

required
bqsr_report BQSRReport

Recalibration table from base_recalibrator

required

Returns:

Type Description
Alignment

Alignment asset with recalibrated BAM file

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037055712-ApplyBQSR

Source code in src/stargazer/tasks/gatk/apply_bqsr.py
@gatk_env.task
async def apply_bqsr(
    alignment: Alignment,
    ref: Reference,
    bqsr_report: BQSRReport,
) -> Alignment:
    """
    Apply Base Quality Score Recalibration to a BAM file.

    Args:
        alignment: Input BAM asset
        ref: Reference FASTA asset
        bqsr_report: Recalibration table from base_recalibrator

    Returns:
        Alignment asset with recalibrated BAM file

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037055712-ApplyBQSR
    """
    logger.info(alignment.to_dict())
    logger.info(ref.to_dict())
    logger.info(bqsr_report.to_dict())
    # fetch() auto-downloads companions (.fai, .dict, .bai, etc.)
    await alignment.fetch()
    await ref.fetch()
    await bqsr_report.fetch()

    ref_path = ref.path
    bam_path = alignment.path
    recal_path = bqsr_report.path

    if not recal_path or not recal_path.exists():
        raise FileNotFoundError("BQSR recalibration report not found in cache.")

    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{alignment.sample_id}_recalibrated.bam"

    cmd = [
        "gatk",
        "ApplyBQSR",
        "-R",
        str(ref_path),
        "-I",
        str(bam_path),
        "--bqsr-recal-file",
        str(recal_path),
        "-O",
        str(output_bam),
    ]

    await _run(cmd, cwd=str(output_dir))

    if not output_bam.exists():
        raise FileNotFoundError(f"ApplyBQSR did not create output BAM at {output_bam}")

    recal_bam = Alignment()
    await recal_bam.update(
        output_bam,
        sample_id=alignment.sample_id,
        format="bam",
        sorted="coordinate",
        duplicates_marked=alignment.duplicates_marked,
        bqsr_applied=True,
        tool="gatk_apply_bqsr",
        reference_cid=ref.cid,
    )

    logger.info(recal_bam.to_dict())
    return recal_bam

ApplyVQSR task for Stargazer.

Applies VQSR recalibration to a VCF using GATK ApplyVQSR.

spec: docs/architecture/tasks.md

apply_vqsr(vcf, ref, vqsr_model, truth_sensitivity_filter_level=None) async

Apply VQSR recalibration to a VCF using GATK ApplyVQSR.

The recalibration mode (SNP or INDEL) is read from vqsr_model.keyvalues["mode"]. If truth_sensitivity_filter_level is not provided, defaults to 99.5 for SNP and 99.0 for INDEL.

Parameters:

Name Type Description Default
vcf Variants

Raw (or SNP-filtered) VCF Variants asset

required
ref Reference

Reference FASTA asset

required
vqsr_model VQSRModel

Recalibration model from variant_recalibrator

required
truth_sensitivity_filter_level float | None

VQSLOD filter threshold (optional)

None

Returns:

Type Description
Variants

Variants asset with VQSR-filtered VCF

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360035531612-Variant-Quality-Score-Recalibration-VQSR

Source code in src/stargazer/tasks/gatk/apply_vqsr.py
@gatk_env.task
async def apply_vqsr(
    vcf: Variants,
    ref: Reference,
    vqsr_model: VQSRModel,
    truth_sensitivity_filter_level: float | None = None,
) -> Variants:
    """
    Apply VQSR recalibration to a VCF using GATK ApplyVQSR.

    The recalibration mode (SNP or INDEL) is read from vqsr_model.keyvalues["mode"].
    If truth_sensitivity_filter_level is not provided, defaults to 99.5 for SNP
    and 99.0 for INDEL.

    Args:
        vcf: Raw (or SNP-filtered) VCF Variants asset
        ref: Reference FASTA asset
        vqsr_model: Recalibration model from variant_recalibrator
        truth_sensitivity_filter_level: VQSLOD filter threshold (optional)

    Returns:
        Variants asset with VQSR-filtered VCF

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360035531612-Variant-Quality-Score-Recalibration-VQSR
    """
    logger.info(vcf.to_dict())
    logger.info(ref.to_dict())
    logger.info(vqsr_model.to_dict())
    mode = vqsr_model.mode or "SNP"
    if mode not in ("SNP", "INDEL"):
        raise ValueError(f"VQSRModel mode must be 'SNP' or 'INDEL', got {mode!r}")

    if not vqsr_model.tranches_path:
        raise ValueError("VQSRModel is missing tranches_path")
    tranches_path = Path(vqsr_model.tranches_path)

    filter_level = truth_sensitivity_filter_level or _DEFAULT_FILTER_LEVEL[mode]

    await vcf.fetch()
    await ref.fetch()
    await vqsr_model.fetch()

    output_dir = _storage.default_client.local_dir
    sample_id = vcf.sample_id or "cohort"
    output_vcf = output_dir / f"{sample_id}_vqsr_{mode.lower()}.vcf"

    cmd = [
        "gatk",
        "ApplyVQSR",
        "-R",
        str(ref.path),
        "-V",
        str(vcf.path),
        "--recal-file",
        str(vqsr_model.path),
        "--tranches-file",
        str(tranches_path),
        "--truth-sensitivity-filter-level",
        str(filter_level),
        "--create-output-variant-index",
        "true",
        "-mode",
        mode,
        "-O",
        str(output_vcf),
    ]

    _, stderr = await _run(cmd, cwd=str(output_dir))

    if not output_vcf.exists():
        raise FileNotFoundError(
            f"ApplyVQSR did not create output VCF at {output_vcf}. stderr: {stderr}"
        )

    source_samples = vcf.source_samples or [sample_id]
    filtered_vcf = Variants()
    await filtered_vcf.update(
        output_vcf,
        sample_id=sample_id,
        caller="apply_vqsr",
        variant_type="vcf",
        vqsr_mode=mode,
        build=ref.build,
        sample_count=len(source_samples),
        source_samples=source_samples,
    )

    idx_path = output_dir / f"{output_vcf.name}.idx"
    if idx_path.exists():
        vidx = VariantsIndex()
        await vidx.update(idx_path, sample_id=sample_id, variants_cid=filtered_vcf.cid)

    logger.info(filtered_vcf.to_dict())
    return filtered_vcf

base_recalibrator task for Stargazer.

Creates BQSR recalibration table using GATK BaseRecalibrator.

spec: docs/architecture/tasks.md

base_recalibrator(alignment, ref, known_sites) async

Generate a Base Quality Score Recalibration report.

Uses GATK BaseRecalibrator to analyze patterns of covariation in the sequence dataset and produce a recalibration table.

Parameters:

Name Type Description Default
alignment Alignment

Input BAM asset (should be sorted and have duplicates marked)

required
ref Reference

Reference FASTA asset

required
known_sites list[KnownSites]

List of KnownSites VCF assets (dbSNP, known indels, etc.)

required

Returns:

Type Description
BQSRReport

BQSRReport asset containing the recalibration table

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360036898312-BaseRecalibrator

Source code in src/stargazer/tasks/gatk/base_recalibrator.py
@gatk_env.task
async def base_recalibrator(
    alignment: Alignment,
    ref: Reference,
    known_sites: list[KnownSites],
) -> BQSRReport:
    """
    Generate a Base Quality Score Recalibration report.

    Uses GATK BaseRecalibrator to analyze patterns of covariation in the
    sequence dataset and produce a recalibration table.

    Args:
        alignment: Input BAM asset (should be sorted and have duplicates marked)
        ref: Reference FASTA asset
        known_sites: List of KnownSites VCF assets (dbSNP, known indels, etc.)

    Returns:
        BQSRReport asset containing the recalibration table

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360036898312-BaseRecalibrator
    """
    logger.info(alignment.to_dict())
    logger.info(ref.to_dict())
    logger.info([x.to_dict() for x in known_sites])
    if not known_sites:
        raise ValueError("known_sites list cannot be empty for BQSR")

    # fetch() auto-downloads companions (.fai, .dict, .bai, etc.)
    await alignment.fetch()
    await ref.fetch()
    for site in known_sites:
        await site.fetch()

    ref_path = ref.path
    bam_path = alignment.path
    output_dir = _storage.default_client.local_dir
    output_recal = output_dir / f"{alignment.sample_id}_bqsr.table"

    cmd = [
        "gatk",
        "BaseRecalibrator",
        "-R",
        str(ref_path),
        "-I",
        str(bam_path),
        "-O",
        str(output_recal),
    ]
    for site in known_sites:
        cmd.extend(["--known-sites", str(site.path)])

    await _run(cmd, cwd=str(output_dir))

    if not output_recal.exists():
        raise FileNotFoundError(
            f"BaseRecalibrator did not create recalibration report at {output_recal}"
        )

    report = BQSRReport()
    await report.update(
        output_recal,
        sample_id=alignment.sample_id,
        tool="gatk_base_recalibrator",
        alignment_cid=alignment.cid,
    )

    logger.info(report.to_dict())
    return report

CombineGVCFs task for Stargazer.

Combines multiple per-sample GVCFs into a single multi-sample GVCF for joint genotyping using GATK CombineGVCFs.

spec: docs/architecture/tasks.md

combine_gvcfs(gvcfs, ref, cohort_id='cohort') async

Combine multiple per-sample GVCFs into a single multi-sample GVCF.

Parameters:

Name Type Description Default
gvcfs list[Variants]

List of Variants assets, each containing a GVCF from a single sample

required
ref Reference

Reference FASTA asset

required
cohort_id str

Identifier for the combined cohort (default: "cohort")

'cohort'

Returns:

Type Description
Variants

Variants asset with combined multi-sample GVCF

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037053272-CombineGVCFs

Source code in src/stargazer/tasks/gatk/combine_gvcfs.py
@gatk_env.task
async def combine_gvcfs(
    gvcfs: list[Variants],
    ref: Reference,
    cohort_id: str = "cohort",
) -> Variants:
    """
    Combine multiple per-sample GVCFs into a single multi-sample GVCF.

    Args:
        gvcfs: List of Variants assets, each containing a GVCF from a single sample
        ref: Reference FASTA asset
        cohort_id: Identifier for the combined cohort (default: "cohort")

    Returns:
        Variants asset with combined multi-sample GVCF

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037053272-CombineGVCFs
    """
    logger.info([x.to_dict() for x in gvcfs])
    logger.info(ref.to_dict())
    if not gvcfs:
        raise ValueError("gvcfs list cannot be empty")

    for i, gvcf in enumerate(gvcfs):
        if gvcf.variant_type != "gvcf":
            raise ValueError(
                f"combine_gvcfs requires GVCF files, but gvcfs[{i}] is not a GVCF. "
                f"sample_id={gvcf.sample_id}"
            )

    # fetch() auto-downloads companions (.fai, .dict for ref; .idx for each gvcf)
    await ref.fetch()

    gvcf_paths: list[Path] = []
    sample_ids: list[str] = []

    for gvcf in gvcfs:
        await gvcf.fetch()
        gvcf_paths.append(gvcf.path)
        sample_ids.append(gvcf.sample_id)

    ref_path = ref.path
    output_dir = _storage.default_client.local_dir
    output_gvcf = output_dir / f"{cohort_id}_combined.g.vcf"

    cmd = [
        "gatk",
        "CombineGVCFs",
        "-R",
        str(ref_path),
        "-O",
        str(output_gvcf),
    ]

    for gvcf_path in gvcf_paths:
        cmd.extend(["-V", str(gvcf_path)])

    stdout, stderr = await _run(cmd, cwd=str(output_dir))

    if not output_gvcf.exists():
        raise FileNotFoundError(
            f"CombineGVCFs did not create output GVCF at {output_gvcf}. "
            f"stderr: {stderr}"
        )

    combined_vcf = Variants()
    await combined_vcf.update(
        output_gvcf,
        sample_id=cohort_id,
        caller="combine_gvcfs",
        variant_type="gvcf",
        build=ref.build,
        sample_count=len(sample_ids),
        source_samples=sample_ids,
    )

    # Upload implicit index file produced by GATK
    idx_path = output_dir / f"{output_gvcf.name}.idx"
    if idx_path.exists():
        vidx = VariantsIndex()
        await vidx.update(idx_path, sample_id=cohort_id, variants_cid=combined_vcf.cid)

    logger.info(combined_vcf.to_dict())
    return combined_vcf

GATK CreateSequenceDictionary task for reference genome.

spec: docs/architecture/tasks.md

create_sequence_dictionary(ref) async

Create a sequence dictionary (.dict file) using GATK CreateSequenceDictionary.

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required

Returns:

Type Description
SequenceDict

SequenceDict asset containing the .dict file

Source code in src/stargazer/tasks/gatk/create_sequence_dictionary.py
@gatk_env.task
async def create_sequence_dictionary(ref: Reference) -> SequenceDict:
    """
    Create a sequence dictionary (.dict file) using GATK CreateSequenceDictionary.

    Args:
        ref: Reference FASTA asset

    Returns:
        SequenceDict asset containing the .dict file
    """
    logger.info(ref.to_dict())
    await ref.fetch()
    ref_path = ref.path

    if not ref_path or not ref_path.exists():
        raise FileNotFoundError(f"Reference file not found at {ref_path}")

    output_dir = _storage.default_client.local_dir
    dict_path = output_dir / f"{ref_path.stem}.dict"
    dict_path.unlink(missing_ok=True)
    cmd = [
        "gatk",
        "CreateSequenceDictionary",
        "-R",
        str(ref_path),
        "-O",
        str(dict_path),
    ]
    await _run(cmd, cwd=str(output_dir))

    if not dict_path.exists():
        raise FileNotFoundError(
            f"Sequence dictionary file {dict_path.name} was not created"
        )

    seq_dict = SequenceDict()
    await seq_dict.update(
        dict_path,
        build=ref.build,
        tool="gatk_CreateSequenceDictionary",
        reference_cid=ref.cid,
    )

    logger.info(seq_dict.to_dict())
    return seq_dict

GenomicsDBImport task for Stargazer.

Import VCFs to GenomicsDB for efficient joint genotyping of large cohorts.

spec: docs/architecture/tasks.md

genomics_db_import(gvcfs, workspace_path, intervals=None) async

Import GVCFs to GenomicsDB workspace for scalable joint genotyping.

Parameters:

Name Type Description Default
gvcfs list[Variants]

List of per-sample GVCF Variants assets to import

required
workspace_path Path

Path where GenomicsDB workspace will be created

required
intervals list[str] | None

Genomic intervals to process (e.g., ["chr1", "chr2:100000-200000"])

None

Returns:

Type Description
Path

Path to the created GenomicsDB workspace directory

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360036883491-GenomicsDBImport

Source code in src/stargazer/tasks/gatk/genomics_db_import.py
@gatk_env.task
async def genomics_db_import(
    gvcfs: list[Variants],
    workspace_path: Path,
    intervals: list[str] | None = None,
) -> Path:
    """
    Import GVCFs to GenomicsDB workspace for scalable joint genotyping.

    Args:
        gvcfs: List of per-sample GVCF Variants assets to import
        workspace_path: Path where GenomicsDB workspace will be created
        intervals: Genomic intervals to process (e.g., ["chr1", "chr2:100000-200000"])

    Returns:
        Path to the created GenomicsDB workspace directory

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360036883491-GenomicsDBImport
    """
    logger.info([x.to_dict() for x in gvcfs])
    if not gvcfs:
        raise ValueError("At least one GVCF must be provided")

    for gvcf in gvcfs:
        if gvcf.variant_type != "gvcf":
            raise ValueError(
                f"All inputs must be GVCFs, got variant_type={gvcf.variant_type!r}: "
                f"sample_id={gvcf.sample_id}"
            )

    if workspace_path.exists():
        raise ValueError(
            f"GenomicsDB workspace already exists at {workspace_path}. "
            "Use --genomicsdb-update-workspace-path to add samples to existing workspace."
        )

    for gvcf in gvcfs:
        await gvcf.fetch()

    sample_map_path = workspace_path.parent / "sample_map.txt"
    sample_map_path.parent.mkdir(parents=True, exist_ok=True)

    with open(sample_map_path, "w") as f:
        for gvcf in gvcfs:
            if not gvcf.path:
                raise ValueError(
                    f"GVCF file not available for sample_id={gvcf.sample_id}"
                )
            f.write(f"{gvcf.sample_id}\t{gvcf.path}\n")

    cmd = [
        "gatk",
        "GenomicsDBImport",
        "--genomicsdb-workspace-path",
        str(workspace_path),
        "--sample-name-map",
        str(sample_map_path),
    ]

    if intervals:
        for interval in intervals:
            cmd.extend(["-L", interval])

    await _run(cmd, cwd=str(workspace_path.parent))

    if not workspace_path.exists():
        raise FileNotFoundError(
            f"GenomicsDBImport did not create workspace at {workspace_path}"
        )

    return workspace_path

haplotype_caller task for Stargazer.

Calls germline SNPs and indels via local re-assembly of haplotypes using GATK HaplotypeCaller in GVCF mode.

spec: docs/architecture/tasks.md

haplotype_caller(alignment, ref) async

Call germline variants in GVCF mode using GATK HaplotypeCaller.

Parameters:

Name Type Description Default
alignment Alignment

Sorted, duplicate-marked BAM asset (BQSR-recalibrated recommended)

required
ref Reference

Reference FASTA asset with sequence dictionary

required

Returns:

Type Description
Variants

Variants asset containing the per-sample GVCF

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037225632-HaplotypeCaller

Source code in src/stargazer/tasks/gatk/haplotype_caller.py
@gatk_env.task
async def haplotype_caller(
    alignment: Alignment,
    ref: Reference,
) -> Variants:
    """
    Call germline variants in GVCF mode using GATK HaplotypeCaller.

    Args:
        alignment: Sorted, duplicate-marked BAM asset (BQSR-recalibrated recommended)
        ref: Reference FASTA asset with sequence dictionary

    Returns:
        Variants asset containing the per-sample GVCF

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037225632-HaplotypeCaller
    """
    logger.info(alignment.to_dict())
    logger.info(ref.to_dict())
    # fetch() auto-downloads companions (.fai, .dict, .bai)
    await alignment.fetch()
    await ref.fetch()

    ref_path = ref.path
    bam_path = alignment.path
    output_dir = _storage.default_client.local_dir
    output_gvcf = output_dir / f"{alignment.sample_id}.g.vcf"

    cmd = [
        "gatk",
        "HaplotypeCaller",
        "-R",
        str(ref_path),
        "-I",
        str(bam_path),
        "-O",
        str(output_gvcf),
        "--emit-ref-confidence",
        "GVCF",
    ]

    await _run(cmd, cwd=str(output_dir))

    if not output_gvcf.exists():
        raise FileNotFoundError(
            f"HaplotypeCaller did not create output GVCF at {output_gvcf}"
        )

    gvcf = Variants()
    await gvcf.update(
        output_gvcf,
        sample_id=alignment.sample_id,
        caller="haplotype_caller",
        variant_type="gvcf",
        build=ref.build,
        sample_count=1,
        source_samples=alignment.sample_id,
    )

    # Upload implicit index file produced by GATK
    idx_path = output_dir / f"{output_gvcf.name}.idx"
    if idx_path.exists():
        vidx = VariantsIndex()
        await vidx.update(
            idx_path, sample_id=alignment.sample_id, variants_cid=gvcf.cid
        )

    logger.info(gvcf.to_dict())
    return gvcf

joint_call_gvcfs task for Stargazer.

Consolidates per-sample GVCFs into a GenomicsDB datastore and performs joint genotyping in a single task, avoiding the need to persist the GenomicsDB workspace between tasks.

spec: docs/architecture/tasks.md

joint_call_gvcfs(gvcfs, ref, intervals, cohort_id='cohort') async

Consolidate GVCFs into GenomicsDB and joint-genotype in a single task.

Runs GenomicsDBImport followed immediately by GenotypeGVCFs within the same execution context, so the workspace never needs to leave the pod.

Parameters:

Name Type Description Default
gvcfs list[Variants]

Per-sample GVCF Variants assets from HaplotypeCaller

required
ref Reference

Reference FASTA asset

required
intervals list[str]

Genomic intervals to process (required by GenomicsDBImport)

required
cohort_id str

Sample ID label for the output VCF (default: "cohort")

'cohort'

Returns:

Type Description
Variants

Joint-genotyped Variants asset (VCF)

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360035535932

Source code in src/stargazer/tasks/gatk/joint_call_gvcfs.py
@gatk_env.task
async def joint_call_gvcfs(
    gvcfs: list[Variants],
    ref: Reference,
    intervals: list[str],
    cohort_id: str = "cohort",
) -> Variants:
    """
    Consolidate GVCFs into GenomicsDB and joint-genotype in a single task.

    Runs GenomicsDBImport followed immediately by GenotypeGVCFs within the same
    execution context, so the workspace never needs to leave the pod.

    Args:
        gvcfs: Per-sample GVCF Variants assets from HaplotypeCaller
        ref: Reference FASTA asset
        intervals: Genomic intervals to process (required by GenomicsDBImport)
        cohort_id: Sample ID label for the output VCF (default: "cohort")

    Returns:
        Joint-genotyped Variants asset (VCF)

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360035535932
    """
    logger.info([x.to_dict() for x in gvcfs])
    logger.info(ref.to_dict())
    if not gvcfs:
        raise ValueError("At least one GVCF must be provided")
    for gvcf in gvcfs:
        if gvcf.variant_type != "gvcf":
            raise ValueError(
                f"All inputs must be GVCFs, got variant_type={gvcf.variant_type!r}: "
                f"sample_id={gvcf.sample_id}"
            )

    await ref.fetch()
    for gvcf in gvcfs:
        await gvcf.fetch()

    output_dir = _storage.default_client.local_dir

    with tempfile.TemporaryDirectory() as tmpdir:
        workspace = Path(tmpdir) / f"{cohort_id}_genomicsdb"

        # Write sample map
        sample_map = Path(tmpdir) / "sample_map.txt"
        with open(sample_map, "w") as f:
            for gvcf in gvcfs:
                f.write(f"{gvcf.sample_id}\t{gvcf.path}\n")

        # GenomicsDBImport
        import_cmd = [
            "gatk",
            "GenomicsDBImport",
            "--genomicsdb-workspace-path",
            str(workspace),
            "--sample-name-map",
            str(sample_map),
        ]
        for interval in intervals:
            import_cmd.extend(["-L", interval])

        await _run(import_cmd, cwd=tmpdir)

        if not workspace.exists():
            raise FileNotFoundError(
                f"GenomicsDBImport did not create workspace at {workspace}"
            )

        # GenotypeGVCFs
        output_vcf = output_dir / f"{cohort_id}_genotyped.vcf"
        genotype_cmd = [
            "gatk",
            "GenotypeGVCFs",
            "-R",
            str(ref.path),
            "-V",
            f"gendb://{workspace}",
            "-O",
            str(output_vcf),
        ]

        _, stderr = await _run(genotype_cmd, cwd=tmpdir)

    if not output_vcf.exists():
        raise FileNotFoundError(
            f"GenotypeGVCFs did not create output VCF at {output_vcf}. stderr: {stderr}"
        )

    source_samples = [g.sample_id for g in gvcfs]
    vcf = Variants()
    await vcf.update(
        output_vcf,
        sample_id=cohort_id,
        caller="joint_call_gvcfs",
        variant_type="vcf",
        build=ref.build,
        sample_count=len(source_samples),
        source_samples=source_samples,
    )

    idx_path = output_dir / f"{output_vcf.name}.idx"
    if idx_path.exists():
        vidx = VariantsIndex()
        await vidx.update(idx_path, sample_id=cohort_id, variants_cid=vcf.cid)

    logger.info(vcf.to_dict())
    return vcf

mark_duplicates task for Stargazer.

Marks duplicate reads in BAM files using GATK MarkDuplicates.

spec: docs/architecture/tasks.md

mark_duplicates(alignment) async

Mark duplicate reads in a BAM file.

Uses GATK MarkDuplicates to identify and tag duplicate reads that originated from the same DNA fragment (PCR or optical duplicates). Duplicates are marked with the 0x0400 SAM flag.

Parameters:

Name Type Description Default
alignment Alignment

Input BAM asset (should be coordinate sorted)

required

Returns:

Type Description
Alignment

Alignment asset with duplicates marked

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037052812-MarkDuplicates-Picard

Source code in src/stargazer/tasks/gatk/mark_duplicates.py
@gatk_env.task
async def mark_duplicates(alignment: Alignment) -> Alignment:
    """
    Mark duplicate reads in a BAM file.

    Uses GATK MarkDuplicates to identify and tag duplicate reads that
    originated from the same DNA fragment (PCR or optical duplicates).
    Duplicates are marked with the 0x0400 SAM flag.

    Args:
        alignment: Input BAM asset (should be coordinate sorted)

    Returns:
        Alignment asset with duplicates marked

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037052812-MarkDuplicates-Picard
    """
    logger.info(alignment.to_dict())
    await alignment.fetch()
    bam_path = alignment.path
    output_dir = _storage.default_client.local_dir

    output_bam = output_dir / f"{alignment.sample_id}_marked_duplicates.bam"
    metrics_file = output_dir / f"{alignment.sample_id}_duplicate_metrics.txt"

    cmd = [
        "gatk",
        "MarkDuplicates",
        "-I",
        str(bam_path),
        "-O",
        str(output_bam),
        "-M",
        str(metrics_file),
        "--CREATE_INDEX",
        "true",
    ]

    await _run(cmd, cwd=str(output_dir))

    if not output_bam.exists():
        raise FileNotFoundError(
            f"MarkDuplicates did not create output BAM at {output_bam}"
        )

    marked_bam = Alignment()
    await marked_bam.update(
        output_bam,
        sample_id=alignment.sample_id,
        format="bam",
        sorted="coordinate",
        duplicates_marked=True,
        bqsr_applied=alignment.bqsr_applied,
        tool="gatk_mark_duplicates",
    )

    bam_index = output_dir / f"{output_bam.name}.bai"
    if bam_index.exists():
        idx = AlignmentIndex()
        await idx.update(
            bam_index,
            sample_id=alignment.sample_id,
            alignment_cid=marked_bam.cid,
        )

    if metrics_file.exists():
        metrics = DuplicateMetrics()
        await metrics.update(
            metrics_file,
            sample_id=alignment.sample_id,
            tool="gatk_mark_duplicates",
            alignment_cid=marked_bam.cid,
        )

    logger.info(marked_bam.to_dict())
    return marked_bam

merge_bam_alignment task for Stargazer.

Merges aligned BAM with unmapped BAM using GATK MergeBamAlignment.

spec: docs/architecture/tasks.md

merge_bam_alignment(aligned_bam, unmapped_bam, ref) async

Merge alignment data from aligned BAM with data in unmapped BAM.

Parameters:

Name Type Description Default
aligned_bam Alignment

Aligned BAM asset from aligner

required
unmapped_bam Alignment

Original unmapped BAM asset (must be queryname sorted)

required
ref Reference

Reference FASTA asset

required

Returns:

Type Description
Alignment

Alignment asset with merged BAM file

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037226472-MergeBamAlignment-Picard

Source code in src/stargazer/tasks/gatk/merge_bam_alignment.py
@gatk_env.task
async def merge_bam_alignment(
    aligned_bam: Alignment,
    unmapped_bam: Alignment,
    ref: Reference,
) -> Alignment:
    """
    Merge alignment data from aligned BAM with data in unmapped BAM.

    Args:
        aligned_bam: Aligned BAM asset from aligner
        unmapped_bam: Original unmapped BAM asset (must be queryname sorted)
        ref: Reference FASTA asset

    Returns:
        Alignment asset with merged BAM file

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037226472-MergeBamAlignment-Picard
    """
    logger.info(aligned_bam.to_dict())
    logger.info(unmapped_bam.to_dict())
    logger.info(ref.to_dict())
    # fetch() auto-downloads companions (.fai, .dict for ref)
    await aligned_bam.fetch()
    await unmapped_bam.fetch()
    await ref.fetch()

    ref_path = ref.path
    aligned_path = aligned_bam.path
    unmapped_path = unmapped_bam.path
    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{aligned_bam.sample_id}_merged.bam"

    cmd = [
        "gatk",
        "MergeBamAlignment",
        "-R",
        str(ref_path),
        "-ALIGNED",
        str(aligned_path),
        "-UNMAPPED",
        str(unmapped_path),
        "-O",
        str(output_bam),
        "--SORT_ORDER",
        "coordinate",
        "--CREATE_INDEX",
        "true",
        "--ADD_MATE_CIGAR",
        "true",
        "--CLIP_ADAPTERS",
        "true",
        "--CLIP_OVERLAPPING_READS",
        "true",
        "--INCLUDE_SECONDARY_ALIGNMENTS",
        "true",
        "--MAX_INSERTIONS_OR_DELETIONS",
        "-1",
    ]

    await _run(cmd, cwd=str(output_dir))

    if not output_bam.exists():
        raise FileNotFoundError(
            f"MergeBamAlignment did not create output BAM at {output_bam}"
        )

    merged_bam = Alignment()
    await merged_bam.update(
        output_bam,
        sample_id=aligned_bam.sample_id,
        format="bam",
        sorted="coordinate",
        duplicates_marked=aligned_bam.duplicates_marked,
        bqsr_applied=aligned_bam.bqsr_applied,
        tool="gatk_merge_bam_alignment",
    )

    bam_index = output_dir / f"{output_bam.name}.bai"
    if bam_index.exists():
        idx = AlignmentIndex()
        await idx.update(
            bam_index,
            sample_id=aligned_bam.sample_id,
            alignment_cid=merged_bam.cid,
        )

    logger.info(merged_bam.to_dict())
    return merged_bam

sort_sam task for Stargazer.

Sorts BAM files using GATK SortSam.

spec: docs/architecture/tasks.md

sort_sam(alignment, sort_order='coordinate') async

Sort a SAM/BAM file.

Parameters:

Name Type Description Default
alignment Alignment

Input BAM asset to sort

required
sort_order str

Sort order - one of "coordinate", "queryname", "duplicate"

'coordinate'

Returns:

Type Description
Alignment

Alignment asset with sorted BAM file

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037056932-SortSam-Picard

Source code in src/stargazer/tasks/gatk/sort_sam.py
@gatk_env.task
async def sort_sam(
    alignment: Alignment,
    sort_order: str = "coordinate",
) -> Alignment:
    """
    Sort a SAM/BAM file.

    Args:
        alignment: Input BAM asset to sort
        sort_order: Sort order - one of "coordinate", "queryname", "duplicate"

    Returns:
        Alignment asset with sorted BAM file

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037056932-SortSam-Picard
    """
    logger.info(alignment.to_dict())
    valid_sort_orders = ["coordinate", "queryname", "duplicate"]
    if sort_order not in valid_sort_orders:
        raise ValueError(
            f"Invalid sort_order: {sort_order}. Must be one of {valid_sort_orders}"
        )

    await alignment.fetch()
    bam_path = alignment.path
    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{alignment.sample_id}_sorted_{sort_order}.bam"

    cmd = [
        "gatk",
        "SortSam",
        "-I",
        str(bam_path),
        "-O",
        str(output_bam),
        "--SORT_ORDER",
        sort_order,
    ]

    if sort_order == "coordinate":
        cmd.extend(["--CREATE_INDEX", "true"])

    await _run(cmd, cwd=str(output_dir))

    if not output_bam.exists():
        raise FileNotFoundError(f"SortSam did not create output BAM at {output_bam}")

    sorted_bam = Alignment()
    await sorted_bam.update(
        output_bam,
        sample_id=alignment.sample_id,
        format="bam",
        sorted=sort_order,
        duplicates_marked=alignment.duplicates_marked,
        bqsr_applied=alignment.bqsr_applied,
        tool="gatk_sort_sam",
    )

    if sort_order == "coordinate":
        bam_index = output_dir / f"{output_bam.name}.bai"
        if bam_index.exists():
            idx = AlignmentIndex()
            await idx.update(
                bam_index,
                sample_id=alignment.sample_id,
                alignment_cid=sorted_bam.cid,
            )

    logger.info(sorted_bam.to_dict())
    return sorted_bam

VariantRecalibrator task for Stargazer.

Builds a recalibration model for VQSR using GATK VariantRecalibrator.

spec: docs/architecture/tasks.md

variant_recalibrator(vcf, ref, resources, mode='SNP') async

Build a VQSR recalibration model using GATK VariantRecalibrator.

Each KnownSites in resources must carry the following keyvalues: resource_name: e.g. "hapmap", "omni", "1000G", "dbsnp", "mills" known: "true" or "false" training: "true" or "false" truth: "true" or "false" prior: numeric string, e.g. "15"

Parameters:

Name Type Description Default
vcf Variants

Raw genotyped VCF Variants asset

required
ref Reference

Reference FASTA asset

required
resources list[KnownSites]

Training/truth VCF resources for the recalibrator

required
mode str

Variant type to recalibrate — "SNP" or "INDEL"

'SNP'

Returns:

Type Description
VQSRModel

VQSRModel asset (recal file) with tranches_path stored in keyvalues

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360035531612-Variant-Quality-Score-Recalibration-VQSR

Source code in src/stargazer/tasks/gatk/variant_recalibrator.py
@gatk_env.task
async def variant_recalibrator(
    vcf: Variants,
    ref: Reference,
    resources: list[KnownSites],
    mode: str = "SNP",
) -> VQSRModel:
    """
    Build a VQSR recalibration model using GATK VariantRecalibrator.

    Each KnownSites in ``resources`` must carry the following keyvalues:
        resource_name: e.g. "hapmap", "omni", "1000G", "dbsnp", "mills"
        known:         "true" or "false"
        training:      "true" or "false"
        truth:         "true" or "false"
        prior:         numeric string, e.g. "15"

    Args:
        vcf: Raw genotyped VCF Variants asset
        ref: Reference FASTA asset
        resources: Training/truth VCF resources for the recalibrator
        mode: Variant type to recalibrate — "SNP" or "INDEL"

    Returns:
        VQSRModel asset (recal file) with tranches_path stored in keyvalues

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360035531612-Variant-Quality-Score-Recalibration-VQSR
    """
    logger.info(vcf.to_dict())
    logger.info(ref.to_dict())
    logger.info([x.to_dict() for x in resources])
    if mode not in ("SNP", "INDEL"):
        raise ValueError(f"mode must be 'SNP' or 'INDEL', got {mode!r}")
    if not resources:
        raise ValueError(f"At least one resource VCF is required for mode={mode!r}")

    await vcf.fetch()
    await ref.fetch()
    for r in resources:
        await r.fetch()

    output_dir = _storage.default_client.local_dir
    sample_id = vcf.sample_id or "cohort"
    output_recal = output_dir / f"{sample_id}_{mode.lower()}.recal"
    output_tranches = output_dir / f"{sample_id}_{mode.lower()}.tranches"

    annotations = _SNP_ANNOTATIONS if mode == "SNP" else _INDEL_ANNOTATIONS

    cmd = [
        "gatk",
        "VariantRecalibrator",
        "-R",
        str(ref.path),
        "-V",
        str(vcf.path),
        "-mode",
        mode,
        "-O",
        str(output_recal),
        "--tranches-file",
        str(output_tranches),
    ]

    for r in resources:
        name = r.resource_name or "unknown"
        known = r.known or "false"
        training = r.training or "false"
        truth = r.truth or "false"
        prior = r.prior or "10"
        cmd.extend(
            [
                f"--resource:{name},known={known},training={training},truth={truth},prior={prior}",
                str(r.path),
            ]
        )

    for ann in annotations:
        cmd.extend(["-an", ann])

    _, stderr = await _run(cmd, cwd=str(output_dir))

    if not output_recal.exists():
        raise FileNotFoundError(
            f"VariantRecalibrator did not create recal file at {output_recal}. stderr: {stderr}"
        )

    model = VQSRModel()
    await model.update(
        output_recal,
        sample_id=sample_id,
        mode=mode,
        tranches_path=str(output_tranches),
        build=ref.build,
        variants_cid=vcf.cid,
    )

    logger.info(model.to_dict())
    return model

BWA tasks for reference genome indexing and alignment.

spec: docs/architecture/tasks.md

bwa_index(ref) async

Create BWA index files for a reference genome using bwa index.

Creates the following index files: - .amb, .ann, .bwt, .pac, .sa

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required

Returns:

Type Description
list[AlignerIndex]

List of AlignerIndex assets, one per index file

Reference

https://bio-bwa.sourceforge.net/bwa.shtml

Source code in src/stargazer/tasks/general/bwa.py
@gatk_env.task
async def bwa_index(ref: Reference) -> list[AlignerIndex]:
    """
    Create BWA index files for a reference genome using bwa index.

    Creates the following index files:
    - .amb, .ann, .bwt, .pac, .sa

    Args:
        ref: Reference FASTA asset

    Returns:
        List of AlignerIndex assets, one per index file

    Reference:
        https://bio-bwa.sourceforge.net/bwa.shtml
    """
    logger.info(ref.to_dict())
    await ref.fetch()
    ref_path = ref.path

    if not ref_path or not ref_path.exists():
        raise FileNotFoundError(f"Reference file not found at {ref_path}")

    index_extensions = [".amb", ".ann", ".bwt", ".pac", ".sa"]
    output_dir = _storage.default_client.local_dir
    base_name = ref_path.name

    prefix = output_dir / base_name
    cmd = ["bwa", "index", "-p", str(prefix), str(ref_path)]
    stdout, stderr = await _run(cmd, cwd=str(output_dir))

    indices = []
    for ext in index_extensions:
        index_path = output_dir / f"{base_name}{ext}"
        if not index_path.exists():
            raise FileNotFoundError(f"BWA index file {index_path.name} was not created")

        idx = AlignerIndex()
        await idx.update(
            index_path,
            aligner="bwa",
            build=ref.build,
            reference_cid=ref.cid,
        )
        indices.append(idx)

    logger.info([x.to_dict() for x in indices])
    return indices

bwa_mem(ref, r1, r2=None, read_group=None) async

Align FASTQ reads to reference genome using BWA-MEM.

Produces an unsorted BAM file that typically needs to be sorted before downstream processing (e.g., with sort_sam).

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required
r1 R1

R1 FASTQ read asset

required
r2 R2 | None

R2 FASTQ read asset (None for single-end)

None
read_group dict[str, str] | None

Optional read group override (ID, SM, LB, PL, PU)

None

Returns:

Type Description
Alignment

Alignment asset containing the unsorted BAM file

Reference

https://bio-bwa.sourceforge.net/bwa.shtml

Source code in src/stargazer/tasks/general/bwa.py
@gatk_env.task
async def bwa_mem(
    ref: Reference,
    r1: R1,
    r2: R2 | None = None,
    read_group: dict[str, str] | None = None,
) -> Alignment:
    """
    Align FASTQ reads to reference genome using BWA-MEM.

    Produces an unsorted BAM file that typically needs to be sorted
    before downstream processing (e.g., with sort_sam).

    Args:
        ref: Reference FASTA asset
        r1: R1 FASTQ read asset
        r2: R2 FASTQ read asset (None for single-end)
        read_group: Optional read group override (ID, SM, LB, PL, PU)

    Returns:
        Alignment asset containing the unsorted BAM file

    Reference:
        https://bio-bwa.sourceforge.net/bwa.shtml
    """
    logger.info(ref.to_dict())
    logger.info(r1.to_dict())
    if r2:
        logger.info(r2.to_dict())
    # fetch() downloads ref + companions (aligner indices, .fai, .dict)
    await ref.fetch()
    await r1.fetch()
    if r2:
        await r2.fetch()

    ref_path = ref.path
    r1_path = r1.path
    r2_path = r2.path if r2 else None
    sample_id = r1.sample_id

    if read_group:
        rg = {"ID": sample_id, "SM": sample_id}
        rg.update(read_group)
        rg_parts = [f"{k}:{v}" for k, v in rg.items()]
        rg_string = r"@RG\t" + r"\t".join(rg_parts)
    else:
        rg_string = rf"@RG\tID:{sample_id}\tSM:{sample_id}\tLB:lib\tPL:ILLUMINA"

    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{sample_id}_aligned.bam"

    cmd = [
        "bwa",
        "mem",
        "-R",
        rg_string,
        "-t",
        "4",
        str(ref_path),
    ]

    if r2_path:
        cmd.extend([str(r1_path), str(r2_path)])
    else:
        cmd.append(str(r1_path))

    bwa_cmd = " ".join(shlex.quote(str(c)) for c in cmd)
    samtools_cmd = f"samtools view -bS -o {output_bam} -"
    full_cmd = f"{bwa_cmd} | {samtools_cmd}"

    logger.info(f"Running: {full_cmd}")
    proc = await asyncio.create_subprocess_shell(
        full_cmd,
        cwd=str(output_dir),
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()

    if proc.returncode != 0:
        stderr_text = stderr.decode() if stderr else ""
        stdout_text = stdout.decode() if stdout else ""
        raise RuntimeError(
            f"BWA-MEM failed with return code {proc.returncode}.\n"
            f"stdout: {stdout_text}\nstderr: {stderr_text}"
        )

    if not output_bam.exists():
        raise FileNotFoundError(f"BWA-MEM did not create output BAM at {output_bam}")

    bam = Alignment()
    await bam.update(
        output_bam,
        sample_id=sample_id,
        format="bam",
        duplicates_marked=False,
        bqsr_applied=False,
        reference_cid=ref.cid,
        r1_cid=r1.cid,
    )

    logger.info(bam.to_dict())
    return bam

BWA-MEM2 tasks for reference genome indexing and alignment.

spec: docs/architecture/tasks.md

bwa_mem2_index(ref) async

Create BWA-MEM2 index files for a reference genome.

Creates the following index files: - .amb, .ann, .bwt.2bit.64, .pac, .sa

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required

Returns:

Type Description
list[AlignerIndex]

List of AlignerIndex assets, one per index file

Reference

https://github.com/bwa-mem2/bwa-mem2

Source code in src/stargazer/tasks/general/bwa_mem2.py
@gatk_env.task
async def bwa_mem2_index(ref: Reference) -> list[AlignerIndex]:
    """
    Create BWA-MEM2 index files for a reference genome.

    Creates the following index files:
    - .amb, .ann, .bwt.2bit.64, .pac, .sa

    Args:
        ref: Reference FASTA asset

    Returns:
        List of AlignerIndex assets, one per index file

    Reference:
        https://github.com/bwa-mem2/bwa-mem2
    """
    logger.info(ref.to_dict())
    await ref.fetch()
    ref_path = ref.path

    if not ref_path or not ref_path.exists():
        raise FileNotFoundError(f"Reference file not found at {ref_path}")

    index_extensions = [".0123", ".amb", ".ann", ".bwt.2bit.64", ".pac"]
    output_dir = _storage.default_client.local_dir
    base_name = ref_path.name

    prefix = output_dir / base_name
    cmd = ["bwa-mem2", "index", "-p", str(prefix), str(ref_path)]
    await _run(cmd, cwd=str(output_dir))

    indices = []
    for ext in index_extensions:
        index_path = output_dir / f"{base_name}{ext}"
        if not index_path.exists():
            raise FileNotFoundError(
                f"BWA-MEM2 index file {index_path.name} was not created"
            )

        idx = AlignerIndex()
        await idx.update(
            index_path,
            aligner="bwa-mem2",
            build=ref.build,
            reference_cid=ref.cid,
        )
        indices.append(idx)

    logger.info([x.to_dict() for x in indices])
    return indices

bwa_mem2_mem(ref, r1, r2=None, read_group=None) async

Align FASTQ reads to reference genome using BWA-MEM2.

Produces an unsorted BAM file that typically needs to be sorted before downstream processing (e.g., with sort_sam).

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required
r1 R1

R1 FASTQ read asset

required
r2 R2 | None

R2 FASTQ read asset (None for single-end)

None
read_group dict[str, str] | None

Optional read group override (ID, SM, LB, PL, PU)

None

Returns:

Type Description
Alignment

Alignment asset containing the unsorted BAM file

Reference

https://github.com/bwa-mem2/bwa-mem2

Source code in src/stargazer/tasks/general/bwa_mem2.py
@gatk_env.task
async def bwa_mem2_mem(
    ref: Reference,
    r1: R1,
    r2: R2 | None = None,
    read_group: dict[str, str] | None = None,
) -> Alignment:
    """
    Align FASTQ reads to reference genome using BWA-MEM2.

    Produces an unsorted BAM file that typically needs to be sorted
    before downstream processing (e.g., with sort_sam).

    Args:
        ref: Reference FASTA asset
        r1: R1 FASTQ read asset
        r2: R2 FASTQ read asset (None for single-end)
        read_group: Optional read group override (ID, SM, LB, PL, PU)

    Returns:
        Alignment asset containing the unsorted BAM file

    Reference:
        https://github.com/bwa-mem2/bwa-mem2
    """
    logger.info(ref.to_dict())
    logger.info(r1.to_dict())
    if r2:
        logger.info(r2.to_dict())

    await ref.fetch()
    await r1.fetch()
    if r2:
        await r2.fetch()

    ref_path = ref.path
    r1_path = r1.path
    r2_path = r2.path if r2 else None
    sample_id = r1.sample_id

    if read_group:
        rg = {"ID": sample_id, "SM": sample_id}
        rg.update(read_group)
        rg_parts = [f"{k}:{v}" for k, v in rg.items()]
        rg_string = r"@RG\t" + r"\t".join(rg_parts)
    else:
        rg_string = rf"@RG\tID:{sample_id}\tSM:{sample_id}\tLB:lib\tPL:ILLUMINA"

    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{sample_id}_aligned.bam"

    cmd = [
        "bwa-mem2",
        "mem",
        "-R",
        rg_string,
        "-t",
        "4",
        str(ref_path),
    ]

    if r2_path:
        cmd.extend([str(r1_path), str(r2_path)])
    else:
        cmd.append(str(r1_path))

    bwa_cmd = " ".join(shlex.quote(str(c)) for c in cmd)
    samtools_cmd = f"samtools view -bS -o {output_bam} -"
    full_cmd = f"{bwa_cmd} | {samtools_cmd}"

    logger.info(f"Running: {full_cmd}")
    proc = await asyncio.create_subprocess_shell(
        full_cmd,
        cwd=str(output_dir),
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()

    if proc.returncode != 0:
        stderr_text = stderr.decode() if stderr else ""
        stdout_text = stdout.decode() if stdout else ""
        raise RuntimeError(
            f"BWA-MEM2 failed with return code {proc.returncode}.\n"
            f"stdout: {stdout_text}\nstderr: {stderr_text}"
        )

    if not output_bam.exists():
        raise FileNotFoundError(f"BWA-MEM2 did not create output BAM at {output_bam}")

    bam = Alignment()
    await bam.update(
        output_bam,
        sample_id=sample_id,
        format="bam",
        duplicates_marked=False,
        bqsr_applied=False,
        reference_cid=ref.cid,
        r1_cid=r1.cid,
    )

    logger.info(bam.to_dict())
    return bam

Samtools tasks for reference genome indexing.

spec: docs/architecture/tasks.md

samtools_faidx(ref) async

Create a FASTA index (.fai file) using samtools faidx.

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required

Returns:

Type Description
ReferenceIndex

ReferenceIndex asset containing the .fai file

Source code in src/stargazer/tasks/general/samtools.py
@gatk_env.task
async def samtools_faidx(ref: Reference) -> ReferenceIndex:
    """
    Create a FASTA index (.fai file) using samtools faidx.

    Args:
        ref: Reference FASTA asset

    Returns:
        ReferenceIndex asset containing the .fai file
    """
    logger.info(ref.to_dict())
    await ref.fetch()
    ref_path = ref.path

    if not ref_path or not ref_path.exists():
        raise FileNotFoundError(f"Reference file not found at {ref_path}")

    output_dir = _storage.default_client.local_dir
    fai_path = output_dir / f"{ref_path.name}.fai"
    cmd = ["samtools", "faidx", str(ref_path), "--fai-idx", str(fai_path)]
    await _run(cmd, cwd=str(output_dir))

    if not fai_path.exists():
        raise FileNotFoundError(f"FASTA index file {fai_path.name} was not created")

    faidx = ReferenceIndex()
    await faidx.update(
        fai_path,
        build=ref.build,
        tool="samtools_faidx",
        reference_cid=ref.cid,
    )

    logger.info(faidx.to_dict())
    return faidx

Leiden community detection clustering for scRNA-seq data.

spec: docs/workflows/scrna.md

cluster(adata, resolution=0.5, key_added='leiden') async

Assign cells to clusters using the Leiden algorithm.

Requires a precomputed neighbor graph (.uns["neighbors"]). Cluster labels are stored in .obs[key_added].

Parameters:

Name Type Description Default
adata AnnData

AnnData asset with neighbor graph

required
resolution float

Leiden resolution parameter (higher = more clusters)

0.5
key_added str

.obs column name to store cluster labels

'leiden'

Returns:

Type Description
AnnData

AnnData asset with cluster labels in .obs

Source code in src/stargazer/tasks/scrna/cluster.py
@scrna_env.task
async def cluster(
    adata: AnnData,
    resolution: float = 0.5,
    key_added: str = "leiden",
) -> AnnData:
    """Assign cells to clusters using the Leiden algorithm.

    Requires a precomputed neighbor graph (.uns["neighbors"]).
    Cluster labels are stored in .obs[key_added].

    Args:
        adata: AnnData asset with neighbor graph
        resolution: Leiden resolution parameter (higher = more clusters)
        key_added: .obs column name to store cluster labels

    Returns:
        AnnData asset with cluster labels in .obs
    """
    import scanpy as sc

    logger.info(adata.to_dict())
    await adata.fetch()
    ad = sc.read_h5ad(adata.path)

    sc.tl.leiden(
        ad, flavor="igraph", n_iterations=2, resolution=resolution, key_added=key_added
    )

    out_path = _storage.default_client.local_dir / "clustered.h5ad"
    ad.write_h5ad(out_path)

    result = AnnData(
        sample_id=adata.sample_id,
        organism=adata.organism,
        source_cid=adata.cid,
    )
    await result.update(out_path, n_obs=ad.n_obs, n_vars=ad.n_vars, stage="clustered")
    logger.info(result.to_dict())
    return result

Marker gene identification via differential expression for scRNA-seq data.

spec: docs/workflows/scrna.md

find_markers(adata, groupby='leiden', method='wilcoxon') async

Identify marker genes for each cluster using differential expression.

Uses raw count data from .layers["counts"] for statistical testing. Results are stored in .uns["rank_genes_groups"].

Parameters:

Name Type Description Default
adata AnnData

Clustered AnnData asset with .layers["counts"]

required
groupby str

.obs column to group cells by (cluster labels)

'leiden'
method str

Statistical test method ("wilcoxon", "t-test", etc.)

'wilcoxon'

Returns:

Type Description
AnnData

AnnData asset with ranked marker genes in .uns["rank_genes_groups"]

Source code in src/stargazer/tasks/scrna/find_markers.py
@scrna_env.task
async def find_markers(
    adata: AnnData,
    groupby: str = "leiden",
    method: str = "wilcoxon",
) -> AnnData:
    """Identify marker genes for each cluster using differential expression.

    Uses raw count data from .layers["counts"] for statistical testing.
    Results are stored in .uns["rank_genes_groups"].

    Args:
        adata: Clustered AnnData asset with .layers["counts"]
        groupby: .obs column to group cells by (cluster labels)
        method: Statistical test method ("wilcoxon", "t-test", etc.)

    Returns:
        AnnData asset with ranked marker genes in .uns["rank_genes_groups"]
    """
    import scanpy as sc

    logger.info(adata.to_dict())
    await adata.fetch()
    ad = sc.read_h5ad(adata.path)

    sc.tl.rank_genes_groups(ad, groupby=groupby, method=method, layer="counts")

    out_path = _storage.default_client.local_dir / "markers.h5ad"
    ad.write_h5ad(out_path)

    result = AnnData(
        sample_id=adata.sample_id,
        organism=adata.organism,
        source_cid=adata.cid,
    )
    await result.update(out_path, n_obs=ad.n_obs, n_vars=ad.n_vars, stage="annotated")
    logger.info(result.to_dict())
    return result

Normalization and log transformation for scRNA-seq data.

spec: docs/workflows/scrna.md

normalize(adata) async

Normalize counts and apply log1p transformation.

Stores raw counts in .layers["counts"] before normalization so they are available for downstream differential expression analysis.

Parameters:

Name Type Description Default
adata AnnData

QC-filtered AnnData asset

required

Returns:

Type Description
AnnData

Normalized and log-transformed AnnData asset

Source code in src/stargazer/tasks/scrna/normalize.py
@scrna_env.task
async def normalize(adata: AnnData) -> AnnData:
    """Normalize counts and apply log1p transformation.

    Stores raw counts in .layers["counts"] before normalization so they
    are available for downstream differential expression analysis.

    Args:
        adata: QC-filtered AnnData asset

    Returns:
        Normalized and log-transformed AnnData asset
    """
    import scanpy as sc

    logger.info(adata.to_dict())
    await adata.fetch()
    ad = sc.read_h5ad(adata.path)

    sc.pp.normalize_total(ad)
    ad.layers["counts"] = ad.X.copy()
    sc.pp.log1p(ad)

    out_path = _storage.default_client.local_dir / "normalized.h5ad"
    ad.write_h5ad(out_path)

    result = AnnData(
        sample_id=adata.sample_id,
        organism=adata.organism,
        source_cid=adata.cid,
    )
    await result.update(out_path, n_obs=ad.n_obs, n_vars=ad.n_vars, stage="normalized")
    logger.info(result.to_dict())
    return result

Quality control and cell/gene filtering for scRNA-seq data.

spec: docs/workflows/scrna.md

qc_filter(adata, min_genes=100, min_cells=3, max_pct_mt=20.0, batch_key='') async

Filter low-quality cells and genes from raw scRNA-seq data.

Applies standard QC filters: minimum gene/cell thresholds, mitochondrial gene percentage, and scrublet doublet detection.

Parameters:

Name Type Description Default
adata AnnData

Raw AnnData asset (.h5ad)

required
min_genes int

Minimum number of genes expressed per cell

100
min_cells int

Minimum number of cells a gene must be expressed in

3
max_pct_mt float

Maximum mitochondrial gene percentage allowed per cell

20.0
batch_key str

Column in .obs to use as batch for scrublet (empty = no batch)

''

Returns:

Type Description
AnnData

Filtered AnnData asset with QC metrics in .obs

Source code in src/stargazer/tasks/scrna/qc_filter.py
@scrna_env.task
async def qc_filter(
    adata: AnnData,
    min_genes: int = 100,
    min_cells: int = 3,
    max_pct_mt: float = 20.0,
    batch_key: str = "",
) -> AnnData:
    """Filter low-quality cells and genes from raw scRNA-seq data.

    Applies standard QC filters: minimum gene/cell thresholds, mitochondrial
    gene percentage, and scrublet doublet detection.

    Args:
        adata: Raw AnnData asset (.h5ad)
        min_genes: Minimum number of genes expressed per cell
        min_cells: Minimum number of cells a gene must be expressed in
        max_pct_mt: Maximum mitochondrial gene percentage allowed per cell
        batch_key: Column in .obs to use as batch for scrublet (empty = no batch)

    Returns:
        Filtered AnnData asset with QC metrics in .obs
    """
    import scanpy as sc

    logger.info(adata.to_dict())
    await adata.fetch()
    ad = sc.read_h5ad(adata.path)

    sc.pp.filter_cells(ad, min_genes=min_genes)
    sc.pp.filter_genes(ad, min_cells=min_cells)

    # Annotate mitochondrial, ribosomal, and hemoglobin genes
    ad.var["mt"] = ad.var_names.str.startswith("MT-")
    ad.var["ribo"] = ad.var_names.str.startswith(("RPS", "RPL"))
    ad.var["hb"] = ad.var_names.str.contains("^HB[^(P)]")

    sc.pp.calculate_qc_metrics(
        ad, qc_vars=["mt", "ribo", "hb"], inplace=True, log1p=True
    )

    # Doublet detection
    scrublet_kwargs = {}
    if batch_key:
        scrublet_kwargs["batch_key"] = batch_key
    sc.pp.scrublet(ad, **scrublet_kwargs)

    # Filter by mitochondrial percentage and predicted doublets
    ad = ad[ad.obs["pct_counts_mt"] < max_pct_mt]
    ad = ad[~ad.obs["predicted_doublet"]]

    out_path = _storage.default_client.local_dir / "qc_filtered.h5ad"
    ad.write_h5ad(out_path)

    result = AnnData(
        sample_id=adata.sample_id,
        organism=adata.organism,
        source_cid=adata.cid,
    )
    await result.update(out_path, n_obs=ad.n_obs, n_vars=ad.n_vars, stage="qc_filtered")
    logger.info(result.to_dict())
    return result

PCA, neighbor graph, and UMAP dimensionality reduction for scRNA-seq data.

spec: docs/workflows/scrna.md

reduce_dimensions(adata, n_pcs=50, n_neighbors=15) async

Compute PCA, k-nearest neighbor graph, and UMAP embedding.

Operates on the highly variable gene subset to reduce noise. Embeddings are stored in .obsm for downstream clustering and visualization.

Parameters:

Name Type Description Default
adata AnnData

AnnData asset with HVG annotations in .var

required
n_pcs int

Number of principal components to compute

50
n_neighbors int

Number of neighbors for the kNN graph

15

Returns:

Type Description
AnnData

AnnData asset with PCA (.obsm["X_pca"]), neighbors, and UMAP (.obsm["X_umap"])

Source code in src/stargazer/tasks/scrna/reduce_dimensions.py
@scrna_env.task
async def reduce_dimensions(
    adata: AnnData,
    n_pcs: int = 50,
    n_neighbors: int = 15,
) -> AnnData:
    """Compute PCA, k-nearest neighbor graph, and UMAP embedding.

    Operates on the highly variable gene subset to reduce noise. Embeddings
    are stored in .obsm for downstream clustering and visualization.

    Args:
        adata: AnnData asset with HVG annotations in .var
        n_pcs: Number of principal components to compute
        n_neighbors: Number of neighbors for the kNN graph

    Returns:
        AnnData asset with PCA (.obsm["X_pca"]), neighbors, and UMAP (.obsm["X_umap"])
    """
    import scanpy as sc

    logger.info(adata.to_dict())
    await adata.fetch()
    ad = sc.read_h5ad(adata.path)

    sc.tl.pca(ad, n_comps=n_pcs)
    sc.pp.neighbors(ad, n_neighbors=n_neighbors, n_pcs=n_pcs)
    sc.tl.umap(ad)

    out_path = _storage.default_client.local_dir / "reduced.h5ad"
    ad.write_h5ad(out_path)

    result = AnnData(
        sample_id=adata.sample_id,
        organism=adata.organism,
        source_cid=adata.cid,
    )
    await result.update(out_path, n_obs=ad.n_obs, n_vars=ad.n_vars, stage="reduced")
    logger.info(result.to_dict())
    return result

Highly variable gene selection for scRNA-seq data.

spec: docs/workflows/scrna.md

select_features(adata, n_top_genes=2000, batch_key='') async

Select highly variable genes for dimensionality reduction.

Annotates .var with highly_variable flags. Downstream tasks use only the highly variable subset.

Parameters:

Name Type Description Default
adata AnnData

Normalized AnnData asset

required
n_top_genes int

Number of top highly variable genes to select

2000
batch_key str

Column in .obs to use as batch (empty = no batch correction)

''

Returns:

Type Description
AnnData

AnnData asset with HVG annotations in .var

Source code in src/stargazer/tasks/scrna/select_features.py
@scrna_env.task
async def select_features(
    adata: AnnData,
    n_top_genes: int = 2000,
    batch_key: str = "",
) -> AnnData:
    """Select highly variable genes for dimensionality reduction.

    Annotates .var with highly_variable flags. Downstream tasks use only
    the highly variable subset.

    Args:
        adata: Normalized AnnData asset
        n_top_genes: Number of top highly variable genes to select
        batch_key: Column in .obs to use as batch (empty = no batch correction)

    Returns:
        AnnData asset with HVG annotations in .var
    """
    import scanpy as sc

    logger.info(adata.to_dict())
    await adata.fetch()
    ad = sc.read_h5ad(adata.path)

    hvg_kwargs = {"n_top_genes": n_top_genes}
    if batch_key:
        hvg_kwargs["batch_key"] = batch_key
    sc.pp.highly_variable_genes(ad, **hvg_kwargs)

    out_path = _storage.default_client.local_dir / "features_selected.h5ad"
    ad.write_h5ad(out_path)

    result = AnnData(
        sample_id=adata.sample_id,
        organism=adata.organism,
        source_cid=adata.cid,
    )
    await result.update(out_path, n_obs=ad.n_obs, n_vars=ad.n_vars, stage="featured")
    logger.info(result.to_dict())
    return result

Types

Alignment asset types for Stargazer.

spec: docs/architecture/types.md

Alignment dataclass

Bases: Asset

BAM/CRAM alignment file asset.

Carries reference_cid and r1_cid for provenance (PROV entity derivation).

Source code in src/stargazer/assets/alignment.py
@dataclass
class Alignment(Asset):
    """BAM/CRAM alignment file asset.

    Carries reference_cid and r1_cid for provenance (PROV entity derivation).
    """

    _asset_key: ClassVar[str] = "alignment"
    sample_id: str = ""
    format: str = ""
    sorted: str = ""
    duplicates_marked: bool = False
    bqsr_applied: bool = False
    tool: str = ""
    reference_cid: str = ""
    r1_cid: str = ""

AlignmentIndex dataclass

Bases: Asset

BAI/CRAI alignment index file asset.

Carries alignment_cid linking to the Alignment it indexes.

Source code in src/stargazer/assets/alignment.py
@dataclass
class AlignmentIndex(Asset):
    """BAI/CRAI alignment index file asset.

    Carries alignment_cid linking to the Alignment it indexes.
    """

    _asset_key: ClassVar[str] = "alignment_index"
    sample_id: str = ""
    alignment_cid: str = ""

BQSRReport dataclass

Bases: Asset

BQSR recalibration table produced by GATK BaseRecalibrator.

Carries alignment_cid linking to the Alignment it was produced from.

Source code in src/stargazer/assets/alignment.py
@dataclass
class BQSRReport(Asset):
    """BQSR recalibration table produced by GATK BaseRecalibrator.

    Carries alignment_cid linking to the Alignment it was produced from.
    """

    _asset_key: ClassVar[str] = "bqsr_report"
    sample_id: str = ""
    tool: str = ""
    alignment_cid: str = ""

DuplicateMetrics dataclass

Bases: Asset

Duplicate metrics text file produced by GATK MarkDuplicates.

Carries alignment_cid linking to the Alignment it was produced from.

Source code in src/stargazer/assets/alignment.py
@dataclass
class DuplicateMetrics(Asset):
    """Duplicate metrics text file produced by GATK MarkDuplicates.

    Carries alignment_cid linking to the Alignment it was produced from.
    """

    _asset_key: ClassVar[str] = "duplicate_metrics"
    sample_id: str = ""
    tool: str = ""
    alignment_cid: str = ""

Asset base dataclass for Stargazer.

spec: docs/architecture/types.md

Asset dataclass

Base class for all typed file assets in Stargazer.

Attributes:

Name Type Description
cid str

Content identifier (CID) for the stored file

path Path | None

Local filesystem path (set after download or upload)

keyvalues Path | None

Arbitrary metadata dict for base Asset instances only

Subclasses declare typed fields as normal dataclass attributes:

@dataclass
class Alignment(Asset):
    _asset_key: ClassVar[str] = "alignment"
    sample_id: str = ""
    duplicates_marked: bool = False

Fields are plain Python attributes. to_keyvalues() serializes them to dict[str, str] at storage boundaries; from_keyvalues() reconstructs from storage. str fields pass through directly; all other types use json.dumps / json.loads.

Source code in src/stargazer/assets/asset.py
@dataclass
class Asset:
    """Base class for all typed file assets in Stargazer.

    Attributes:
        cid: Content identifier (CID) for the stored file
        path: Local filesystem path (set after download or upload)
        keyvalues: Arbitrary metadata dict for base Asset instances only

    Subclasses declare typed fields as normal dataclass attributes:

        @dataclass
        class Alignment(Asset):
            _asset_key: ClassVar[str] = "alignment"
            sample_id: str = ""
            duplicates_marked: bool = False

    Fields are plain Python attributes. ``to_keyvalues()`` serializes them to
    ``dict[str, str]`` at storage boundaries; ``from_keyvalues()`` reconstructs
    from storage. ``str`` fields pass through directly; all other types use
    ``json.dumps`` / ``json.loads``.
    """

    _registry: ClassVar[dict[str, type["Asset"]]] = {}
    _asset_key: ClassVar[str] = ""

    cid: str = ""
    path: Path | None = None

    def __init_subclass__(cls, **kwargs):
        """Register subclass in the asset registry."""
        super().__init_subclass__(**kwargs)
        ak = cls.__dict__.get("_asset_key", "")
        if ak:
            Asset._registry[ak] = cls

    def __setattr__(self, name: str, value: Any) -> None:
        """Enforce declared fields on typed subclasses; pass through on base Asset."""
        if self._asset_key and not name.startswith("_") and name not in _BASE_FIELDS:
            allowed = {f.name for f in dataclasses.fields(type(self))} - _BASE_FIELDS
            if name not in allowed:
                raise AttributeError(
                    f"{type(self).__name__} has no field '{name}'. "
                    f"Allowed: {sorted(allowed)}"
                )
        super().__setattr__(name, value)

    def to_keyvalues(self) -> dict[str, str]:
        """Serialize to storage format.

        str fields pass through as-is; all other types are serialized with
        json.dumps. Base Asset instances return their keyvalues dict directly.
        """
        if not self._asset_key:
            return {}
        hints = get_type_hints(type(self))
        result: dict[str, str] = {"asset": self._asset_key}
        for f in dataclasses.fields(self):
            if f.name in _BASE_FIELDS:
                continue
            val = getattr(self, f.name)
            result[f.name] = val if hints.get(f.name) is str else json.dumps(val)
        return result

    @classmethod
    def from_keyvalues(
        cls, kv: dict[str, str], cid: str = "", path: Path | None = None
    ) -> "Asset":
        """Reconstruct from a storage keyvalues dict.

        str fields are assigned directly; all other types are deserialized with
        json.loads. Base Asset receives keyvalues as-is.
        """
        if not cls._asset_key:
            return cls(cid=cid, path=path)
        hints = get_type_hints(cls)
        kwargs = {}
        for f in dataclasses.fields(cls):
            if f.name in _BASE_FIELDS:
                continue
            if f.name in kv:
                kwargs[f.name] = (
                    kv[f.name] if hints.get(f.name) is str else json.loads(kv[f.name])
                )
        return cls(cid=cid, path=path, **kwargs)

    def to_dict(self) -> dict:
        """Serialize to a JSON-friendly dict."""
        return {
            "cid": self.cid,
            "path": str(self.path) if self.path else None,
            "keyvalues": self.to_keyvalues(),
        }

    @classmethod
    def from_dict(cls, data: dict) -> Self:
        """Reconstruct from a serialized dict."""
        return cls.from_keyvalues(
            data.get("keyvalues", {}),
            cid=data.get("cid", ""),
            path=Path(data["path"]) if data.get("path") else None,
        )

    async def fetch(self) -> None:
        """Download this asset and all its companions from storage.

        Downloads the asset itself, then queries storage for any assets linked
        via ``{_asset_key}_cid`` to auto-download companions (e.g. indices,
        mate reads).
        """
        import stargazer.utils.local_storage as _storage

        await _storage.default_client.download(self)

        if self._asset_key and self.cid:
            companions = await assemble(**{f"{self._asset_key}_cid": self.cid})
            for a in companions:
                await _storage.default_client.download(a)

    async def update(self, path: Path, **kwargs) -> None:
        """Upload file and set cid. Shared by all asset types."""
        from stargazer.utils.local_storage import default_client

        for key, value in kwargs.items():
            if value is not None:
                setattr(self, key, value)
        self.path = path
        await default_client.upload(self)

__init_subclass__(**kwargs)

Register subclass in the asset registry.

Source code in src/stargazer/assets/asset.py
def __init_subclass__(cls, **kwargs):
    """Register subclass in the asset registry."""
    super().__init_subclass__(**kwargs)
    ak = cls.__dict__.get("_asset_key", "")
    if ak:
        Asset._registry[ak] = cls

__setattr__(name, value)

Enforce declared fields on typed subclasses; pass through on base Asset.

Source code in src/stargazer/assets/asset.py
def __setattr__(self, name: str, value: Any) -> None:
    """Enforce declared fields on typed subclasses; pass through on base Asset."""
    if self._asset_key and not name.startswith("_") and name not in _BASE_FIELDS:
        allowed = {f.name for f in dataclasses.fields(type(self))} - _BASE_FIELDS
        if name not in allowed:
            raise AttributeError(
                f"{type(self).__name__} has no field '{name}'. "
                f"Allowed: {sorted(allowed)}"
            )
    super().__setattr__(name, value)

fetch() async

Download this asset and all its companions from storage.

Downloads the asset itself, then queries storage for any assets linked via {_asset_key}_cid to auto-download companions (e.g. indices, mate reads).

Source code in src/stargazer/assets/asset.py
async def fetch(self) -> None:
    """Download this asset and all its companions from storage.

    Downloads the asset itself, then queries storage for any assets linked
    via ``{_asset_key}_cid`` to auto-download companions (e.g. indices,
    mate reads).
    """
    import stargazer.utils.local_storage as _storage

    await _storage.default_client.download(self)

    if self._asset_key and self.cid:
        companions = await assemble(**{f"{self._asset_key}_cid": self.cid})
        for a in companions:
            await _storage.default_client.download(a)

from_dict(data) classmethod

Reconstruct from a serialized dict.

Source code in src/stargazer/assets/asset.py
@classmethod
def from_dict(cls, data: dict) -> Self:
    """Reconstruct from a serialized dict."""
    return cls.from_keyvalues(
        data.get("keyvalues", {}),
        cid=data.get("cid", ""),
        path=Path(data["path"]) if data.get("path") else None,
    )

from_keyvalues(kv, cid='', path=None) classmethod

Reconstruct from a storage keyvalues dict.

str fields are assigned directly; all other types are deserialized with json.loads. Base Asset receives keyvalues as-is.

Source code in src/stargazer/assets/asset.py
@classmethod
def from_keyvalues(
    cls, kv: dict[str, str], cid: str = "", path: Path | None = None
) -> "Asset":
    """Reconstruct from a storage keyvalues dict.

    str fields are assigned directly; all other types are deserialized with
    json.loads. Base Asset receives keyvalues as-is.
    """
    if not cls._asset_key:
        return cls(cid=cid, path=path)
    hints = get_type_hints(cls)
    kwargs = {}
    for f in dataclasses.fields(cls):
        if f.name in _BASE_FIELDS:
            continue
        if f.name in kv:
            kwargs[f.name] = (
                kv[f.name] if hints.get(f.name) is str else json.loads(kv[f.name])
            )
    return cls(cid=cid, path=path, **kwargs)

to_dict()

Serialize to a JSON-friendly dict.

Source code in src/stargazer/assets/asset.py
def to_dict(self) -> dict:
    """Serialize to a JSON-friendly dict."""
    return {
        "cid": self.cid,
        "path": str(self.path) if self.path else None,
        "keyvalues": self.to_keyvalues(),
    }

to_keyvalues()

Serialize to storage format.

str fields pass through as-is; all other types are serialized with json.dumps. Base Asset instances return their keyvalues dict directly.

Source code in src/stargazer/assets/asset.py
def to_keyvalues(self) -> dict[str, str]:
    """Serialize to storage format.

    str fields pass through as-is; all other types are serialized with
    json.dumps. Base Asset instances return their keyvalues dict directly.
    """
    if not self._asset_key:
        return {}
    hints = get_type_hints(type(self))
    result: dict[str, str] = {"asset": self._asset_key}
    for f in dataclasses.fields(self):
        if f.name in _BASE_FIELDS:
            continue
        val = getattr(self, f.name)
        result[f.name] = val if hints.get(f.name) is str else json.dumps(val)
    return result

update(path, **kwargs) async

Upload file and set cid. Shared by all asset types.

Source code in src/stargazer/assets/asset.py
async def update(self, path: Path, **kwargs) -> None:
    """Upload file and set cid. Shared by all asset types."""
    from stargazer.utils.local_storage import default_client

    for key, value in kwargs.items():
        if value is not None:
            setattr(self, key, value)
    self.path = path
    await default_client.upload(self)

assemble(**filters) async

Query storage by keyvalue filters and return specialized assets.

The asset filter key accepts a string or list of strings to narrow by asset type. Other filters are passed through as keyvalue matchers.

Parameters:

Name Type Description Default
**filters Any

Keyvalue filters. Values may be scalars or lists (cartesian product).

{}

Returns:

Type Description
list[Asset]

Flat list of specialized Asset subclass instances.

Examples:

assets = await assemble(build="GRCh38", asset="reference") ref = next(a for a in assets if isinstance(a, Reference))

assets = await assemble(sample_id="NA12878", asset=["r1", "r2"]) r1 = next(a for a in assets if isinstance(a, R1))

Source code in src/stargazer/assets/asset.py
async def assemble(**filters: Any) -> list["Asset"]:
    """Query storage by keyvalue filters and return specialized assets.

    The ``asset`` filter key accepts a string or list of strings to narrow
    by asset type. Other filters are passed through as keyvalue matchers.

    Args:
        **filters: Keyvalue filters. Values may be scalars or lists
                   (cartesian product).

    Returns:
        Flat list of specialized Asset subclass instances.

    Examples:
        assets = await assemble(build="GRCh38", asset="reference")
        ref = next(a for a in assets if isinstance(a, Reference))

        assets = await assemble(sample_id="NA12878", asset=["r1", "r2"])
        r1 = next(a for a in assets if isinstance(a, R1))
    """
    import stargazer.utils.local_storage as _storage
    from stargazer.assets import specialize
    from stargazer.utils.query import generate_query_combinations

    query_combinations = generate_query_combinations(base_query={}, filters=filters)

    seen: dict[str, dict] = {}
    for query in query_combinations:
        for record in await _storage.default_client.query(query):
            seen[record["cid"]] = record

    return [specialize(r) for r in seen.values()]

Read file asset types for Stargazer.

spec: docs/architecture/types.md

R1 dataclass

Bases: Asset

R1 (forward) FASTQ read file asset.

Carries mate_cid pointing to the paired R2 asset's CID (None for single-end).

Source code in src/stargazer/assets/reads.py
@dataclass
class R1(Asset):
    """R1 (forward) FASTQ read file asset.

    Carries mate_cid pointing to the paired R2 asset's CID (None for single-end).
    """

    _asset_key: ClassVar[str] = "r1"
    sample_id: str = ""
    mate_cid: str = ""
    sequencing_platform: str = ""

R2 dataclass

Bases: Asset

R2 (reverse) FASTQ read file asset.

Carries mate_cid pointing to the paired R1 asset's CID (None for single-end).

Source code in src/stargazer/assets/reads.py
@dataclass
class R2(Asset):
    """R2 (reverse) FASTQ read file asset.

    Carries mate_cid pointing to the paired R1 asset's CID (None for single-end).
    """

    _asset_key: ClassVar[str] = "r2"
    sample_id: str = ""
    mate_cid: str = ""
    sequencing_platform: str = ""

Reference genome asset types for Stargazer.

spec: docs/architecture/types.md

AlignerIndex dataclass

Bases: Asset

Aligner index file asset (one file per index file for multi-file indices).

Source code in src/stargazer/assets/reference.py
@dataclass
class AlignerIndex(Asset):
    """Aligner index file asset (one file per index file for multi-file indices)."""

    _asset_key: ClassVar[str] = "aligner_index"
    build: str = ""
    aligner: str = ""
    reference_cid: str = ""

Reference dataclass

Bases: Asset

Reference FASTA file asset.

Source code in src/stargazer/assets/reference.py
@dataclass
class Reference(Asset):
    """Reference FASTA file asset."""

    _asset_key: ClassVar[str] = "reference"
    build: str = ""

    @property
    def contigs(self) -> list[str]:
        """Read contig names from the companion .fai index.

        Requires fetch() to have been called first so the ReferenceIndex
        companion is downloaded alongside this reference.
        """
        if self.path is None:
            raise ValueError("Reference has no local path — call fetch() first")
        fai_path = self.path.parent / (self.path.name + ".fai")
        if not fai_path.exists():
            raise FileNotFoundError(
                f"Reference index not found at {fai_path}. "
                f"Run samtools_faidx first, then fetch() to download companions."
            )
        contigs = []
        with open(fai_path) as f:
            for line in f:
                name = line.split("\t", 1)[0].strip()
                if name:
                    contigs.append(name)
        return contigs

contigs property

Read contig names from the companion .fai index.

Requires fetch() to have been called first so the ReferenceIndex companion is downloaded alongside this reference.

ReferenceIndex dataclass

Bases: Asset

FASTA index (.fai) file asset.

Carries reference_cid linking back to the Reference it was built from.

Source code in src/stargazer/assets/reference.py
@dataclass
class ReferenceIndex(Asset):
    """FASTA index (.fai) file asset.

    Carries reference_cid linking back to the Reference it was built from.
    """

    _asset_key: ClassVar[str] = "reference_index"
    build: str = ""
    tool: str = ""
    reference_cid: str = ""

SequenceDict dataclass

Bases: Asset

Sequence dictionary (.dict) file asset.

Source code in src/stargazer/assets/reference.py
@dataclass
class SequenceDict(Asset):
    """Sequence dictionary (.dict) file asset."""

    _asset_key: ClassVar[str] = "sequence_dict"
    build: str = ""
    tool: str = ""
    reference_cid: str = ""

scRNA-seq asset types for Stargazer.

spec: docs/workflows/scrna.md

AnnData dataclass

Bases: Asset

AnnData (.h5ad) file asset for single-cell RNA-seq data.

Tracks pipeline stage, cell/gene counts, and provenance through the scRNA-seq processing steps.

Source code in src/stargazer/assets/scrna.py
@dataclass
class AnnData(Asset):
    """AnnData (.h5ad) file asset for single-cell RNA-seq data.

    Tracks pipeline stage, cell/gene counts, and provenance through
    the scRNA-seq processing steps.
    """

    _asset_key: ClassVar[str] = "anndata"
    sample_id: str = ""
    n_obs: int = 0
    n_vars: int = 0
    stage: str = ""
    organism: str = ""
    source_cid: str = ""

Variant call asset types for Stargazer.

spec: docs/architecture/types.md

KnownSites dataclass

Bases: Asset

Known variant sites VCF used for BQSR.

Standalone asset — carries build and source fields, no container needed.

Source code in src/stargazer/assets/variants.py
@dataclass
class KnownSites(Asset):
    """Known variant sites VCF used for BQSR.

    Standalone asset — carries build and source fields, no container needed.
    """

    _asset_key: ClassVar[str] = "known_sites"
    build: str = ""
    resource_name: str = ""
    known: str = "false"
    training: str = "false"
    truth: str = "false"
    prior: str = "10"
    vqsr_mode: str = ""

KnownSitesIndex dataclass

Bases: Asset

VCF index (.idx) file for a KnownSites asset.

Carries known_sites_cid linking to the KnownSites VCF it indexes. Fetched automatically alongside the VCF via Asset.fetch().

Source code in src/stargazer/assets/variants.py
@dataclass
class KnownSitesIndex(Asset):
    """VCF index (.idx) file for a KnownSites asset.

    Carries known_sites_cid linking to the KnownSites VCF it indexes.
    Fetched automatically alongside the VCF via Asset.fetch().
    """

    _asset_key: ClassVar[str] = "known_sites_index"
    known_sites_cid: str = ""

VQSRModel dataclass

Bases: Asset

VQSR recalibration model (.recal file + tranches path).

Produced by VariantRecalibrator. The recal file is the primary path; the companion tranches file path is stored in keyvalues["tranches_path"].

Source code in src/stargazer/assets/variants.py
@dataclass
class VQSRModel(Asset):
    """VQSR recalibration model (.recal file + tranches path).

    Produced by VariantRecalibrator. The recal file is the primary path;
    the companion tranches file path is stored in keyvalues["tranches_path"].
    """

    _asset_key: ClassVar[str] = "vqsr_model"
    sample_id: str = ""
    mode: str = "SNP"
    tranches_path: str = ""
    build: str = ""
    variants_cid: str = ""

Variants dataclass

Bases: Asset

VCF/GVCF variant call file asset.

Source code in src/stargazer/assets/variants.py
@dataclass
class Variants(Asset):
    """VCF/GVCF variant call file asset."""

    _asset_key: ClassVar[str] = "variants"
    sample_id: str = ""
    caller: str = ""
    variant_type: str = ""
    build: str = ""
    vqsr_mode: str = ""
    sample_count: int = 0
    source_samples: list = None

VariantsIndex dataclass

Bases: Asset

VCF index (.tbi) file asset.

Carries variants_cid linking to the Variants file it indexes.

Source code in src/stargazer/assets/variants.py
@dataclass
class VariantsIndex(Asset):
    """VCF index (.tbi) file asset.

    Carries variants_cid linking to the Variants file it indexes.
    """

    _asset_key: ClassVar[str] = "variants_index"
    sample_id: str = ""
    variants_cid: str = ""

Utils

Local filesystem storage client for Stargazer.

Always the primary storage client. Stores files locally with TinyDB metadata indexing and delegates to a remote backend (PinataClient) or the public IPFS gateway for cache misses.

Also provides the module-level factory and singleton: - get_client(): create a LocalStorageClient based on available config - default_client: pre-built singleton used across the application

spec: docs/architecture/configuration.md

LocalStorageClient

Local filesystem storage client with optional remote backend.

Always handles caching and TinyDB metadata. Downloads follow this order:

  1. Return if file already exists at component.path
  2. Check local cache by CID
  3. If remote backend (PinataClient) is attached, fetch via signed URL
  4. Fall back to public IPFS gateway

When a PinataClient remote is attached, upload/query/delete delegate to it. Without a remote, upload/query/delete operate locally only.

Usage

client = LocalStorageClient() comp = Asset(path=Path("data.bam"), keyvalues={"type": "alignment"}) await client.upload(comp) files = await client.query({"type": "alignment"}) await client.download(comp)

Source code in src/stargazer/utils/local_storage.py
class LocalStorageClient:
    """Local filesystem storage client with optional remote backend.

    Always handles caching and TinyDB metadata. Downloads follow this order:

    1. Return if file already exists at component.path
    2. Check local cache by CID
    3. If remote backend (PinataClient) is attached, fetch via signed URL
    4. Fall back to public IPFS gateway

    When a PinataClient remote is attached, upload/query/delete delegate to it.
    Without a remote, upload/query/delete operate locally only.

    Usage:
        client = LocalStorageClient()
        comp = Asset(path=Path("data.bam"), keyvalues={"type": "alignment"})
        await client.upload(comp)
        files = await client.query({"type": "alignment"})
        await client.download(comp)
    """

    def __init__(
        self,
        local_dir: Optional[Path] = None,
        remote: Optional[PinataClient] = None,
        public_gateway: Optional[str] = None,
    ):
        """Initialize local storage client.

        Args:
            local_dir: Local directory for file storage (defaults to STARGAZER_LOCAL)
            remote: Optional PinataClient for authenticated Pinata operations
            public_gateway: Public IPFS gateway URL (defaults to PINATA_GATEWAY)
        """
        self.local_dir = local_dir or Path(os.environ["STARGAZER_LOCAL"])
        self.local_dir.mkdir(parents=True, exist_ok=True)
        self.remote = remote
        self.public_gateway = (
            public_gateway
            if public_gateway is not None
            else os.environ["PINATA_GATEWAY"]
        )

        # TinyDB for local metadata storage (lazy initialized)
        self.local_db_path = self.local_dir / "stargazer_local.json"
        self._db: Optional[TinyDB] = None
        self._db_mtime: float = 0.0

    @property
    def db(self) -> TinyDB:
        """Get TinyDB instance for local metadata storage (lazy initialized).

        Re-opens if the DB file has been deleted or modified externally,
        keeping _last_id in sync when other processes write to the same file.
        """
        mtime = (
            self.local_db_path.stat().st_mtime if self.local_db_path.exists() else 0.0
        )
        if self._db is None or mtime != self._db_mtime:
            if self._db is not None:
                self._db.close()
            self._db = TinyDB(self.local_db_path)
            self._db_mtime = mtime
        return self._db

    async def upload(self, component: Asset) -> None:
        """Upload a file. Delegates to remote if attached, otherwise stores locally.

        Args:
            component: Asset with path and keyvalues set
        """
        if self.remote:
            await self.remote.upload(component)
            return

        path = component.path
        if path is None:
            raise ValueError("component.path must be set before uploading")

        # Generate MD5 hash of file content (streamed to handle large files)
        h = hashlib.md5()
        with path.open("rb") as fh:
            for chunk in iter(lambda: fh.read(8 * 1024 * 1024), b""):
                h.update(chunk)
        md5_hash = h.hexdigest()
        cid = f"local_{md5_hash}"

        # Determine relative path: preserve subdirectory if file is inside local_dir
        try:
            rel_path = path.resolve().relative_to(self.local_dir.resolve())
        except ValueError:
            rel_path = Path(path.name)

        local_path = self.local_dir / rel_path
        local_path.parent.mkdir(parents=True, exist_ok=True)
        if path.resolve() != local_path.resolve():
            shutil.copy2(path, local_path)

        # Upsert metadata in TinyDB (avoid duplicates on re-upload)
        now = datetime.now(timezone.utc)
        File = Query()
        self.db.upsert(
            {
                "cid": cid,
                "keyvalues": component.to_keyvalues(),
                "created_at": now.isoformat(),
                "rel_path": str(rel_path),
            },
            File.cid == cid,
        )

        component.cid = cid

    async def download(
        self,
        component: Asset,
        dest: Optional[Path] = None,
        name: Optional[str] = None,
    ) -> bool:
        """Download a file by CID. Checks cache, then remote, then public gateway.

        Args:
            component: Asset with cid set
            dest: Optional destination path (copies file there)
            name: Optional filename to use instead of the CID

        Returns:
            True if the file was already cached, False if freshly downloaded.
        """
        # Skip if path is already set and file exists
        if component.path and component.path.exists():
            return True

        cid = component.cid

        # 1. Check local cache by name or CID
        cache_key = name if name else cid.replace("/", "_")
        cache_path = self.local_dir / cache_key

        if cache_path.exists():
            self._resolve_dest(component, cache_path, dest)
            return True

        # 2. Check TinyDB for local_ CIDs
        if cid.startswith("local_"):
            File = Query()
            record = self.db.get(File.cid == cid)
            if record:
                local_path = self.local_dir / record["rel_path"]
                if local_path.exists():
                    self._resolve_dest(component, local_path, dest)
                    return True
            raise FileNotFoundError(
                f"Local file {cid} not found in local directory or database."
            )

        # 3. Remote backend (signed URLs for private visibility)
        if self.remote and self.remote.visibility == "private":
            await self.remote.download_to(cid, cache_path)
            self._resolve_dest(component, cache_path, dest)
            return False

        # 4. Public IPFS gateway (default for public visibility or no remote)
        await self._fetch_public(cid, cache_path)
        self._resolve_dest(component, cache_path, dest)
        return False

    async def query(self, keyvalues: dict[str, str]) -> list[dict]:
        """Query files by keyvalue metadata. Delegates to remote if attached.

        Args:
            keyvalues: Metadata key-value pairs to filter by

        Returns:
            List of raw storage records with 'cid', 'path', and 'keyvalues' keys
        """
        if self.remote:
            return await self.remote.query(keyvalues)

        results = []
        for record in self.db.all():
            record_kv = record.get("keyvalues", {})
            if all(record_kv.get(k) == v for k, v in keyvalues.items()):
                results.append(
                    {
                        "cid": record["cid"],
                        "name": record.get("name", ""),
                        "path": self.local_dir / record["rel_path"],
                        "keyvalues": record_kv,
                    }
                )
        return results

    async def delete(self, component: Asset) -> None:
        """Delete a file. Delegates to remote if attached, otherwise deletes locally.

        Args:
            component: Asset with cid set
        """
        if self.remote:
            await self.remote.delete(component)
            return

        File = Query()
        record = self.db.get(File.cid == component.cid)
        if record:
            local_path = self.local_dir / record["rel_path"]
            if local_path.exists():
                local_path.unlink()
            self.db.remove(File.cid == component.cid)

    def _resolve_dest(
        self, component: Asset, source: Path, dest: Optional[Path]
    ) -> None:
        """Set component.path, optionally copying to dest."""
        if dest:
            dest.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy(source, dest)
            component.path = dest
        else:
            component.path = source

    async def _fetch_public(self, cid: str, dest: Path) -> None:
        """Fetch a file from the public IPFS gateway."""
        url = f"{self.public_gateway}/ipfs/{cid}"

        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                response.raise_for_status()

                dest.parent.mkdir(parents=True, exist_ok=True)
                async with aiofiles.open(dest, "wb") as f:
                    async for chunk in response.content.iter_chunked(8192):
                        await f.write(chunk)

db property

Get TinyDB instance for local metadata storage (lazy initialized).

Re-opens if the DB file has been deleted or modified externally, keeping _last_id in sync when other processes write to the same file.

__init__(local_dir=None, remote=None, public_gateway=None)

Initialize local storage client.

Parameters:

Name Type Description Default
local_dir Optional[Path]

Local directory for file storage (defaults to STARGAZER_LOCAL)

None
remote Optional[PinataClient]

Optional PinataClient for authenticated Pinata operations

None
public_gateway Optional[str]

Public IPFS gateway URL (defaults to PINATA_GATEWAY)

None
Source code in src/stargazer/utils/local_storage.py
def __init__(
    self,
    local_dir: Optional[Path] = None,
    remote: Optional[PinataClient] = None,
    public_gateway: Optional[str] = None,
):
    """Initialize local storage client.

    Args:
        local_dir: Local directory for file storage (defaults to STARGAZER_LOCAL)
        remote: Optional PinataClient for authenticated Pinata operations
        public_gateway: Public IPFS gateway URL (defaults to PINATA_GATEWAY)
    """
    self.local_dir = local_dir or Path(os.environ["STARGAZER_LOCAL"])
    self.local_dir.mkdir(parents=True, exist_ok=True)
    self.remote = remote
    self.public_gateway = (
        public_gateway
        if public_gateway is not None
        else os.environ["PINATA_GATEWAY"]
    )

    # TinyDB for local metadata storage (lazy initialized)
    self.local_db_path = self.local_dir / "stargazer_local.json"
    self._db: Optional[TinyDB] = None
    self._db_mtime: float = 0.0

delete(component) async

Delete a file. Delegates to remote if attached, otherwise deletes locally.

Parameters:

Name Type Description Default
component Asset

Asset with cid set

required
Source code in src/stargazer/utils/local_storage.py
async def delete(self, component: Asset) -> None:
    """Delete a file. Delegates to remote if attached, otherwise deletes locally.

    Args:
        component: Asset with cid set
    """
    if self.remote:
        await self.remote.delete(component)
        return

    File = Query()
    record = self.db.get(File.cid == component.cid)
    if record:
        local_path = self.local_dir / record["rel_path"]
        if local_path.exists():
            local_path.unlink()
        self.db.remove(File.cid == component.cid)

download(component, dest=None, name=None) async

Download a file by CID. Checks cache, then remote, then public gateway.

Parameters:

Name Type Description Default
component Asset

Asset with cid set

required
dest Optional[Path]

Optional destination path (copies file there)

None
name Optional[str]

Optional filename to use instead of the CID

None

Returns:

Type Description
bool

True if the file was already cached, False if freshly downloaded.

Source code in src/stargazer/utils/local_storage.py
async def download(
    self,
    component: Asset,
    dest: Optional[Path] = None,
    name: Optional[str] = None,
) -> bool:
    """Download a file by CID. Checks cache, then remote, then public gateway.

    Args:
        component: Asset with cid set
        dest: Optional destination path (copies file there)
        name: Optional filename to use instead of the CID

    Returns:
        True if the file was already cached, False if freshly downloaded.
    """
    # Skip if path is already set and file exists
    if component.path and component.path.exists():
        return True

    cid = component.cid

    # 1. Check local cache by name or CID
    cache_key = name if name else cid.replace("/", "_")
    cache_path = self.local_dir / cache_key

    if cache_path.exists():
        self._resolve_dest(component, cache_path, dest)
        return True

    # 2. Check TinyDB for local_ CIDs
    if cid.startswith("local_"):
        File = Query()
        record = self.db.get(File.cid == cid)
        if record:
            local_path = self.local_dir / record["rel_path"]
            if local_path.exists():
                self._resolve_dest(component, local_path, dest)
                return True
        raise FileNotFoundError(
            f"Local file {cid} not found in local directory or database."
        )

    # 3. Remote backend (signed URLs for private visibility)
    if self.remote and self.remote.visibility == "private":
        await self.remote.download_to(cid, cache_path)
        self._resolve_dest(component, cache_path, dest)
        return False

    # 4. Public IPFS gateway (default for public visibility or no remote)
    await self._fetch_public(cid, cache_path)
    self._resolve_dest(component, cache_path, dest)
    return False

query(keyvalues) async

Query files by keyvalue metadata. Delegates to remote if attached.

Parameters:

Name Type Description Default
keyvalues dict[str, str]

Metadata key-value pairs to filter by

required

Returns:

Type Description
list[dict]

List of raw storage records with 'cid', 'path', and 'keyvalues' keys

Source code in src/stargazer/utils/local_storage.py
async def query(self, keyvalues: dict[str, str]) -> list[dict]:
    """Query files by keyvalue metadata. Delegates to remote if attached.

    Args:
        keyvalues: Metadata key-value pairs to filter by

    Returns:
        List of raw storage records with 'cid', 'path', and 'keyvalues' keys
    """
    if self.remote:
        return await self.remote.query(keyvalues)

    results = []
    for record in self.db.all():
        record_kv = record.get("keyvalues", {})
        if all(record_kv.get(k) == v for k, v in keyvalues.items()):
            results.append(
                {
                    "cid": record["cid"],
                    "name": record.get("name", ""),
                    "path": self.local_dir / record["rel_path"],
                    "keyvalues": record_kv,
                }
            )
    return results

upload(component) async

Upload a file. Delegates to remote if attached, otherwise stores locally.

Parameters:

Name Type Description Default
component Asset

Asset with path and keyvalues set

required
Source code in src/stargazer/utils/local_storage.py
async def upload(self, component: Asset) -> None:
    """Upload a file. Delegates to remote if attached, otherwise stores locally.

    Args:
        component: Asset with path and keyvalues set
    """
    if self.remote:
        await self.remote.upload(component)
        return

    path = component.path
    if path is None:
        raise ValueError("component.path must be set before uploading")

    # Generate MD5 hash of file content (streamed to handle large files)
    h = hashlib.md5()
    with path.open("rb") as fh:
        for chunk in iter(lambda: fh.read(8 * 1024 * 1024), b""):
            h.update(chunk)
    md5_hash = h.hexdigest()
    cid = f"local_{md5_hash}"

    # Determine relative path: preserve subdirectory if file is inside local_dir
    try:
        rel_path = path.resolve().relative_to(self.local_dir.resolve())
    except ValueError:
        rel_path = Path(path.name)

    local_path = self.local_dir / rel_path
    local_path.parent.mkdir(parents=True, exist_ok=True)
    if path.resolve() != local_path.resolve():
        shutil.copy2(path, local_path)

    # Upsert metadata in TinyDB (avoid duplicates on re-upload)
    now = datetime.now(timezone.utc)
    File = Query()
    self.db.upsert(
        {
            "cid": cid,
            "keyvalues": component.to_keyvalues(),
            "created_at": now.isoformat(),
            "rel_path": str(rel_path),
        },
        File.cid == cid,
    )

    component.cid = cid

get_client()

Create a storage client based on available credentials.

Always returns a LocalStorageClient. When PINATA_JWT is available, a PinataClient remote is attached for authenticated operations (upload, query, delete, private downloads). Public IPFS gateway access is always available for downloading public CIDs.

Resolution logic
  • PINATA_JWT set -> LocalStorageClient + PinataClient remote
  • No JWT -> LocalStorageClient (public gateway only)

Returns:

Type Description
LocalStorageClient

A LocalStorageClient, optionally with a PinataClient remote

Source code in src/stargazer/utils/local_storage.py
def get_client() -> "LocalStorageClient":
    """Create a storage client based on available credentials.

    Always returns a LocalStorageClient. When PINATA_JWT is available,
    a PinataClient remote is attached for authenticated operations (upload,
    query, delete, private downloads). Public IPFS gateway access is always
    available for downloading public CIDs.

    Resolution logic:
        - PINATA_JWT set -> LocalStorageClient + PinataClient remote
        - No JWT -> LocalStorageClient (public gateway only)

    Returns:
        A LocalStorageClient, optionally with a PinataClient remote
    """
    if os.environ.get("PINATA_JWT"):
        return LocalStorageClient(remote=PinataClient())

    return LocalStorageClient()

Pinata API v3 client for IPFS file storage.

Provides async interface for authenticated Pinata operations: - Uploading files with keyvalue metadata - Downloading private files via signed gateway URLs - Querying files by keyvalue pairs - Deleting files

Used as a remote backend by LocalStorageClient when PINATA_JWT is available.

spec: docs/architecture/configuration.md

PinataClient

Async client for Pinata API v3.

Handles authenticated operations against the Pinata API: uploads, private downloads via signed URLs, metadata queries, and deletions.

This is a pure remote transport — caching is handled by LocalStorageClient.

PINATA_VISIBILITY controls upload network and query/download behavior: - "private": uploads as private, downloads via signed URLs, queries /files/private - "public": uploads as public, downloads via public gateway (handled by LocalStorageClient), queries /files/public

If JWT is unset, only public downloads are possible (via LocalStorageClient's public gateway fallback).

Usage

client = PinataClient() comp = Asset(path=Path("data.bam"), keyvalues={"type": "alignment"}) await client.upload(comp) # sets comp.cid files = await client.query({"type": "alignment", "sample": "NA12878"}) await client.delete(comp)

Source code in src/stargazer/utils/pinata.py
class PinataClient:
    """Async client for Pinata API v3.

    Handles authenticated operations against the Pinata API: uploads,
    private downloads via signed URLs, metadata queries, and deletions.

    This is a pure remote transport — caching is handled by LocalStorageClient.

    PINATA_VISIBILITY controls upload network and query/download behavior:
    - "private": uploads as private, downloads via signed URLs, queries /files/private
    - "public": uploads as public, downloads via public gateway (handled by
      LocalStorageClient), queries /files/public

    If JWT is unset, only public downloads are possible (via LocalStorageClient's
    public gateway fallback).

    Usage:
        client = PinataClient()
        comp = Asset(path=Path("data.bam"), keyvalues={"type": "alignment"})
        await client.upload(comp)  # sets comp.cid
        files = await client.query({"type": "alignment", "sample": "NA12878"})
        await client.delete(comp)
    """

    API_BASE = "https://api.pinata.cloud/v3"
    UPLOAD_BASE = "https://uploads.pinata.cloud/v3"

    def __init__(
        self,
        jwt: Optional[str] = None,
        visibility: Optional[str] = None,
    ):
        """Initialize Pinata client.

        Args:
            jwt: Pinata JWT token (defaults to PINATA_JWT from config)
            visibility: "public" or "private" (defaults to PINATA_VISIBILITY from config)
        """
        self._jwt = jwt or os.environ.get("PINATA_JWT") or None
        self.visibility = visibility or os.environ["PINATA_VISIBILITY"]

    @property
    def jwt(self) -> str:
        """Get JWT token, raising error if not set."""
        if not self._jwt:
            raise ValueError(
                "PINATA_JWT not set. Provide jwt= argument or "
                "set PINATA_JWT environment variable."
            )
        return self._jwt

    def _headers(self) -> dict:
        """Get authorization headers."""
        return {"Authorization": f"Bearer {self.jwt}"}

    async def _get_gateway_domain(self) -> str:
        """Fetch the dedicated gateway domain from Pinata API."""
        if not hasattr(self, "_gateway_domain") or self._gateway_domain is None:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    f"{self.API_BASE}/ipfs/gateways", headers=self._headers()
                ) as response:
                    response.raise_for_status()
                    data = await response.json()
                    rows = data["data"]["rows"]
                    if not rows:
                        raise ValueError(
                            "No gateway configured in Pinata account. "
                            "Create one at https://app.pinata.cloud/gateway"
                        )
                    domain = rows[0]["domain"]
                    self._gateway_domain = f"https://{domain}.mypinata.cloud"
        return self._gateway_domain

    async def _get_signed_url(self, cid: str, expires: int = 300) -> str:
        """Get a signed download URL for a private file."""
        gateway = await self._get_gateway_domain()
        payload = {
            "url": f"{gateway}/files/{cid}",
            "expires": expires,
            "date": int(time.time()),
            "method": "GET",
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.API_BASE}/files/sign",
                headers=self._headers(),
                json=payload,
            ) as response:
                response.raise_for_status()
                data = await response.json()
                return data["data"]

    async def upload(self, component: Asset) -> None:
        """Upload a file to IPFS via Pinata. Sets component.cid.

        Args:
            component: Asset with path and keyvalues set
        """
        path = component.path
        if path is None:
            raise ValueError("component.path must be set before uploading")

        url = f"{self.UPLOAD_BASE}/files"

        async with aiohttp.ClientSession() as session:
            data = aiohttp.FormData()
            data.add_field("file", open(path, "rb"), filename=path.name)
            data.add_field("name", path.name)
            data.add_field("network", self.visibility)

            kv = component.to_keyvalues()
            if kv:
                data.add_field("keyvalues", json.dumps(kv))

            async with session.post(
                url, headers=self._headers(), data=data
            ) as response:
                response.raise_for_status()
                result = await response.json()
                data_obj = result.get("data", result)
                component.cid = data_obj["cid"]

    async def download_to(self, cid: str, dest: Path) -> None:
        """Download a file to dest. Uses signed URL for private, raises for public.

        Public downloads are handled by LocalStorageClient's public gateway
        fallback, so this method is only called for private visibility.

        Args:
            cid: Content identifier
            dest: Destination path to write to
        """
        if self.visibility == "public":
            raise ValueError(
                "Public downloads should use the public IPFS gateway, "
                "not signed URLs. This is a bug — LocalStorageClient "
                "should have handled this download."
            )

        download_url = await self._get_signed_url(cid)

        async with aiohttp.ClientSession() as session:
            async with session.get(download_url) as response:
                response.raise_for_status()

                dest.parent.mkdir(parents=True, exist_ok=True)
                async with aiofiles.open(dest, "wb") as f:
                    async for chunk in response.content.iter_chunked(8192):
                        await f.write(chunk)

    async def query(self, keyvalues: dict[str, str]) -> list[dict]:
        """Query files by keyvalue metadata from Pinata API.

        Paginates through all results automatically. Queries the private or
        public file endpoint based on visibility.

        Args:
            keyvalues: Metadata key-value pairs to filter by

        Returns:
            List of matching Asset objects
        """
        url = f"{self.API_BASE}/files/{self.visibility}"
        params: dict = {"pageLimit": 1000, "order": "DESC"}

        # Add metadata filters using the correct format: metadata[key]=value
        if keyvalues:
            for key, value in keyvalues.items():
                params[f"metadata[{key}]"] = value

        results: list[dict] = []
        async with aiohttp.ClientSession() as session:
            while True:
                async with session.get(
                    url, headers=self._headers(), params=params
                ) as response:
                    response.raise_for_status()
                    data = await response.json()

                    for f in data.get("data", {}).get("files", []):
                        results.append(
                            {
                                "cid": f["cid"],
                                "name": f.get("name", ""),
                                "keyvalues": f.get("keyvalues", {}),
                            }
                        )

                    next_token = data.get("data", {}).get("next_page_token")
                    if not next_token:
                        break
                    params["pageToken"] = next_token

        return results

    async def delete(self, component: Asset) -> None:
        """Delete a file from Pinata by querying for its internal ID first.

        Args:
            component: Asset with cid set
        """
        url = f"{self.API_BASE}/files/{self.visibility}"
        params = {"cid": component.cid}

        async with aiohttp.ClientSession() as session:
            async with session.get(
                url, headers=self._headers(), params=params
            ) as response:
                response.raise_for_status()
                data = await response.json()
                files = data.get("data", {}).get("files", [])
                if not files:
                    return
                file_id = files[0]["id"]

            async with session.delete(
                f"{self.API_BASE}/files/{self.visibility}/{file_id}",
                headers=self._headers(),
            ) as response:
                response.raise_for_status()

jwt property

Get JWT token, raising error if not set.

__init__(jwt=None, visibility=None)

Initialize Pinata client.

Parameters:

Name Type Description Default
jwt Optional[str]

Pinata JWT token (defaults to PINATA_JWT from config)

None
visibility Optional[str]

"public" or "private" (defaults to PINATA_VISIBILITY from config)

None
Source code in src/stargazer/utils/pinata.py
def __init__(
    self,
    jwt: Optional[str] = None,
    visibility: Optional[str] = None,
):
    """Initialize Pinata client.

    Args:
        jwt: Pinata JWT token (defaults to PINATA_JWT from config)
        visibility: "public" or "private" (defaults to PINATA_VISIBILITY from config)
    """
    self._jwt = jwt or os.environ.get("PINATA_JWT") or None
    self.visibility = visibility or os.environ["PINATA_VISIBILITY"]

delete(component) async

Delete a file from Pinata by querying for its internal ID first.

Parameters:

Name Type Description Default
component Asset

Asset with cid set

required
Source code in src/stargazer/utils/pinata.py
async def delete(self, component: Asset) -> None:
    """Delete a file from Pinata by querying for its internal ID first.

    Args:
        component: Asset with cid set
    """
    url = f"{self.API_BASE}/files/{self.visibility}"
    params = {"cid": component.cid}

    async with aiohttp.ClientSession() as session:
        async with session.get(
            url, headers=self._headers(), params=params
        ) as response:
            response.raise_for_status()
            data = await response.json()
            files = data.get("data", {}).get("files", [])
            if not files:
                return
            file_id = files[0]["id"]

        async with session.delete(
            f"{self.API_BASE}/files/{self.visibility}/{file_id}",
            headers=self._headers(),
        ) as response:
            response.raise_for_status()

download_to(cid, dest) async

Download a file to dest. Uses signed URL for private, raises for public.

Public downloads are handled by LocalStorageClient's public gateway fallback, so this method is only called for private visibility.

Parameters:

Name Type Description Default
cid str

Content identifier

required
dest Path

Destination path to write to

required
Source code in src/stargazer/utils/pinata.py
async def download_to(self, cid: str, dest: Path) -> None:
    """Download a file to dest. Uses signed URL for private, raises for public.

    Public downloads are handled by LocalStorageClient's public gateway
    fallback, so this method is only called for private visibility.

    Args:
        cid: Content identifier
        dest: Destination path to write to
    """
    if self.visibility == "public":
        raise ValueError(
            "Public downloads should use the public IPFS gateway, "
            "not signed URLs. This is a bug — LocalStorageClient "
            "should have handled this download."
        )

    download_url = await self._get_signed_url(cid)

    async with aiohttp.ClientSession() as session:
        async with session.get(download_url) as response:
            response.raise_for_status()

            dest.parent.mkdir(parents=True, exist_ok=True)
            async with aiofiles.open(dest, "wb") as f:
                async for chunk in response.content.iter_chunked(8192):
                    await f.write(chunk)

query(keyvalues) async

Query files by keyvalue metadata from Pinata API.

Paginates through all results automatically. Queries the private or public file endpoint based on visibility.

Parameters:

Name Type Description Default
keyvalues dict[str, str]

Metadata key-value pairs to filter by

required

Returns:

Type Description
list[dict]

List of matching Asset objects

Source code in src/stargazer/utils/pinata.py
async def query(self, keyvalues: dict[str, str]) -> list[dict]:
    """Query files by keyvalue metadata from Pinata API.

    Paginates through all results automatically. Queries the private or
    public file endpoint based on visibility.

    Args:
        keyvalues: Metadata key-value pairs to filter by

    Returns:
        List of matching Asset objects
    """
    url = f"{self.API_BASE}/files/{self.visibility}"
    params: dict = {"pageLimit": 1000, "order": "DESC"}

    # Add metadata filters using the correct format: metadata[key]=value
    if keyvalues:
        for key, value in keyvalues.items():
            params[f"metadata[{key}]"] = value

    results: list[dict] = []
    async with aiohttp.ClientSession() as session:
        while True:
            async with session.get(
                url, headers=self._headers(), params=params
            ) as response:
                response.raise_for_status()
                data = await response.json()

                for f in data.get("data", {}).get("files", []):
                    results.append(
                        {
                            "cid": f["cid"],
                            "name": f.get("name", ""),
                            "keyvalues": f.get("keyvalues", {}),
                        }
                    )

                next_token = data.get("data", {}).get("next_page_token")
                if not next_token:
                    break
                params["pageToken"] = next_token

    return results

upload(component) async

Upload a file to IPFS via Pinata. Sets component.cid.

Parameters:

Name Type Description Default
component Asset

Asset with path and keyvalues set

required
Source code in src/stargazer/utils/pinata.py
async def upload(self, component: Asset) -> None:
    """Upload a file to IPFS via Pinata. Sets component.cid.

    Args:
        component: Asset with path and keyvalues set
    """
    path = component.path
    if path is None:
        raise ValueError("component.path must be set before uploading")

    url = f"{self.UPLOAD_BASE}/files"

    async with aiohttp.ClientSession() as session:
        data = aiohttp.FormData()
        data.add_field("file", open(path, "rb"), filename=path.name)
        data.add_field("name", path.name)
        data.add_field("network", self.visibility)

        kv = component.to_keyvalues()
        if kv:
            data.add_field("keyvalues", json.dumps(kv))

        async with session.post(
            url, headers=self._headers(), data=data
        ) as response:
            response.raise_for_status()
            result = await response.json()
            data_obj = result.get("data", result)
            component.cid = data_obj["cid"]

Query generation utilities for Stargazer.

Utilities for generating metadata queries, including support for cartesian product queries across multiple dimensions.

spec: docs/architecture/types.md

generate_query_combinations(base_query, filters)

Generate query combinations from filters using cartesian product.

Takes a base query dict and filters dict, where filters can contain scalar values or lists. For any list-valued filter, generates all combinations using cartesian product, while preserving scalar filters and the base query in all combinations.

Parameters:

Name Type Description Default
base_query Dict[str, Any]

Base query dict to include in all combinations

required
filters Dict[str, Any]

Filter dict with scalar or list values

required

Returns:

Type Description
List[Dict[str, Any]]

List of query dicts representing all combinations

Example

base = {"type": "reference"} filters = {"build": "GRCh38", "tool": ["fasta", "bwa"]} generate_query_combinations(base, filters) [ {"type": "reference", "build": "GRCh38", "tool": "fasta"}, {"type": "reference", "build": "GRCh38", "tool": "bwa"} ]

base = {"type": "reference"} filters = {"build": ["GRCh38", "GRCh37"], "tool": ["fasta", "bwa"]} generate_query_combinations(base, filters) [ {"type": "reference", "build": "GRCh38", "tool": "fasta"}, {"type": "reference", "build": "GRCh38", "tool": "bwa"}, {"type": "reference", "build": "GRCh37", "tool": "fasta"}, {"type": "reference", "build": "GRCh37", "tool": "bwa"} ]

Source code in src/stargazer/utils/query.py
def generate_query_combinations(
    base_query: Dict[str, Any],
    filters: Dict[str, Any],
) -> List[Dict[str, Any]]:
    """
    Generate query combinations from filters using cartesian product.

    Takes a base query dict and filters dict, where filters can contain
    scalar values or lists. For any list-valued filter, generates all
    combinations using cartesian product, while preserving scalar filters
    and the base query in all combinations.

    Args:
        base_query: Base query dict to include in all combinations
        filters: Filter dict with scalar or list values

    Returns:
        List of query dicts representing all combinations

    Example:
        >>> base = {"type": "reference"}
        >>> filters = {"build": "GRCh38", "tool": ["fasta", "bwa"]}
        >>> generate_query_combinations(base, filters)
        [
            {"type": "reference", "build": "GRCh38", "tool": "fasta"},
            {"type": "reference", "build": "GRCh38", "tool": "bwa"}
        ]

        >>> base = {"type": "reference"}
        >>> filters = {"build": ["GRCh38", "GRCh37"], "tool": ["fasta", "bwa"]}
        >>> generate_query_combinations(base, filters)
        [
            {"type": "reference", "build": "GRCh38", "tool": "fasta"},
            {"type": "reference", "build": "GRCh38", "tool": "bwa"},
            {"type": "reference", "build": "GRCh37", "tool": "fasta"},
            {"type": "reference", "build": "GRCh37", "tool": "bwa"}
        ]
    """
    # Separate list-valued and scalar-valued filters
    list_filters = {}
    scalar_filters = {}

    for key, value in filters.items():
        if isinstance(value, list):
            list_filters[key] = value
        else:
            scalar_filters[key] = value

    # Generate cartesian product of list-valued filters
    if list_filters:
        # Get keys and values for cartesian product
        keys = list(list_filters.keys())
        value_lists = [list_filters[k] for k in keys]

        # Generate all combinations
        query_combinations = []
        for combo in product(*value_lists):
            query = {**base_query, **scalar_filters}
            query.update(dict(zip(keys, combo)))
            query_combinations.append(query)
    else:
        # No list filters, just one query
        query_combinations = [{**base_query, **scalar_filters}]

    return query_combinations

Subprocess utilities for running external commands.

spec: docs/architecture/tasks.md

Workflows

GATK Best Practices: Data Pre-processing for Variant Discovery

Implements: 1. Reference preparation — FASTA index, sequence dictionary, BWA index 2. Sample preprocessing — align, sort, mark duplicates, BQSR

References

spec: docs/architecture/workflows.md

prepare_reference(build) async

Prepare reference genome for alignment and variant calling.

Assembles the reference FASTA from storage and creates necessary indices: 1. FASTA index (samtools faidx) 2. BWA index (bwa index)

All indices are uploaded to storage as side-effects.

Parameters:

Name Type Description Default
build str

Reference genome build identifier (e.g. "GRCh38")

required

Returns:

Type Description
Reference

Reference asset (FASTA file)

Source code in src/stargazer/workflows/gatk_data_preprocessing.py
@gatk_env.task
async def prepare_reference(build: str) -> Reference:
    """
    Prepare reference genome for alignment and variant calling.

    Assembles the reference FASTA from storage and creates necessary indices:
    1. FASTA index (samtools faidx)
    2. BWA index (bwa index)

    All indices are uploaded to storage as side-effects.

    Args:
        build: Reference genome build identifier (e.g. "GRCh38")

    Returns:
        Reference asset (FASTA file)
    """
    log_execution()
    assets = await assemble(build=build, asset="reference")
    refs = [a for a in assets if isinstance(a, Reference)]
    if not refs:
        raise ValueError(f"No reference found for build={build!r}")
    ref = refs[0]

    await samtools_faidx(ref)
    await create_sequence_dictionary(ref)
    await bwa_mem2_index(ref)

    return ref

preprocess_sample(build, sample_id) async

Pre-process a single sample's reads for variant calling.

Assembles reference and reads from storage, then runs: 1. BWA-MEM alignment 2. Coordinate sort (GATK SortSam) 3. Mark duplicates (GATK MarkDuplicates)

Parameters:

Name Type Description Default
build str

Reference genome build identifier

required
sample_id str

Sample identifier used to query reads

required

Returns:

Type Description
Alignment

Alignment asset with the preprocessed BAM file

Source code in src/stargazer/workflows/gatk_data_preprocessing.py
@gatk_env.task
async def preprocess_sample(
    build: str,
    sample_id: str,
) -> Alignment:
    """
    Pre-process a single sample's reads for variant calling.

    Assembles reference and reads from storage, then runs:
    1. BWA-MEM alignment
    2. Coordinate sort (GATK SortSam)
    3. Mark duplicates (GATK MarkDuplicates)

    Args:
        build: Reference genome build identifier
        sample_id: Sample identifier used to query reads

    Returns:
        Alignment asset with the preprocessed BAM file
    """
    log_execution()
    # Assemble reference
    ref_assets = await assemble(build=build, asset="reference")
    refs = [a for a in ref_assets if isinstance(a, Reference)]
    if not refs:
        raise ValueError(f"No reference found for build={build!r}")
    ref = refs[0]

    # Assemble reads
    read_assets = await assemble(sample_id=sample_id, asset=["r1", "r2"])
    r1_list = [a for a in read_assets if isinstance(a, R1)]
    if not r1_list:
        raise ValueError(f"No R1 reads found for sample_id={sample_id!r}")
    r1 = r1_list[0]
    r2_list = [a for a in read_assets if isinstance(a, R2)]
    r2 = r2_list[0] if r2_list else None

    # Alignment pipeline — tasks call fetch() internally
    alignment = await bwa_mem2_mem(ref=ref, r1=r1, r2=r2)
    alignment = await sort_sam(alignment=alignment, sort_order="coordinate")
    alignment = await mark_duplicates(alignment=alignment)

    return alignment

GATK Best Practices: Germline Short Variant Discovery (SNPs + Indels)

End-to-end GATK pipeline from raw reads to joint-genotyped variants
  1. prepare_reference — FASTA index, sequence dictionary, BWA index
  2. preprocess_sample — align, sort, mark duplicates (per sample, parallel)
  3. haplotype_caller — per-sample GVCF (parallel)
  4. joint_call_gvcfs — GenomicsDBImport + GenotypeGVCFs
Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360035535932-Germline-short-variant-discovery-SNPs-Indels

spec: docs/architecture/workflows.md

germline_short_variant_discovery(build, sample_ids, cohort_id='cohort') async

End-to-end germline short variant discovery from raw reads.

Runs the full GATK best-practices pipeline: 1. Reference preparation (indexing) 2. Per-sample preprocessing (align, sort, mark duplicates) in parallel 3. HaplotypeCaller per sample in parallel 4. Joint genotyping (GenomicsDBImport + GenotypeGVCFs)

Parameters:

Name Type Description Default
build str

Reference genome build identifier (e.g. "GRCh38")

required
sample_ids list[str]

List of sample identifiers to process

required
cohort_id str

Identifier for the cohort output (default: "cohort")

'cohort'

Returns:

Type Description
Variants

Joint-genotyped Variants asset

Source code in src/stargazer/workflows/germline_short_variant_discovery.py
@gatk_env.task(cache="disable")
async def germline_short_variant_discovery(
    build: str,
    sample_ids: list[str],
    cohort_id: str = "cohort",
) -> Variants:
    """
    End-to-end germline short variant discovery from raw reads.

    Runs the full GATK best-practices pipeline:
    1. Reference preparation (indexing)
    2. Per-sample preprocessing (align, sort, mark duplicates) in parallel
    3. HaplotypeCaller per sample in parallel
    4. Joint genotyping (GenomicsDBImport + GenotypeGVCFs)

    Args:
        build: Reference genome build identifier (e.g. "GRCh38")
        sample_ids: List of sample identifiers to process
        cohort_id: Identifier for the cohort output (default: "cohort")

    Returns:
        Joint-genotyped Variants asset
    """
    log_execution()

    # 1. Reference preparation
    ref = await prepare_reference(build=build)

    # 2. Per-sample preprocessing — parallel across samples
    alignments = list(
        await asyncio.gather(
            *[preprocess_sample(build=build, sample_id=sid) for sid in sample_ids]
        )
    )

    # 3. HaplotypeCaller — per-sample GVCFs in parallel
    gvcfs = list(
        await asyncio.gather(
            *[haplotype_caller(alignment=aln, ref=ref) for aln in alignments]
        )
    )

    # 4. GenomicsDBImport + GenotypeGVCFs — joint calling
    return await joint_call_gvcfs(
        gvcfs=gvcfs, ref=ref, intervals=ref.contigs, cohort_id=cohort_id
    )

scRNA-seq clustering pipeline: QC → normalization → clustering → marker genes.

Implements the scanpy clustering tutorial workflow as Flyte v2 tasks. Assembles a raw AnnData from storage by sample_id, then runs the full preprocessing and clustering stack.

Prerequisites

A raw .h5ad file must be uploaded to storage with asset="anndata" and stage="raw".

Reference

https://scanpy.readthedocs.io/en/stable/tutorials/basics/clustering.html

spec: docs/workflows/scrna.md

scrna_clustering_pipeline(sample_id, organism='human', n_top_genes=2000, resolution=0.5, max_pct_mt=20.0) async

End-to-end scRNA-seq clustering pipeline.

Runs QC filtering, normalization, feature selection, dimensionality reduction, Leiden clustering, and marker gene identification in sequence.

Parameters:

Name Type Description Default
sample_id str

Sample identifier used to look up the raw AnnData in storage

required
organism str

Organism name (e.g. "human", "mouse")

'human'
n_top_genes int

Number of highly variable genes to select

2000
resolution float

Leiden clustering resolution (higher = more clusters)

0.5
max_pct_mt float

Maximum mitochondrial gene percentage per cell

20.0

Returns:

Type Description
AnnData

Annotated AnnData asset with cluster labels and ranked marker genes

Source code in src/stargazer/workflows/scrna_clustering.py
@scrna_env.task(cache="disable")
async def scrna_clustering_pipeline(
    sample_id: str,
    organism: str = "human",
    n_top_genes: int = 2000,
    resolution: float = 0.5,
    max_pct_mt: float = 20.0,
) -> AnnData:
    """End-to-end scRNA-seq clustering pipeline.

    Runs QC filtering, normalization, feature selection, dimensionality
    reduction, Leiden clustering, and marker gene identification in sequence.

    Args:
        sample_id: Sample identifier used to look up the raw AnnData in storage
        organism: Organism name (e.g. "human", "mouse")
        n_top_genes: Number of highly variable genes to select
        resolution: Leiden clustering resolution (higher = more clusters)
        max_pct_mt: Maximum mitochondrial gene percentage per cell

    Returns:
        Annotated AnnData asset with cluster labels and ranked marker genes
    """
    log_execution()

    assets = await assemble(sample_id=sample_id, asset="anndata", stage="raw")
    if not assets:
        raise ValueError(
            f"No raw AnnData found for sample_id={sample_id!r}. "
            "Upload a .h5ad file with asset='anndata' and stage='raw' first."
        )
    raw = assets[0]

    filtered = await qc_filter(adata=raw, max_pct_mt=max_pct_mt, organism=organism)
    normalized = await normalize(adata=filtered)
    featured = await select_features(adata=normalized, n_top_genes=n_top_genes)
    reduced = await reduce_dimensions(adata=featured)
    clustered = await cluster(adata=reduced, resolution=resolution)
    annotated = await find_markers(adata=clustered)

    return annotated