Source code for civicrm_py.core.client

"""CiviClient - Main entry point for CiviCRM API v4.

Provides both async and sync context-managed access to CiviCRM.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Self

from civicrm_py.core.auth import AuthProvider
from civicrm_py.core.config import CiviSettings
from civicrm_py.core.serialization import APIRequest, APIResponse
from civicrm_py.http.transport import AsyncTransport, SyncTransport

if TYPE_CHECKING:
    from types import TracebackType


[docs] class CiviClient: """Async client for CiviCRM API v4. Usage: async with CiviClient(settings) as client: response = await client.request("Contact", "get", {"limit": 10}) Or with environment variables: async with CiviClient.from_env() as client: response = await client.request("Contact", "get") """
[docs] def __init__( self, settings: CiviSettings | None = None, *, base_url: str | None = None, api_key: str | None = None, site_key: str | None = None, timeout: int = 30, verify_ssl: bool = True, debug: bool = False, max_retries: int = 3, ) -> None: """Initialize CiviClient. Either provide a CiviSettings instance or individual parameters. If neither settings nor base_url is provided, will attempt to load from environment variables. Args: settings: Pre-configured CiviSettings instance. base_url: CiviCRM API base URL. api_key: API key for authentication. site_key: Optional site key. timeout: Request timeout in seconds. verify_ssl: Whether to verify SSL certificates. debug: Enable debug logging. max_retries: Maximum retry attempts. """ if settings is not None: self._settings = settings elif base_url is not None: self._settings = CiviSettings( base_url=base_url, api_key=api_key, site_key=site_key, timeout=timeout, verify_ssl=verify_ssl, debug=debug, max_retries=max_retries, ) else: self._settings = CiviSettings.from_env() self._auth = AuthProvider.from_settings(self._settings) self._transport: AsyncTransport | None = None
[docs] @classmethod def from_env(cls) -> CiviClient: """Create client from environment variables. Returns: CiviClient configured from environment. """ return cls(settings=CiviSettings.from_env())
@property def settings(self) -> CiviSettings: """Get client settings.""" return self._settings async def _get_transport(self) -> AsyncTransport: """Get or create async transport.""" if self._transport is None: self._transport = AsyncTransport(self._settings, self._auth) return self._transport
[docs] async def request( self, entity: str, action: str, params: APIRequest | dict[str, Any] | None = None, ) -> APIResponse[dict[str, Any]]: """Make API request. Args: entity: CiviCRM entity name (e.g., 'Contact', 'Activity'). action: API action (e.g., 'get', 'create', 'delete'). params: Request parameters. Returns: API response with values and metadata. Raises: CiviConnectionError: On network error. CiviTimeoutError: On request timeout. CiviAuthError: On authentication failure. CiviAPIError: On API error response. """ transport = await self._get_transport() return await transport.request(entity, action, params)
[docs] async def get( self, entity: str, *, select: list[str] | None = None, where: list[list[Any]] | None = None, order_by: dict[str, str] | None = None, limit: int | None = None, offset: int | None = None, ) -> APIResponse[dict[str, Any]]: """Get entities with optional filtering. Args: entity: Entity name. select: Fields to return. where: Filter conditions. order_by: Sort order. limit: Max records. offset: Skip records. Returns: API response. """ params = APIRequest( select=select, where=where, orderBy=order_by, limit=limit, offset=offset, ) return await self.request(entity, "get", params)
[docs] async def create( self, entity: str, values: dict[str, Any], ) -> APIResponse[dict[str, Any]]: """Create a new entity. Args: entity: Entity name. values: Field values for the new entity. Returns: API response with created entity. """ params = APIRequest(values=values) return await self.request(entity, "create", params)
[docs] async def update( self, entity: str, values: dict[str, Any], where: list[list[Any]], ) -> APIResponse[dict[str, Any]]: """Update existing entities. Args: entity: Entity name. values: Field values to update. where: Filter to select entities to update. Returns: API response with updated entities. """ params = APIRequest(values=values, where=where) return await self.request(entity, "update", params)
[docs] async def delete( self, entity: str, where: list[list[Any]], ) -> APIResponse[dict[str, Any]]: """Delete entities. Args: entity: Entity name. where: Filter to select entities to delete. Returns: API response. """ params = APIRequest(where=where) return await self.request(entity, "delete", params)
[docs] async def get_fields( self, entity: str, ) -> APIResponse[dict[str, Any]]: """Get field metadata for an entity. Args: entity: Entity name. Returns: API response with field definitions. """ return await self.request(entity, "getFields", {})
[docs] async def close(self) -> None: """Close the client and release resources.""" if self._transport is not None: await self._transport.close() self._transport = None
[docs] async def __aenter__(self) -> Self: """Enter async context.""" return self
[docs] async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """Exit async context.""" await self.close()
[docs] class SyncCiviClient: """Sync client for CiviCRM API v4. Usage: with SyncCiviClient(settings) as client: response = client.request("Contact", "get", {"limit": 10}) """
[docs] def __init__( self, settings: CiviSettings | None = None, *, base_url: str | None = None, api_key: str | None = None, site_key: str | None = None, timeout: int = 30, verify_ssl: bool = True, debug: bool = False, max_retries: int = 3, ) -> None: """Initialize SyncCiviClient. Args: settings: Pre-configured CiviSettings instance. base_url: CiviCRM API base URL. api_key: API key for authentication. site_key: Optional site key. timeout: Request timeout in seconds. verify_ssl: Whether to verify SSL certificates. debug: Enable debug logging. max_retries: Maximum retry attempts. """ if settings is not None: self._settings = settings elif base_url is not None: self._settings = CiviSettings( base_url=base_url, api_key=api_key, site_key=site_key, timeout=timeout, verify_ssl=verify_ssl, debug=debug, max_retries=max_retries, ) else: self._settings = CiviSettings.from_env() self._auth = AuthProvider.from_settings(self._settings) self._transport: SyncTransport | None = None
[docs] @classmethod def from_env(cls) -> SyncCiviClient: """Create client from environment variables.""" return cls(settings=CiviSettings.from_env())
@property def settings(self) -> CiviSettings: """Get client settings.""" return self._settings def _get_transport(self) -> SyncTransport: """Get or create sync transport.""" if self._transport is None: self._transport = SyncTransport(self._settings, self._auth) return self._transport
[docs] def request( self, entity: str, action: str, params: APIRequest | dict[str, Any] | None = None, ) -> APIResponse[dict[str, Any]]: """Make API request.""" transport = self._get_transport() return transport.request(entity, action, params)
[docs] def get( self, entity: str, *, select: list[str] | None = None, where: list[list[Any]] | None = None, order_by: dict[str, str] | None = None, limit: int | None = None, offset: int | None = None, ) -> APIResponse[dict[str, Any]]: """Get entities with optional filtering.""" params = APIRequest( select=select, where=where, orderBy=order_by, limit=limit, offset=offset, ) return self.request(entity, "get", params)
[docs] def create( self, entity: str, values: dict[str, Any], ) -> APIResponse[dict[str, Any]]: """Create a new entity.""" params = APIRequest(values=values) return self.request(entity, "create", params)
[docs] def update( self, entity: str, values: dict[str, Any], where: list[list[Any]], ) -> APIResponse[dict[str, Any]]: """Update existing entities.""" params = APIRequest(values=values, where=where) return self.request(entity, "update", params)
[docs] def delete( self, entity: str, where: list[list[Any]], ) -> APIResponse[dict[str, Any]]: """Delete entities.""" params = APIRequest(where=where) return self.request(entity, "delete", params)
[docs] def get_fields( self, entity: str, ) -> APIResponse[dict[str, Any]]: """Get field metadata for an entity.""" return self.request(entity, "getFields", {})
[docs] def close(self) -> None: """Close the client and release resources.""" if self._transport is not None: self._transport.close() self._transport = None
[docs] def __enter__(self) -> Self: """Enter sync context.""" return self
[docs] def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """Exit sync context.""" self.close()
__all__ = [ "CiviClient", "SyncCiviClient", ]