from reactivity.async_primitives import *
The module reactivity.async_primitives contains the async counterparts of HMR's core reactive primitives. The dependency model is the same as the sync API, but actual work is scheduled onto an async runtime instead of running to completion inline.
What Lives Here
AsyncEffect / async_effect for async side effects
AsyncDerived / async_derived for cached async computations
TaskFactory / default_task_factory for integrating with an event loop or task supervisor
Mental Model
async_effect behaves like effect, except each trigger schedules an awaitable task instead of finishing immediately on the current stack.
async_derived behaves like derived, except you read it with await value().
- Dirty async derived values are still lazy. If nothing is pulling them, they stay dirty until the next read.
- If an effect or some other non-derived subscriber is pulling an async derived value, invalidation can eagerly schedule a rerun.
- Internally, the current reactive context is forked before the coroutine runs, so dependency tracking still works across the async boundary.
Runtime Support
default_task_factory is intentionally small in scope. In the current implementation it:
- uses
asyncio.ensure_future(...) on asyncio
- spawns a Trio system task on
trio
- falls back to
asyncio.ensure_future(...) on emscripten
- raises
AsyncLibraryNotFoundError for other async libraries
With the default task factory, create async primitives inside an active asyncio / trio runtime. If your application already owns task lifetime through asyncio.TaskGroup or a Trio nursery, pass a custom task_factory.
A TaskFactory receives a zero-argument async function and must schedule it, returning an awaitable for the eventual result.
Basic Example
from anyio import sleep
from reactivity import signal, async_derived, async_effect
count = signal(1)
async def main():
@async_derived
async def doubled():
await sleep(0.01)
return count.get() * 2
@async_effect
async def printer():
print(await doubled())
await sleep(0.03)
count.set(2)
await sleep(0.03)
In IPython or a notebook, run await main(). In a script, the release notes pattern is from anyio import run; run(main).
Creating printer schedules an initial run by default. Later, count.set(2) marks doubled dirty and schedules the effect again, so this example prints 2 and then 4.
Because the work is asynchronous, you usually observe the rerun only after control returns to the event loop.
Custom Task Ownership
from asyncio import TaskGroup, sleep
from reactivity import async_effect
async def main():
async with TaskGroup() as tg:
@async_effect(
task_factory=lambda fn: tg.create_task(fn()),
call_immediately=False,
)
async def printer():
await sleep(0.01)
print("tick")
await printer() # manually trigger the first run
await sleep(0.02)
Use this pattern when tasks must belong to a specific supervisor instead of the library's default scheduler.
Important Semantics
AsyncEffect(..., call_immediately=False) defers the first run until you explicitly await or otherwise trigger it.
AsyncDerived(..., check_equality=True) keeps the same equality-based notification behavior as sync derived; equal recomputations do not notify subscribers.
- Tracking happens when you call an
AsyncDerived, not when the returned awaitable is later awaited. In normal code, await total() is fine because the call and await happen together.
- Concurrent callers share the same in-flight recomputation. If a recompute is already running, later callers await the same task instead of starting duplicate work.
- Before recomputing an
AsyncDerived, the implementation synchronizes dirty derived dependencies first, including mixed sync/async derived chains.
dispose() removes subscriptions so future dependency changes stop auto-triggering the computation. It does not cancel tasks that were already scheduled.
API Reference
AsyncFunction
AsyncFunction[T] = Callable[[], Coroutine[Any, Any, T]]
TaskFactory
Bases: Protocol
flowchart TD
reactivity.async_primitives.TaskFactory[TaskFactory]
click reactivity.async_primitives.TaskFactory href "" "reactivity.async_primitives.TaskFactory"
Source code in reactivity/async_primitives.py
|
class TaskFactory(Protocol):
def __call__[T](self, func: AsyncFunction[T], /) -> Awaitable[T]: ...
|
__call__
__call__[T](func: AsyncFunction[T]) -> Awaitable[T]
Source code in reactivity/async_primitives.py
|
def __call__[T](self, func: AsyncFunction[T], /) -> Awaitable[T]: ...
|
AsyncEffect
Bases: Effect[Awaitable[T]]
flowchart TD
reactivity.async_primitives.AsyncEffect[AsyncEffect]
reactivity.primitives.Effect[Effect]
reactivity.primitives.BaseComputation[BaseComputation]
reactivity.primitives.Effect --> reactivity.async_primitives.AsyncEffect
reactivity.primitives.BaseComputation --> reactivity.primitives.Effect
click reactivity.async_primitives.AsyncEffect href "" "reactivity.async_primitives.AsyncEffect"
click reactivity.primitives.Effect href "" "reactivity.primitives.Effect"
click reactivity.primitives.BaseComputation href "" "reactivity.primitives.BaseComputation"
Source code in reactivity/async_primitives.py
61
62
63
64
65
66
67
68
69
70
71
72 |
class AsyncEffect[T](Effect[Awaitable[T]]):
def __init__(self, fn: Callable[[], Awaitable[T]], call_immediately=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
self.start = task_factory
Effect.__init__(self, fn, call_immediately, context=context)
async def _run_in_context(self):
self.context.fork()
with self._enter():
return await self._fn()
def trigger(self):
return self.start(self._run_in_context)
|
__init__
__init__(
fn: Callable[[], Awaitable[T]],
call_immediately=True,
*,
context: Context | None = None,
task_factory: TaskFactory = default_task_factory,
)
Source code in reactivity/async_primitives.py
|
def __init__(self, fn: Callable[[], Awaitable[T]], call_immediately=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
self.start = task_factory
Effect.__init__(self, fn, call_immediately, context=context)
|
_run_in_context
async
Source code in reactivity/async_primitives.py
|
async def _run_in_context(self):
self.context.fork()
with self._enter():
return await self._fn()
|
trigger
Source code in reactivity/async_primitives.py
|
def trigger(self):
return self.start(self._run_in_context)
|
AsyncDerived
Bases: BaseDerived[Awaitable[T]]
flowchart TD
reactivity.async_primitives.AsyncDerived[AsyncDerived]
reactivity.primitives.BaseDerived[BaseDerived]
reactivity.primitives.Subscribable[Subscribable]
reactivity.primitives.BaseComputation[BaseComputation]
reactivity.primitives.BaseDerived --> reactivity.async_primitives.AsyncDerived
reactivity.primitives.Subscribable --> reactivity.primitives.BaseDerived
reactivity.primitives.BaseComputation --> reactivity.primitives.BaseDerived
click reactivity.async_primitives.AsyncDerived href "" "reactivity.async_primitives.AsyncDerived"
click reactivity.primitives.BaseDerived href "" "reactivity.primitives.BaseDerived"
click reactivity.primitives.Subscribable href "" "reactivity.primitives.Subscribable"
click reactivity.primitives.BaseComputation href "" "reactivity.primitives.BaseComputation"
Source code in reactivity/async_primitives.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 |
class AsyncDerived[T](BaseDerived[Awaitable[T]]):
UNSET: T = object() # type: ignore
def __init__(self, fn: Callable[[], Awaitable[T]], check_equality=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
super().__init__(context=context)
self.fn = fn
self._check_equality = check_equality
self._value = self.UNSET
self.start: TaskFactory = task_factory
self._call_task: Awaitable[None] | None = None
self._sync_dirty_deps_task: Awaitable[None] | None = None
async def _run_in_context(self):
self.context.fork()
with self._enter():
return await self.fn()
async def recompute(self):
try:
value = await self._run_in_context()
finally:
if self._call_task is not None:
self.dirty = False # If invalidated before this run completes, stay dirty.
if self._check_equality and _equal(value, self._value):
return
if self._value is self.UNSET:
self._value = value
# do not notify on first set
else:
self._value = value
self.notify()
async def __sync_dirty_deps(self):
try:
current_computations = self.context.leaf.current_computations
for dep in tuple(self.dependencies): # note: I don't know why but `self.dependencies` may shrink during iteration
if isinstance(dep, BaseDerived) and dep not in current_computations:
if isinstance(dep, AsyncDerived):
await dep._sync_dirty_deps()
if dep.dirty:
await dep()
else:
await __class__.__sync_dirty_deps(dep) # type: ignore
if dep.dirty:
dep()
finally:
self._sync_dirty_deps_task = None
def _sync_dirty_deps(self):
if self._sync_dirty_deps_task is not None:
return self._sync_dirty_deps_task
task = self._sync_dirty_deps_task = self.start(self.__sync_dirty_deps)
return task
async def _call_async(self):
await self._sync_dirty_deps()
try:
if self.dirty:
if self._call_task is not None:
await self._call_task
else:
task = self._call_task = self.start(self.recompute)
await task
return self._value
finally:
self._call_task = None
def __call__(self):
self.track()
return self.start(self._call_async)
def trigger(self):
self.dirty = True
self._call_task = None
if _pulled(self):
return self()
def invalidate(self):
self.trigger()
|
UNSET
class-attribute
instance-attribute
_check_equality
instance-attribute
_check_equality = check_equality
_value
instance-attribute
start
instance-attribute
start: TaskFactory = task_factory
_call_task
instance-attribute
_call_task: Awaitable[None] | None = None
_sync_dirty_deps_task
instance-attribute
_sync_dirty_deps_task: Awaitable[None] | None = None
__init__
__init__(
fn: Callable[[], Awaitable[T]],
check_equality=True,
*,
context: Context | None = None,
task_factory: TaskFactory = default_task_factory,
)
Source code in reactivity/async_primitives.py
|
def __init__(self, fn: Callable[[], Awaitable[T]], check_equality=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
super().__init__(context=context)
self.fn = fn
self._check_equality = check_equality
self._value = self.UNSET
self.start: TaskFactory = task_factory
self._call_task: Awaitable[None] | None = None
self._sync_dirty_deps_task: Awaitable[None] | None = None
|
_run_in_context
async
Source code in reactivity/async_primitives.py
|
async def _run_in_context(self):
self.context.fork()
with self._enter():
return await self.fn()
|
recompute
async
Source code in reactivity/async_primitives.py
92
93
94
95
96
97
98
99
100
101
102
103
104
105 |
async def recompute(self):
try:
value = await self._run_in_context()
finally:
if self._call_task is not None:
self.dirty = False # If invalidated before this run completes, stay dirty.
if self._check_equality and _equal(value, self._value):
return
if self._value is self.UNSET:
self._value = value
# do not notify on first set
else:
self._value = value
self.notify()
|
__sync_dirty_deps
async
Source code in reactivity/async_primitives.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 |
async def __sync_dirty_deps(self):
try:
current_computations = self.context.leaf.current_computations
for dep in tuple(self.dependencies): # note: I don't know why but `self.dependencies` may shrink during iteration
if isinstance(dep, BaseDerived) and dep not in current_computations:
if isinstance(dep, AsyncDerived):
await dep._sync_dirty_deps()
if dep.dirty:
await dep()
else:
await __class__.__sync_dirty_deps(dep) # type: ignore
if dep.dirty:
dep()
finally:
self._sync_dirty_deps_task = None
|
_sync_dirty_deps
Source code in reactivity/async_primitives.py
|
def _sync_dirty_deps(self):
if self._sync_dirty_deps_task is not None:
return self._sync_dirty_deps_task
task = self._sync_dirty_deps_task = self.start(self.__sync_dirty_deps)
return task
|
_call_async
async
Source code in reactivity/async_primitives.py
129
130
131
132
133
134
135
136
137
138
139
140 |
async def _call_async(self):
await self._sync_dirty_deps()
try:
if self.dirty:
if self._call_task is not None:
await self._call_task
else:
task = self._call_task = self.start(self.recompute)
await task
return self._value
finally:
self._call_task = None
|
__call__
Source code in reactivity/async_primitives.py
|
def __call__(self):
self.track()
return self.start(self._call_async)
|
trigger
Source code in reactivity/async_primitives.py
|
def trigger(self):
self.dirty = True
self._call_task = None
if _pulled(self):
return self()
|
invalidate
Source code in reactivity/async_primitives.py
|
def invalidate(self):
self.trigger()
|
default_task_factory
default_task_factory[T](
async_function: AsyncFunction[T],
) -> Awaitable[T]
Source code in reactivity/async_primitives.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 |
def default_task_factory[T](async_function: AsyncFunction[T]) -> Awaitable[T]:
if platform == "emscripten":
from asyncio import ensure_future
return ensure_future(async_function())
from sniffio import AsyncLibraryNotFoundError, current_async_library
match current_async_library():
case "asyncio":
from asyncio import ensure_future
return ensure_future(async_function())
case "trio":
from trio import Event
from trio.lowlevel import spawn_system_task
evt = Event()
res: T
exc: BaseException | None = None
@spawn_system_task
async def _():
nonlocal res, exc
try:
res = await async_function()
except BaseException as e:
exc = e
finally:
evt.set()
class Future: # An awaitable that can be awaited multiple times
def __await__(self):
if not evt.is_set():
yield from evt.wait().__await__()
if exc is not None:
raise exc
return res
return Future()
case _ as other:
raise AsyncLibraryNotFoundError(f"Only asyncio and trio are supported, not {other}")
|