Skip to content

SyncJob

The SyncJob class handles incremental updates to previously crawled sites.

Overview

SyncJob efficiently updates your knowledge base by:

  1. Checking sitemap for new/updated URLs
  2. Using conditional requests (ETags, Last-Modified)
  3. Comparing content hashes for changes
  4. Marking deleted pages as tombstones

Usage

Basic Sync

Python
import asyncio
from ragcrawl.config import SyncConfig
from ragcrawl.core import SyncJob

config = SyncConfig(
    site_id="site_abc123",
    use_sitemap=True,
    use_conditional_requests=True,
)

job = SyncJob(config)
result = asyncio.run(job.run())

# Check what changed
print(f"New pages: {result.stats.pages_new}")
print(f"Updated pages: {result.stats.pages_changed}")
print(f"Deleted pages: {result.stats.pages_deleted}")
print(f"Unchanged: {result.stats.pages_unchanged}")

Sync with Age Filter

Only sync pages that haven't been checked recently:

Python
config = SyncConfig(
    site_id="site_abc123",
    max_age_hours=24,  # Only pages not synced in 24 hours
)

job = SyncJob(config)
result = asyncio.run(job.run())

Sync with Page Limit

Python
config = SyncConfig(
    site_id="site_abc123",
    max_pages=100,  # Stop after 100 pages
)

job = SyncJob(config)
result = asyncio.run(job.run())

Sync Strategies

SyncJob uses multiple strategies in order of efficiency:

1. Sitemap

If available, the sitemap provides: - List of all current URLs - Last modification dates - Change frequency hints

Python
config = SyncConfig(
    site_id="site_abc123",
    use_sitemap=True,  # Default: True
)

2. Conditional Requests

Uses HTTP headers to avoid downloading unchanged content:

Python
config = SyncConfig(
    site_id="site_abc123",
    use_conditional_requests=True,  # Default: True
)

Supports: - If-None-Match with ETags - If-Modified-Since with Last-Modified dates

3. Content Hash Comparison

As a fallback, compares content hashes:

Python
# This happens automatically when conditional requests
# don't indicate a change but content differs

Configuration

See SyncConfig for all options.

Option Type Default Description
site_id str required Site to sync
max_pages int None Maximum pages to sync
max_age_hours float None Only sync pages older than N hours
use_sitemap bool True Use sitemap for discovery
use_conditional_requests bool True Use ETags/Last-Modified

API Reference

SyncJob

Python
SyncJob(config: SyncConfig)

Incremental sync job for detecting content changes.

Uses multiple strategies: 1. Sitemap lastmod (if available) 2. HTTP conditional requests (ETag/Last-Modified) 3. Content hash diffing (fallback)

Initialize sync job.

PARAMETER DESCRIPTION
config

Sync configuration.

TYPE: SyncConfig

Source code in src/ragcrawl/core/sync_job.py
Python
def __init__(self, config: SyncConfig) -> None:
    """
    Initialize sync job.

    Args:
        config: Sync configuration.
    """
    self.config = config
    self.site_id = config.site_id
    self.run_id = generate_run_id()

    # Components
    self._storage: StorageBackend | None = None
    self._fetcher: Crawl4AIFetcher | None = None
    self._extractor: ContentExtractor | None = None
    self._sitemap_parser: SitemapParser | None = None
    self._change_detector: ChangeDetector | None = None
    self._revalidator: Revalidator | None = None

    # Tracking
    self._metrics = MetricsCollector()
    self._logger = CrawlLoggerAdapter(self.run_id, self.site_id)
    self._changed_pages: list[str] = []
    self._deleted_pages: list[str] = []

run async

Python
run() -> SyncResult

Execute the sync job.

RETURNS DESCRIPTION
SyncResult

SyncResult with changed and deleted pages.

Source code in src/ragcrawl/core/sync_job.py
Python
async def run(self) -> SyncResult:
    """
    Execute the sync job.

    Returns:
        SyncResult with changed and deleted pages.
    """
    start_time = datetime.now()

    try:
        self._init_components()

        # Verify site exists
        site = self._storage.get_site(self.site_id)
        if not site:
            raise ValueError(f"Site not found: {self.site_id}")

        # Create sync run record
        crawl_run = CrawlRun(
            run_id=self.run_id,
            site_id=self.site_id,
            is_sync=True,
            config_snapshot=self.config.model_dump(
                exclude={"on_page", "on_change_detected", "on_deletion_detected", "on_error"}
            ),
        )
        crawl_run.mark_started()
        self._storage.save_run(crawl_run)

        # Get pages to check
        pages = await self._get_pages_to_check()

        logger.info("Starting sync", site_id=self.site_id, pages_to_check=len(pages))

        # Process pages
        for page in pages:
            await self._process_page(page)

            # Check limit
            if self.config.max_pages and self._metrics.metrics.pages_crawled >= self.config.max_pages:
                break

        # Finalize
        metrics = self._metrics.finalize()
        crawl_run.stats = CrawlStats(
            pages_crawled=metrics.pages_crawled,
            pages_changed=metrics.pages_changed,
            pages_unchanged=metrics.pages_unchanged,
            pages_deleted=metrics.pages_deleted,
            pages_failed=metrics.pages_failed,
        )
        crawl_run.mark_completed(partial=metrics.pages_failed > 0)
        self._storage.save_run(crawl_run)

        # Update site
        site.last_sync_at = datetime.now()
        self._storage.save_site(site)

        duration = (datetime.now() - start_time).total_seconds()

        return SyncResult(
            run_id=self.run_id,
            site_id=self.site_id,
            success=True,
            stats=crawl_run.stats,
            changed_pages=self._changed_pages,
            deleted_pages=self._deleted_pages,
            duration_seconds=duration,
        )

    except Exception as e:
        logger.error("Sync job failed", error=str(e))
        return SyncResult(
            run_id=self.run_id,
            site_id=self.site_id,
            success=False,
            error=str(e),
            duration_seconds=(datetime.now() - start_time).total_seconds(),
        )

    finally:
        if self._fetcher:
            await self._fetcher.close()
        if self._storage:
            self._storage.close()