Bulk Operations

Process large numbers of records efficiently.

Batch Updates

Update contacts in batches to avoid timeouts:

async def batch_update_contacts(
    client,
    contact_ids: list[int],
    updates: dict,
    batch_size: int = 50,
):
    """Update contacts in batches."""
    for i in range(0, len(contact_ids), batch_size):
        batch = contact_ids[i:i + batch_size]

        await client.update(
            "Contact",
            values=updates,
            where=[["id", "IN", batch]],
        )

        print(f"Updated {min(i + batch_size, len(contact_ids))} of {len(contact_ids)}")

Page Through Results

Process all matching records with pagination:

async def process_all_contacts(client, process_func):
    """Process all contacts in pages."""
    offset = 0
    page_size = 100

    while True:
        response = await client.get(
            "Contact",
            select=["id", "display_name", "email_primary.email"],
            where=[["is_deleted", "=", False]],
            limit=page_size,
            offset=offset,
        )

        if not response.values:
            break

        for contact in response.values:
            await process_func(contact)

        offset += page_size
        print(f"Processed {offset} contacts")

Import with Error Handling

Track successes and failures during import:

from dataclasses import dataclass
from civicrm_py import CiviAPIError

@dataclass
class ImportResult:
    created: int = 0
    updated: int = 0
    errors: list = None

    def __post_init__(self):
        self.errors = self.errors or []

async def import_contacts(client, records: list[dict]) -> ImportResult:
    result = ImportResult()

    for record in records:
        try:
            response = await client.create("Contact", values=record)
            result.created += 1
        except CiviAPIError as e:
            result.errors.append({"record": record, "error": str(e)})

    return result