Skip to content

from reactivity.async_primitives import *

The module reactivity.async_primitives contains the async counterparts of HMR's core reactive primitives. The dependency model is the same as the sync API, but actual work is scheduled onto an async runtime instead of running to completion inline.

What Lives Here

  • AsyncEffect / async_effect for async side effects
  • AsyncDerived / async_derived for cached async computations
  • TaskFactory / default_task_factory for integrating with an event loop or task supervisor

Mental Model

  • async_effect behaves like effect, except each trigger schedules an awaitable task instead of finishing immediately on the current stack.
  • async_derived behaves like derived, except you read it with await value().
  • Dirty async derived values are still lazy. If nothing is pulling them, they stay dirty until the next read.
  • If an effect or some other non-derived subscriber is pulling an async derived value, invalidation can eagerly schedule a rerun.
  • Internally, the current reactive context is forked before the coroutine runs, so dependency tracking still works across the async boundary.

Runtime Support

default_task_factory is intentionally small in scope. In the current implementation it:

  • uses asyncio.ensure_future(...) on asyncio
  • spawns a Trio system task on trio
  • falls back to asyncio.ensure_future(...) on emscripten
  • raises AsyncLibraryNotFoundError for other async libraries

With the default task factory, create async primitives inside an active asyncio / trio runtime. If your application already owns task lifetime through asyncio.TaskGroup or a Trio nursery, pass a custom task_factory.

A TaskFactory receives a zero-argument async function and must schedule it, returning an awaitable for the eventual result.

Basic Example

from anyio import sleep
from reactivity import signal, async_derived, async_effect

count = signal(1)

async def main():
    @async_derived
    async def doubled():
        await sleep(0.01)
        return count.get() * 2

    @async_effect
    async def printer():
        print(await doubled())

    await sleep(0.03)
    count.set(2)
    await sleep(0.03)

In IPython or a notebook, run await main(). In a script, the release notes pattern is from anyio import run; run(main).

Creating printer schedules an initial run by default. Later, count.set(2) marks doubled dirty and schedules the effect again, so this example prints 2 and then 4.

Because the work is asynchronous, you usually observe the rerun only after control returns to the event loop.

Custom Task Ownership

from asyncio import TaskGroup, sleep
from reactivity import async_effect


async def main():
    async with TaskGroup() as tg:

        @async_effect(
            task_factory=lambda fn: tg.create_task(fn()),
            call_immediately=False,
        )
        async def printer():
            await sleep(0.01)
            print("tick")

        await printer()  # manually trigger the first run
        await sleep(0.02)

Use this pattern when tasks must belong to a specific supervisor instead of the library's default scheduler.

Important Semantics

  • AsyncEffect(..., call_immediately=False) defers the first run until you explicitly await or otherwise trigger it.
  • AsyncDerived(..., check_equality=True) keeps the same equality-based notification behavior as sync derived; equal recomputations do not notify subscribers.
  • Tracking happens when you call an AsyncDerived, not when the returned awaitable is later awaited. In normal code, await total() is fine because the call and await happen together.
  • Concurrent callers share the same in-flight recomputation. If a recompute is already running, later callers await the same task instead of starting duplicate work.
  • Before recomputing an AsyncDerived, the implementation synchronizes dirty derived dependencies first, including mixed sync/async derived chains.
  • dispose() removes subscriptions so future dependency changes stop auto-triggering the computation. It does not cancel tasks that were already scheduled.

API Reference

AsyncFunction

AsyncFunction[T] = Callable[[], Coroutine[Any, Any, T]]

TaskFactory

Bases: Protocol


              flowchart TD
              reactivity.async_primitives.TaskFactory[TaskFactory]

              

              click reactivity.async_primitives.TaskFactory href "" "reactivity.async_primitives.TaskFactory"
            
Source code in reactivity/async_primitives.py
11
12
class TaskFactory(Protocol):
    def __call__[T](self, func: AsyncFunction[T], /) -> Awaitable[T]: ...

__call__

__call__[T](func: AsyncFunction[T]) -> Awaitable[T]
Source code in reactivity/async_primitives.py
12
def __call__[T](self, func: AsyncFunction[T], /) -> Awaitable[T]: ...

AsyncEffect

Bases: Effect[Awaitable[T]]


              flowchart TD
              reactivity.async_primitives.AsyncEffect[AsyncEffect]
              reactivity.primitives.Effect[Effect]
              reactivity.primitives.BaseComputation[BaseComputation]

                              reactivity.primitives.Effect --> reactivity.async_primitives.AsyncEffect
                                reactivity.primitives.BaseComputation --> reactivity.primitives.Effect
                



              click reactivity.async_primitives.AsyncEffect href "" "reactivity.async_primitives.AsyncEffect"
              click reactivity.primitives.Effect href "" "reactivity.primitives.Effect"
              click reactivity.primitives.BaseComputation href "" "reactivity.primitives.BaseComputation"
            
Source code in reactivity/async_primitives.py
61
62
63
64
65
66
67
68
69
70
71
72
class AsyncEffect[T](Effect[Awaitable[T]]):
    def __init__(self, fn: Callable[[], Awaitable[T]], call_immediately=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
        self.start = task_factory
        Effect.__init__(self, fn, call_immediately, context=context)

    async def _run_in_context(self):
        self.context.fork()
        with self._enter():
            return await self._fn()

    def trigger(self):
        return self.start(self._run_in_context)

start instance-attribute

start = task_factory

__init__

__init__(
    fn: Callable[[], Awaitable[T]],
    call_immediately=True,
    *,
    context: Context | None = None,
    task_factory: TaskFactory = default_task_factory,
)
Source code in reactivity/async_primitives.py
62
63
64
def __init__(self, fn: Callable[[], Awaitable[T]], call_immediately=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
    self.start = task_factory
    Effect.__init__(self, fn, call_immediately, context=context)

_run_in_context async

_run_in_context()
Source code in reactivity/async_primitives.py
66
67
68
69
async def _run_in_context(self):
    self.context.fork()
    with self._enter():
        return await self._fn()

trigger

trigger()
Source code in reactivity/async_primitives.py
71
72
def trigger(self):
    return self.start(self._run_in_context)

AsyncDerived

Bases: BaseDerived[Awaitable[T]]


              flowchart TD
              reactivity.async_primitives.AsyncDerived[AsyncDerived]
              reactivity.primitives.BaseDerived[BaseDerived]
              reactivity.primitives.Subscribable[Subscribable]
              reactivity.primitives.BaseComputation[BaseComputation]

                              reactivity.primitives.BaseDerived --> reactivity.async_primitives.AsyncDerived
                                reactivity.primitives.Subscribable --> reactivity.primitives.BaseDerived
                
                reactivity.primitives.BaseComputation --> reactivity.primitives.BaseDerived
                



              click reactivity.async_primitives.AsyncDerived href "" "reactivity.async_primitives.AsyncDerived"
              click reactivity.primitives.BaseDerived href "" "reactivity.primitives.BaseDerived"
              click reactivity.primitives.Subscribable href "" "reactivity.primitives.Subscribable"
              click reactivity.primitives.BaseComputation href "" "reactivity.primitives.BaseComputation"
            
Source code in reactivity/async_primitives.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class AsyncDerived[T](BaseDerived[Awaitable[T]]):
    UNSET: T = object()  # type: ignore

    def __init__(self, fn: Callable[[], Awaitable[T]], check_equality=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
        super().__init__(context=context)
        self.fn = fn
        self._check_equality = check_equality
        self._value = self.UNSET
        self.start: TaskFactory = task_factory
        self._call_task: Awaitable[None] | None = None
        self._sync_dirty_deps_task: Awaitable[None] | None = None

    async def _run_in_context(self):
        self.context.fork()
        with self._enter():
            return await self.fn()

    async def recompute(self):
        try:
            value = await self._run_in_context()
        finally:
            if self._call_task is not None:
                self.dirty = False  # If invalidated before this run completes, stay dirty.
        if self._check_equality and _equal(value, self._value):
            return
        if self._value is self.UNSET:
            self._value = value
            # do not notify on first set
        else:
            self._value = value
            self.notify()

    async def __sync_dirty_deps(self):
        try:
            current_computations = self.context.leaf.current_computations
            for dep in tuple(self.dependencies):  # note: I don't know why but `self.dependencies` may shrink during iteration
                if isinstance(dep, BaseDerived) and dep not in current_computations:
                    if isinstance(dep, AsyncDerived):
                        await dep._sync_dirty_deps()
                        if dep.dirty:
                            await dep()
                    else:
                        await __class__.__sync_dirty_deps(dep)  # type: ignore
                        if dep.dirty:
                            dep()
        finally:
            self._sync_dirty_deps_task = None

    def _sync_dirty_deps(self):
        if self._sync_dirty_deps_task is not None:
            return self._sync_dirty_deps_task
        task = self._sync_dirty_deps_task = self.start(self.__sync_dirty_deps)
        return task

    async def _call_async(self):
        await self._sync_dirty_deps()
        try:
            if self.dirty:
                if self._call_task is not None:
                    await self._call_task
                else:
                    task = self._call_task = self.start(self.recompute)
                    await task
            return self._value
        finally:
            self._call_task = None

    def __call__(self):
        self.track()
        return self.start(self._call_async)

    def trigger(self):
        self.dirty = True
        self._call_task = None
        if _pulled(self):
            return self()

    def invalidate(self):
        self.trigger()

UNSET class-attribute instance-attribute

UNSET: T = object()

fn instance-attribute

fn = fn

_check_equality instance-attribute

_check_equality = check_equality

_value instance-attribute

_value = UNSET

start instance-attribute

start: TaskFactory = task_factory

_call_task instance-attribute

_call_task: Awaitable[None] | None = None

_sync_dirty_deps_task instance-attribute

_sync_dirty_deps_task: Awaitable[None] | None = None

__init__

__init__(
    fn: Callable[[], Awaitable[T]],
    check_equality=True,
    *,
    context: Context | None = None,
    task_factory: TaskFactory = default_task_factory,
)
Source code in reactivity/async_primitives.py
78
79
80
81
82
83
84
85
def __init__(self, fn: Callable[[], Awaitable[T]], check_equality=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
    super().__init__(context=context)
    self.fn = fn
    self._check_equality = check_equality
    self._value = self.UNSET
    self.start: TaskFactory = task_factory
    self._call_task: Awaitable[None] | None = None
    self._sync_dirty_deps_task: Awaitable[None] | None = None

_run_in_context async

_run_in_context()
Source code in reactivity/async_primitives.py
87
88
89
90
async def _run_in_context(self):
    self.context.fork()
    with self._enter():
        return await self.fn()

recompute async

recompute()
Source code in reactivity/async_primitives.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def recompute(self):
    try:
        value = await self._run_in_context()
    finally:
        if self._call_task is not None:
            self.dirty = False  # If invalidated before this run completes, stay dirty.
    if self._check_equality and _equal(value, self._value):
        return
    if self._value is self.UNSET:
        self._value = value
        # do not notify on first set
    else:
        self._value = value
        self.notify()

__sync_dirty_deps async

__sync_dirty_deps()
Source code in reactivity/async_primitives.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
async def __sync_dirty_deps(self):
    try:
        current_computations = self.context.leaf.current_computations
        for dep in tuple(self.dependencies):  # note: I don't know why but `self.dependencies` may shrink during iteration
            if isinstance(dep, BaseDerived) and dep not in current_computations:
                if isinstance(dep, AsyncDerived):
                    await dep._sync_dirty_deps()
                    if dep.dirty:
                        await dep()
                else:
                    await __class__.__sync_dirty_deps(dep)  # type: ignore
                    if dep.dirty:
                        dep()
    finally:
        self._sync_dirty_deps_task = None

_sync_dirty_deps

_sync_dirty_deps()
Source code in reactivity/async_primitives.py
123
124
125
126
127
def _sync_dirty_deps(self):
    if self._sync_dirty_deps_task is not None:
        return self._sync_dirty_deps_task
    task = self._sync_dirty_deps_task = self.start(self.__sync_dirty_deps)
    return task

_call_async async

_call_async()
Source code in reactivity/async_primitives.py
129
130
131
132
133
134
135
136
137
138
139
140
async def _call_async(self):
    await self._sync_dirty_deps()
    try:
        if self.dirty:
            if self._call_task is not None:
                await self._call_task
            else:
                task = self._call_task = self.start(self.recompute)
                await task
        return self._value
    finally:
        self._call_task = None

__call__

__call__()
Source code in reactivity/async_primitives.py
142
143
144
def __call__(self):
    self.track()
    return self.start(self._call_async)

trigger

trigger()
Source code in reactivity/async_primitives.py
146
147
148
149
150
def trigger(self):
    self.dirty = True
    self._call_task = None
    if _pulled(self):
        return self()

invalidate

invalidate()
Source code in reactivity/async_primitives.py
152
153
def invalidate(self):
    self.trigger()

default_task_factory

default_task_factory[T](
    async_function: AsyncFunction[T],
) -> Awaitable[T]
Source code in reactivity/async_primitives.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def default_task_factory[T](async_function: AsyncFunction[T]) -> Awaitable[T]:
    if platform == "emscripten":
        from asyncio import ensure_future

        return ensure_future(async_function())

    from sniffio import AsyncLibraryNotFoundError, current_async_library

    match current_async_library():
        case "asyncio":
            from asyncio import ensure_future

            return ensure_future(async_function())

        case "trio":
            from trio import Event
            from trio.lowlevel import spawn_system_task

            evt = Event()
            res: T
            exc: BaseException | None = None

            @spawn_system_task
            async def _():
                nonlocal res, exc
                try:
                    res = await async_function()
                except BaseException as e:
                    exc = e
                finally:
                    evt.set()

            class Future:  # An awaitable that can be awaited multiple times
                def __await__(self):
                    if not evt.is_set():
                        yield from evt.wait().__await__()
                    if exc is not None:
                        raise exc
                    return res

            return Future()

        case _ as other:
            raise AsyncLibraryNotFoundError(f"Only asyncio and trio are supported, not {other}")