Skip to content

Route builders

The four route builders return an APIRoute subclass to pass as route_class= to an APIRouter. Endpoints registered on that router gain .delay/.options (delayed) or .scheduler (scheduled) methods.

fastapi_gcp_tasks.delayed_route.DelayedRouteBuilder

DelayedRouteBuilder(
    *,
    base_url: str,
    queue_path: str,
    task_create_timeout: float = 10.0,
    pre_create_hook: DelayedTaskHook | None = None,
    client: CloudTasksClient | None = None,
    auto_create_queue: bool = True,
) -> type[APIRoute]

Returns a Mixin that should be used to override route_class.

It adds a .delay and .options methods to the original endpoint.

Example:
  delayed_router = APIRouter(route_class=DelayedRouteBuilder(...), prefix="/delayed")

  class UserData(BaseModel):
      name: str

  @delayed_router.post("/on_user_create/{user_id}")
  @as_delayed_task  # optional: makes .delay visible to type checkers
  def on_user_create(user_id: str, data: UserData):
      # do work here
      # Return values are meaningless

  # Call .delay to trigger
  on_user_create.delay(user_id="007", data=UserData(name="Piyush"))

  app.include_router(delayed_router)
Source code in fastapi_gcp_tasks/delayed_route.py
def DelayedRouteBuilder(  # noqa: N802
    *,
    base_url: str,
    queue_path: str,
    task_create_timeout: float = 10.0,
    pre_create_hook: DelayedTaskHook | None = None,
    client: tasks_v2.CloudTasksClient | None = None,
    auto_create_queue: bool = True,
) -> type[APIRoute]:
    """
    Returns a Mixin that should be used to override route_class.

    It adds a .delay and .options methods to the original endpoint.

    Example:
    -------
    ```
      delayed_router = APIRouter(route_class=DelayedRouteBuilder(...), prefix="/delayed")

      class UserData(BaseModel):
          name: str

      @delayed_router.post("/on_user_create/{user_id}")
      @as_delayed_task  # optional: makes .delay visible to type checkers
      def on_user_create(user_id: str, data: UserData):
          # do work here
          # Return values are meaningless

      # Call .delay to trigger
      on_user_create.delay(user_id="007", data=UserData(name="Piyush"))

      app.include_router(delayed_router)
    ```

    """
    task_client = client if client is not None else tasks_v2.CloudTasksClient()
    hook: DelayedTaskHook = pre_create_hook if pre_create_hook is not None else noop_hook

    if auto_create_queue:
        ensure_queue(client=task_client, path=queue_path)

    class TaskRouteMixin(APIRoute):
        def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
            original_route_handler = super().get_route_handler()
            self.endpoint.options = self.delay_options  # type: ignore[attr-defined]
            self.endpoint.delay = self.delay  # type: ignore[attr-defined]
            return original_route_handler

        def delay_options(self, **options: Unpack[DelayOptions]) -> Delayer:
            ensure_known_options(options, DelayOptions)
            opts: DelayOptions = {}
            endpoint_defaults = getattr(self.endpoint, "_delay_options", None)
            if endpoint_defaults:
                opts.update(endpoint_defaults)
            opts.update(options)

            # An async client here (e.g. via task_default_options) would make
            # create_task return a never-awaited coroutine — fail fast instead.
            resolved_client = opts.get("client", task_client)
            if not isinstance(resolved_client, tasks_v2.CloudTasksClient):
                raise TypeError(
                    f"DelayedRouteBuilder requires a CloudTasksClient; got {type(resolved_client).__name__}. "
                    "Use AsyncDelayedRouteBuilder for CloudTasksAsyncClient."
                )

            return Delayer(
                route=self,
                base_url=opts.get("base_url", base_url),
                queue_path=opts.get("queue_path", queue_path),
                task_create_timeout=opts.get("task_create_timeout", task_create_timeout),
                client=resolved_client,
                pre_create_hook=opts.get("pre_create_hook", hook),
                countdown=opts.get("countdown", 0),
                task_id=opts.get("task_id"),
            )

        def delay(self, **kwargs: Any) -> tasks_v2.Task:
            return self.delay_options().delay(**kwargs)

    return TaskRouteMixin

fastapi_gcp_tasks.async_delayed_route.AsyncDelayedRouteBuilder

AsyncDelayedRouteBuilder(
    *,
    base_url: str,
    queue_path: str,
    task_create_timeout: float = 10.0,
    pre_create_hook: DelayedTaskHook | None = None,
    client: CloudTasksAsyncClient
    | AsyncCloudTasksClientFactory
    | None = None,
    auto_create_queue: bool = False,
) -> type[APIRoute]

Returns a Mixin that should be used to override route_class, with an awaitable .delay.

It adds awaitable .delay and sync .options methods to the original endpoint.

client may be a CloudTasksAsyncClient, a zero-argument factory returning one, or None (default client). grpc.aio channels bind to the event loop active at construction, so the client is resolved lazily on the first awaited .delay() inside the running loop; pass a factory if your client needs custom construction (e.g. the emulator).

Unlike DelayedRouteBuilder, auto_create_queue defaults to False so no unexpected RPCs run inside request handlers. Either pass auto_create_queue=True to lazily ensure the queue on the first .delay(), or call fastapi_gcp_tasks.utils.ensure_queue_async from your FastAPI lifespan (recommended).

Example:
  async_delayed_router = APIRouter(route_class=AsyncDelayedRouteBuilder(...), prefix="/delayed")

  class UserData(BaseModel):
      name: str

  @async_delayed_router.post("/on_user_create/{user_id}")
  @as_async_delayed_task  # optional: makes .delay visible to type checkers
  async def on_user_create(user_id: str, data: UserData):
      # do work here
      # Return values are meaningless

  # Await .delay to trigger
  await on_user_create.delay(user_id="007", data=UserData(name="Piyush"))

  app.include_router(async_delayed_router)
Source code in fastapi_gcp_tasks/async_delayed_route.py
def AsyncDelayedRouteBuilder(  # noqa: N802
    *,
    base_url: str,
    queue_path: str,
    task_create_timeout: float = 10.0,
    pre_create_hook: DelayedTaskHook | None = None,
    client: tasks_v2.CloudTasksAsyncClient | AsyncCloudTasksClientFactory | None = None,
    auto_create_queue: bool = False,
) -> type[APIRoute]:
    """
    Returns a Mixin that should be used to override route_class, with an awaitable .delay.

    It adds awaitable .delay and sync .options methods to the original endpoint.

    ``client`` may be a CloudTasksAsyncClient, a zero-argument factory returning one,
    or None (default client). grpc.aio channels bind to the event loop active at
    construction, so the client is resolved lazily on the first awaited ``.delay()``
    inside the running loop; pass a factory if your client needs custom construction
    (e.g. the emulator).

    Unlike DelayedRouteBuilder, ``auto_create_queue`` defaults to False so no
    unexpected RPCs run inside request handlers. Either pass
    ``auto_create_queue=True`` to lazily ensure the queue on the first
    ``.delay()``, or call ``fastapi_gcp_tasks.utils.ensure_queue_async`` from your
    FastAPI lifespan (recommended).

    Example:
    -------
    ```
      async_delayed_router = APIRouter(route_class=AsyncDelayedRouteBuilder(...), prefix="/delayed")

      class UserData(BaseModel):
          name: str

      @async_delayed_router.post("/on_user_create/{user_id}")
      @as_async_delayed_task  # optional: makes .delay visible to type checkers
      async def on_user_create(user_id: str, data: UserData):
          # do work here
          # Return values are meaningless

      # Await .delay to trigger
      await on_user_create.delay(user_id="007", data=UserData(name="Piyush"))

      app.include_router(async_delayed_router)
    ```

    """
    hook: DelayedTaskHook = pre_create_hook if pre_create_hook is not None else noop_hook

    client_provider = AsyncCloudTasksClientProvider(client=client, auto_create_queue=auto_create_queue)

    class AsyncTaskRouteMixin(APIRoute):
        def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
            original_route_handler = super().get_route_handler()
            self.endpoint.options = self.delay_options  # type: ignore[attr-defined]
            self.endpoint.delay = self.delay  # type: ignore[attr-defined]
            return original_route_handler

        def delay_options(self, **options: Unpack[AsyncDelayOptions]) -> AsyncDelayer:
            ensure_known_options(options, AsyncDelayOptions)
            opts: AsyncDelayOptions = {}
            endpoint_defaults = getattr(self.endpoint, "_delay_options", None)
            if endpoint_defaults:
                opts.update(endpoint_defaults)
            opts.update(options)

            # A per-call client override gets its own one-off provider
            provider = client_provider
            if "client" in opts:
                provider = AsyncCloudTasksClientProvider(
                    client=opts["client"],
                    auto_create_queue=auto_create_queue,
                )

            return AsyncDelayer(
                route=self,
                base_url=opts.get("base_url", base_url),
                queue_path=opts.get("queue_path", queue_path),
                task_create_timeout=opts.get("task_create_timeout", task_create_timeout),
                client_provider=provider,
                pre_create_hook=opts.get("pre_create_hook", hook),
                countdown=opts.get("countdown", 0),
                task_id=opts.get("task_id"),
            )

        async def delay(self, **kwargs: Any) -> tasks_v2.Task:
            return await self.delay_options().delay(**kwargs)

    return AsyncTaskRouteMixin

fastapi_gcp_tasks.scheduled_route.ScheduledRouteBuilder

ScheduledRouteBuilder(
    *,
    base_url: str,
    location_path: str,
    job_create_timeout: float = 10.0,
    pre_create_hook: ScheduledHook | None = None,
    client: CloudSchedulerClient | None = None,
) -> type[APIRoute]

Returns a Mixin that should be used to override route_class.

It adds a .scheduler method to the original endpoint.

Example:
scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...), prefix="/scheduled")

@scheduled_router.get("/simple_scheduled_task")
@as_scheduled_task  # optional: makes .scheduler visible to type checkers
def simple_scheduled_task():
    # Do work here

simple_scheduled_task.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()

app.include_router(scheduled_router)
Source code in fastapi_gcp_tasks/scheduled_route.py
def ScheduledRouteBuilder(  # noqa: N802
    *,
    base_url: str,
    location_path: str,
    job_create_timeout: float = 10.0,
    pre_create_hook: ScheduledHook | None = None,
    client: scheduler_v1.CloudSchedulerClient | None = None,
) -> type[APIRoute]:
    """
    Returns a Mixin that should be used to override route_class.

    It adds a .scheduler method to the original endpoint.

    Example:
    -------
    ```
    scheduled_router = APIRouter(route_class=ScheduledRouteBuilder(...), prefix="/scheduled")

    @scheduled_router.get("/simple_scheduled_task")
    @as_scheduled_task  # optional: makes .scheduler visible to type checkers
    def simple_scheduled_task():
        # Do work here

    simple_scheduled_task.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()

    app.include_router(scheduled_router)
    ```

    """
    scheduler_client = client if client is not None else scheduler_v1.CloudSchedulerClient()
    hook: ScheduledHook = pre_create_hook if pre_create_hook is not None else noop_hook

    class ScheduledRouteMixin(APIRoute):
        def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
            original_route_handler = super().get_route_handler()
            self.endpoint.scheduler = self.scheduler_options  # type: ignore[attr-defined]
            return original_route_handler

        def scheduler_options(self, *, name: str, schedule: str, **options: Unpack[SchedulerOptions]) -> Scheduler:
            ensure_known_options(options, SchedulerOptions)

            # An async client here would make create_job return a never-awaited
            # coroutine — fail fast instead.
            resolved_client = options.get("client", scheduler_client)
            if not isinstance(resolved_client, scheduler_v1.CloudSchedulerClient):
                raise TypeError(
                    f"ScheduledRouteBuilder requires a CloudSchedulerClient; got {type(resolved_client).__name__}. "
                    "Use AsyncScheduledRouteBuilder for CloudSchedulerAsyncClient."
                )

            return Scheduler(
                route=self,
                base_url=options.get("base_url", base_url),
                location_path=options.get("location_path", location_path),
                schedule=schedule,
                client=resolved_client,
                pre_create_hook=options.get("pre_create_hook", hook),
                name=name,
                job_create_timeout=options.get("job_create_timeout", job_create_timeout),
                retry_config=options.get("retry_config"),
                time_zone=options.get("time_zone", "UTC"),
                force=options.get("force", False),
            )

    return ScheduledRouteMixin

fastapi_gcp_tasks.async_scheduled_route.AsyncScheduledRouteBuilder

AsyncScheduledRouteBuilder(
    *,
    base_url: str,
    location_path: str,
    job_create_timeout: float = 10.0,
    pre_create_hook: ScheduledHook | None = None,
    client: CloudSchedulerAsyncClient
    | AsyncCloudSchedulerClientFactory
    | None = None,
) -> type[APIRoute]

Returns a Mixin that should be used to override route_class, with an awaitable scheduler.

It adds a .scheduler method to the original endpoint whose .schedule and .delete coroutines must be awaited. Unlike ScheduledRouteBuilder, .schedule() cannot run at module import time — await it from a FastAPI lifespan or a request handler.

client may be a CloudSchedulerAsyncClient, a zero-argument factory returning one, or None (default client). grpc.aio channels bind to the event loop active at construction, so the client is resolved lazily on the first awaited call; pass a factory if your client needs custom construction.

Example:
async_scheduled_router = APIRouter(route_class=AsyncScheduledRouteBuilder(...), prefix="/scheduled")

@async_scheduled_router.get("/simple_scheduled_task")
@as_async_scheduled_task  # optional: makes .scheduler visible to type checkers
async def simple_scheduled_task():
    # Do work here

@asynccontextmanager
async def lifespan(app: FastAPI):
    await simple_scheduled_task.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()
    yield

app.include_router(async_scheduled_router)
Source code in fastapi_gcp_tasks/async_scheduled_route.py
def AsyncScheduledRouteBuilder(  # noqa: N802
    *,
    base_url: str,
    location_path: str,
    job_create_timeout: float = 10.0,
    pre_create_hook: ScheduledHook | None = None,
    client: scheduler_v1.CloudSchedulerAsyncClient | AsyncCloudSchedulerClientFactory | None = None,
) -> type[APIRoute]:
    """
    Returns a Mixin that should be used to override route_class, with an awaitable scheduler.

    It adds a .scheduler method to the original endpoint whose .schedule and .delete
    coroutines must be awaited. Unlike ScheduledRouteBuilder, .schedule() cannot run at
    module import time — await it from a FastAPI lifespan or a request handler.

    ``client`` may be a CloudSchedulerAsyncClient, a zero-argument factory returning
    one, or None (default client). grpc.aio channels bind to the event loop active at
    construction, so the client is resolved lazily on the first awaited call; pass a
    factory if your client needs custom construction.

    Example:
    -------
    ```
    async_scheduled_router = APIRouter(route_class=AsyncScheduledRouteBuilder(...), prefix="/scheduled")

    @async_scheduled_router.get("/simple_scheduled_task")
    @as_async_scheduled_task  # optional: makes .scheduler visible to type checkers
    async def simple_scheduled_task():
        # Do work here

    @asynccontextmanager
    async def lifespan(app: FastAPI):
        await simple_scheduled_task.scheduler(name="simple_scheduled_task", schedule="* * * * *").schedule()
        yield

    app.include_router(async_scheduled_router)
    ```

    """
    hook: ScheduledHook = pre_create_hook if pre_create_hook is not None else noop_hook

    # One provider per builder so every scheduler reuses the same client and channel
    client_provider = AsyncClientProvider(client=client, client_cls=scheduler_v1.CloudSchedulerAsyncClient)

    class AsyncScheduledRouteMixin(APIRoute):
        def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
            original_route_handler = super().get_route_handler()
            self.endpoint.scheduler = self.scheduler_options  # type: ignore[attr-defined]
            return original_route_handler

        def scheduler_options(
            self, *, name: str, schedule: str, **options: Unpack[AsyncSchedulerOptions]
        ) -> AsyncScheduler:
            ensure_known_options(options, AsyncSchedulerOptions)
            # A per-call client override gets its own one-off provider
            provider = client_provider
            if "client" in options:
                provider = AsyncClientProvider(
                    client=options["client"],
                    client_cls=scheduler_v1.CloudSchedulerAsyncClient,
                )

            return AsyncScheduler(
                route=self,
                base_url=options.get("base_url", base_url),
                location_path=options.get("location_path", location_path),
                schedule=schedule,
                client_provider=provider,
                pre_create_hook=options.get("pre_create_hook", hook),
                name=name,
                job_create_timeout=options.get("job_create_timeout", job_create_timeout),
                retry_config=options.get("retry_config"),
                time_zone=options.get("time_zone", "UTC"),
                force=options.get("force", False),
            )

    return AsyncScheduledRouteMixin

fastapi_gcp_tasks.decorators.task_default_options

task_default_options(
    **options: Unpack[DelayOptions],
) -> Callable[[F], F]
task_default_options(
    **options: Unpack[AsyncDelayOptions],
) -> Callable[[F], F]
task_default_options(**options: Any) -> Callable[[F], F]

Wrapper to set default options for a cloud task.

Source code in fastapi_gcp_tasks/decorators.py
def task_default_options(**options: Any) -> Callable[[F], F]:
    """Wrapper to set default options for a cloud task."""
    ensure_known_options(options, DelayOptions)

    def wrapper(fn: F) -> F:
        # Stored on the function object; read back by the route builders when
        # constructing a delayer for this endpoint.
        fn._delay_options = options  # type: ignore[attr-defined]
        return fn

    return wrapper