Skip to content

from reactivity.context import *

Reactivity Contexts define the execution context that reactive computations run inside.

For simple programs you usually rely on the default context implicitly. This module becomes more interesting when you want:

  • isolated reactive subsystems that should not subscribe to one another
  • shorthand constructors like ctx.signal(...), ctx.effect(...), ctx.derived(...)
  • async dependency tracking that survives across coroutine scheduling boundaries

In particular, async primitives use the context's async execution state to keep tracking coherent after await. That is why async reactivity in HMR is more than "just check the call stack".

For a higher-level introduction, see Advanced Reactivity and Async Reactive Primitives.

default_context module-attribute

default_context = new_context()

Context

Bases: NamedTuple

Source code in reactivity/context.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class Context(NamedTuple):
    current_computations: list[BaseComputation]
    batches: list[Batch]
    async_execution_context: ContextVar[Context | None]

    def schedule_callbacks(self, callbacks: Iterable[BaseComputation]):
        self.batches[-1].callbacks.update(callbacks)

    @contextmanager
    def enter(self, computation: BaseComputation):
        old_dependencies = {*computation.dependencies}
        computation.dispose()
        self.current_computations.append(computation)
        try:
            yield
        except BaseException:
            # For backward compatibility, we restore old dependencies only if some dependencies are lost after an exception.
            # This behavior may be configurable in the future.
            if computation.dependencies.issubset(old_dependencies):
                for dep in old_dependencies:
                    dep.subscribers.add(computation)
                computation.dependencies.update(old_dependencies)
            raise
        else:
            if not computation.dependencies and (strategy := computation.reactivity_loss_strategy) != "ignore":
                if strategy == "restore" and old_dependencies:
                    for dep in old_dependencies:
                        dep.subscribers.add(computation)
                    computation.dependencies.update(old_dependencies)
                    return
                from pathlib import Path
                from sysconfig import get_path
                from warnings import warn

                msg = "lost all its dependencies" if old_dependencies else "has no dependencies"
                warn(f"{computation} {msg} and will never be auto-triggered.", RuntimeWarning, skip_file_prefixes=(str(Path(__file__).parent), str(Path(get_path("stdlib")).resolve())))
        finally:
            last = self.current_computations.pop()
            assert last is computation  # sanity check

    @property
    def batch(self):
        return partial(Batch, context=self)

    @property
    def signal(self):
        return partial(Signal, context=self)

    @property
    def effect(self):
        return partial(Effect, context=self)

    @property
    def derived(self):
        return partial(Derived, context=self)

    @property
    def async_effect(self):
        return partial(AsyncEffect, context=self)

    @property
    def async_derived(self):
        return partial(AsyncDerived, context=self)

    @contextmanager
    def untrack(self):
        computations = self.current_computations[:]
        self.current_computations.clear()
        try:
            yield
        finally:
            self.current_computations[:] = computations

    @property
    def leaf(self):
        return self.async_execution_context.get() or self

    def fork(self):
        self.async_execution_context.set(Context(self.current_computations[:], self.batches[:], self.async_execution_context))

current_computations instance-attribute

current_computations: list[BaseComputation]

batches instance-attribute

batches: list[Batch]

async_execution_context instance-attribute

async_execution_context: ContextVar[Context | None]

batch property

batch

signal property

signal

effect property

effect

derived property

derived

async_effect property

async_effect

async_derived property

async_derived

leaf property

leaf

schedule_callbacks

schedule_callbacks(callbacks: Iterable[BaseComputation])
Source code in reactivity/context.py
18
19
def schedule_callbacks(self, callbacks: Iterable[BaseComputation]):
    self.batches[-1].callbacks.update(callbacks)

enter

enter(computation: BaseComputation)
Source code in reactivity/context.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@contextmanager
def enter(self, computation: BaseComputation):
    old_dependencies = {*computation.dependencies}
    computation.dispose()
    self.current_computations.append(computation)
    try:
        yield
    except BaseException:
        # For backward compatibility, we restore old dependencies only if some dependencies are lost after an exception.
        # This behavior may be configurable in the future.
        if computation.dependencies.issubset(old_dependencies):
            for dep in old_dependencies:
                dep.subscribers.add(computation)
            computation.dependencies.update(old_dependencies)
        raise
    else:
        if not computation.dependencies and (strategy := computation.reactivity_loss_strategy) != "ignore":
            if strategy == "restore" and old_dependencies:
                for dep in old_dependencies:
                    dep.subscribers.add(computation)
                computation.dependencies.update(old_dependencies)
                return
            from pathlib import Path
            from sysconfig import get_path
            from warnings import warn

            msg = "lost all its dependencies" if old_dependencies else "has no dependencies"
            warn(f"{computation} {msg} and will never be auto-triggered.", RuntimeWarning, skip_file_prefixes=(str(Path(__file__).parent), str(Path(get_path("stdlib")).resolve())))
    finally:
        last = self.current_computations.pop()
        assert last is computation  # sanity check

untrack

untrack()
Source code in reactivity/context.py
77
78
79
80
81
82
83
84
@contextmanager
def untrack(self):
    computations = self.current_computations[:]
    self.current_computations.clear()
    try:
        yield
    finally:
        self.current_computations[:] = computations

fork

fork()
Source code in reactivity/context.py
90
91
def fork(self):
    self.async_execution_context.set(Context(self.current_computations[:], self.batches[:], self.async_execution_context))

new_context

new_context()
Source code in reactivity/context.py
94
95
def new_context():
    return Context([], [], async_execution_context=ContextVar("current context", default=None))