Skip to content

Storage API

ragcrawl supports pluggable storage backends for persisting crawl data.

Overview

Backend Description Use Case
DuckDB Local file-based SQL database Default, local development
DynamoDB AWS managed NoSQL database Cloud deployments, scalability

Storage Backend Interface

All backends implement the StorageBackend protocol:

Python
from ragcrawl.storage import StorageBackend

class StorageBackend(Protocol):
    # Lifecycle
    def initialize(self) -> None: ...
    def close(self) -> None: ...
    def health_check(self) -> bool: ...

    # Sites
    def save_site(self, site: Site) -> None: ...
    def get_site(self, site_id: str) -> Site | None: ...
    def list_sites(self) -> list[Site]: ...

    # Crawl Runs
    def save_run(self, run: CrawlRun) -> None: ...
    def get_run(self, run_id: str) -> CrawlRun | None: ...
    def list_runs(self, site_id: str) -> list[CrawlRun]: ...

    # Pages
    def save_page(self, page: Page) -> None: ...
    def get_page(self, page_id: str) -> Page | None: ...
    def get_page_by_url(self, site_id: str, url: str) -> Page | None: ...
    def list_pages(self, site_id: str) -> list[Page]: ...

    # Versions
    def save_version(self, version: PageVersion) -> None: ...
    def get_version(self, version_id: str) -> PageVersion | None: ...
    def list_versions(self, page_id: str) -> list[PageVersion]: ...

    # Frontier
    def save_frontier_item(self, item: FrontierItem) -> None: ...
    def get_frontier_items(self, run_id: str) -> list[FrontierItem]: ...

Quick Start

Using DuckDB (Default)

Python
from ragcrawl.config.storage_config import StorageConfig, DuckDBConfig
from ragcrawl.storage import create_storage_backend

config = StorageConfig(
    backend=DuckDBConfig(path="./crawler.duckdb")
)

backend = create_storage_backend(config)
backend.initialize()

# Use the backend
sites = backend.list_sites()

Using DynamoDB

Python
from ragcrawl.config.storage_config import StorageConfig, DynamoDBConfig
from ragcrawl.storage import create_storage_backend

config = StorageConfig(
    backend=DynamoDBConfig(
        table_prefix="ragcrawl_",
        region="us-west-2",
    )
)

backend = create_storage_backend(config)
backend.initialize()

Factory Function

Use create_storage_backend() to create backends:

Python
from ragcrawl.storage import create_storage_backend

# Automatically selects backend based on config
backend = create_storage_backend(storage_config)

Context Manager

Backends support context manager protocol:

Python
with create_storage_backend(config) as backend:
    backend.initialize()
    sites = backend.list_sites()
# Automatically closed

Module Reference

Storage backend protocol and factory.

StorageBackend

Bases: ABC

Abstract base class for storage backends.

All backends must implement this interface to ensure feature parity.

save_site abstractmethod

Python
save_site(site: Site) -> None

Save or update a site.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def save_site(self, site: Site) -> None:
    """Save or update a site."""
    ...

get_site abstractmethod

Python
get_site(site_id: str) -> Site | None

Get a site by ID.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_site(self, site_id: str) -> Site | None:
    """Get a site by ID."""
    ...

list_sites abstractmethod

Python
list_sites() -> list[Site]

List all sites.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def list_sites(self) -> list[Site]:
    """List all sites."""
    ...

delete_site abstractmethod

Python
delete_site(site_id: str) -> bool

Delete a site and all associated data.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def delete_site(self, site_id: str) -> bool:
    """Delete a site and all associated data."""
    ...

save_run abstractmethod

Python
save_run(run: CrawlRun) -> None

Save or update a crawl run.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def save_run(self, run: CrawlRun) -> None:
    """Save or update a crawl run."""
    ...

get_run abstractmethod

Python
get_run(run_id: str) -> CrawlRun | None

Get a crawl run by ID.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_run(self, run_id: str) -> CrawlRun | None:
    """Get a crawl run by ID."""
    ...

list_runs abstractmethod

Python
list_runs(
    site_id: str, limit: int = 100, offset: int = 0
) -> list[CrawlRun]

List crawl runs for a site.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def list_runs(
    self,
    site_id: str,
    limit: int = 100,
    offset: int = 0,
) -> list[CrawlRun]:
    """List crawl runs for a site."""
    ...

get_latest_run abstractmethod

Python
get_latest_run(site_id: str) -> CrawlRun | None

Get the latest crawl run for a site.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_latest_run(self, site_id: str) -> CrawlRun | None:
    """Get the latest crawl run for a site."""
    ...

save_page abstractmethod

Python
save_page(page: Page) -> None

Save or update a page.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def save_page(self, page: Page) -> None:
    """Save or update a page."""
    ...

get_page abstractmethod

Python
get_page(page_id: str) -> Page | None

Get a page by ID.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_page(self, page_id: str) -> Page | None:
    """Get a page by ID."""
    ...

get_page_by_url abstractmethod

Python
get_page_by_url(site_id: str, url: str) -> Page | None

Get a page by normalized URL.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_page_by_url(self, site_id: str, url: str) -> Page | None:
    """Get a page by normalized URL."""
    ...

list_pages abstractmethod

Python
list_pages(
    site_id: str,
    limit: int = 1000,
    offset: int = 0,
    include_tombstones: bool = False,
) -> list[Page]

List pages for a site.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def list_pages(
    self,
    site_id: str,
    limit: int = 1000,
    offset: int = 0,
    include_tombstones: bool = False,
) -> list[Page]:
    """List pages for a site."""
    ...

get_pages_needing_recrawl abstractmethod

Python
get_pages_needing_recrawl(
    site_id: str,
    max_age_hours: float | None = None,
    limit: int = 1000,
) -> list[Page]

Get pages that need to be re-crawled.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_pages_needing_recrawl(
    self,
    site_id: str,
    max_age_hours: float | None = None,
    limit: int = 1000,
) -> list[Page]:
    """Get pages that need to be re-crawled."""
    ...

count_pages abstractmethod

Python
count_pages(
    site_id: str, include_tombstones: bool = False
) -> int

Count pages for a site.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def count_pages(self, site_id: str, include_tombstones: bool = False) -> int:
    """Count pages for a site."""
    ...

save_version abstractmethod

Python
save_version(version: PageVersion) -> None

Save a page version.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def save_version(self, version: PageVersion) -> None:
    """Save a page version."""
    ...

get_version abstractmethod

Python
get_version(version_id: str) -> PageVersion | None

Get a page version by ID.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_version(self, version_id: str) -> PageVersion | None:
    """Get a page version by ID."""
    ...

get_current_version abstractmethod

Python
get_current_version(page_id: str) -> PageVersion | None

Get the current version for a page.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_current_version(self, page_id: str) -> PageVersion | None:
    """Get the current version for a page."""
    ...

list_versions abstractmethod

Python
list_versions(
    page_id: str, limit: int = 100
) -> list[PageVersion]

List versions for a page.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def list_versions(
    self,
    page_id: str,
    limit: int = 100,
) -> list[PageVersion]:
    """List versions for a page."""
    ...

save_frontier_item abstractmethod

Python
save_frontier_item(item: FrontierItem) -> None

Save a frontier item.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def save_frontier_item(self, item: FrontierItem) -> None:
    """Save a frontier item."""
    ...

get_frontier_items abstractmethod

Python
get_frontier_items(
    run_id: str,
    status: str | None = None,
    limit: int = 1000,
) -> list[FrontierItem]

Get frontier items for a run.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def get_frontier_items(
    self,
    run_id: str,
    status: str | None = None,
    limit: int = 1000,
) -> list[FrontierItem]:
    """Get frontier items for a run."""
    ...

update_frontier_status abstractmethod

Python
update_frontier_status(
    item_id: str, status: str, error: str | None = None
) -> None

Update frontier item status.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def update_frontier_status(
    self,
    item_id: str,
    status: str,
    error: str | None = None,
) -> None:
    """Update frontier item status."""
    ...

clear_frontier abstractmethod

Python
clear_frontier(run_id: str) -> int

Clear all frontier items for a run. Returns count deleted.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def clear_frontier(self, run_id: str) -> int:
    """Clear all frontier items for a run. Returns count deleted."""
    ...

save_pages_bulk abstractmethod

Python
save_pages_bulk(pages: list[Page]) -> int

Bulk save pages. Returns count saved.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def save_pages_bulk(self, pages: list[Page]) -> int:
    """Bulk save pages. Returns count saved."""
    ...

save_versions_bulk abstractmethod

Python
save_versions_bulk(versions: list[PageVersion]) -> int

Bulk save versions. Returns count saved.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def save_versions_bulk(self, versions: list[PageVersion]) -> int:
    """Bulk save versions. Returns count saved."""
    ...

initialize abstractmethod

Python
initialize() -> None

Initialize the storage backend (create tables, etc.).

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def initialize(self) -> None:
    """Initialize the storage backend (create tables, etc.)."""
    ...

close abstractmethod

Python
close() -> None

Close any connections.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def close(self) -> None:
    """Close any connections."""
    ...

health_check abstractmethod

Python
health_check() -> bool

Check if the backend is healthy/available.

Source code in src/ragcrawl/storage/backend.py
Python
@abstractmethod
def health_check(self) -> bool:
    """Check if the backend is healthy/available."""
    ...

create_storage_backend

Python
create_storage_backend(
    config: StorageConfig,
) -> StorageBackend

Create a storage backend from configuration.

Falls back to DuckDB if the configured backend is unavailable and fail_if_unavailable is False.

PARAMETER DESCRIPTION
config

Storage configuration.

TYPE: StorageConfig

RETURNS DESCRIPTION
StorageBackend

A StorageBackend instance.

RAISES DESCRIPTION
RuntimeError

If backend unavailable and fail_if_unavailable is True.

Source code in src/ragcrawl/storage/backend.py
Python
def create_storage_backend(config: StorageConfig) -> StorageBackend:
    """
    Create a storage backend from configuration.

    Falls back to DuckDB if the configured backend is unavailable
    and fail_if_unavailable is False.

    Args:
        config: Storage configuration.

    Returns:
        A StorageBackend instance.

    Raises:
        RuntimeError: If backend unavailable and fail_if_unavailable is True.
    """
    if config.storage_type == StorageType.DYNAMODB:
        try:
            from ragcrawl.storage.dynamodb.backend import DynamoDBBackend

            assert isinstance(config.backend, DynamoDBConfig)
            backend = DynamoDBBackend(config.backend)

            if backend.health_check():
                logger.info("Using DynamoDB storage backend")
                return backend
            else:
                raise RuntimeError("DynamoDB health check failed")

        except Exception as e:
            if config.fail_if_unavailable:
                raise RuntimeError(f"DynamoDB unavailable: {e}") from e

            logger.warning(
                "DynamoDB unavailable, falling back to DuckDB",
                error=str(e),
            )

    # Default to DuckDB
    from ragcrawl.storage.duckdb.backend import DuckDBBackend

    if isinstance(config.backend, DuckDBConfig):
        db_config = config.backend
    else:
        # Fallback config
        db_config = DuckDBConfig()

    logger.info("Using DuckDB storage backend", path=str(db_config.path))
    return DuckDBBackend(db_config)