Skip to content

Delayers & schedulers

These are the objects returned by .options() and .scheduler() on task endpoints. You rarely construct them directly.

fastapi_gcp_tasks.delayer

BaseDelayer

BaseDelayer(
    *,
    route: APIRoute,
    base_url: str,
    queue_path: str,
    pre_create_hook: DelayedTaskHook,
    task_create_timeout: float = 10.0,
    countdown: int = 0,
    task_id: str | None = None,
)

Bases: Requester

Shared logic to build Cloud Tasks requests from FastAPI routes.

Attributes:

Name Type Description
queue_path (str) The path to the Cloud Tasks queue.

countdown (int): The delay in seconds before the task is executed. task_create_timeout (float): Timeout for creating the task. task_id (str): The unique identifier for the task. method (tasks_v2.HttpMethod): The HTTP method for the task. pre_create_hook (DelayedTaskHook): Hook to be called before creating the task.

Source code in fastapi_gcp_tasks/delayer.py
def __init__(
    self,
    *,
    route: APIRoute,
    base_url: str,
    queue_path: str,
    pre_create_hook: DelayedTaskHook,
    task_create_timeout: float = 10.0,
    countdown: int = 0,
    task_id: str | None = None,
) -> None:
    super().__init__(route=route, base_url=base_url)
    self.queue_path = queue_path
    self.countdown = countdown
    self.task_create_timeout = task_create_timeout

    self.task_id = task_id
    self.method = _task_method(route.methods)
    self.pre_create_hook = pre_create_hook

Delayer

Delayer(
    *,
    route: APIRoute,
    base_url: str,
    queue_path: str,
    client: CloudTasksClient,
    pre_create_hook: DelayedTaskHook,
    task_create_timeout: float = 10.0,
    countdown: int = 0,
    task_id: str | None = None,
)

Bases: BaseDelayer

A class to delay HTTP requests as tasks on Google Cloud Tasks.

See BaseDelayer for the shared attributes.

Attributes:

Name Type Description
client (tasks_v2.CloudTasksClient) The Cloud Tasks client.
Source code in fastapi_gcp_tasks/delayer.py
def __init__(
    self,
    *,
    route: APIRoute,
    base_url: str,
    queue_path: str,
    client: tasks_v2.CloudTasksClient,
    pre_create_hook: DelayedTaskHook,
    task_create_timeout: float = 10.0,
    countdown: int = 0,
    task_id: str | None = None,
) -> None:
    super().__init__(
        route=route,
        base_url=base_url,
        queue_path=queue_path,
        pre_create_hook=pre_create_hook,
        task_create_timeout=task_create_timeout,
        countdown=countdown,
        task_id=task_id,
    )
    self.client = client

delay

delay(**kwargs: Any) -> tasks_v2.Task

Delay a task on Cloud Tasks.

Source code in fastapi_gcp_tasks/delayer.py
def delay(self, **kwargs: Any) -> tasks_v2.Task:
    """Delay a task on Cloud Tasks."""
    request = self._build_create_task_request(values=kwargs)
    return self.client.create_task(request=request, timeout=self.task_create_timeout)

fastapi_gcp_tasks.async_delayer

AsyncCloudTasksClientProvider

AsyncCloudTasksClientProvider(
    *,
    client: CloudTasksAsyncClient
    | AsyncCloudTasksClientFactory
    | None,
    auto_create_queue: bool = False,
)

Bases: AsyncClientProvider[CloudTasksAsyncClient]

Lazily resolves and caches a CloudTasksAsyncClient inside the running event loop.

If auto_create_queue is True, each queue path requested via get_for_queue is ensured exactly once before the client is handed out; a failed ensure is retried on the next call while the already-resolved client stays cached.

Attributes:

Name Type Description
auto_create_queue (bool) Whether to ensure queues exist on first use.
Source code in fastapi_gcp_tasks/async_delayer.py
def __init__(
    self,
    *,
    client: tasks_v2.CloudTasksAsyncClient | AsyncCloudTasksClientFactory | None,
    auto_create_queue: bool = False,
) -> None:
    super().__init__(client=client, client_cls=tasks_v2.CloudTasksAsyncClient)
    self.auto_create_queue = auto_create_queue
    self._ensured_queues: set[str] = set()
    self._ensure_lock = asyncio.Lock()

get_for_queue async

get_for_queue(
    *, queue_path: str
) -> tasks_v2.CloudTasksAsyncClient

Return the cached client, ensuring the given queue exists first if configured.

Source code in fastapi_gcp_tasks/async_delayer.py
async def get_for_queue(self, *, queue_path: str) -> tasks_v2.CloudTasksAsyncClient:
    """Return the cached client, ensuring the given queue exists first if configured."""
    client = await self.get()
    if self.auto_create_queue and queue_path not in self._ensured_queues:
        async with self._ensure_lock:
            if queue_path not in self._ensured_queues:
                await ensure_queue_async(client=client, path=queue_path)
                self._ensured_queues.add(queue_path)
    return client

AsyncDelayer

AsyncDelayer(
    *,
    route: APIRoute,
    base_url: str,
    queue_path: str,
    client_provider: AsyncCloudTasksClientProvider,
    pre_create_hook: DelayedTaskHook,
    task_create_timeout: float = 10.0,
    countdown: int = 0,
    task_id: str | None = None,
)

Bases: BaseDelayer

A class to delay HTTP requests as tasks on Google Cloud Tasks, using an async client.

See BaseDelayer for the shared attributes.

Attributes:

Name Type Description
client_provider (AsyncCloudTasksClientProvider) Lazy provider for the async client.
Source code in fastapi_gcp_tasks/async_delayer.py
def __init__(
    self,
    *,
    route: APIRoute,
    base_url: str,
    queue_path: str,
    client_provider: AsyncCloudTasksClientProvider,
    pre_create_hook: DelayedTaskHook,
    task_create_timeout: float = 10.0,
    countdown: int = 0,
    task_id: str | None = None,
) -> None:
    super().__init__(
        route=route,
        base_url=base_url,
        queue_path=queue_path,
        pre_create_hook=pre_create_hook,
        task_create_timeout=task_create_timeout,
        countdown=countdown,
        task_id=task_id,
    )
    self.client_provider = client_provider

delay async

delay(**kwargs: Any) -> tasks_v2.Task

Delay a task on Cloud Tasks without blocking the event loop.

Source code in fastapi_gcp_tasks/async_delayer.py
async def delay(self, **kwargs: Any) -> tasks_v2.Task:
    """Delay a task on Cloud Tasks without blocking the event loop."""
    client = await self.client_provider.get_for_queue(queue_path=self.queue_path)
    request = self._build_create_task_request(values=kwargs)
    return await client.create_task(request=request, timeout=self.task_create_timeout)

fastapi_gcp_tasks.scheduler

BaseScheduler

BaseScheduler(
    *,
    route: APIRoute,
    base_url: str,
    location_path: str,
    schedule: str,
    pre_create_hook: ScheduledHook,
    name: str = "",
    job_create_timeout: float = 10.0,
    retry_config: RetryConfig | None = None,
    time_zone: str = "UTC",
    force: bool = False,
)

Bases: Requester

Shared logic to build Cloud Scheduler job requests from FastAPI routes.

Attributes:

Name Type Description
retry_config (scheduler_v1.RetryConfig) Configuration for retrying failed jobs.

job_id (str): The unique identifier for the job. time_zone (str): The time zone for the job schedule. location_path (str): The location path for the job. cron_schedule (str): The cron schedule for the job. job_create_timeout (float): Timeout for creating the job. method (scheduler_v1.HttpMethod): The HTTP method for the job. pre_create_hook (ScheduledHook): Hook to be called before creating the job. force (bool): Whether to force create the job if it already exists.

Source code in fastapi_gcp_tasks/scheduler.py
def __init__(
    self,
    *,
    route: APIRoute,
    base_url: str,
    location_path: str,
    schedule: str,
    pre_create_hook: ScheduledHook,
    name: str = "",
    job_create_timeout: float = 10.0,
    retry_config: scheduler_v1.RetryConfig | None = None,
    time_zone: str = "UTC",
    force: bool = False,
) -> None:
    super().__init__(route=route, base_url=base_url)
    if not name:
        name = route.unique_id

    if retry_config is None:
        retry_config = scheduler_v1.RetryConfig(
            retry_count=5,
            max_retry_duration=duration_pb2.Duration(seconds=0),
            min_backoff_duration=duration_pb2.Duration(seconds=5),
            max_backoff_duration=duration_pb2.Duration(seconds=120),
            max_doublings=5,
        )

    self.retry_config = retry_config
    # Path helpers are staticmethods shared by the sync and async clients,
    # so we don't need a client instance here.
    location_parts = scheduler_v1.CloudSchedulerClient.parse_common_location_path(location_path)

    self.job_id = scheduler_v1.CloudSchedulerClient.job_path(job=name, **location_parts)
    self.time_zone = time_zone

    self.location_path = location_path
    self.cron_schedule = schedule
    self.job_create_timeout = job_create_timeout

    self.method = _scheduler_method(route.methods)
    self.pre_create_hook = pre_create_hook
    self.force = force

Scheduler

Scheduler(
    *,
    route: APIRoute,
    base_url: str,
    location_path: str,
    schedule: str,
    client: CloudSchedulerClient,
    pre_create_hook: ScheduledHook,
    name: str = "",
    job_create_timeout: float = 10.0,
    retry_config: RetryConfig | None = None,
    time_zone: str = "UTC",
    force: bool = False,
)

Bases: BaseScheduler

A class to schedule HTTP requests as jobs on Google Cloud Scheduler.

See BaseScheduler for the shared attributes.

Attributes:

Name Type Description
client (scheduler_v1.CloudSchedulerClient) The Cloud Scheduler client.
Source code in fastapi_gcp_tasks/scheduler.py
def __init__(
    self,
    *,
    route: APIRoute,
    base_url: str,
    location_path: str,
    schedule: str,
    client: scheduler_v1.CloudSchedulerClient,
    pre_create_hook: ScheduledHook,
    name: str = "",
    job_create_timeout: float = 10.0,
    retry_config: scheduler_v1.RetryConfig | None = None,
    time_zone: str = "UTC",
    force: bool = False,
) -> None:
    super().__init__(
        route=route,
        base_url=base_url,
        location_path=location_path,
        schedule=schedule,
        pre_create_hook=pre_create_hook,
        name=name,
        job_create_timeout=job_create_timeout,
        retry_config=retry_config,
        time_zone=time_zone,
        force=force,
    )
    self.client = client

schedule

schedule(**kwargs: Any) -> None

Schedule a job on Cloud Scheduler.

Source code in fastapi_gcp_tasks/scheduler.py
def schedule(self, **kwargs: Any) -> None:
    """Schedule a job on Cloud Scheduler."""
    request = self._build_create_job_request(values=kwargs)

    if self.force or self._has_changed(request=request):
        # Delete and create job
        self.delete()
        self.client.create_job(request=request, timeout=self.job_create_timeout)

delete

delete() -> bool | Exception

Delete the job from the scheduler if it exists.

Source code in fastapi_gcp_tasks/scheduler.py
def delete(self) -> bool | Exception:
    """Delete the job from the scheduler if it exists."""
    # We return true or exception because you could have the delete code on multiple instances
    try:
        self.client.delete_job(name=self.job_id, timeout=self.job_create_timeout)
        return True
    # TODO: replace this with a more specific exception. we may also just raise the exception here?
    except Exception as ex:  # noqa: BLE001
        return ex

fastapi_gcp_tasks.async_scheduler

AsyncScheduler

AsyncScheduler(
    *,
    route: APIRoute,
    base_url: str,
    location_path: str,
    schedule: str,
    pre_create_hook: ScheduledHook,
    client_provider: AsyncClientProvider[
        CloudSchedulerAsyncClient
    ],
    name: str = "",
    job_create_timeout: float = 10.0,
    retry_config: RetryConfig | None = None,
    time_zone: str = "UTC",
    force: bool = False,
)

Bases: BaseScheduler

A class to schedule HTTP requests as jobs on Google Cloud Scheduler, using an async client.

See BaseScheduler for the shared attributes.

Attributes:

Name Type Description
client_provider (AsyncClientProvider) Lazy provider for the async client. Share one

provider across schedulers so they reuse the same client and gRPC channel.

Source code in fastapi_gcp_tasks/async_scheduler.py
def __init__(
    self,
    *,
    route: APIRoute,
    base_url: str,
    location_path: str,
    schedule: str,
    pre_create_hook: ScheduledHook,
    client_provider: AsyncClientProvider[scheduler_v1.CloudSchedulerAsyncClient],
    name: str = "",
    job_create_timeout: float = 10.0,
    retry_config: scheduler_v1.RetryConfig | None = None,
    time_zone: str = "UTC",
    force: bool = False,
) -> None:
    super().__init__(
        route=route,
        base_url=base_url,
        location_path=location_path,
        schedule=schedule,
        pre_create_hook=pre_create_hook,
        name=name,
        job_create_timeout=job_create_timeout,
        retry_config=retry_config,
        time_zone=time_zone,
        force=force,
    )
    self.client_provider = client_provider

schedule async

schedule(**kwargs: Any) -> None

Schedule a job on Cloud Scheduler without blocking the event loop.

Source code in fastapi_gcp_tasks/async_scheduler.py
async def schedule(self, **kwargs: Any) -> None:
    """Schedule a job on Cloud Scheduler without blocking the event loop."""
    request = self._build_create_job_request(values=kwargs)

    if self.force or await self._has_changed(request=request):
        # Delete and create job
        await self.delete()
        client = await self.client_provider.get()
        await client.create_job(request=request, timeout=self.job_create_timeout)

delete async

delete() -> bool | Exception

Delete the job from the scheduler if it exists.

Source code in fastapi_gcp_tasks/async_scheduler.py
async def delete(self) -> bool | Exception:
    """Delete the job from the scheduler if it exists."""
    # We return true or exception because you could have the delete code on multiple instances
    try:
        client = await self.client_provider.get()
        await client.delete_job(name=self.job_id, timeout=self.job_create_timeout)
        return True
    # TODO: replace this with a more specific exception. we may also just raise the exception here?
    except Exception as ex:  # noqa: BLE001
        return ex

fastapi_gcp_tasks.async_clients

AsyncClientProvider

AsyncClientProvider(
    *,
    client: ClientT | Callable[[], ClientT] | None,
    client_cls: type[ClientT],
)

Bases: Generic[ClientT]

Lazily resolves and caches an async gRPC client inside the running event loop.

grpc.aio channels bind to the event loop active at construction, so the client (or client factory) is only resolved on first use, from within the loop that will await the RPCs. One provider should be shared per route builder so all calls reuse the same client and channel.

Source code in fastapi_gcp_tasks/async_clients.py
def __init__(
    self,
    *,
    client: "ClientT | Callable[[], ClientT] | None",
    client_cls: type[ClientT],
) -> None:
    self._client_or_factory = client
    self._client_cls = client_cls
    self._client: ClientT | None = None
    self._lock = asyncio.Lock()

get async

get() -> ClientT

Return the cached client, resolving it on first call.

Source code in fastapi_gcp_tasks/async_clients.py
async def get(self) -> ClientT:
    """Return the cached client, resolving it on first call."""
    if self._client is not None:
        return self._client
    async with self._lock:
        if self._client is None:
            self._client = self._resolve()
        return self._client