Skip to content

CrawlJob

The CrawlJob class is the main entry point for crawling websites.

Overview

CrawlJob orchestrates the entire crawling process:

  1. Initializes the storage backend
  2. Creates or retrieves the site record
  3. Manages the URL frontier
  4. Coordinates fetching, extraction, and storage
  5. Tracks statistics and handles errors

Usage

Basic Crawl

Python
import asyncio
from ragcrawl.config import CrawlerConfig
from ragcrawl.core import CrawlJob

config = CrawlerConfig(
    seeds=["https://docs.example.com"],
    max_pages=100,
    max_depth=5,
)

job = CrawlJob(config)
result = asyncio.run(job.run())

# Access results
print(f"Pages crawled: {result.stats.pages_crawled}")
print(f"Pages failed: {result.stats.pages_failed}")

for doc in result.documents:
    print(f"- {doc.title}: {doc.source_url}")

With Callbacks

Python
from ragcrawl.hooks import CrawlCallbacks

class MyCallbacks(CrawlCallbacks):
    def on_page_crawled(self, page, version):
        print(f"Crawled: {page.url}")

    def on_page_error(self, url, error):
        print(f"Error: {url} - {error}")

job = CrawlJob(config, callbacks=MyCallbacks())
result = asyncio.run(job.run())

Graceful Stop

Python
import asyncio
import signal

job = CrawlJob(config)

def handle_signal(sig, frame):
    asyncio.create_task(job.stop())

signal.signal(signal.SIGINT, handle_signal)

result = asyncio.run(job.run())

Configuration

See CrawlerConfig for all options.

Key options:

Option Type Description
seeds list[str] Starting URLs
max_pages int Maximum pages to crawl
max_depth int Maximum link depth
delay_seconds float Delay between requests
include_patterns list[str] URL patterns to include
exclude_patterns list[str] URL patterns to exclude

API Reference

CrawlJob

Python
CrawlJob(config: CrawlerConfig)

Main crawl job orchestrator.

Coordinates the frontier, fetcher, extractor, and storage to perform a complete crawl.

Initialize a crawl job.

PARAMETER DESCRIPTION
config

Crawler configuration.

TYPE: CrawlerConfig

Source code in src/ragcrawl/core/crawl_job.py
Python
def __init__(self, config: CrawlerConfig) -> None:
    """
    Initialize a crawl job.

    Args:
        config: Crawler configuration.
    """
    self.config = config

    # Generate IDs
    self.site_id = config.site_id or generate_site_id(config.seeds)
    self.run_id = generate_run_id()

    # Initialize components (lazy)
    self._storage: StorageBackend | None = None
    self._fetcher: Crawl4AIFetcher | None = None
    self._robots: RobotsChecker | None = None
    self._frontier: Frontier | None = None
    self._scheduler: DomainScheduler | None = None
    self._extractor: ContentExtractor | None = None
    self._quality_gate: QualityGate | None = None
    self._link_filter: LinkFilter | None = None

    # Tracking
    self._metrics = MetricsCollector()
    self._logger = CrawlLoggerAdapter(self.run_id, self.site_id)
    self._crawl_run: CrawlRun | None = None
    self._documents: list[Document] = []

run async

Python
run() -> CrawlResult

Execute the crawl job.

RETURNS DESCRIPTION
CrawlResult

CrawlResult with statistics and documents.

Source code in src/ragcrawl/core/crawl_job.py
Python
async def run(self) -> CrawlResult:
    """
    Execute the crawl job.

    Returns:
        CrawlResult with statistics and documents.
    """
    start_time = datetime.now()

    try:
        # Initialize
        self._init_components()

        # Create/update site record
        await self._save_site()

        # Create crawl run record
        self._crawl_run = CrawlRun(
            run_id=self.run_id,
            site_id=self.site_id,
            config_snapshot=self.config.model_dump(exclude={"on_page", "on_error", "on_change_detected", "redaction_hook"}),
            seeds=self.config.seeds,
        )
        self._crawl_run.mark_started()
        self._storage.save_run(self._crawl_run)

        self._logger.run_started(
            self.config.seeds,
            {"max_pages": self.config.max_pages, "max_depth": self.config.max_depth},
        )

        # Add seeds to frontier
        await self._frontier.add_seeds(self.config.seeds)

        # Main crawl loop
        await self._crawl_loop()

        # Finalize
        metrics = self._metrics.finalize()
        self._crawl_run.stats = CrawlStats(
            pages_discovered=metrics.pages_discovered,
            pages_crawled=metrics.pages_crawled,
            pages_failed=metrics.pages_failed,
            pages_skipped=metrics.pages_skipped,
            pages_changed=metrics.pages_changed,
            pages_new=metrics.pages_new,
            total_bytes_downloaded=metrics.total_bytes,
            total_fetch_time_ms=metrics.total_fetch_time_ms,
            total_extraction_time_ms=metrics.total_extraction_time_ms,
            avg_fetch_latency_ms=metrics.avg_fetch_latency_ms,
            status_codes=dict(metrics.status_codes),
            errors_by_type=dict(metrics.errors_by_type),
        )
        self._crawl_run.frontier_size = self._frontier.size
        self._crawl_run.max_depth_reached = self._frontier.max_depth_reached

        partial = metrics.pages_failed > 0
        self._crawl_run.mark_completed(partial=partial)
        self._storage.save_run(self._crawl_run)

        duration = (datetime.now() - start_time).total_seconds()
        self._logger.run_completed(metrics.to_dict(), duration)

        return CrawlResult(
            run_id=self.run_id,
            site_id=self.site_id,
            success=True,
            stats=self._crawl_run.stats,
            documents=self._documents,
            duration_seconds=duration,
        )

    except Exception as e:
        logger.error("Crawl job failed", error=str(e))

        if self._crawl_run:
            self._crawl_run.mark_failed(str(e))
            self._storage.save_run(self._crawl_run)

        self._logger.run_failed(str(e))

        return CrawlResult(
            run_id=self.run_id,
            site_id=self.site_id,
            success=False,
            error=str(e),
            duration_seconds=(datetime.now() - start_time).total_seconds(),
        )

    finally:
        # Cleanup
        if self._fetcher:
            await self._fetcher.close()
        if self._storage:
            self._storage.close()