Skip to content

API Reference

Config

Task environment and logger configuration for Stargazer.

spec: docs/architecture/tasks.md

Marshal

Output marshaling: typed object → dict (for MCP response serialization).

spec: docs/architecture/mcp-server.md

marshal_output(value)

Convert a typed Python object to a JSON-friendly structure for MCP transport.

Source code in src/stargazer/marshal.py
def marshal_output(value: Any) -> Any:
    """Convert a typed Python object to a JSON-friendly structure for MCP transport.
    """
    if value is None:
        return None

    if hasattr(value, "to_dict"):
        return value.to_dict()

    if isinstance(value, Path):
        return str(value)

    if isinstance(value, tuple):
        return {f"o{i}": marshal_output(item) for i, item in enumerate(value)}

    if isinstance(value, list):
        return [marshal_output(item) for item in value]

    if isinstance(value, dict):
        return {k: marshal_output(v) for k, v in value.items()}

    return value

Registry

Task registry for auto-discovery of Flyte tasks and workflows.

Discovers all tasks from stargazer.tasks and stargazer.workflows modules, extracts parameter types, defaults, and return types for MCP catalog exposure.

spec: docs/architecture/mcp-server.md

TaskInfo dataclass

Complete metadata about a registered task.

Source code in src/stargazer/registry.py
@dataclass
class TaskInfo:
    """Complete metadata about a registered task.
    """

    name: str
    category: str  # "task" or "workflow"
    description: str
    params: list[TaskParam]
    outputs: list[TaskOutput]
    task_obj: Any  # The Flyte task object

TaskOutput dataclass

Describes a single output of a task.

Source code in src/stargazer/registry.py
@dataclass
class TaskOutput:
    """Describes a single output of a task.
    """

    name: str
    type_hint: Any
    type_name: str

TaskParam dataclass

Describes a single parameter of a task.

Source code in src/stargazer/registry.py
@dataclass
class TaskParam:
    """Describes a single parameter of a task.
    """

    name: str
    type_hint: Any
    type_name: str
    required: bool
    default: Any = None

TaskRegistry dataclass

Discovers and provides access to all Flyte tasks and workflows.

Source code in src/stargazer/registry.py
@dataclass
class TaskRegistry:
    """Discovers and provides access to all Flyte tasks and workflows.
    """

    _tasks: dict[str, TaskInfo] = field(default_factory=dict)

    def __post_init__(self):
        """Discover all tasks and workflows on initialization."""
        self._discover()

    def _discover(self):
        """Walk task and workflow modules to register all Flyte tasks.
        """
        self._discover_tasks()
        self._discover_workflows()

    def _discover_tasks(self):
        """Register all tasks from stargazer.tasks.__all__.
        """
        import stargazer.tasks as tasks_mod

        for name in tasks_mod.__all__:
            obj = getattr(tasks_mod, name)
            if not hasattr(obj, "func"):
                continue
            self._register(obj.short_name, obj, category="task")

    def _discover_workflows(self):
        """Register all workflows from stargazer.workflows.__all__.
        """
        import stargazer.workflows as workflows_mod

        for name in workflows_mod.__all__:
            obj = getattr(workflows_mod, name)
            if not hasattr(obj, "func"):
                continue
            # Skip if already registered (e.g. duplicate short_name across modules)
            if obj.short_name in self._tasks:
                continue
            self._register(obj.short_name, obj, category="workflow")

    def _register(self, name: str, task_obj: Any, category: str):
        """Register a single task by introspecting its wrapped function.
        """
        func = task_obj.func
        sig = inspect.signature(func)
        hints = get_type_hints(func)

        # Extract parameters
        params = []
        for pname, param in sig.parameters.items():
            hint = hints.get(pname, Any)
            has_default = param.default is not inspect.Parameter.empty
            params.append(
                TaskParam(
                    name=pname,
                    type_hint=hint,
                    type_name=_type_name(hint),
                    required=not has_default,
                    default=param.default if has_default else None,
                )
            )

        # Extract outputs from return type
        return_hint = hints.get("return", type(None))
        outputs = _parse_outputs(return_hint)

        # Extract description from docstring
        doc = func.__doc__ or ""
        description = doc.strip().split("\n")[0] if doc.strip() else ""

        self._tasks[name] = TaskInfo(
            name=name,
            category=category,
            description=description,
            params=params,
            outputs=outputs,
            task_obj=task_obj,
        )

    def get(self, name: str) -> TaskInfo | None:
        """Look up a task by name.
        """
        return self._tasks.get(name)

    def list_tasks(self, category: str | None = None) -> list[TaskInfo]:
        """List all registered tasks, optionally filtered by category.
        """
        tasks = list(self._tasks.values())
        if category:
            tasks = [t for t in tasks if t.category == category]
        return tasks

    def to_catalog(self, category: str | None = None) -> list[dict]:
        """Return a JSON-serializable catalog of all tasks.
        """
        catalog = []
        for info in self.list_tasks(category=category):
            catalog.append(
                {
                    "name": info.name,
                    "category": info.category,
                    "description": info.description,
                    "params": [
                        {
                            "name": p.name,
                            "type": p.type_name,
                            "required": p.required,
                            "default": _serialize_default(p.default)
                            if not p.required
                            else None,
                        }
                        for p in info.params
                    ],
                    "outputs": [
                        {"name": o.name, "type": o.type_name} for o in info.outputs
                    ],
                }
            )
        return catalog

__post_init__()

Discover all tasks and workflows on initialization.

Source code in src/stargazer/registry.py
def __post_init__(self):
    """Discover all tasks and workflows on initialization."""
    self._discover()

get(name)

Look up a task by name.

Source code in src/stargazer/registry.py
def get(self, name: str) -> TaskInfo | None:
    """Look up a task by name.
    """
    return self._tasks.get(name)

list_tasks(category=None)

List all registered tasks, optionally filtered by category.

Source code in src/stargazer/registry.py
def list_tasks(self, category: str | None = None) -> list[TaskInfo]:
    """List all registered tasks, optionally filtered by category.
    """
    tasks = list(self._tasks.values())
    if category:
        tasks = [t for t in tasks if t.category == category]
    return tasks

to_catalog(category=None)

Return a JSON-serializable catalog of all tasks.

Source code in src/stargazer/registry.py
def to_catalog(self, category: str | None = None) -> list[dict]:
    """Return a JSON-serializable catalog of all tasks.
    """
    catalog = []
    for info in self.list_tasks(category=category):
        catalog.append(
            {
                "name": info.name,
                "category": info.category,
                "description": info.description,
                "params": [
                    {
                        "name": p.name,
                        "type": p.type_name,
                        "required": p.required,
                        "default": _serialize_default(p.default)
                        if not p.required
                        else None,
                    }
                    for p in info.params
                ],
                "outputs": [
                    {"name": o.name, "type": o.type_name} for o in info.outputs
                ],
            }
        )
    return catalog

Server

Stargazer MCP Server.

Exposes storage tools and a dynamic task runner via FastMCP. Tasks and workflows are auto-discovered from the registry and executed through the Flyte local run context.

Usage

stargazer # stdio transport (default) stargazer --http # streamable-http transport

spec: docs/architecture/mcp-server.md

delete_file(cid) async

Delete a file by CID.

Source code in src/stargazer/server.py
@mcp.tool()
async def delete_file(cid: str) -> str:
    """Delete a file by CID.
    """
    comp = Asset(cid=cid)
    await default_client.delete(comp)
    return f"Deleted file {cid}"

download_file(cid) async

Download a file by CID to local cache. Returns the local path.

Source code in src/stargazer/server.py
@mcp.tool()
async def download_file(cid: str) -> str:
    """Download a file by CID to local cache. Returns the local path.
    """
    comp = Asset(cid=cid)
    await default_client.download(comp)
    return str(comp.path)

list_tasks(category=None)

List available tasks and workflows with their parameter signatures.

Parameters:

Name Type Description Default
category str | None

Filter by "task" or "workflow". Omit for all.

None

Returns:

Type Description
list[dict]

Catalog of tasks with name, category, description, params, and outputs.

Source code in src/stargazer/server.py
@mcp.tool()
def list_tasks(category: str | None = None) -> list[dict]:
    """List available tasks and workflows with their parameter signatures.

    Args:
        category: Filter by "task" or "workflow". Omit for all.

    Returns:
        Catalog of tasks with name, category, description, params, and outputs.
    """
    return _registry.to_catalog(category=category)

main()

Run the Stargazer MCP server.

Source code in src/stargazer/server.py
def main():
    """Run the Stargazer MCP server.
    """
    import sys

    transport = "stdio"
    if "--http" in sys.argv:
        transport = "streamable-http"
    mcp.run(transport=transport)

query_files(keyvalues) async

Query files by metadata key-value pairs. Returns matching files.

Source code in src/stargazer/server.py
@mcp.tool()
async def query_files(keyvalues: dict[str, str]) -> list[dict]:
    """Query files by metadata key-value pairs. Returns matching files.
    """
    files = await default_client.query(keyvalues)
    return [f.to_dict() for f in files]

run_task(task_name, filters, inputs=None) async

Run a single task by name for ad-hoc experimentation.

Use this for testing individual tools in isolation. Asset parameters are assembled from storage using the provided filters — one call to assemble() resolves all required assets. Scalar and Path parameters are passed separately via inputs.

For reproducible pipeline runs, use run_workflow instead.

Parameters:

Name Type Description Default
task_name str

Name of the task (from list_tasks with category="task").

required
filters dict

Keyvalue filters for assemble() to resolve asset parameters (e.g. {"build": "GRCh38", "sample_id": "NA12878"}).

required
inputs dict | None

Optional scalar/Path keyword arguments (str, int, bool, list[str]).

None

Returns:

Type Description
dict

Serialized task output. Single outputs returned directly,

dict

multi-outputs as {"o0": ..., "o1": ...}.

Source code in src/stargazer/server.py
@mcp.tool()
async def run_task(task_name: str, filters: dict, inputs: dict | None = None) -> dict:
    """Run a single task by name for ad-hoc experimentation.

    Use this for testing individual tools in isolation. Asset parameters
    are assembled from storage using the provided filters — one call to
    assemble() resolves all required assets. Scalar and Path parameters
    are passed separately via inputs.

    For reproducible pipeline runs, use run_workflow instead.

    Args:
        task_name: Name of the task (from list_tasks with category="task").
        filters: Keyvalue filters for assemble() to resolve asset parameters
                 (e.g. {"build": "GRCh38", "sample_id": "NA12878"}).
        inputs: Optional scalar/Path keyword arguments (str, int, bool, list[str]).

    Returns:
        Serialized task output. Single outputs returned directly,
        multi-outputs as {"o0": ..., "o1": ...}.
    """
    info = _registry.get(task_name)
    if info is None:
        available = [t.name for t in _registry.list_tasks(category="task")]
        raise ValueError(f"Unknown task: {task_name!r}. Available: {available}")
    if info.category != "task":
        raise ValueError(f"{task_name!r} is a workflow — use run_workflow instead.")

    inputs = inputs or {}

    # Assemble all assets from storage in one query
    assets = await assemble(**filters) if filters else []

    # Build kwargs: match Asset params from the assembled list, scalars from inputs
    kwargs = {}
    for p in info.params:
        asset_key = _asset_key_for_hint(p.type_hint)
        if asset_key:
            matched = [a for a in assets if a._asset_key == asset_key]
            if not matched and p.required:
                raise ValueError(
                    f"Task {task_name!r} requires {p.name} ({asset_key}) "
                    f"but no matching asset found for filters: {filters}"
                )
            if matched:
                kwargs[p.name] = (
                    matched if _is_list_asset_hint(p.type_hint) else matched[-1]
                )
        elif p.name in inputs:
            value = inputs[p.name]
            if p.type_hint is Path and isinstance(value, str):
                value = Path(value)
            kwargs[p.name] = value

    return await _execute(info, kwargs)

run_workflow(workflow_name, inputs) async

Run a workflow by name for reproducible pipeline execution.

Workflows accept scalar parameters (str, int, bool, list[str]) and handle their own asset assembly internally. Pass inputs exactly as the workflow signature defines them — no automatic resolution is performed.

For ad-hoc experimentation with individual tools, use run_task instead.

Parameters:

Name Type Description Default
workflow_name str

Name of the workflow (from list_tasks with category="workflow").

required
inputs dict

Keyword arguments as a JSON dict (scalars only).

required

Returns:

Type Description
dict

Serialized workflow output. Single outputs returned directly,

dict

multi-outputs as {"o0": ..., "o1": ...}.

Source code in src/stargazer/server.py
@mcp.tool()
async def run_workflow(workflow_name: str, inputs: dict) -> dict:
    """Run a workflow by name for reproducible pipeline execution.

    Workflows accept scalar parameters (str, int, bool, list[str]) and
    handle their own asset assembly internally. Pass inputs exactly as
    the workflow signature defines them — no automatic resolution is
    performed.

    For ad-hoc experimentation with individual tools, use run_task instead.

    Args:
        workflow_name: Name of the workflow (from list_tasks with category="workflow").
        inputs: Keyword arguments as a JSON dict (scalars only).

    Returns:
        Serialized workflow output. Single outputs returned directly,
        multi-outputs as {"o0": ..., "o1": ...}.
    """
    info = _registry.get(workflow_name)
    if info is None:
        available = [t.name for t in _registry.list_tasks(category="workflow")]
        raise ValueError(f"Unknown workflow: {workflow_name!r}. Available: {available}")
    if info.category != "workflow":
        raise ValueError(f"{workflow_name!r} is a task — use run_task instead.")

    return await _execute(info, dict(inputs))

show_config() async

Show current Stargazer configuration and available task counts.

Source code in src/stargazer/server.py
@mcp.resource("stargazer://config")
async def show_config() -> str:
    """Show current Stargazer configuration and available task counts.
    """
    tasks = _registry.list_tasks(category="task")
    workflows = _registry.list_tasks(category="workflow")
    config = {
        "stargazer_mode": os.environ.get("STARGAZER_MODE", "local"),
        "local_dir": str(default_client.local_dir),
        "tasks": len(tasks),
        "workflows": len(workflows),
    }
    return json.dumps(config, indent=2)

upload_file(path, keyvalues) async

Upload a file with metadata key-value pairs.

keyvalues must include "asset". Valid asset keys are derived from the Asset registry (e.g. asset=reference component=fasta).

When displaying results, always show a table with the CID and all keyvalues.

Source code in src/stargazer/server.py
@mcp.tool()
async def upload_file(path: str, keyvalues: dict[str, str]) -> dict:
    """Upload a file with metadata key-value pairs.

    keyvalues must include "asset". Valid asset keys are derived
    from the Asset registry (e.g. asset=reference component=fasta).

    When displaying results, always show a table with the CID and all keyvalues.
    """
    asset_key = keyvalues.get("asset")
    if asset_key not in ASSET_REGISTRY:
        valid = sorted(ASSET_REGISTRY.keys())
        raise ValueError(f"Invalid asset key {asset_key!r}. Valid keys: {valid}")
    cls = ASSET_REGISTRY[asset_key]
    declared = set(cls._field_defaults) | set(cls._field_types)
    unknown = set(keyvalues) - declared - {"asset"}
    if unknown:
        raise ValueError(
            f"Unknown keys for {asset_key!r}: {unknown}. Allowed: {sorted(declared)}"
        )
    field_kwargs = {k: v for k, v in keyvalues.items() if k in declared}
    comp = cls(path=Path(path), **field_kwargs)
    await default_client.upload(comp)
    return comp.to_dict()

Tasks

apply_bqsr task for Stargazer.

Applies BQSR recalibration to BAM files using GATK ApplyBQSR.

spec: docs/architecture/tasks.md

apply_bqsr(alignment, ref, bqsr_report) async

Apply Base Quality Score Recalibration to a BAM file.

Parameters:

Name Type Description Default
alignment Alignment

Input BAM asset

required
ref Reference

Reference FASTA asset

required
bqsr_report BQSRReport

Recalibration table from base_recalibrator

required

Returns:

Type Description
Alignment

Alignment asset with recalibrated BAM file

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037055712-ApplyBQSR

Source code in src/stargazer/tasks/gatk/apply_bqsr.py
@gatk_env.task
async def apply_bqsr(
    alignment: Alignment,
    ref: Reference,
    bqsr_report: BQSRReport,
) -> Alignment:
    """
    Apply Base Quality Score Recalibration to a BAM file.

    Args:
        alignment: Input BAM asset
        ref: Reference FASTA asset
        bqsr_report: Recalibration table from base_recalibrator

    Returns:
        Alignment asset with recalibrated BAM file

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037055712-ApplyBQSR
    """
    logger.info(alignment.to_dict())
    logger.info(ref.to_dict())
    logger.info(bqsr_report.to_dict())
    # fetch() auto-downloads companions (.fai, .dict, .bai, etc.)
    await alignment.fetch()
    await ref.fetch()
    await bqsr_report.fetch()

    ref_path = ref.path
    bam_path = alignment.path
    recal_path = bqsr_report.path

    if not recal_path or not recal_path.exists():
        raise FileNotFoundError("BQSR recalibration report not found in cache.")

    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{alignment.sample_id}_recalibrated.bam"

    cmd = [
        "gatk",
        "ApplyBQSR",
        "-R",
        str(ref_path),
        "-I",
        str(bam_path),
        "--bqsr-recal-file",
        str(recal_path),
        "-O",
        str(output_bam),
    ]

    await _run(cmd, cwd=str(output_dir))

    if not output_bam.exists():
        raise FileNotFoundError(f"ApplyBQSR did not create output BAM at {output_bam}")

    recal_bam = Alignment()
    await recal_bam.update(
        output_bam,
        sample_id=alignment.sample_id,
        format="bam",
        sorted="coordinate",
        duplicates_marked=alignment.duplicates_marked,
        bqsr_applied=True,
        tool="gatk_apply_bqsr",
        reference_cid=ref.cid,
    )

    logger.info(recal_bam.to_dict())
    return recal_bam

ApplyVQSR task for Stargazer.

Applies VQSR recalibration to a VCF using GATK ApplyVQSR.

spec: docs/architecture/tasks.md

apply_vqsr(vcf, ref, vqsr_model, truth_sensitivity_filter_level=None) async

Apply VQSR recalibration to a VCF using GATK ApplyVQSR.

The recalibration mode (SNP or INDEL) is read from vqsr_model.keyvalues["mode"]. If truth_sensitivity_filter_level is not provided, defaults to 99.5 for SNP and 99.0 for INDEL.

Parameters:

Name Type Description Default
vcf Variants

Raw (or SNP-filtered) VCF Variants asset

required
ref Reference

Reference FASTA asset

required
vqsr_model VQSRModel

Recalibration model from variant_recalibrator

required
truth_sensitivity_filter_level float | None

VQSLOD filter threshold (optional)

None

Returns:

Type Description
Variants

Variants asset with VQSR-filtered VCF

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360035531612-Variant-Quality-Score-Recalibration-VQSR

Source code in src/stargazer/tasks/gatk/apply_vqsr.py
@gatk_env.task
async def apply_vqsr(
    vcf: Variants,
    ref: Reference,
    vqsr_model: VQSRModel,
    truth_sensitivity_filter_level: float | None = None,
) -> Variants:
    """
    Apply VQSR recalibration to a VCF using GATK ApplyVQSR.

    The recalibration mode (SNP or INDEL) is read from vqsr_model.keyvalues["mode"].
    If truth_sensitivity_filter_level is not provided, defaults to 99.5 for SNP
    and 99.0 for INDEL.

    Args:
        vcf: Raw (or SNP-filtered) VCF Variants asset
        ref: Reference FASTA asset
        vqsr_model: Recalibration model from variant_recalibrator
        truth_sensitivity_filter_level: VQSLOD filter threshold (optional)

    Returns:
        Variants asset with VQSR-filtered VCF

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360035531612-Variant-Quality-Score-Recalibration-VQSR
    """
    logger.info(vcf.to_dict())
    logger.info(ref.to_dict())
    logger.info(vqsr_model.to_dict())
    mode = vqsr_model.keyvalues.get("mode", "SNP")
    if mode not in ("SNP", "INDEL"):
        raise ValueError(f"VQSRModel mode must be 'SNP' or 'INDEL', got {mode!r}")

    tranches_str = vqsr_model.keyvalues.get("tranches_path")
    if not tranches_str:
        raise ValueError("VQSRModel is missing tranches_path in keyvalues")
    tranches_path = Path(tranches_str)

    filter_level = truth_sensitivity_filter_level or _DEFAULT_FILTER_LEVEL[mode]

    await vcf.fetch()
    await ref.fetch()
    await vqsr_model.fetch()

    output_dir = _storage.default_client.local_dir
    sample_id = vcf.keyvalues.get("sample_id", "cohort")
    output_vcf = output_dir / f"{sample_id}_vqsr_{mode.lower()}.vcf"

    cmd = [
        "gatk",
        "ApplyVQSR",
        "-R",
        str(ref.path),
        "-V",
        str(vcf.path),
        "--recal-file",
        str(vqsr_model.path),
        "--tranches-file",
        str(tranches_path),
        "--truth-sensitivity-filter-level",
        str(filter_level),
        "--create-output-variant-index",
        "true",
        "-mode",
        mode,
        "-O",
        str(output_vcf),
    ]

    _, stderr = await _run(cmd, cwd=str(output_dir))

    if not output_vcf.exists():
        raise FileNotFoundError(
            f"ApplyVQSR did not create output VCF at {output_vcf}. stderr: {stderr}"
        )

    source_samples = vcf.source_samples or [sample_id]
    filtered_vcf = Variants()
    await filtered_vcf.update(
        output_vcf,
        sample_id=sample_id,
        caller="apply_vqsr",
        variant_type="vcf",
        vqsr_mode=mode,
        build=ref.build,
        sample_count=len(source_samples),
        source_samples=source_samples,
    )

    idx_path = output_dir / f"{output_vcf.name}.idx"
    if idx_path.exists():
        vidx = VariantsIndex()
        await vidx.update(idx_path, sample_id=sample_id, variants_cid=filtered_vcf.cid)

    logger.info(filtered_vcf.to_dict())
    return filtered_vcf

base_recalibrator task for Stargazer.

Creates BQSR recalibration table using GATK BaseRecalibrator.

spec: docs/architecture/tasks.md

base_recalibrator(alignment, ref, known_sites) async

Generate a Base Quality Score Recalibration report.

Uses GATK BaseRecalibrator to analyze patterns of covariation in the sequence dataset and produce a recalibration table.

Parameters:

Name Type Description Default
alignment Alignment

Input BAM asset (should be sorted and have duplicates marked)

required
ref Reference

Reference FASTA asset

required
known_sites list[KnownSites]

List of KnownSites VCF assets (dbSNP, known indels, etc.)

required

Returns:

Type Description
BQSRReport

BQSRReport asset containing the recalibration table

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360036898312-BaseRecalibrator

Source code in src/stargazer/tasks/gatk/base_recalibrator.py
@gatk_env.task
async def base_recalibrator(
    alignment: Alignment,
    ref: Reference,
    known_sites: list[KnownSites],
) -> BQSRReport:
    """
    Generate a Base Quality Score Recalibration report.

    Uses GATK BaseRecalibrator to analyze patterns of covariation in the
    sequence dataset and produce a recalibration table.

    Args:
        alignment: Input BAM asset (should be sorted and have duplicates marked)
        ref: Reference FASTA asset
        known_sites: List of KnownSites VCF assets (dbSNP, known indels, etc.)

    Returns:
        BQSRReport asset containing the recalibration table

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360036898312-BaseRecalibrator
    """
    logger.info(alignment.to_dict())
    logger.info(ref.to_dict())
    logger.info([x.to_dict() for x in known_sites])
    if not known_sites:
        raise ValueError("known_sites list cannot be empty for BQSR")

    # fetch() auto-downloads companions (.fai, .dict, .bai, etc.)
    await alignment.fetch()
    await ref.fetch()
    for site in known_sites:
        await site.fetch()

    ref_path = ref.path
    bam_path = alignment.path
    output_dir = _storage.default_client.local_dir
    output_recal = output_dir / f"{alignment.sample_id}_bqsr.table"

    cmd = [
        "gatk",
        "BaseRecalibrator",
        "-R",
        str(ref_path),
        "-I",
        str(bam_path),
        "-O",
        str(output_recal),
    ]
    for site in known_sites:
        cmd.extend(["--known-sites", str(site.path)])

    await _run(cmd, cwd=str(output_dir))

    if not output_recal.exists():
        raise FileNotFoundError(
            f"BaseRecalibrator did not create recalibration report at {output_recal}"
        )

    report = BQSRReport()
    await report.update(
        output_recal,
        sample_id=alignment.sample_id,
        tool="gatk_base_recalibrator",
        alignment_cid=alignment.cid,
    )

    logger.info(report.to_dict())
    return report

CombineGVCFs task for Stargazer.

Combines multiple per-sample GVCFs into a single multi-sample GVCF for joint genotyping using GATK CombineGVCFs.

spec: docs/architecture/tasks.md

combine_gvcfs(gvcfs, ref, cohort_id='cohort') async

Combine multiple per-sample GVCFs into a single multi-sample GVCF.

Parameters:

Name Type Description Default
gvcfs list[Variants]

List of Variants assets, each containing a GVCF from a single sample

required
ref Reference

Reference FASTA asset

required
cohort_id str

Identifier for the combined cohort (default: "cohort")

'cohort'

Returns:

Type Description
Variants

Variants asset with combined multi-sample GVCF

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037053272-CombineGVCFs

Source code in src/stargazer/tasks/gatk/combine_gvcfs.py
@gatk_env.task
async def combine_gvcfs(
    gvcfs: list[Variants],
    ref: Reference,
    cohort_id: str = "cohort",
) -> Variants:
    """
    Combine multiple per-sample GVCFs into a single multi-sample GVCF.

    Args:
        gvcfs: List of Variants assets, each containing a GVCF from a single sample
        ref: Reference FASTA asset
        cohort_id: Identifier for the combined cohort (default: "cohort")

    Returns:
        Variants asset with combined multi-sample GVCF

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037053272-CombineGVCFs
    """
    logger.info([x.to_dict() for x in gvcfs])
    logger.info(ref.to_dict())
    if not gvcfs:
        raise ValueError("gvcfs list cannot be empty")

    for i, gvcf in enumerate(gvcfs):
        if gvcf.variant_type != "gvcf":
            raise ValueError(
                f"combine_gvcfs requires GVCF files, but gvcfs[{i}] is not a GVCF. "
                f"sample_id={gvcf.sample_id}"
            )

    # fetch() auto-downloads companions (.fai, .dict for ref; .idx for each gvcf)
    await ref.fetch()

    gvcf_paths: list[Path] = []
    sample_ids: list[str] = []

    for gvcf in gvcfs:
        await gvcf.fetch()
        gvcf_paths.append(gvcf.path)
        sample_ids.append(gvcf.sample_id)

    ref_path = ref.path
    output_dir = _storage.default_client.local_dir
    output_gvcf = output_dir / f"{cohort_id}_combined.g.vcf"

    cmd = [
        "gatk",
        "CombineGVCFs",
        "-R",
        str(ref_path),
        "-O",
        str(output_gvcf),
    ]

    for gvcf_path in gvcf_paths:
        cmd.extend(["-V", str(gvcf_path)])

    stdout, stderr = await _run(cmd, cwd=str(output_dir))

    if not output_gvcf.exists():
        raise FileNotFoundError(
            f"CombineGVCFs did not create output GVCF at {output_gvcf}. "
            f"stderr: {stderr}"
        )

    combined_vcf = Variants()
    await combined_vcf.update(
        output_gvcf,
        sample_id=cohort_id,
        caller="combine_gvcfs",
        variant_type="gvcf",
        build=ref.build,
        sample_count=len(sample_ids),
        source_samples=sample_ids,
    )

    # Upload implicit index file produced by GATK
    idx_path = output_dir / f"{output_gvcf.name}.idx"
    if idx_path.exists():
        vidx = VariantsIndex()
        await vidx.update(idx_path, sample_id=cohort_id, variants_cid=combined_vcf.cid)

    logger.info(combined_vcf.to_dict())
    return combined_vcf

GATK CreateSequenceDictionary task for reference genome.

spec: docs/architecture/tasks.md

create_sequence_dictionary(ref) async

Create a sequence dictionary (.dict file) using GATK CreateSequenceDictionary.

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required

Returns:

Type Description
SequenceDict

SequenceDict asset containing the .dict file

Source code in src/stargazer/tasks/gatk/create_sequence_dictionary.py
@gatk_env.task
async def create_sequence_dictionary(ref: Reference) -> SequenceDict:
    """
    Create a sequence dictionary (.dict file) using GATK CreateSequenceDictionary.

    Args:
        ref: Reference FASTA asset

    Returns:
        SequenceDict asset containing the .dict file
    """
    logger.info(ref.to_dict())
    await ref.fetch()
    ref_path = ref.path

    if not ref_path or not ref_path.exists():
        raise FileNotFoundError(f"Reference file not found at {ref_path}")

    output_dir = _storage.default_client.local_dir
    dict_path = output_dir / f"{ref_path.stem}.dict"
    dict_path.unlink(missing_ok=True)
    cmd = [
        "gatk",
        "CreateSequenceDictionary",
        "-R",
        str(ref_path),
        "-O",
        str(dict_path),
    ]
    await _run(cmd, cwd=str(output_dir))

    if not dict_path.exists():
        raise FileNotFoundError(
            f"Sequence dictionary file {dict_path.name} was not created"
        )

    seq_dict = SequenceDict()
    await seq_dict.update(
        dict_path,
        build=ref.build,
        tool="gatk_CreateSequenceDictionary",
        reference_cid=ref.cid,
    )

    logger.info(seq_dict.to_dict())
    return seq_dict

GenomicsDBImport task for Stargazer.

Import VCFs to GenomicsDB for efficient joint genotyping of large cohorts.

spec: docs/architecture/tasks.md

genomics_db_import(gvcfs, workspace_path, intervals=None) async

Import GVCFs to GenomicsDB workspace for scalable joint genotyping.

Parameters:

Name Type Description Default
gvcfs list[Variants]

List of per-sample GVCF Variants assets to import

required
workspace_path Path

Path where GenomicsDB workspace will be created

required
intervals list[str] | None

Genomic intervals to process (e.g., ["chr1", "chr2:100000-200000"])

None

Returns:

Type Description
Path

Path to the created GenomicsDB workspace directory

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360036883491-GenomicsDBImport

Source code in src/stargazer/tasks/gatk/genomics_db_import.py
@gatk_env.task
async def genomics_db_import(
    gvcfs: list[Variants],
    workspace_path: Path,
    intervals: list[str] | None = None,
) -> Path:
    """
    Import GVCFs to GenomicsDB workspace for scalable joint genotyping.

    Args:
        gvcfs: List of per-sample GVCF Variants assets to import
        workspace_path: Path where GenomicsDB workspace will be created
        intervals: Genomic intervals to process (e.g., ["chr1", "chr2:100000-200000"])

    Returns:
        Path to the created GenomicsDB workspace directory

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360036883491-GenomicsDBImport
    """
    logger.info([x.to_dict() for x in gvcfs])
    if not gvcfs:
        raise ValueError("At least one GVCF must be provided")

    for gvcf in gvcfs:
        if gvcf.variant_type != "gvcf":
            raise ValueError(
                f"All inputs must be GVCFs, got variant_type={gvcf.variant_type!r}: "
                f"sample_id={gvcf.sample_id}"
            )

    if workspace_path.exists():
        raise ValueError(
            f"GenomicsDB workspace already exists at {workspace_path}. "
            "Use --genomicsdb-update-workspace-path to add samples to existing workspace."
        )

    for gvcf in gvcfs:
        await gvcf.fetch()

    sample_map_path = workspace_path.parent / "sample_map.txt"
    sample_map_path.parent.mkdir(parents=True, exist_ok=True)

    with open(sample_map_path, "w") as f:
        for gvcf in gvcfs:
            if not gvcf.path:
                raise ValueError(
                    f"GVCF file not available for sample_id={gvcf.sample_id}"
                )
            f.write(f"{gvcf.sample_id}\t{gvcf.path}\n")

    cmd = [
        "gatk",
        "GenomicsDBImport",
        "--genomicsdb-workspace-path",
        str(workspace_path),
        "--sample-name-map",
        str(sample_map_path),
    ]

    if intervals:
        for interval in intervals:
            cmd.extend(["-L", interval])

    await _run(cmd, cwd=str(workspace_path.parent))

    if not workspace_path.exists():
        raise FileNotFoundError(
            f"GenomicsDBImport did not create workspace at {workspace_path}"
        )

    return workspace_path

haplotype_caller task for Stargazer.

Calls germline SNPs and indels via local re-assembly of haplotypes using GATK HaplotypeCaller in GVCF mode.

spec: docs/architecture/tasks.md

haplotype_caller(alignment, ref) async

Call germline variants in GVCF mode using GATK HaplotypeCaller.

Parameters:

Name Type Description Default
alignment Alignment

Sorted, duplicate-marked BAM asset (BQSR-recalibrated recommended)

required
ref Reference

Reference FASTA asset with sequence dictionary

required

Returns:

Type Description
Variants

Variants asset containing the per-sample GVCF

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037225632-HaplotypeCaller

Source code in src/stargazer/tasks/gatk/haplotype_caller.py
@gatk_env.task
async def haplotype_caller(
    alignment: Alignment,
    ref: Reference,
) -> Variants:
    """
    Call germline variants in GVCF mode using GATK HaplotypeCaller.

    Args:
        alignment: Sorted, duplicate-marked BAM asset (BQSR-recalibrated recommended)
        ref: Reference FASTA asset with sequence dictionary

    Returns:
        Variants asset containing the per-sample GVCF

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037225632-HaplotypeCaller
    """
    logger.info(alignment.to_dict())
    logger.info(ref.to_dict())
    # fetch() auto-downloads companions (.fai, .dict, .bai)
    await alignment.fetch()
    await ref.fetch()

    ref_path = ref.path
    bam_path = alignment.path
    output_dir = _storage.default_client.local_dir
    output_gvcf = output_dir / f"{alignment.sample_id}.g.vcf"

    cmd = [
        "gatk",
        "HaplotypeCaller",
        "-R",
        str(ref_path),
        "-I",
        str(bam_path),
        "-O",
        str(output_gvcf),
        "--emit-ref-confidence",
        "GVCF",
    ]

    await _run(cmd, cwd=str(output_dir))

    if not output_gvcf.exists():
        raise FileNotFoundError(
            f"HaplotypeCaller did not create output GVCF at {output_gvcf}"
        )

    gvcf = Variants()
    await gvcf.update(
        output_gvcf,
        sample_id=alignment.sample_id,
        caller="haplotype_caller",
        variant_type="gvcf",
        build=ref.build,
        sample_count=1,
        source_samples=alignment.sample_id,
    )

    # Upload implicit index file produced by GATK
    idx_path = output_dir / f"{output_gvcf.name}.idx"
    if idx_path.exists():
        vidx = VariantsIndex()
        await vidx.update(
            idx_path, sample_id=alignment.sample_id, variants_cid=gvcf.cid
        )

    logger.info(gvcf.to_dict())
    return gvcf

joint_call_gvcfs task for Stargazer.

Consolidates per-sample GVCFs into a GenomicsDB datastore and performs joint genotyping in a single task, avoiding the need to persist the GenomicsDB workspace between tasks.

spec: docs/architecture/tasks.md

joint_call_gvcfs(gvcfs, ref, intervals, cohort_id='cohort') async

Consolidate GVCFs into GenomicsDB and joint-genotype in a single task.

Runs GenomicsDBImport followed immediately by GenotypeGVCFs within the same execution context, so the workspace never needs to leave the pod.

Parameters:

Name Type Description Default
gvcfs list[Variants]

Per-sample GVCF Variants assets from HaplotypeCaller

required
ref Reference

Reference FASTA asset

required
intervals list[str]

Genomic intervals to process (required by GenomicsDBImport)

required
cohort_id str

Sample ID label for the output VCF (default: "cohort")

'cohort'

Returns:

Type Description
Variants

Joint-genotyped Variants asset (VCF)

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360035535932

Source code in src/stargazer/tasks/gatk/joint_call_gvcfs.py
@gatk_env.task
async def joint_call_gvcfs(
    gvcfs: list[Variants],
    ref: Reference,
    intervals: list[str],
    cohort_id: str = "cohort",
) -> Variants:
    """
    Consolidate GVCFs into GenomicsDB and joint-genotype in a single task.

    Runs GenomicsDBImport followed immediately by GenotypeGVCFs within the same
    execution context, so the workspace never needs to leave the pod.

    Args:
        gvcfs: Per-sample GVCF Variants assets from HaplotypeCaller
        ref: Reference FASTA asset
        intervals: Genomic intervals to process (required by GenomicsDBImport)
        cohort_id: Sample ID label for the output VCF (default: "cohort")

    Returns:
        Joint-genotyped Variants asset (VCF)

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360035535932
    """
    logger.info([x.to_dict() for x in gvcfs])
    logger.info(ref.to_dict())
    if not gvcfs:
        raise ValueError("At least one GVCF must be provided")
    for gvcf in gvcfs:
        if gvcf.variant_type != "gvcf":
            raise ValueError(
                f"All inputs must be GVCFs, got variant_type={gvcf.variant_type!r}: "
                f"sample_id={gvcf.sample_id}"
            )

    await ref.fetch()
    for gvcf in gvcfs:
        await gvcf.fetch()

    output_dir = _storage.default_client.local_dir

    with tempfile.TemporaryDirectory() as tmpdir:
        workspace = Path(tmpdir) / f"{cohort_id}_genomicsdb"

        # Write sample map
        sample_map = Path(tmpdir) / "sample_map.txt"
        with open(sample_map, "w") as f:
            for gvcf in gvcfs:
                f.write(f"{gvcf.sample_id}\t{gvcf.path}\n")

        # GenomicsDBImport
        import_cmd = [
            "gatk",
            "GenomicsDBImport",
            "--genomicsdb-workspace-path",
            str(workspace),
            "--sample-name-map",
            str(sample_map),
        ]
        for interval in intervals:
            import_cmd.extend(["-L", interval])

        await _run(import_cmd, cwd=tmpdir)

        if not workspace.exists():
            raise FileNotFoundError(
                f"GenomicsDBImport did not create workspace at {workspace}"
            )

        # GenotypeGVCFs
        output_vcf = output_dir / f"{cohort_id}_genotyped.vcf"
        genotype_cmd = [
            "gatk",
            "GenotypeGVCFs",
            "-R",
            str(ref.path),
            "-V",
            f"gendb://{workspace}",
            "-O",
            str(output_vcf),
        ]

        _, stderr = await _run(genotype_cmd, cwd=tmpdir)

    if not output_vcf.exists():
        raise FileNotFoundError(
            f"GenotypeGVCFs did not create output VCF at {output_vcf}. stderr: {stderr}"
        )

    source_samples = [g.sample_id for g in gvcfs]
    vcf = Variants()
    await vcf.update(
        output_vcf,
        sample_id=cohort_id,
        caller="joint_call_gvcfs",
        variant_type="vcf",
        build=ref.build,
        sample_count=len(source_samples),
        source_samples=source_samples,
    )

    idx_path = output_dir / f"{output_vcf.name}.idx"
    if idx_path.exists():
        vidx = VariantsIndex()
        await vidx.update(idx_path, sample_id=cohort_id, variants_cid=vcf.cid)

    logger.info(vcf.to_dict())
    return vcf

mark_duplicates task for Stargazer.

Marks duplicate reads in BAM files using GATK MarkDuplicates.

spec: docs/architecture/tasks.md

mark_duplicates(alignment) async

Mark duplicate reads in a BAM file.

Uses GATK MarkDuplicates to identify and tag duplicate reads that originated from the same DNA fragment (PCR or optical duplicates). Duplicates are marked with the 0x0400 SAM flag.

Parameters:

Name Type Description Default
alignment Alignment

Input BAM asset (should be coordinate sorted)

required

Returns:

Type Description
Alignment

Alignment asset with duplicates marked

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037052812-MarkDuplicates-Picard

Source code in src/stargazer/tasks/gatk/mark_duplicates.py
@gatk_env.task
async def mark_duplicates(alignment: Alignment) -> Alignment:
    """
    Mark duplicate reads in a BAM file.

    Uses GATK MarkDuplicates to identify and tag duplicate reads that
    originated from the same DNA fragment (PCR or optical duplicates).
    Duplicates are marked with the 0x0400 SAM flag.

    Args:
        alignment: Input BAM asset (should be coordinate sorted)

    Returns:
        Alignment asset with duplicates marked

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037052812-MarkDuplicates-Picard
    """
    logger.info(alignment.to_dict())
    await alignment.fetch()
    bam_path = alignment.path
    output_dir = _storage.default_client.local_dir

    output_bam = output_dir / f"{alignment.sample_id}_marked_duplicates.bam"
    metrics_file = output_dir / f"{alignment.sample_id}_duplicate_metrics.txt"

    cmd = [
        "gatk",
        "MarkDuplicates",
        "-I",
        str(bam_path),
        "-O",
        str(output_bam),
        "-M",
        str(metrics_file),
        "--CREATE_INDEX",
        "true",
    ]

    await _run(cmd, cwd=str(output_dir))

    if not output_bam.exists():
        raise FileNotFoundError(
            f"MarkDuplicates did not create output BAM at {output_bam}"
        )

    marked_bam = Alignment()
    await marked_bam.update(
        output_bam,
        sample_id=alignment.sample_id,
        format="bam",
        sorted="coordinate",
        duplicates_marked=True,
        bqsr_applied=alignment.bqsr_applied,
        tool="gatk_mark_duplicates",
    )

    bam_index = output_dir / f"{output_bam.name}.bai"
    if bam_index.exists():
        idx = AlignmentIndex()
        await idx.update(
            bam_index,
            sample_id=alignment.sample_id,
            alignment_cid=marked_bam.cid,
        )

    if metrics_file.exists():
        metrics = DuplicateMetrics()
        await metrics.update(
            metrics_file,
            sample_id=alignment.sample_id,
            tool="gatk_mark_duplicates",
            alignment_cid=marked_bam.cid,
        )

    logger.info(marked_bam.to_dict())
    return marked_bam

merge_bam_alignment task for Stargazer.

Merges aligned BAM with unmapped BAM using GATK MergeBamAlignment.

spec: docs/architecture/tasks.md

merge_bam_alignment(aligned_bam, unmapped_bam, ref) async

Merge alignment data from aligned BAM with data in unmapped BAM.

Parameters:

Name Type Description Default
aligned_bam Alignment

Aligned BAM asset from aligner

required
unmapped_bam Alignment

Original unmapped BAM asset (must be queryname sorted)

required
ref Reference

Reference FASTA asset

required

Returns:

Type Description
Alignment

Alignment asset with merged BAM file

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037226472-MergeBamAlignment-Picard

Source code in src/stargazer/tasks/gatk/merge_bam_alignment.py
@gatk_env.task
async def merge_bam_alignment(
    aligned_bam: Alignment,
    unmapped_bam: Alignment,
    ref: Reference,
) -> Alignment:
    """
    Merge alignment data from aligned BAM with data in unmapped BAM.

    Args:
        aligned_bam: Aligned BAM asset from aligner
        unmapped_bam: Original unmapped BAM asset (must be queryname sorted)
        ref: Reference FASTA asset

    Returns:
        Alignment asset with merged BAM file

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037226472-MergeBamAlignment-Picard
    """
    logger.info(aligned_bam.to_dict())
    logger.info(unmapped_bam.to_dict())
    logger.info(ref.to_dict())
    # fetch() auto-downloads companions (.fai, .dict for ref)
    await aligned_bam.fetch()
    await unmapped_bam.fetch()
    await ref.fetch()

    ref_path = ref.path
    aligned_path = aligned_bam.path
    unmapped_path = unmapped_bam.path
    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{aligned_bam.sample_id}_merged.bam"

    cmd = [
        "gatk",
        "MergeBamAlignment",
        "-R",
        str(ref_path),
        "-ALIGNED",
        str(aligned_path),
        "-UNMAPPED",
        str(unmapped_path),
        "-O",
        str(output_bam),
        "--SORT_ORDER",
        "coordinate",
        "--CREATE_INDEX",
        "true",
        "--ADD_MATE_CIGAR",
        "true",
        "--CLIP_ADAPTERS",
        "true",
        "--CLIP_OVERLAPPING_READS",
        "true",
        "--INCLUDE_SECONDARY_ALIGNMENTS",
        "true",
        "--MAX_INSERTIONS_OR_DELETIONS",
        "-1",
    ]

    await _run(cmd, cwd=str(output_dir))

    if not output_bam.exists():
        raise FileNotFoundError(
            f"MergeBamAlignment did not create output BAM at {output_bam}"
        )

    merged_bam = Alignment()
    await merged_bam.update(
        output_bam,
        sample_id=aligned_bam.sample_id,
        format="bam",
        sorted="coordinate",
        duplicates_marked=aligned_bam.duplicates_marked,
        bqsr_applied=aligned_bam.bqsr_applied,
        tool="gatk_merge_bam_alignment",
    )

    bam_index = output_dir / f"{output_bam.name}.bai"
    if bam_index.exists():
        idx = AlignmentIndex()
        await idx.update(
            bam_index,
            sample_id=aligned_bam.sample_id,
            alignment_cid=merged_bam.cid,
        )

    logger.info(merged_bam.to_dict())
    return merged_bam

sort_sam task for Stargazer.

Sorts BAM files using GATK SortSam.

spec: docs/architecture/tasks.md

sort_sam(alignment, sort_order='coordinate') async

Sort a SAM/BAM file.

Parameters:

Name Type Description Default
alignment Alignment

Input BAM asset to sort

required
sort_order str

Sort order - one of "coordinate", "queryname", "duplicate"

'coordinate'

Returns:

Type Description
Alignment

Alignment asset with sorted BAM file

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360037056932-SortSam-Picard

Source code in src/stargazer/tasks/gatk/sort_sam.py
@gatk_env.task
async def sort_sam(
    alignment: Alignment,
    sort_order: str = "coordinate",
) -> Alignment:
    """
    Sort a SAM/BAM file.

    Args:
        alignment: Input BAM asset to sort
        sort_order: Sort order - one of "coordinate", "queryname", "duplicate"

    Returns:
        Alignment asset with sorted BAM file

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360037056932-SortSam-Picard
    """
    logger.info(alignment.to_dict())
    valid_sort_orders = ["coordinate", "queryname", "duplicate"]
    if sort_order not in valid_sort_orders:
        raise ValueError(
            f"Invalid sort_order: {sort_order}. Must be one of {valid_sort_orders}"
        )

    await alignment.fetch()
    bam_path = alignment.path
    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{alignment.sample_id}_sorted_{sort_order}.bam"

    cmd = [
        "gatk",
        "SortSam",
        "-I",
        str(bam_path),
        "-O",
        str(output_bam),
        "--SORT_ORDER",
        sort_order,
    ]

    if sort_order == "coordinate":
        cmd.extend(["--CREATE_INDEX", "true"])

    await _run(cmd, cwd=str(output_dir))

    if not output_bam.exists():
        raise FileNotFoundError(f"SortSam did not create output BAM at {output_bam}")

    sorted_bam = Alignment()
    await sorted_bam.update(
        output_bam,
        sample_id=alignment.sample_id,
        format="bam",
        sorted=sort_order,
        duplicates_marked=alignment.duplicates_marked,
        bqsr_applied=alignment.bqsr_applied,
        tool="gatk_sort_sam",
    )

    if sort_order == "coordinate":
        bam_index = output_dir / f"{output_bam.name}.bai"
        if bam_index.exists():
            idx = AlignmentIndex()
            await idx.update(
                bam_index,
                sample_id=alignment.sample_id,
                alignment_cid=sorted_bam.cid,
            )

    logger.info(sorted_bam.to_dict())
    return sorted_bam

VariantRecalibrator task for Stargazer.

Builds a recalibration model for VQSR using GATK VariantRecalibrator.

spec: docs/architecture/tasks.md

variant_recalibrator(vcf, ref, resources, mode='SNP') async

Build a VQSR recalibration model using GATK VariantRecalibrator.

Each KnownSites in resources must carry the following keyvalues: resource_name: e.g. "hapmap", "omni", "1000G", "dbsnp", "mills" known: "true" or "false" training: "true" or "false" truth: "true" or "false" prior: numeric string, e.g. "15"

Parameters:

Name Type Description Default
vcf Variants

Raw genotyped VCF Variants asset

required
ref Reference

Reference FASTA asset

required
resources list[KnownSites]

Training/truth VCF resources for the recalibrator

required
mode str

Variant type to recalibrate — "SNP" or "INDEL"

'SNP'

Returns:

Type Description
VQSRModel

VQSRModel asset (recal file) with tranches_path stored in keyvalues

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360035531612-Variant-Quality-Score-Recalibration-VQSR

Source code in src/stargazer/tasks/gatk/variant_recalibrator.py
@gatk_env.task
async def variant_recalibrator(
    vcf: Variants,
    ref: Reference,
    resources: list[KnownSites],
    mode: str = "SNP",
) -> VQSRModel:
    """
    Build a VQSR recalibration model using GATK VariantRecalibrator.

    Each KnownSites in ``resources`` must carry the following keyvalues:
        resource_name: e.g. "hapmap", "omni", "1000G", "dbsnp", "mills"
        known:         "true" or "false"
        training:      "true" or "false"
        truth:         "true" or "false"
        prior:         numeric string, e.g. "15"

    Args:
        vcf: Raw genotyped VCF Variants asset
        ref: Reference FASTA asset
        resources: Training/truth VCF resources for the recalibrator
        mode: Variant type to recalibrate — "SNP" or "INDEL"

    Returns:
        VQSRModel asset (recal file) with tranches_path stored in keyvalues

    Reference:
        https://gatk.broadinstitute.org/hc/en-us/articles/360035531612-Variant-Quality-Score-Recalibration-VQSR
    """
    logger.info(vcf.to_dict())
    logger.info(ref.to_dict())
    logger.info([x.to_dict() for x in resources])
    if mode not in ("SNP", "INDEL"):
        raise ValueError(f"mode must be 'SNP' or 'INDEL', got {mode!r}")
    if not resources:
        raise ValueError(f"At least one resource VCF is required for mode={mode!r}")

    await vcf.fetch()
    await ref.fetch()
    for r in resources:
        await r.fetch()

    output_dir = _storage.default_client.local_dir
    sample_id = vcf.keyvalues.get("sample_id", "cohort")
    output_recal = output_dir / f"{sample_id}_{mode.lower()}.recal"
    output_tranches = output_dir / f"{sample_id}_{mode.lower()}.tranches"

    annotations = _SNP_ANNOTATIONS if mode == "SNP" else _INDEL_ANNOTATIONS

    cmd = [
        "gatk",
        "VariantRecalibrator",
        "-R",
        str(ref.path),
        "-V",
        str(vcf.path),
        "-mode",
        mode,
        "-O",
        str(output_recal),
        "--tranches-file",
        str(output_tranches),
    ]

    for r in resources:
        name = r.keyvalues.get("resource_name", "unknown")
        known = r.keyvalues.get("known", "false")
        training = r.keyvalues.get("training", "false")
        truth = r.keyvalues.get("truth", "false")
        prior = r.keyvalues.get("prior", "10")
        cmd.extend(
            [
                f"--resource:{name},known={known},training={training},truth={truth},prior={prior}",
                str(r.path),
            ]
        )

    for ann in annotations:
        cmd.extend(["-an", ann])

    _, stderr = await _run(cmd, cwd=str(output_dir))

    if not output_recal.exists():
        raise FileNotFoundError(
            f"VariantRecalibrator did not create recal file at {output_recal}. stderr: {stderr}"
        )

    model = VQSRModel()
    await model.update(
        output_recal,
        sample_id=sample_id,
        mode=mode,
        tranches_path=str(output_tranches),
        build=ref.build,
        variants_cid=vcf.cid,
    )

    logger.info(model.to_dict())
    return model

BWA tasks for reference genome indexing and alignment.

spec: docs/architecture/tasks.md

bwa_index(ref) async

Create BWA index files for a reference genome using bwa index.

Creates the following index files: - .amb, .ann, .bwt, .pac, .sa

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required

Returns:

Type Description
list[AlignerIndex]

List of AlignerIndex assets, one per index file

Reference

https://bio-bwa.sourceforge.net/bwa.shtml

Source code in src/stargazer/tasks/general/bwa.py
@gatk_env.task
async def bwa_index(ref: Reference) -> list[AlignerIndex]:
    """
    Create BWA index files for a reference genome using bwa index.

    Creates the following index files:
    - .amb, .ann, .bwt, .pac, .sa

    Args:
        ref: Reference FASTA asset

    Returns:
        List of AlignerIndex assets, one per index file

    Reference:
        https://bio-bwa.sourceforge.net/bwa.shtml
    """
    logger.info(ref.to_dict())
    await ref.fetch()
    ref_path = ref.path

    if not ref_path or not ref_path.exists():
        raise FileNotFoundError(f"Reference file not found at {ref_path}")

    index_extensions = [".amb", ".ann", ".bwt", ".pac", ".sa"]
    output_dir = _storage.default_client.local_dir
    base_name = ref_path.name
    output_prefix = output_dir / base_name

    cmd = ["bwa", "index", "-p", str(output_prefix), str(ref_path)]
    stdout, stderr = await _run(cmd, cwd=str(output_dir))

    indices = []
    for ext in index_extensions:
        index_path = output_dir / f"{base_name}{ext}"
        if not index_path.exists():
            raise FileNotFoundError(f"BWA index file {index_path.name} was not created")

        idx = AlignerIndex()
        await idx.update(
            index_path,
            aligner="bwa",
            build=ref.build,
            reference_cid=ref.cid,
        )
        indices.append(idx)

    logger.info([x.to_dict() for x in indices])
    return indices

bwa_mem(ref, r1, r2=None, read_group=None) async

Align FASTQ reads to reference genome using BWA-MEM.

Produces an unsorted BAM file that typically needs to be sorted before downstream processing (e.g., with sort_sam).

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required
r1 R1

R1 FASTQ read asset

required
r2 R2 | None

R2 FASTQ read asset (None for single-end)

None
read_group dict[str, str] | None

Optional read group override (ID, SM, LB, PL, PU)

None

Returns:

Type Description
Alignment

Alignment asset containing the unsorted BAM file

Reference

https://bio-bwa.sourceforge.net/bwa.shtml

Source code in src/stargazer/tasks/general/bwa.py
@gatk_env.task
async def bwa_mem(
    ref: Reference,
    r1: R1,
    r2: R2 | None = None,
    read_group: dict[str, str] | None = None,
) -> Alignment:
    """
    Align FASTQ reads to reference genome using BWA-MEM.

    Produces an unsorted BAM file that typically needs to be sorted
    before downstream processing (e.g., with sort_sam).

    Args:
        ref: Reference FASTA asset
        r1: R1 FASTQ read asset
        r2: R2 FASTQ read asset (None for single-end)
        read_group: Optional read group override (ID, SM, LB, PL, PU)

    Returns:
        Alignment asset containing the unsorted BAM file

    Reference:
        https://bio-bwa.sourceforge.net/bwa.shtml
    """
    logger.info(ref.to_dict())
    logger.info(r1.to_dict())
    if r2:
        logger.info(r2.to_dict())
    # fetch() downloads ref + companions (aligner indices, .fai, .dict)
    await ref.fetch()
    await r1.fetch()
    if r2:
        await r2.fetch()

    ref_path = ref.path
    r1_path = r1.path
    r2_path = r2.path if r2 else None
    sample_id = r1.sample_id

    if read_group:
        rg = {"ID": sample_id, "SM": sample_id}
        rg.update(read_group)
        rg_parts = [f"{k}:{v}" for k, v in rg.items()]
        rg_string = "@RG\\t" + "\\t".join(rg_parts)
    else:
        rg_string = f"@RG\\tID:{sample_id}\\tSM:{sample_id}\\tLB:lib\\tPL:ILLUMINA"

    output_dir = _storage.default_client.local_dir
    output_bam = output_dir / f"{sample_id}_aligned.bam"

    cmd = [
        "bwa",
        "mem",
        "-K",
        "10000000",
        "-R",
        rg_string,
        "-t",
        "4",
        str(ref_path),
    ]

    if r2_path:
        cmd.extend([str(r1_path), str(r2_path)])
    else:
        cmd.append(str(r1_path))

    bwa_cmd = " ".join(shlex.quote(str(c)) for c in cmd)
    samtools_cmd = f"samtools view -bS -o {output_bam} -"
    full_cmd = f"{bwa_cmd} | {samtools_cmd}"

    logger.info(f"Running: {full_cmd}")
    proc = await asyncio.create_subprocess_shell(
        full_cmd,
        cwd=str(output_dir),
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()

    if proc.returncode != 0:
        stderr_text = stderr.decode() if stderr else ""
        stdout_text = stdout.decode() if stdout else ""
        raise RuntimeError(
            f"BWA-MEM failed with return code {proc.returncode}.\n"
            f"stdout: {stdout_text}\nstderr: {stderr_text}"
        )

    if not output_bam.exists():
        raise FileNotFoundError(f"BWA-MEM did not create output BAM at {output_bam}")

    bam = Alignment()
    await bam.update(
        output_bam,
        sample_id=sample_id,
        format="bam",
        duplicates_marked=False,
        bqsr_applied=False,
        reference_cid=ref.cid,
        r1_cid=r1.cid,
    )

    logger.info(bam.to_dict())
    return bam

Samtools tasks for reference genome indexing.

spec: docs/architecture/tasks.md

samtools_faidx(ref) async

Create a FASTA index (.fai file) using samtools faidx.

Parameters:

Name Type Description Default
ref Reference

Reference FASTA asset

required

Returns:

Type Description
ReferenceIndex

ReferenceIndex asset containing the .fai file

Source code in src/stargazer/tasks/general/samtools.py
@gatk_env.task
async def samtools_faidx(ref: Reference) -> ReferenceIndex:
    """
    Create a FASTA index (.fai file) using samtools faidx.

    Args:
        ref: Reference FASTA asset

    Returns:
        ReferenceIndex asset containing the .fai file
    """
    logger.info(ref.to_dict())
    await ref.fetch()
    ref_path = ref.path

    if not ref_path or not ref_path.exists():
        raise FileNotFoundError(f"Reference file not found at {ref_path}")

    output_dir = _storage.default_client.local_dir
    fai_path = output_dir / f"{ref_path.name}.fai"
    cmd = ["samtools", "faidx", str(ref_path), "--fai-idx", str(fai_path)]
    await _run(cmd, cwd=str(output_dir))

    if not fai_path.exists():
        raise FileNotFoundError(f"FASTA index file {fai_path.name} was not created")

    faidx = ReferenceIndex()
    await faidx.update(
        fai_path,
        build=ref.build,
        tool="samtools_faidx",
        reference_cid=ref.cid,
    )

    logger.info(faidx.to_dict())
    return faidx

Types

Alignment asset types for Stargazer.

spec: docs/architecture/types.md

Alignment dataclass

Bases: Asset

BAM/CRAM alignment file asset.

Carries reference_cid and r1_cid for provenance (PROV entity derivation).

Source code in src/stargazer/types/alignment.py
@dataclass
class Alignment(Asset):
    """BAM/CRAM alignment file asset.

    Carries reference_cid and r1_cid for provenance (PROV entity derivation).
    """

    _asset_key: ClassVar[str] = "alignment"
    sample_id: str = ""
    format: str = ""
    sorted: str = ""
    duplicates_marked: bool = False
    bqsr_applied: bool = False
    tool: str = ""
    reference_cid: str = ""
    r1_cid: str = ""

AlignmentIndex dataclass

Bases: Asset

BAI/CRAI alignment index file asset.

Carries alignment_cid linking to the Alignment it indexes.

Source code in src/stargazer/types/alignment.py
@dataclass
class AlignmentIndex(Asset):
    """BAI/CRAI alignment index file asset.

    Carries alignment_cid linking to the Alignment it indexes.
    """

    _asset_key: ClassVar[str] = "alignment_index"
    sample_id: str = ""
    alignment_cid: str = ""

BQSRReport dataclass

Bases: Asset

BQSR recalibration table produced by GATK BaseRecalibrator.

Carries alignment_cid linking to the Alignment it was produced from.

Source code in src/stargazer/types/alignment.py
@dataclass
class BQSRReport(Asset):
    """BQSR recalibration table produced by GATK BaseRecalibrator.

    Carries alignment_cid linking to the Alignment it was produced from.
    """

    _asset_key: ClassVar[str] = "bqsr_report"
    sample_id: str = ""
    tool: str = ""
    alignment_cid: str = ""

DuplicateMetrics dataclass

Bases: Asset

Duplicate metrics text file produced by GATK MarkDuplicates.

Carries alignment_cid linking to the Alignment it was produced from.

Source code in src/stargazer/types/alignment.py
@dataclass
class DuplicateMetrics(Asset):
    """Duplicate metrics text file produced by GATK MarkDuplicates.

    Carries alignment_cid linking to the Alignment it was produced from.
    """

    _asset_key: ClassVar[str] = "duplicate_metrics"
    sample_id: str = ""
    tool: str = ""
    alignment_cid: str = ""

Asset base dataclass for Stargazer.

spec: docs/architecture/types.md

Asset dataclass

Base class for all typed file assets in Stargazer.

Attributes:

Name Type Description
cid str

Content identifier (CID) for the stored file

path Path | None

Local filesystem path (set after download or upload)

keyvalues dict[str, str]

Metadata key-value pairs for querying and routing

Subclasses declare typed field annotations directly:

@dataclass
class Alignment(Asset):
    _asset_key: ClassVar[str] = "alignment"
    sample_id: str = ""
    duplicates_marked: bool = False

__init_subclass__ auto-derives _field_types (non-str fields) and _field_defaults (all defaults) from the annotations. The _field_types and _field_defaults ClassVars on the base class are empty-dict defaults inherited by subclasses that declare no fields.

Source code in src/stargazer/types/asset.py
@dataclass
class Asset:
    """Base class for all typed file assets in Stargazer.

    Attributes:
        cid: Content identifier (CID) for the stored file
        path: Local filesystem path (set after download or upload)
        keyvalues: Metadata key-value pairs for querying and routing

    Subclasses declare typed field annotations directly:

        @dataclass
        class Alignment(Asset):
            _asset_key: ClassVar[str] = "alignment"
            sample_id: str = ""
            duplicates_marked: bool = False

    ``__init_subclass__`` auto-derives ``_field_types`` (non-str fields) and
    ``_field_defaults`` (all defaults) from the annotations. The ``_field_types``
    and ``_field_defaults`` ClassVars on the base class are empty-dict defaults
    inherited by subclasses that declare no fields.
    """

    _registry: ClassVar[dict[str, type["Asset"]]] = {}
    _field_types: ClassVar[dict[str, type]] = {}
    _field_defaults: ClassVar[dict[str, Any]] = {}
    _own_attrs: ClassVar[frozenset] = frozenset(("cid", "path", "keyvalues"))
    _asset_key: ClassVar[str] = ""

    cid: str = ""
    path: Path | None = None
    keyvalues: dict[str, str] = field(default_factory=dict)

    def __init_subclass__(cls, **kwargs):
        """Register subclass in the asset registry and derive field metadata."""
        super().__init_subclass__(**kwargs)
        ak = cls.__dict__.get("_asset_key", "")
        if ak:
            Asset._registry[ak] = cls

        field_types: dict[str, type] = {}
        field_defaults: dict[str, Any] = {}
        for name, annotation in cls.__dict__.get("__annotations__", {}).items():
            if name.startswith("_"):
                continue
            if typing.get_origin(annotation) is ClassVar:
                continue
            if annotation is not str:
                field_types[name] = annotation
            default = cls.__dict__.get(name, _MISSING)
            if default is not _MISSING:
                field_defaults[name] = default

        if field_types:
            cls._field_types = field_types
        if field_defaults:
            cls._field_defaults = field_defaults

    def __post_init__(self):
        """Seed the 'asset' keyvalue from _asset_key on construction."""
        if self._asset_key:
            self.keyvalues.setdefault("asset", self._asset_key)

    def __getattribute__(self, name: str) -> Any:
        """Read declared fields from keyvalues with type coercion; delegate everything else."""
        # Fast-path: internal/private attrs skip all keyvalues logic
        if name.startswith("_"):
            return object.__getattribute__(self, name)

        # Get the field registries without triggering recursion
        field_defaults = object.__getattribute__(self, "_field_defaults")
        field_types = object.__getattribute__(self, "_field_types")

        # Only intercept declared fields; let everything else through
        if name in field_defaults or name in field_types:
            try:
                kv = object.__getattribute__(self, "keyvalues")
            except AttributeError:
                return object.__getattribute__(self, name)
            val = kv.get(name)
            ftype = field_types.get(name)
            if val is None:
                if ftype is bool:
                    return field_defaults.get(name, False)
                return field_defaults.get(name)
            if ftype is bool:
                return val == "true"
            if ftype is int:
                return int(val)
            if ftype is list:
                return val.split(",") if val else None
            return val

        return object.__getattribute__(self, name)

    def __getattr__(self, name: str) -> Any:
        """Fall back to keyvalues lookup for undeclared attributes on base Asset."""
        # Fallback for undeclared keys on base Asset instances
        kv = self.__dict__.get("keyvalues", {})
        return kv.get(name)

    def __setattr__(self, name: str, value: Any) -> None:
        """Coerce and store declared fields into keyvalues; bypass for core attrs."""
        if name in self._own_attrs or name.startswith("_"):
            super().__setattr__(name, value)
            return
        # Enforce allowed keys — only on subclasses that declare _asset_key
        if self._asset_key:
            allowed = (
                frozenset(self._field_defaults)
                | frozenset(self._field_types)
                | {"asset"}
            )
            if name not in allowed:
                raise ValueError(
                    f"{type(self).__name__} does not allow keyvalue '{name}'. "
                    f"Allowed: {sorted(allowed)}"
                )
        # Coerce and store in keyvalues
        ftype = self._field_types.get(name)
        if ftype is bool:
            # Preserve already-serialized strings; coerce bool/other by truthiness
            if isinstance(value, str):
                self.keyvalues[name] = "true" if value == "true" else "false"
            else:
                self.keyvalues[name] = "true" if value else "false"
        elif isinstance(value, bool):
            self.keyvalues[name] = "true" if value else "false"
        elif ftype is list or isinstance(value, list):
            if value is None:
                self.keyvalues[name] = ""
            elif isinstance(value, list):
                self.keyvalues[name] = ",".join(value)
            else:
                self.keyvalues[name] = str(value)
        else:
            self.keyvalues[name] = str(value)

    async def fetch(self) -> None:
        """Download this asset and all its companions from storage.

        Downloads the asset itself, then queries storage for any assets linked
        via ``{_asset_key}_cid`` to auto-download companions (e.g. indices,
        mate reads).
        """
        import stargazer.utils.storage as _storage

        await _storage.default_client.download(self)

        if self._asset_key and self.cid:
            companions = await assemble(**{f"{self._asset_key}_cid": self.cid})
            for a in companions:
                await _storage.default_client.download(a)

    async def update(self, path: Path, **kwargs) -> None:
        """Upload file and set cid. Shared by all asset types.
        """
        from stargazer.utils.storage import default_client

        for key, value in kwargs.items():
            if value is not None:
                setattr(self, key, value)
        self.path = path
        await default_client.upload(self)

    def to_dict(self) -> dict:
        """Serialize to a JSON-friendly dict.
        """
        return {
            "cid": self.cid,
            "path": str(self.path) if self.path else None,
            "keyvalues": self.keyvalues,
        }

    @classmethod
    def from_dict(cls, data: dict) -> Self:
        """Reconstruct from a serialized dict.
        """
        kv = data.get("keyvalues", {})
        if cls._asset_key:
            # Subclass: unpack declared fields as kwargs; keyvalues rebuilt by __setattr__
            declared = set(cls._field_defaults) | set(cls._field_types)
            field_kwargs = {k: v for k, v in kv.items() if k in declared}
            return cls(
                cid=data.get("cid", ""),
                path=Path(data["path"]) if data.get("path") else None,
                **field_kwargs,
            )
        # Base Asset: pass keyvalues directly
        return cls(
            cid=data.get("cid", ""),
            path=Path(data["path"]) if data.get("path") else None,
            keyvalues=dict(kv),
        )

__getattr__(name)

Fall back to keyvalues lookup for undeclared attributes on base Asset.

Source code in src/stargazer/types/asset.py
def __getattr__(self, name: str) -> Any:
    """Fall back to keyvalues lookup for undeclared attributes on base Asset."""
    # Fallback for undeclared keys on base Asset instances
    kv = self.__dict__.get("keyvalues", {})
    return kv.get(name)

__getattribute__(name)

Read declared fields from keyvalues with type coercion; delegate everything else.

Source code in src/stargazer/types/asset.py
def __getattribute__(self, name: str) -> Any:
    """Read declared fields from keyvalues with type coercion; delegate everything else."""
    # Fast-path: internal/private attrs skip all keyvalues logic
    if name.startswith("_"):
        return object.__getattribute__(self, name)

    # Get the field registries without triggering recursion
    field_defaults = object.__getattribute__(self, "_field_defaults")
    field_types = object.__getattribute__(self, "_field_types")

    # Only intercept declared fields; let everything else through
    if name in field_defaults or name in field_types:
        try:
            kv = object.__getattribute__(self, "keyvalues")
        except AttributeError:
            return object.__getattribute__(self, name)
        val = kv.get(name)
        ftype = field_types.get(name)
        if val is None:
            if ftype is bool:
                return field_defaults.get(name, False)
            return field_defaults.get(name)
        if ftype is bool:
            return val == "true"
        if ftype is int:
            return int(val)
        if ftype is list:
            return val.split(",") if val else None
        return val

    return object.__getattribute__(self, name)

__init_subclass__(**kwargs)

Register subclass in the asset registry and derive field metadata.

Source code in src/stargazer/types/asset.py
def __init_subclass__(cls, **kwargs):
    """Register subclass in the asset registry and derive field metadata."""
    super().__init_subclass__(**kwargs)
    ak = cls.__dict__.get("_asset_key", "")
    if ak:
        Asset._registry[ak] = cls

    field_types: dict[str, type] = {}
    field_defaults: dict[str, Any] = {}
    for name, annotation in cls.__dict__.get("__annotations__", {}).items():
        if name.startswith("_"):
            continue
        if typing.get_origin(annotation) is ClassVar:
            continue
        if annotation is not str:
            field_types[name] = annotation
        default = cls.__dict__.get(name, _MISSING)
        if default is not _MISSING:
            field_defaults[name] = default

    if field_types:
        cls._field_types = field_types
    if field_defaults:
        cls._field_defaults = field_defaults

__post_init__()

Seed the 'asset' keyvalue from _asset_key on construction.

Source code in src/stargazer/types/asset.py
def __post_init__(self):
    """Seed the 'asset' keyvalue from _asset_key on construction."""
    if self._asset_key:
        self.keyvalues.setdefault("asset", self._asset_key)

__setattr__(name, value)

Coerce and store declared fields into keyvalues; bypass for core attrs.

Source code in src/stargazer/types/asset.py
def __setattr__(self, name: str, value: Any) -> None:
    """Coerce and store declared fields into keyvalues; bypass for core attrs."""
    if name in self._own_attrs or name.startswith("_"):
        super().__setattr__(name, value)
        return
    # Enforce allowed keys — only on subclasses that declare _asset_key
    if self._asset_key:
        allowed = (
            frozenset(self._field_defaults)
            | frozenset(self._field_types)
            | {"asset"}
        )
        if name not in allowed:
            raise ValueError(
                f"{type(self).__name__} does not allow keyvalue '{name}'. "
                f"Allowed: {sorted(allowed)}"
            )
    # Coerce and store in keyvalues
    ftype = self._field_types.get(name)
    if ftype is bool:
        # Preserve already-serialized strings; coerce bool/other by truthiness
        if isinstance(value, str):
            self.keyvalues[name] = "true" if value == "true" else "false"
        else:
            self.keyvalues[name] = "true" if value else "false"
    elif isinstance(value, bool):
        self.keyvalues[name] = "true" if value else "false"
    elif ftype is list or isinstance(value, list):
        if value is None:
            self.keyvalues[name] = ""
        elif isinstance(value, list):
            self.keyvalues[name] = ",".join(value)
        else:
            self.keyvalues[name] = str(value)
    else:
        self.keyvalues[name] = str(value)

fetch() async

Download this asset and all its companions from storage.

Downloads the asset itself, then queries storage for any assets linked via {_asset_key}_cid to auto-download companions (e.g. indices, mate reads).

Source code in src/stargazer/types/asset.py
async def fetch(self) -> None:
    """Download this asset and all its companions from storage.

    Downloads the asset itself, then queries storage for any assets linked
    via ``{_asset_key}_cid`` to auto-download companions (e.g. indices,
    mate reads).
    """
    import stargazer.utils.storage as _storage

    await _storage.default_client.download(self)

    if self._asset_key and self.cid:
        companions = await assemble(**{f"{self._asset_key}_cid": self.cid})
        for a in companions:
            await _storage.default_client.download(a)

from_dict(data) classmethod

Reconstruct from a serialized dict.

Source code in src/stargazer/types/asset.py
@classmethod
def from_dict(cls, data: dict) -> Self:
    """Reconstruct from a serialized dict.
    """
    kv = data.get("keyvalues", {})
    if cls._asset_key:
        # Subclass: unpack declared fields as kwargs; keyvalues rebuilt by __setattr__
        declared = set(cls._field_defaults) | set(cls._field_types)
        field_kwargs = {k: v for k, v in kv.items() if k in declared}
        return cls(
            cid=data.get("cid", ""),
            path=Path(data["path"]) if data.get("path") else None,
            **field_kwargs,
        )
    # Base Asset: pass keyvalues directly
    return cls(
        cid=data.get("cid", ""),
        path=Path(data["path"]) if data.get("path") else None,
        keyvalues=dict(kv),
    )

to_dict()

Serialize to a JSON-friendly dict.

Source code in src/stargazer/types/asset.py
def to_dict(self) -> dict:
    """Serialize to a JSON-friendly dict.
    """
    return {
        "cid": self.cid,
        "path": str(self.path) if self.path else None,
        "keyvalues": self.keyvalues,
    }

update(path, **kwargs) async

Upload file and set cid. Shared by all asset types.

Source code in src/stargazer/types/asset.py
async def update(self, path: Path, **kwargs) -> None:
    """Upload file and set cid. Shared by all asset types.
    """
    from stargazer.utils.storage import default_client

    for key, value in kwargs.items():
        if value is not None:
            setattr(self, key, value)
    self.path = path
    await default_client.upload(self)

assemble(**filters) async

Query storage by keyvalue filters and return specialized assets.

The asset filter key accepts a string or list of strings to narrow by asset type. Other filters are passed through as keyvalue matchers.

Parameters:

Name Type Description Default
**filters Any

Keyvalue filters. Values may be scalars or lists (cartesian product).

{}

Returns:

Type Description
list[Asset]

Flat list of specialized Asset subclass instances.

Examples:

assets = await assemble(build="GRCh38", asset="reference") ref = next(a for a in assets if isinstance(a, Reference))

assets = await assemble(sample_id="NA12878", asset=["r1", "r2"]) r1 = next(a for a in assets if isinstance(a, R1))

Source code in src/stargazer/types/asset.py
async def assemble(**filters: Any) -> list["Asset"]:
    """Query storage by keyvalue filters and return specialized assets.

    The ``asset`` filter key accepts a string or list of strings to narrow
    by asset type. Other filters are passed through as keyvalue matchers.

    Args:
        **filters: Keyvalue filters. Values may be scalars or lists
                   (cartesian product).

    Returns:
        Flat list of specialized Asset subclass instances.

    Examples:
        assets = await assemble(build="GRCh38", asset="reference")
        ref = next(a for a in assets if isinstance(a, Reference))

        assets = await assemble(sample_id="NA12878", asset=["r1", "r2"])
        r1 = next(a for a in assets if isinstance(a, R1))
    """
    import stargazer.utils.storage as _storage
    from stargazer.types import specialize
    from stargazer.utils.query import generate_query_combinations

    query_combinations = generate_query_combinations(base_query={}, filters=filters)

    seen: dict[str, Asset] = {}
    for query in query_combinations:
        for raw in await _storage.default_client.query(query):
            seen[raw.cid] = raw

    return [specialize(a) for a in seen.values()]

Read file asset types for Stargazer.

spec: docs/architecture/types.md

R1 dataclass

Bases: Asset

R1 (forward) FASTQ read file asset.

Carries mate_cid pointing to the paired R2 asset's CID (None for single-end).

Source code in src/stargazer/types/reads.py
@dataclass
class R1(Asset):
    """R1 (forward) FASTQ read file asset.

    Carries mate_cid pointing to the paired R2 asset's CID (None for single-end).
    """

    _asset_key: ClassVar[str] = "r1"
    sample_id: str = ""
    mate_cid: str = ""
    sequencing_platform: str = ""

R2 dataclass

Bases: Asset

R2 (reverse) FASTQ read file asset.

Carries mate_cid pointing to the paired R1 asset's CID (None for single-end).

Source code in src/stargazer/types/reads.py
@dataclass
class R2(Asset):
    """R2 (reverse) FASTQ read file asset.

    Carries mate_cid pointing to the paired R1 asset's CID (None for single-end).
    """

    _asset_key: ClassVar[str] = "r2"
    sample_id: str = ""
    mate_cid: str = ""
    sequencing_platform: str = ""

Reference genome asset types for Stargazer.

spec: docs/architecture/types.md

AlignerIndex dataclass

Bases: Asset

Aligner index file asset (one file per index file for multi-file indices).

Source code in src/stargazer/types/reference.py
@dataclass
class AlignerIndex(Asset):
    """Aligner index file asset (one file per index file for multi-file indices).
    """

    _asset_key: ClassVar[str] = "aligner_index"
    build: str = ""
    aligner: str = ""
    reference_cid: str = ""

Reference dataclass

Bases: Asset

Reference FASTA file asset.

Source code in src/stargazer/types/reference.py
@dataclass
class Reference(Asset):
    """Reference FASTA file asset.
    """

    _asset_key: ClassVar[str] = "reference"
    build: str = ""

    @property
    def contigs(self) -> list[str]:
        """Read contig names from the companion .fai index.

        Requires fetch() to have been called first so the ReferenceIndex
        companion is downloaded alongside this reference.
        """
        if self.path is None:
            raise ValueError("Reference has no local path — call fetch() first")
        fai_path = self.path.parent / (self.path.name + ".fai")
        if not fai_path.exists():
            raise FileNotFoundError(
                f"Reference index not found at {fai_path}. "
                f"Run samtools_faidx first, then fetch() to download companions."
            )
        contigs = []
        with open(fai_path) as f:
            for line in f:
                name = line.split("\t", 1)[0].strip()
                if name:
                    contigs.append(name)
        return contigs

contigs property

Read contig names from the companion .fai index.

Requires fetch() to have been called first so the ReferenceIndex companion is downloaded alongside this reference.

ReferenceIndex dataclass

Bases: Asset

FASTA index (.fai) file asset.

Carries reference_cid linking back to the Reference it was built from.

Source code in src/stargazer/types/reference.py
@dataclass
class ReferenceIndex(Asset):
    """FASTA index (.fai) file asset.

    Carries reference_cid linking back to the Reference it was built from.
    """

    _asset_key: ClassVar[str] = "reference_index"
    build: str = ""
    tool: str = ""
    reference_cid: str = ""

SequenceDict dataclass

Bases: Asset

Sequence dictionary (.dict) file asset.

Source code in src/stargazer/types/reference.py
@dataclass
class SequenceDict(Asset):
    """Sequence dictionary (.dict) file asset.
    """

    _asset_key: ClassVar[str] = "sequence_dict"
    build: str = ""
    tool: str = ""
    reference_cid: str = ""

Variant call asset types for Stargazer.

spec: docs/architecture/types.md

KnownSites dataclass

Bases: Asset

Known variant sites VCF used for BQSR.

Standalone asset — carries build and source fields, no container needed.

Source code in src/stargazer/types/variants.py
@dataclass
class KnownSites(Asset):
    """Known variant sites VCF used for BQSR.

    Standalone asset — carries build and source fields, no container needed.
    """

    _asset_key: ClassVar[str] = "known_sites"
    build: str = ""
    resource_name: str = ""
    known: str = "false"
    training: str = "false"
    truth: str = "false"
    prior: str = "10"
    vqsr_mode: str = ""

KnownSitesIndex dataclass

Bases: Asset

VCF index (.idx) file for a KnownSites asset.

Carries known_sites_cid linking to the KnownSites VCF it indexes. Fetched automatically alongside the VCF via Asset.fetch().

Source code in src/stargazer/types/variants.py
@dataclass
class KnownSitesIndex(Asset):
    """VCF index (.idx) file for a KnownSites asset.

    Carries known_sites_cid linking to the KnownSites VCF it indexes.
    Fetched automatically alongside the VCF via Asset.fetch().
    """

    _asset_key: ClassVar[str] = "known_sites_index"
    known_sites_cid: str = ""

VQSRModel dataclass

Bases: Asset

VQSR recalibration model (.recal file + tranches path).

Produced by VariantRecalibrator. The recal file is the primary path; the companion tranches file path is stored in keyvalues["tranches_path"].

Source code in src/stargazer/types/variants.py
@dataclass
class VQSRModel(Asset):
    """VQSR recalibration model (.recal file + tranches path).

    Produced by VariantRecalibrator. The recal file is the primary path;
    the companion tranches file path is stored in keyvalues["tranches_path"].
    """

    _asset_key: ClassVar[str] = "vqsr_model"
    sample_id: str = ""
    mode: str = "SNP"
    tranches_path: str = ""
    build: str = ""
    variants_cid: str = ""

Variants dataclass

Bases: Asset

VCF/GVCF variant call file asset.

Source code in src/stargazer/types/variants.py
@dataclass
class Variants(Asset):
    """VCF/GVCF variant call file asset.
    """

    _asset_key: ClassVar[str] = "variants"
    sample_id: str = ""
    caller: str = ""
    variant_type: str = ""
    build: str = ""
    vqsr_mode: str = ""
    sample_count: int = 0
    source_samples: list = None

VariantsIndex dataclass

Bases: Asset

VCF index (.tbi) file asset.

Carries variants_cid linking to the Variants file it indexes.

Source code in src/stargazer/types/variants.py
@dataclass
class VariantsIndex(Asset):
    """VCF index (.tbi) file asset.

    Carries variants_cid linking to the Variants file it indexes.
    """

    _asset_key: ClassVar[str] = "variants_index"
    sample_id: str = ""
    variants_cid: str = ""

Utils

Local filesystem storage client for Stargazer.

Stores files locally with TinyDB metadata indexing. No network access required.

spec: docs/architecture/modes.md

LocalStorageClient

Local filesystem storage client.

Stores files in a local directory and indexes metadata in TinyDB. No network access or API credentials required.

Usage

client = LocalStorageClient() comp = Asset(path=Path("data.bam"), keyvalues={"type": "alignment"}) await client.upload(comp) files = await client.query({"type": "alignment"}) await client.download(comp)

Source code in src/stargazer/utils/local_storage.py
class LocalStorageClient:
    """
    Local filesystem storage client.

    Stores files in a local directory and indexes metadata in TinyDB.
    No network access or API credentials required.

    Usage:
        client = LocalStorageClient()
        comp = Asset(path=Path("data.bam"), keyvalues={"type": "alignment"})
        await client.upload(comp)
        files = await client.query({"type": "alignment"})
        await client.download(comp)
    """

    def __init__(
        self,
        local_dir: Optional[Path] = None,
    ):
        """
        Initialize local storage client.

        Args:
            local_dir: Local directory for file storage (defaults to STARGAZER_LOCAL env var
                       or ~/.stargazer/local)
        """
        self.local_dir = local_dir or Path(
            os.environ.get("STARGAZER_LOCAL", str(Path.home() / ".stargazer" / "local"))
        )
        self.local_dir.mkdir(parents=True, exist_ok=True)

        # TinyDB for local metadata storage (lazy initialized)
        self.local_db_path = self.local_dir / "stargazer_local.json"
        self._db: Optional[TinyDB] = None
        self._db_mtime: float = 0.0

    @property
    def db(self) -> TinyDB:
        """Get TinyDB instance for local metadata storage (lazy initialized).

        Re-opens if the DB file has been deleted or modified externally,
        keeping _last_id in sync when other processes write to the same file.
        """
        mtime = (
            self.local_db_path.stat().st_mtime if self.local_db_path.exists() else 0.0
        )
        if self._db is None or mtime != self._db_mtime:
            if self._db is not None:
                self._db.close()
            self._db = TinyDB(self.local_db_path)
            self._db_mtime = mtime
        return self._db

    async def upload(self, component: Asset) -> None:
        """
        Copy a file to local storage, index metadata in TinyDB, and set component.cid.

        Args:
            component: Asset with path and keyvalues set
        """
        path = component.path
        if path is None:
            raise ValueError("component.path must be set before uploading")

        # Generate MD5 hash of file content
        md5_hash = hashlib.md5(path.read_bytes()).hexdigest()
        cid = f"local_{md5_hash}"

        # Copy to local dir using original filename
        local_path = self.local_dir / path.name
        local_path.parent.mkdir(parents=True, exist_ok=True)
        if path.resolve() != local_path.resolve():
            shutil.copy2(path, local_path)

        # Upsert metadata in TinyDB (avoid duplicates on re-upload)
        now = datetime.now(timezone.utc)
        File = Query()
        self.db.upsert(
            {
                "cid": cid,
                "keyvalues": component.keyvalues,
                "created_at": now.isoformat(),
                "rel_path": path.name,
            },
            File.cid == cid,
        )

        component.cid = cid

    async def download(self, component: Asset, dest: Optional[Path] = None) -> None:
        """
        Resolve a local file path and set component.path. For local storage, files are
        already on disk.

        Args:
            component: Asset with cid set
            dest: Optional destination path (copies file there)
        """
        # Skip if path is already set and file exists
        if component.path and component.path.exists():
            return

        cid = component.cid
        # Check local dir first (cache key)
        cache_key = cid.replace("/", "_")
        cache_path = self.local_dir / cache_key

        if cache_path.exists():
            if dest:
                dest.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy(cache_path, dest)
                component.path = dest
            else:
                component.path = cache_path
            return

        # Look up in TinyDB for path resolution
        if cid.startswith("local_"):
            File = Query()
            record = self.db.get(File.cid == cid)
            if record:
                local_path = self.local_dir / record["rel_path"]
                if local_path.exists():
                    if dest:
                        dest.parent.mkdir(parents=True, exist_ok=True)
                        shutil.copy(local_path, dest)
                        component.path = dest
                    else:
                        component.path = local_path
                    return
            raise FileNotFoundError(
                f"Local file {cid} not found in local directory or database."
            )

        raise FileNotFoundError(
            f"File {cid} not found in local storage. "
            "Use a PinataClient for remote files."
        )

    async def query(self, keyvalues: dict[str, str]) -> list[Asset]:
        """
        Query files by keyvalue metadata from TinyDB.

        Args:
            keyvalues: Metadata key-value pairs to filter by

        Returns:
            List of matching Asset objects
        """
        results = []
        for record in self.db.all():
            record_kv = record.get("keyvalues", {})
            if all(record_kv.get(k) == v for k, v in keyvalues.items()):
                cid = record["cid"]
                results.append(
                    Asset(
                        cid=cid,
                        path=self.local_dir / record["rel_path"],
                        keyvalues=record.get("keyvalues", {}),
                    )
                )
        return results

    async def delete(self, component: Asset) -> None:
        """
        Delete a file from local storage and TinyDB.

        Args:
            component: Asset with cid set
        """
        File = Query()
        record = self.db.get(File.cid == component.cid)
        if record:
            local_path = self.local_dir / record["rel_path"]
            if local_path.exists():
                local_path.unlink()
            self.db.remove(File.cid == component.cid)

db property

Get TinyDB instance for local metadata storage (lazy initialized).

Re-opens if the DB file has been deleted or modified externally, keeping _last_id in sync when other processes write to the same file.

__init__(local_dir=None)

Initialize local storage client.

Parameters:

Name Type Description Default
local_dir Optional[Path]

Local directory for file storage (defaults to STARGAZER_LOCAL env var or ~/.stargazer/local)

None
Source code in src/stargazer/utils/local_storage.py
def __init__(
    self,
    local_dir: Optional[Path] = None,
):
    """
    Initialize local storage client.

    Args:
        local_dir: Local directory for file storage (defaults to STARGAZER_LOCAL env var
                   or ~/.stargazer/local)
    """
    self.local_dir = local_dir or Path(
        os.environ.get("STARGAZER_LOCAL", str(Path.home() / ".stargazer" / "local"))
    )
    self.local_dir.mkdir(parents=True, exist_ok=True)

    # TinyDB for local metadata storage (lazy initialized)
    self.local_db_path = self.local_dir / "stargazer_local.json"
    self._db: Optional[TinyDB] = None
    self._db_mtime: float = 0.0

delete(component) async

Delete a file from local storage and TinyDB.

Parameters:

Name Type Description Default
component Asset

Asset with cid set

required
Source code in src/stargazer/utils/local_storage.py
async def delete(self, component: Asset) -> None:
    """
    Delete a file from local storage and TinyDB.

    Args:
        component: Asset with cid set
    """
    File = Query()
    record = self.db.get(File.cid == component.cid)
    if record:
        local_path = self.local_dir / record["rel_path"]
        if local_path.exists():
            local_path.unlink()
        self.db.remove(File.cid == component.cid)

download(component, dest=None) async

Resolve a local file path and set component.path. For local storage, files are already on disk.

Parameters:

Name Type Description Default
component Asset

Asset with cid set

required
dest Optional[Path]

Optional destination path (copies file there)

None
Source code in src/stargazer/utils/local_storage.py
async def download(self, component: Asset, dest: Optional[Path] = None) -> None:
    """
    Resolve a local file path and set component.path. For local storage, files are
    already on disk.

    Args:
        component: Asset with cid set
        dest: Optional destination path (copies file there)
    """
    # Skip if path is already set and file exists
    if component.path and component.path.exists():
        return

    cid = component.cid
    # Check local dir first (cache key)
    cache_key = cid.replace("/", "_")
    cache_path = self.local_dir / cache_key

    if cache_path.exists():
        if dest:
            dest.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy(cache_path, dest)
            component.path = dest
        else:
            component.path = cache_path
        return

    # Look up in TinyDB for path resolution
    if cid.startswith("local_"):
        File = Query()
        record = self.db.get(File.cid == cid)
        if record:
            local_path = self.local_dir / record["rel_path"]
            if local_path.exists():
                if dest:
                    dest.parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy(local_path, dest)
                    component.path = dest
                else:
                    component.path = local_path
                return
        raise FileNotFoundError(
            f"Local file {cid} not found in local directory or database."
        )

    raise FileNotFoundError(
        f"File {cid} not found in local storage. "
        "Use a PinataClient for remote files."
    )

query(keyvalues) async

Query files by keyvalue metadata from TinyDB.

Parameters:

Name Type Description Default
keyvalues dict[str, str]

Metadata key-value pairs to filter by

required

Returns:

Type Description
list[Asset]

List of matching Asset objects

Source code in src/stargazer/utils/local_storage.py
async def query(self, keyvalues: dict[str, str]) -> list[Asset]:
    """
    Query files by keyvalue metadata from TinyDB.

    Args:
        keyvalues: Metadata key-value pairs to filter by

    Returns:
        List of matching Asset objects
    """
    results = []
    for record in self.db.all():
        record_kv = record.get("keyvalues", {})
        if all(record_kv.get(k) == v for k, v in keyvalues.items()):
            cid = record["cid"]
            results.append(
                Asset(
                    cid=cid,
                    path=self.local_dir / record["rel_path"],
                    keyvalues=record.get("keyvalues", {}),
                )
            )
    return results

upload(component) async

Copy a file to local storage, index metadata in TinyDB, and set component.cid.

Parameters:

Name Type Description Default
component Asset

Asset with path and keyvalues set

required
Source code in src/stargazer/utils/local_storage.py
async def upload(self, component: Asset) -> None:
    """
    Copy a file to local storage, index metadata in TinyDB, and set component.cid.

    Args:
        component: Asset with path and keyvalues set
    """
    path = component.path
    if path is None:
        raise ValueError("component.path must be set before uploading")

    # Generate MD5 hash of file content
    md5_hash = hashlib.md5(path.read_bytes()).hexdigest()
    cid = f"local_{md5_hash}"

    # Copy to local dir using original filename
    local_path = self.local_dir / path.name
    local_path.parent.mkdir(parents=True, exist_ok=True)
    if path.resolve() != local_path.resolve():
        shutil.copy2(path, local_path)

    # Upsert metadata in TinyDB (avoid duplicates on re-upload)
    now = datetime.now(timezone.utc)
    File = Query()
    self.db.upsert(
        {
            "cid": cid,
            "keyvalues": component.keyvalues,
            "created_at": now.isoformat(),
            "rel_path": path.name,
        },
        File.cid == cid,
    )

    component.cid = cid

Pinata API v3 client for IPFS file storage.

Provides async interface for: - Uploading files with keyvalue metadata - Downloading files via IPFS gateway with local caching - Querying files by keyvalue pairs - Deleting files

spec: docs/architecture/modes.md

PinataClient

Async client for Pinata API v3.

Used when STARGAZER_MODE=local and PINATA_JWT is available. Handles uploads to IPFS via Pinata, downloads via IPFS gateways, and metadata queries against the Pinata API.

Usage

client = PinataClient()

Upload with metadata

comp = Asset(path=Path("data.bam"), keyvalues={"type": "alignment"}) await client.upload(comp) # sets comp.cid

Query by keyvalues

files = await client.query({"type": "alignment", "sample": "NA12878"})

Download

await client.download(comp) # sets comp.path

Delete file

await client.delete(comp)

Source code in src/stargazer/utils/pinata.py
class PinataClient:
    """
    Async client for Pinata API v3.

    Used when STARGAZER_MODE=local and PINATA_JWT is available.
    Handles uploads to IPFS via Pinata, downloads via IPFS gateways,
    and metadata queries against the Pinata API.

    Usage:
        client = PinataClient()

        # Upload with metadata
        comp = Asset(path=Path("data.bam"), keyvalues={"type": "alignment"})
        await client.upload(comp)  # sets comp.cid

        # Query by keyvalues
        files = await client.query({"type": "alignment", "sample": "NA12878"})

        # Download
        await client.download(comp)  # sets comp.path

        # Delete file
        await client.delete(comp)
    """

    API_BASE = "https://api.pinata.cloud/v3"
    UPLOAD_BASE = "https://uploads.pinata.cloud/v3"

    def __init__(
        self,
        jwt: Optional[str] = None,
        gateway: Optional[str] = None,
        local_dir: Optional[Path] = None,
    ):
        """
        Initialize Pinata client.

        Args:
            jwt: Pinata JWT token (defaults to PINATA_JWT env var)
            gateway: IPFS gateway URL (defaults to gateway.pinata.cloud)
            local_dir: Local directory for download caching
        """
        self._jwt = jwt or os.environ.get("PINATA_JWT")
        self.gateway = gateway or os.environ.get(
            "PINATA_GATEWAY", "https://gateway.pinata.cloud"
        )
        self.local_dir = local_dir or Path(
            os.environ.get("STARGAZER_LOCAL", str(Path.home() / ".stargazer" / "local"))
        )
        self.local_dir.mkdir(parents=True, exist_ok=True)

    @property
    def jwt(self) -> str:
        """Get JWT token, raising error if not set.
        """
        if not self._jwt:
            raise ValueError(
                "PINATA_JWT not set. Provide jwt= argument or "
                "set PINATA_JWT environment variable."
            )
        return self._jwt

    def _headers(self) -> dict:
        """Get authorization headers.
        """
        return {"Authorization": f"Bearer {self.jwt}"}

    async def _get_gateway_domain(self) -> str:
        """Fetch the dedicated gateway domain from Pinata API.
        """
        if not hasattr(self, "_gateway_domain") or self._gateway_domain is None:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    f"{self.API_BASE}/ipfs/gateways", headers=self._headers()
                ) as response:
                    response.raise_for_status()
                    data = await response.json()
                    rows = data["data"]["rows"]
                    if not rows:
                        raise ValueError(
                            "No gateway configured in Pinata account. "
                            "Create one at https://app.pinata.cloud/gateway"
                        )
                    domain = rows[0]["domain"]
                    self._gateway_domain = f"https://{domain}.mypinata.cloud"
        return self._gateway_domain

    async def _get_signed_url(self, cid: str, expires: int = 300) -> str:
        """Get a signed download URL for a private file.
        """
        import time

        gateway = await self._get_gateway_domain()
        payload = {
            "url": f"{gateway}/files/{cid}",
            "expires": expires,
            "date": int(time.time()),
            "method": "GET",
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.API_BASE}/files/sign",
                headers=self._headers(),
                json=payload,
            ) as response:
                response.raise_for_status()
                data = await response.json()
                return data["data"]

    async def upload(self, component: Asset) -> None:
        """
        Upload a file to IPFS via Pinata. Sets component.cid.

        Args:
            component: Asset with path and keyvalues set
        """
        path = component.path
        if path is None:
            raise ValueError("component.path must be set before uploading")

        url = f"{self.UPLOAD_BASE}/files"

        async with aiohttp.ClientSession() as session:
            data = aiohttp.FormData()
            data.add_field("file", open(path, "rb"), filename=path.name)
            data.add_field("name", path.name)
            data.add_field("network", "private")

            if component.keyvalues:
                data.add_field("keyvalues", json.dumps(component.keyvalues))

            async with session.post(
                url, headers=self._headers(), data=data
            ) as response:
                response.raise_for_status()
                result = await response.json()
                data_obj = result.get("data", result)
                component.cid = data_obj["cid"]

    async def download(self, component: Asset, dest: Optional[Path] = None) -> None:
        """
        Download a file from IPFS and set component.path.
        Uses local cache to avoid re-downloading.

        Args:
            component: Asset with cid set
            dest: Optional destination path (otherwise uses cache)
        """
        # Skip download if path is already set and file exists
        if component.path and component.path.exists():
            return

        cid = component.cid

        # Check local dir first
        cache_key = cid.replace("/", "_")
        cache_path = self.local_dir / cache_key

        if cache_path.exists():
            if dest:
                dest.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy(cache_path, dest)
                component.path = dest
            else:
                component.path = cache_path
            return

        # Private files: get a signed URL via Pinata API
        download_url = await self._get_signed_url(cid)

        async with aiohttp.ClientSession() as session:
            async with session.get(download_url) as response:
                response.raise_for_status()

                cache_path.parent.mkdir(parents=True, exist_ok=True)
                async with aiofiles.open(cache_path, "wb") as f:
                    async for chunk in response.content.iter_chunked(8192):
                        await f.write(chunk)

        if dest:
            dest.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy(cache_path, dest)
            component.path = dest
        else:
            component.path = cache_path

    async def query(self, keyvalues: dict[str, str]) -> list[Asset]:
        """
        Query files by keyvalue metadata from Pinata API.
        Paginates through all results automatically.

        Args:
            keyvalues: Metadata key-value pairs to filter by

        Returns:
            List of matching Asset objects
        """
        url = f"{self.API_BASE}/files/private"
        params: dict = {"pageLimit": 1000, "order": "DESC"}

        # Add metadata filters using the correct format: metadata[key]=value
        if keyvalues:
            for key, value in keyvalues.items():
                params[f"metadata[{key}]"] = value

        results: list[Asset] = []
        async with aiohttp.ClientSession() as session:
            while True:
                async with session.get(
                    url, headers=self._headers(), params=params
                ) as response:
                    response.raise_for_status()
                    data = await response.json()

                    for f in data.get("data", {}).get("files", []):
                        results.append(
                            Asset(
                                cid=f["cid"],
                                keyvalues=f.get("keyvalues", {}),
                            )
                        )

                    next_token = data.get("data", {}).get("next_page_token")
                    if not next_token:
                        break
                    params["pageToken"] = next_token

        return results

    async def delete(self, component: Asset) -> None:
        """
        Delete a file from Pinata by querying for its internal ID first.

        Args:
            component: Asset with cid set
        """
        url = f"{self.API_BASE}/files/private"
        params = {"cid": component.cid}

        async with aiohttp.ClientSession() as session:
            async with session.get(
                url, headers=self._headers(), params=params
            ) as response:
                response.raise_for_status()
                data = await response.json()
                files = data.get("data", {}).get("files", [])
                if not files:
                    return
                file_id = files[0]["id"]

            async with session.delete(
                f"{self.API_BASE}/files/private/{file_id}",
                headers=self._headers(),
            ) as response:
                response.raise_for_status()

jwt property

Get JWT token, raising error if not set.

__init__(jwt=None, gateway=None, local_dir=None)

Initialize Pinata client.

Parameters:

Name Type Description Default
jwt Optional[str]

Pinata JWT token (defaults to PINATA_JWT env var)

None
gateway Optional[str]

IPFS gateway URL (defaults to gateway.pinata.cloud)

None
local_dir Optional[Path]

Local directory for download caching

None
Source code in src/stargazer/utils/pinata.py
def __init__(
    self,
    jwt: Optional[str] = None,
    gateway: Optional[str] = None,
    local_dir: Optional[Path] = None,
):
    """
    Initialize Pinata client.

    Args:
        jwt: Pinata JWT token (defaults to PINATA_JWT env var)
        gateway: IPFS gateway URL (defaults to gateway.pinata.cloud)
        local_dir: Local directory for download caching
    """
    self._jwt = jwt or os.environ.get("PINATA_JWT")
    self.gateway = gateway or os.environ.get(
        "PINATA_GATEWAY", "https://gateway.pinata.cloud"
    )
    self.local_dir = local_dir or Path(
        os.environ.get("STARGAZER_LOCAL", str(Path.home() / ".stargazer" / "local"))
    )
    self.local_dir.mkdir(parents=True, exist_ok=True)

delete(component) async

Delete a file from Pinata by querying for its internal ID first.

Parameters:

Name Type Description Default
component Asset

Asset with cid set

required
Source code in src/stargazer/utils/pinata.py
async def delete(self, component: Asset) -> None:
    """
    Delete a file from Pinata by querying for its internal ID first.

    Args:
        component: Asset with cid set
    """
    url = f"{self.API_BASE}/files/private"
    params = {"cid": component.cid}

    async with aiohttp.ClientSession() as session:
        async with session.get(
            url, headers=self._headers(), params=params
        ) as response:
            response.raise_for_status()
            data = await response.json()
            files = data.get("data", {}).get("files", [])
            if not files:
                return
            file_id = files[0]["id"]

        async with session.delete(
            f"{self.API_BASE}/files/private/{file_id}",
            headers=self._headers(),
        ) as response:
            response.raise_for_status()

download(component, dest=None) async

Download a file from IPFS and set component.path. Uses local cache to avoid re-downloading.

Parameters:

Name Type Description Default
component Asset

Asset with cid set

required
dest Optional[Path]

Optional destination path (otherwise uses cache)

None
Source code in src/stargazer/utils/pinata.py
async def download(self, component: Asset, dest: Optional[Path] = None) -> None:
    """
    Download a file from IPFS and set component.path.
    Uses local cache to avoid re-downloading.

    Args:
        component: Asset with cid set
        dest: Optional destination path (otherwise uses cache)
    """
    # Skip download if path is already set and file exists
    if component.path and component.path.exists():
        return

    cid = component.cid

    # Check local dir first
    cache_key = cid.replace("/", "_")
    cache_path = self.local_dir / cache_key

    if cache_path.exists():
        if dest:
            dest.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy(cache_path, dest)
            component.path = dest
        else:
            component.path = cache_path
        return

    # Private files: get a signed URL via Pinata API
    download_url = await self._get_signed_url(cid)

    async with aiohttp.ClientSession() as session:
        async with session.get(download_url) as response:
            response.raise_for_status()

            cache_path.parent.mkdir(parents=True, exist_ok=True)
            async with aiofiles.open(cache_path, "wb") as f:
                async for chunk in response.content.iter_chunked(8192):
                    await f.write(chunk)

    if dest:
        dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy(cache_path, dest)
        component.path = dest
    else:
        component.path = cache_path

query(keyvalues) async

Query files by keyvalue metadata from Pinata API. Paginates through all results automatically.

Parameters:

Name Type Description Default
keyvalues dict[str, str]

Metadata key-value pairs to filter by

required

Returns:

Type Description
list[Asset]

List of matching Asset objects

Source code in src/stargazer/utils/pinata.py
async def query(self, keyvalues: dict[str, str]) -> list[Asset]:
    """
    Query files by keyvalue metadata from Pinata API.
    Paginates through all results automatically.

    Args:
        keyvalues: Metadata key-value pairs to filter by

    Returns:
        List of matching Asset objects
    """
    url = f"{self.API_BASE}/files/private"
    params: dict = {"pageLimit": 1000, "order": "DESC"}

    # Add metadata filters using the correct format: metadata[key]=value
    if keyvalues:
        for key, value in keyvalues.items():
            params[f"metadata[{key}]"] = value

    results: list[Asset] = []
    async with aiohttp.ClientSession() as session:
        while True:
            async with session.get(
                url, headers=self._headers(), params=params
            ) as response:
                response.raise_for_status()
                data = await response.json()

                for f in data.get("data", {}).get("files", []):
                    results.append(
                        Asset(
                            cid=f["cid"],
                            keyvalues=f.get("keyvalues", {}),
                        )
                    )

                next_token = data.get("data", {}).get("next_page_token")
                if not next_token:
                    break
                params["pageToken"] = next_token

    return results

upload(component) async

Upload a file to IPFS via Pinata. Sets component.cid.

Parameters:

Name Type Description Default
component Asset

Asset with path and keyvalues set

required
Source code in src/stargazer/utils/pinata.py
async def upload(self, component: Asset) -> None:
    """
    Upload a file to IPFS via Pinata. Sets component.cid.

    Args:
        component: Asset with path and keyvalues set
    """
    path = component.path
    if path is None:
        raise ValueError("component.path must be set before uploading")

    url = f"{self.UPLOAD_BASE}/files"

    async with aiohttp.ClientSession() as session:
        data = aiohttp.FormData()
        data.add_field("file", open(path, "rb"), filename=path.name)
        data.add_field("name", path.name)
        data.add_field("network", "private")

        if component.keyvalues:
            data.add_field("keyvalues", json.dumps(component.keyvalues))

        async with session.post(
            url, headers=self._headers(), data=data
        ) as response:
            response.raise_for_status()
            result = await response.json()
            data_obj = result.get("data", result)
            component.cid = data_obj["cid"]

Query generation utilities for Stargazer.

Utilities for generating metadata queries, including support for cartesian product queries across multiple dimensions.

spec: docs/architecture/types.md

generate_query_combinations(base_query, filters)

Generate query combinations from filters using cartesian product.

Takes a base query dict and filters dict, where filters can contain scalar values or lists. For any list-valued filter, generates all combinations using cartesian product, while preserving scalar filters and the base query in all combinations.

Parameters:

Name Type Description Default
base_query Dict[str, Any]

Base query dict to include in all combinations

required
filters Dict[str, Any]

Filter dict with scalar or list values

required

Returns:

Type Description
List[Dict[str, Any]]

List of query dicts representing all combinations

Example

base = {"type": "reference"} filters = {"build": "GRCh38", "tool": ["fasta", "bwa"]} generate_query_combinations(base, filters) [ {"type": "reference", "build": "GRCh38", "tool": "fasta"}, {"type": "reference", "build": "GRCh38", "tool": "bwa"} ]

base = {"type": "reference"} filters = {"build": ["GRCh38", "GRCh37"], "tool": ["fasta", "bwa"]} generate_query_combinations(base, filters) [ {"type": "reference", "build": "GRCh38", "tool": "fasta"}, {"type": "reference", "build": "GRCh38", "tool": "bwa"}, {"type": "reference", "build": "GRCh37", "tool": "fasta"}, {"type": "reference", "build": "GRCh37", "tool": "bwa"} ]

Source code in src/stargazer/utils/query.py
def generate_query_combinations(
    base_query: Dict[str, Any],
    filters: Dict[str, Any],
) -> List[Dict[str, Any]]:
    """
    Generate query combinations from filters using cartesian product.

    Takes a base query dict and filters dict, where filters can contain
    scalar values or lists. For any list-valued filter, generates all
    combinations using cartesian product, while preserving scalar filters
    and the base query in all combinations.

    Args:
        base_query: Base query dict to include in all combinations
        filters: Filter dict with scalar or list values

    Returns:
        List of query dicts representing all combinations

    Example:
        >>> base = {"type": "reference"}
        >>> filters = {"build": "GRCh38", "tool": ["fasta", "bwa"]}
        >>> generate_query_combinations(base, filters)
        [
            {"type": "reference", "build": "GRCh38", "tool": "fasta"},
            {"type": "reference", "build": "GRCh38", "tool": "bwa"}
        ]

        >>> base = {"type": "reference"}
        >>> filters = {"build": ["GRCh38", "GRCh37"], "tool": ["fasta", "bwa"]}
        >>> generate_query_combinations(base, filters)
        [
            {"type": "reference", "build": "GRCh38", "tool": "fasta"},
            {"type": "reference", "build": "GRCh38", "tool": "bwa"},
            {"type": "reference", "build": "GRCh37", "tool": "fasta"},
            {"type": "reference", "build": "GRCh37", "tool": "bwa"}
        ]
    """
    # Separate list-valued and scalar-valued filters
    list_filters = {}
    scalar_filters = {}

    for key, value in filters.items():
        if isinstance(value, list):
            list_filters[key] = value
        else:
            scalar_filters[key] = value

    # Generate cartesian product of list-valued filters
    if list_filters:
        # Get keys and values for cartesian product
        keys = list(list_filters.keys())
        value_lists = [list_filters[k] for k in keys]

        # Generate all combinations
        query_combinations = []
        for combo in product(*value_lists):
            query = {**base_query, **scalar_filters}
            query.update(dict(zip(keys, combo)))
            query_combinations.append(query)
    else:
        # No list filters, just one query
        query_combinations = [{**base_query, **scalar_filters}]

    return query_combinations

Storage abstraction for Stargazer.

Defines the StorageClient protocol, StargazerMode enum, and factory function for creating storage clients based on STARGAZER_MODE configuration.

Configuration

STARGAZER_MODE=local (default) -> local exec, local storage (or Pinata if JWT present) STARGAZER_MODE=cloud -> union exec, Pinata storage (PINATA_JWT required)

spec: docs/architecture/modes.md

StargazerMode

Bases: Enum

Execution and storage mode for Stargazer.

Source code in src/stargazer/utils/storage.py
class StargazerMode(Enum):
    """Execution and storage mode for Stargazer.
    """

    LOCAL = "local"
    CLOUD = "cloud"

StorageClient

Bases: Protocol

Protocol defining the storage client interface.

All storage backends (local, Pinata) implement this interface.

Source code in src/stargazer/utils/storage.py
class StorageClient(Protocol):
    """Protocol defining the storage client interface.

    All storage backends (local, Pinata) implement this interface.
    """

    local_dir: Path

    async def upload(self, component: Asset) -> None:
        """Upload a file from component.path; sets component.cid.
        """
        ...

    async def download(
        self,
        component: Asset,
        dest: Optional[Path] = None,
    ) -> None:
        """Download a file by cid; sets component.path.
        """
        ...

    async def query(
        self,
        keyvalues: dict[str, str],
    ) -> list[Asset]:
        """Query files by keyvalue metadata; returns Asset list.
        """
        ...

    async def delete(self, component: Asset) -> None:
        """Delete a file from storage.
        """
        ...

delete(component) async

Delete a file from storage.

Source code in src/stargazer/utils/storage.py
async def delete(self, component: Asset) -> None:
    """Delete a file from storage.
    """
    ...

download(component, dest=None) async

Download a file by cid; sets component.path.

Source code in src/stargazer/utils/storage.py
async def download(
    self,
    component: Asset,
    dest: Optional[Path] = None,
) -> None:
    """Download a file by cid; sets component.path.
    """
    ...

query(keyvalues) async

Query files by keyvalue metadata; returns Asset list.

Source code in src/stargazer/utils/storage.py
async def query(
    self,
    keyvalues: dict[str, str],
) -> list[Asset]:
    """Query files by keyvalue metadata; returns Asset list.
    """
    ...

upload(component) async

Upload a file from component.path; sets component.cid.

Source code in src/stargazer/utils/storage.py
async def upload(self, component: Asset) -> None:
    """Upload a file from component.path; sets component.cid.
    """
    ...

get_client()

Create a storage client based on STARGAZER_MODE and available credentials.

Resolution logic
  • STARGAZER_MODE=cloud -> PinataClient (PINATA_JWT required)
  • STARGAZER_MODE=local + PINATA_JWT -> PinataClient
  • STARGAZER_MODE=local (no JWT) -> LocalStorageClient

Returns:

Type Description
StorageClient

A StorageClient implementation

Source code in src/stargazer/utils/storage.py
def get_client() -> StorageClient:
    """Create a storage client based on STARGAZER_MODE and available credentials.

    Resolution logic:
        - STARGAZER_MODE=cloud -> PinataClient (PINATA_JWT required)
        - STARGAZER_MODE=local + PINATA_JWT -> PinataClient
        - STARGAZER_MODE=local (no JWT) -> LocalStorageClient

    Returns:
        A StorageClient implementation
    """
    mode = resolve_mode()
    pinata_jwt = os.environ.get("PINATA_JWT")

    if mode == StargazerMode.CLOUD:
        if not pinata_jwt:
            raise ValueError("PINATA_JWT is required when STARGAZER_MODE=cloud.")
        from stargazer.utils.pinata import PinataClient

        return PinataClient()

    # Local mode: upgrade to Pinata if JWT is available
    if pinata_jwt:
        from stargazer.utils.pinata import PinataClient

        return PinataClient()

    from stargazer.utils.local_storage import LocalStorageClient

    return LocalStorageClient()

resolve_mode()

Resolve the current Stargazer mode from STARGAZER_MODE env var.

Returns:

Type Description
StargazerMode

StargazerMode.LOCAL or StargazerMode.CLOUD

Raises:

Type Description
ValueError

If STARGAZER_MODE is set to an invalid value

Source code in src/stargazer/utils/storage.py
def resolve_mode() -> StargazerMode:
    """Resolve the current Stargazer mode from STARGAZER_MODE env var.

    Returns:
        StargazerMode.LOCAL or StargazerMode.CLOUD

    Raises:
        ValueError: If STARGAZER_MODE is set to an invalid value
    """
    mode_str = os.environ.get("STARGAZER_MODE", "local").lower()
    try:
        return StargazerMode(mode_str)
    except ValueError:
        raise ValueError(
            f"Invalid STARGAZER_MODE: '{mode_str}'. Must be 'local' or 'cloud'."
        )

Subprocess utilities for running external commands.

spec: docs/architecture/tasks.md

Workflows

GATK Best Practices: Data Pre-processing for Variant Discovery

Implements: 1. Reference preparation — FASTA index, sequence dictionary, BWA index 2. Sample preprocessing — align, sort, mark duplicates, BQSR

References

spec: docs/architecture/workflows.md

prepare_reference(build) async

Prepare reference genome for alignment and variant calling.

Assembles the reference FASTA from storage and creates necessary indices: 1. FASTA index (samtools faidx) 2. BWA index (bwa index)

All indices are uploaded to storage as side-effects.

Parameters:

Name Type Description Default
build str

Reference genome build identifier (e.g. "GRCh38")

required

Returns:

Type Description
Reference

Reference asset (FASTA file)

Source code in src/stargazer/workflows/gatk_data_preprocessing.py
@gatk_env.task
async def prepare_reference(build: str) -> Reference:
    """
    Prepare reference genome for alignment and variant calling.

    Assembles the reference FASTA from storage and creates necessary indices:
    1. FASTA index (samtools faidx)
    2. BWA index (bwa index)

    All indices are uploaded to storage as side-effects.

    Args:
        build: Reference genome build identifier (e.g. "GRCh38")

    Returns:
        Reference asset (FASTA file)
    """
    assets = await assemble(build=build, asset="reference")
    refs = [a for a in assets if isinstance(a, Reference)]
    if not refs:
        raise ValueError(f"No reference found for build={build!r}")
    ref = refs[0]

    await samtools_faidx(ref)
    await bwa_index(ref)

    return ref

preprocess_sample(build, sample_id, run_bqsr=True) async

Pre-process a single sample's reads for variant calling.

Assembles reference and reads from storage, then runs: 1. BWA-MEM alignment 2. Coordinate sort (GATK SortSam) 3. Mark duplicates (GATK MarkDuplicates) 4. BQSR (optional, GATK BaseRecalibrator + ApplyBQSR)

Parameters:

Name Type Description Default
build str

Reference genome build identifier

required
sample_id str

Sample identifier used to query reads and known sites

required
run_bqsr bool

Whether to apply BQSR (default: True)

True

Returns:

Type Description
Alignment

Alignment asset with the preprocessed BAM file

Source code in src/stargazer/workflows/gatk_data_preprocessing.py
@gatk_env.task
async def preprocess_sample(
    build: str,
    sample_id: str,
    run_bqsr: bool = True,
) -> Alignment:
    """
    Pre-process a single sample's reads for variant calling.

    Assembles reference and reads from storage, then runs:
    1. BWA-MEM alignment
    2. Coordinate sort (GATK SortSam)
    3. Mark duplicates (GATK MarkDuplicates)
    4. BQSR (optional, GATK BaseRecalibrator + ApplyBQSR)

    Args:
        build: Reference genome build identifier
        sample_id: Sample identifier used to query reads and known sites
        run_bqsr: Whether to apply BQSR (default: True)

    Returns:
        Alignment asset with the preprocessed BAM file
    """
    # Assemble reference
    ref_assets = await assemble(build=build, asset="reference")
    refs = [a for a in ref_assets if isinstance(a, Reference)]
    if not refs:
        raise ValueError(f"No reference found for build={build!r}")
    ref = refs[0]

    # Assemble reads
    read_assets = await assemble(sample_id=sample_id, asset=["r1", "r2"])
    r1_list = [a for a in read_assets if isinstance(a, R1)]
    if not r1_list:
        raise ValueError(f"No R1 reads found for sample_id={sample_id!r}")
    r1 = r1_list[0]
    r2_list = [a for a in read_assets if isinstance(a, R2)]
    r2 = r2_list[0] if r2_list else None

    # Alignment pipeline — tasks call fetch() internally
    alignment = await bwa_mem(ref=ref, r1=r1, r2=r2)
    alignment = await sort_sam(alignment=alignment, sort_order="coordinate")
    alignment = await mark_duplicates(alignment=alignment)

    if run_bqsr:
        known_assets = await assemble(build=build, asset="known_sites")
        known_sites = [a for a in known_assets if isinstance(a, KnownSites)]
        if not known_sites:
            raise ValueError(
                f"run_bqsr=True but no known_sites found for build={build!r}"
            )

        bqsr_report = await base_recalibrator(
            alignment=alignment,
            ref=ref,
            known_sites=known_sites,
        )
        alignment = await apply_bqsr(
            alignment=alignment,
            ref=ref,
            bqsr_report=bqsr_report,
        )

    return alignment

GATK Best Practices: Germline Short Variant Discovery (SNPs + Indels)

Implements the full GATK pipeline from preprocessed BAMs
  1. HaplotypeCaller — per-sample GVCF (parallel)
  2. joint_call_gvcfs — GenomicsDBImport + GenotypeGVCFs in one task
  3. VariantRecalibrator (INDEL) — build VQSR model
  4. ApplyVQSR INDEL — filter indels → final VCF
Prerequisites

Reference and sample alignments must already be in storage (run prepare_reference and preprocess_sample first). VQSR training resources (HapMap, omni, 1000G, mills, dbSNP) must be stored with build, resource_name, known, training, truth, and prior keyvalues, tagged with vqsr_mode=INDEL.

Reference

https://gatk.broadinstitute.org/hc/en-us/articles/360035535932-Germline-short-variant-discovery-SNPs-Indels

spec: docs/architecture/workflows.md

germline_short_variant_discovery(build, cohort_id='cohort') async

Germline short variant discovery from preprocessed BAMs.

Assembles all BQSR-applied alignments and reference from storage, then runs the full GATK Best Practices pipeline through VQSR filtering.

Expects preprocess_sample to have been run first — alignments must have bqsr_applied=true.

Parameters:

Name Type Description Default
build str

Reference genome build identifier (e.g. "GRCh38")

required
cohort_id str

Identifier for the cohort output (default: "cohort")

'cohort'

Returns:

Type Description
Variants

VQSR-filtered joint-genotyped Variants asset

Source code in src/stargazer/workflows/germline_short_variant_discovery.py
@gatk_env.task(cache="disable")
async def germline_short_variant_discovery(
    build: str,
    cohort_id: str = "cohort",
) -> Variants:
    """
    Germline short variant discovery from preprocessed BAMs.

    Assembles all BQSR-applied alignments and reference from storage, then runs
    the full GATK Best Practices pipeline through VQSR filtering.

    Expects preprocess_sample to have been run first — alignments must have
    bqsr_applied=true.

    Args:
        build: Reference genome build identifier (e.g. "GRCh38")
        cohort_id: Identifier for the cohort output (default: "cohort")

    Returns:
        VQSR-filtered joint-genotyped Variants asset
    """
    # Assemble reference first so we can filter alignments by reference_cid
    refs = await assemble(build=build, asset="reference")
    if not refs:
        raise ValueError(f"No reference found for build={build!r}")
    ref = refs[0]

    alignments, indel_resources = await asyncio.gather(
        assemble(reference_cid=ref.cid, asset="alignment", bqsr_applied="true"),
        assemble(build=build, asset="known_sites", vqsr_mode="INDEL"),
    )

    if not alignments:
        raise ValueError(
            f"No BQSR-applied alignments found for build={build!r}. "
            "Run preprocess_sample first."
        )

    if not indel_resources:
        raise ValueError(f"No INDEL VQSR resources for build={build!r}")

    # 1. HaplotypeCaller — per-sample GVCFs in parallel
    gvcfs = list(
        await asyncio.gather(
            *[haplotype_caller(alignment=aln, ref=ref) for aln in alignments]
        )
    )

    # 2. GenomicsDBImport + GenotypeGVCFs — joint calling
    raw_vcf = await joint_call_gvcfs(
        gvcfs=gvcfs, ref=ref, intervals=ref.contigs, cohort_id=cohort_id
    )

    # 3. VariantRecalibrator — INDEL model
    indel_model = await variant_recalibrator(
        vcf=raw_vcf, ref=ref, resources=indel_resources, mode="INDEL"
    )

    # 4. ApplyVQSR — INDEL
    return await apply_vqsr(vcf=raw_vcf, ref=ref, vqsr_model=indel_model)