Skip to content

Core API

The core module contains the main entry points for crawling and syncing.

Overview

Class Description
CrawlJob Execute website crawls
SyncJob Incremental sync operations

Quick Start

Crawling

Python
import asyncio
from ragcrawl.config import CrawlerConfig
from ragcrawl.core import CrawlJob

config = CrawlerConfig(
    seeds=["https://docs.example.com"],
    max_pages=100,
)

job = CrawlJob(config)
result = asyncio.run(job.run())

print(f"Crawled {result.stats.pages_crawled} pages")

Syncing

Python
import asyncio
from ragcrawl.config import SyncConfig
from ragcrawl.core import SyncJob

config = SyncConfig(
    site_id="site_abc123",
    use_sitemap=True,
)

job = SyncJob(config)
result = asyncio.run(job.run())

print(f"Updated {result.stats.pages_changed} pages")

Module Reference

Core crawling logic for ragcrawl.

CrawlJob

Python
CrawlJob(config: CrawlerConfig)

Main crawl job orchestrator.

Coordinates the frontier, fetcher, extractor, and storage to perform a complete crawl.

Initialize a crawl job.

PARAMETER DESCRIPTION
config

Crawler configuration.

TYPE: CrawlerConfig

Source code in src/ragcrawl/core/crawl_job.py
Python
def __init__(self, config: CrawlerConfig) -> None:
    """
    Initialize a crawl job.

    Args:
        config: Crawler configuration.
    """
    self.config = config

    # Generate IDs
    self.site_id = config.site_id or generate_site_id(config.seeds)
    self.run_id = generate_run_id()

    # Initialize components (lazy)
    self._storage: StorageBackend | None = None
    self._fetcher: Crawl4AIFetcher | None = None
    self._robots: RobotsChecker | None = None
    self._frontier: Frontier | None = None
    self._scheduler: DomainScheduler | None = None
    self._extractor: ContentExtractor | None = None
    self._quality_gate: QualityGate | None = None
    self._link_filter: LinkFilter | None = None

    # Tracking
    self._metrics = MetricsCollector()
    self._logger = CrawlLoggerAdapter(self.run_id, self.site_id)
    self._crawl_run: CrawlRun | None = None
    self._documents: list[Document] = []

run async

Python
run() -> CrawlResult

Execute the crawl job.

RETURNS DESCRIPTION
CrawlResult

CrawlResult with statistics and documents.

Source code in src/ragcrawl/core/crawl_job.py
Python
async def run(self) -> CrawlResult:
    """
    Execute the crawl job.

    Returns:
        CrawlResult with statistics and documents.
    """
    start_time = datetime.now()

    try:
        # Initialize
        self._init_components()

        # Create/update site record
        await self._save_site()

        # Create crawl run record
        self._crawl_run = CrawlRun(
            run_id=self.run_id,
            site_id=self.site_id,
            config_snapshot=self.config.model_dump(exclude={"on_page", "on_error", "on_change_detected", "redaction_hook"}),
            seeds=self.config.seeds,
        )
        self._crawl_run.mark_started()
        self._storage.save_run(self._crawl_run)

        self._logger.run_started(
            self.config.seeds,
            {"max_pages": self.config.max_pages, "max_depth": self.config.max_depth},
        )

        # Add seeds to frontier
        await self._frontier.add_seeds(self.config.seeds)

        # Main crawl loop
        await self._crawl_loop()

        # Finalize
        metrics = self._metrics.finalize()
        self._crawl_run.stats = CrawlStats(
            pages_discovered=metrics.pages_discovered,
            pages_crawled=metrics.pages_crawled,
            pages_failed=metrics.pages_failed,
            pages_skipped=metrics.pages_skipped,
            pages_changed=metrics.pages_changed,
            pages_new=metrics.pages_new,
            total_bytes_downloaded=metrics.total_bytes,
            total_fetch_time_ms=metrics.total_fetch_time_ms,
            total_extraction_time_ms=metrics.total_extraction_time_ms,
            avg_fetch_latency_ms=metrics.avg_fetch_latency_ms,
            status_codes=dict(metrics.status_codes),
            errors_by_type=dict(metrics.errors_by_type),
        )
        self._crawl_run.frontier_size = self._frontier.size
        self._crawl_run.max_depth_reached = self._frontier.max_depth_reached

        partial = metrics.pages_failed > 0
        self._crawl_run.mark_completed(partial=partial)
        self._storage.save_run(self._crawl_run)

        duration = (datetime.now() - start_time).total_seconds()
        self._logger.run_completed(metrics.to_dict(), duration)

        return CrawlResult(
            run_id=self.run_id,
            site_id=self.site_id,
            success=True,
            stats=self._crawl_run.stats,
            documents=self._documents,
            duration_seconds=duration,
        )

    except Exception as e:
        logger.error("Crawl job failed", error=str(e))

        if self._crawl_run:
            self._crawl_run.mark_failed(str(e))
            self._storage.save_run(self._crawl_run)

        self._logger.run_failed(str(e))

        return CrawlResult(
            run_id=self.run_id,
            site_id=self.site_id,
            success=False,
            error=str(e),
            duration_seconds=(datetime.now() - start_time).total_seconds(),
        )

    finally:
        # Cleanup
        if self._fetcher:
            await self._fetcher.close()
        if self._storage:
            self._storage.close()

SyncJob

Python
SyncJob(config: SyncConfig)

Incremental sync job for detecting content changes.

Uses multiple strategies: 1. Sitemap lastmod (if available) 2. HTTP conditional requests (ETag/Last-Modified) 3. Content hash diffing (fallback)

Initialize sync job.

PARAMETER DESCRIPTION
config

Sync configuration.

TYPE: SyncConfig

Source code in src/ragcrawl/core/sync_job.py
Python
def __init__(self, config: SyncConfig) -> None:
    """
    Initialize sync job.

    Args:
        config: Sync configuration.
    """
    self.config = config
    self.site_id = config.site_id
    self.run_id = generate_run_id()

    # Components
    self._storage: StorageBackend | None = None
    self._fetcher: Crawl4AIFetcher | None = None
    self._extractor: ContentExtractor | None = None
    self._sitemap_parser: SitemapParser | None = None
    self._change_detector: ChangeDetector | None = None
    self._revalidator: Revalidator | None = None

    # Tracking
    self._metrics = MetricsCollector()
    self._logger = CrawlLoggerAdapter(self.run_id, self.site_id)
    self._changed_pages: list[str] = []
    self._deleted_pages: list[str] = []

run async

Python
run() -> SyncResult

Execute the sync job.

RETURNS DESCRIPTION
SyncResult

SyncResult with changed and deleted pages.

Source code in src/ragcrawl/core/sync_job.py
Python
async def run(self) -> SyncResult:
    """
    Execute the sync job.

    Returns:
        SyncResult with changed and deleted pages.
    """
    start_time = datetime.now()

    try:
        self._init_components()

        # Verify site exists
        site = self._storage.get_site(self.site_id)
        if not site:
            raise ValueError(f"Site not found: {self.site_id}")

        # Create sync run record
        crawl_run = CrawlRun(
            run_id=self.run_id,
            site_id=self.site_id,
            is_sync=True,
            config_snapshot=self.config.model_dump(
                exclude={"on_page", "on_change_detected", "on_deletion_detected", "on_error"}
            ),
        )
        crawl_run.mark_started()
        self._storage.save_run(crawl_run)

        # Get pages to check
        pages = await self._get_pages_to_check()

        logger.info("Starting sync", site_id=self.site_id, pages_to_check=len(pages))

        # Process pages
        for page in pages:
            await self._process_page(page)

            # Check limit
            if self.config.max_pages and self._metrics.metrics.pages_crawled >= self.config.max_pages:
                break

        # Finalize
        metrics = self._metrics.finalize()
        crawl_run.stats = CrawlStats(
            pages_crawled=metrics.pages_crawled,
            pages_changed=metrics.pages_changed,
            pages_unchanged=metrics.pages_unchanged,
            pages_deleted=metrics.pages_deleted,
            pages_failed=metrics.pages_failed,
        )
        crawl_run.mark_completed(partial=metrics.pages_failed > 0)
        self._storage.save_run(crawl_run)

        # Update site
        site.last_sync_at = datetime.now()
        self._storage.save_site(site)

        duration = (datetime.now() - start_time).total_seconds()

        return SyncResult(
            run_id=self.run_id,
            site_id=self.site_id,
            success=True,
            stats=crawl_run.stats,
            changed_pages=self._changed_pages,
            deleted_pages=self._deleted_pages,
            duration_seconds=duration,
        )

    except Exception as e:
        logger.error("Sync job failed", error=str(e))
        return SyncResult(
            run_id=self.run_id,
            site_id=self.site_id,
            success=False,
            error=str(e),
            duration_seconds=(datetime.now() - start_time).total_seconds(),
        )

    finally:
        if self._fetcher:
            await self._fetcher.close()
        if self._storage:
            self._storage.close()