from reactivity.context import *
Reactivity Contexts define the execution context that reactive computations run inside.
For simple programs you usually rely on the default context implicitly. This module becomes more interesting when you want:
- isolated reactive subsystems that should not subscribe to one another
- shorthand constructors like
ctx.signal(...), ctx.effect(...), ctx.derived(...)
- async dependency tracking that survives across coroutine scheduling boundaries
In particular, async primitives use the context's async execution state to keep tracking coherent after await. That is why async reactivity in HMR is more than "just check the call stack".
For a higher-level introduction, see Advanced Reactivity and Async Reactive Primitives.
default_context
module-attribute
default_context = new_context()
Context
Bases: NamedTuple
Source code in reactivity/context.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 |
class Context(NamedTuple):
current_computations: list[BaseComputation]
batches: list[Batch]
async_execution_context: ContextVar[Context | None]
def schedule_callbacks(self, callbacks: Iterable[BaseComputation]):
self.batches[-1].callbacks.update(callbacks)
@contextmanager
def enter(self, computation: BaseComputation):
old_dependencies = {*computation.dependencies}
computation.dispose()
self.current_computations.append(computation)
try:
yield
except BaseException:
# For backward compatibility, we restore old dependencies only if some dependencies are lost after an exception.
# This behavior may be configurable in the future.
if computation.dependencies.issubset(old_dependencies):
for dep in old_dependencies:
dep.subscribers.add(computation)
computation.dependencies.update(old_dependencies)
raise
else:
if not computation.dependencies and (strategy := computation.reactivity_loss_strategy) != "ignore":
if strategy == "restore" and old_dependencies:
for dep in old_dependencies:
dep.subscribers.add(computation)
computation.dependencies.update(old_dependencies)
return
from pathlib import Path
from sysconfig import get_path
from warnings import warn
msg = "lost all its dependencies" if old_dependencies else "has no dependencies"
warn(f"{computation} {msg} and will never be auto-triggered.", RuntimeWarning, skip_file_prefixes=(str(Path(__file__).parent), str(Path(get_path("stdlib")).resolve())))
finally:
last = self.current_computations.pop()
assert last is computation # sanity check
@property
def batch(self):
return partial(Batch, context=self)
@property
def signal(self):
return partial(Signal, context=self)
@property
def effect(self):
return partial(Effect, context=self)
@property
def derived(self):
return partial(Derived, context=self)
@property
def async_effect(self):
return partial(AsyncEffect, context=self)
@property
def async_derived(self):
return partial(AsyncDerived, context=self)
@contextmanager
def untrack(self):
computations = self.current_computations[:]
self.current_computations.clear()
try:
yield
finally:
self.current_computations[:] = computations
@property
def leaf(self):
return self.async_execution_context.get() or self
def fork(self):
self.async_execution_context.set(Context(self.current_computations[:], self.batches[:], self.async_execution_context))
|
current_computations
instance-attribute
current_computations: list[BaseComputation]
batches
instance-attribute
async_execution_context
instance-attribute
async_execution_context: ContextVar[Context | None]
schedule_callbacks
schedule_callbacks(callbacks: Iterable[BaseComputation])
Source code in reactivity/context.py
|
def schedule_callbacks(self, callbacks: Iterable[BaseComputation]):
self.batches[-1].callbacks.update(callbacks)
|
enter
enter(computation: BaseComputation)
Source code in reactivity/context.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 |
@contextmanager
def enter(self, computation: BaseComputation):
old_dependencies = {*computation.dependencies}
computation.dispose()
self.current_computations.append(computation)
try:
yield
except BaseException:
# For backward compatibility, we restore old dependencies only if some dependencies are lost after an exception.
# This behavior may be configurable in the future.
if computation.dependencies.issubset(old_dependencies):
for dep in old_dependencies:
dep.subscribers.add(computation)
computation.dependencies.update(old_dependencies)
raise
else:
if not computation.dependencies and (strategy := computation.reactivity_loss_strategy) != "ignore":
if strategy == "restore" and old_dependencies:
for dep in old_dependencies:
dep.subscribers.add(computation)
computation.dependencies.update(old_dependencies)
return
from pathlib import Path
from sysconfig import get_path
from warnings import warn
msg = "lost all its dependencies" if old_dependencies else "has no dependencies"
warn(f"{computation} {msg} and will never be auto-triggered.", RuntimeWarning, skip_file_prefixes=(str(Path(__file__).parent), str(Path(get_path("stdlib")).resolve())))
finally:
last = self.current_computations.pop()
assert last is computation # sanity check
|
untrack
Source code in reactivity/context.py
|
@contextmanager
def untrack(self):
computations = self.current_computations[:]
self.current_computations.clear()
try:
yield
finally:
self.current_computations[:] = computations
|
fork
Source code in reactivity/context.py
|
def fork(self):
self.async_execution_context.set(Context(self.current_computations[:], self.batches[:], self.async_execution_context))
|
new_context
Source code in reactivity/context.py
|
def new_context():
return Context([], [], async_execution_context=ContextVar("current context", default=None))
|