Under Construction
This site is currently under construction. This page is generated by AI and has not undergone thorough review by a human. There may be hallucination!
In fact, for now only /, /reactive, and the llms.txt are carefully crafted by myself.
Advanced Reactivity¶
HMR uses push-pull reactivity: signals push notifications when updated, but derived values are lazy and only recompute when read (pulled) or when a hard puller (like an effect) forces evaluation. This creates a dependency graph where data flows from sources to consumers.
Core primitives¶
from reactivity import signal, state, effect, derived, memoized, batch, new_context, reactive
signal/state: observableseffect: side effectsderived: lazy computedmemoized: cached with hard pullbatch: group updatesnew_context: isolated environmentreactive: reactive containers
Patterns¶
1. Stable resources¶
Keep DB clients, ML models, sockets in modules you rarely edit or behind factories.
2. Derived for computed values¶
s = signal(1)
square = derived(lambda: s.get() ** 2) # lazy
3. Effect for side effects¶
@effect
def logger():
print(square())
4. Memoized for expensive calls¶
@memoized
def expensive():
return sum(range(1_000_000))
5. Batch updates¶
a = signal(0)
b = signal(0)
with batch():
a.set(a.get() + 1)
b.set(b.get() + 2)
6. Context isolation¶
ctx = new_context()
s = signal(0, context=ctx)
@effect(context=ctx)
def _():
print(s.get())
7. Reactive containers¶
reactive() creates reactive dicts, sets, lists, or object proxies with per-key/index tracking.
Async¶
async_effect and async_derived are the async counterparts of effect and derived.
The existing sync mental model still applies, but the async variants have a few usage differences that are worth stating explicitly:
async_derivedis cached and lazy, just likederived, but you read it withawait value()async_effectreruns side effects by scheduling work onto the current async runtime- The default task factory supports
asyncioandtrio; passtask_factory=when you need task ownership to live in a specificTaskGroupor nursery
from anyio import sleep
from reactivity import signal, async_derived, async_effect
s = signal(1)
async def main():
@async_derived
async def doubled():
await sleep(0.01)
return s.get() * 2
@async_effect
async def printer():
print(await doubled())
await sleep(0.03)
s.set(2)
await sleep(0.03)
Run it with await main() in IPython / notebooks, or anyio.run(main) in a script. When s.set(...) is called, reruns are scheduled asynchronously, so you observe the update after control returns to the event loop.
If you need explicit task ownership, provide a custom task_factory:
from asyncio import TaskGroup, sleep
from reactivity import async_effect
async def main():
async with TaskGroup() as tg:
@async_effect(task_factory=lambda fn: tg.create_task(fn()), call_immediately=False)
async def printer():
await sleep(0.01)
print("tick")
await printer() # manually trigger the first run
await sleep(0.02)
See the async primitives reference for the full API surface.
Best practices¶
- Keep heavy init in stable modules or factories
- Prefer derived/memoized over manual caching
- Batch related updates
- Use
new_context()for isolated subsystems - Restart for C extensions or global state changes
- Think of your code as a dependency graph: signals as sources, effects as consumers, derived as transformations