Skip to content

from reactivity.async_primitives import *

AsyncFunction

AsyncFunction[T] = Callable[[], Coroutine[Any, Any, T]]

TaskFactory

Bases: Protocol


              flowchart TD
              reactivity.async_primitives.TaskFactory[TaskFactory]

              

              click reactivity.async_primitives.TaskFactory href "" "reactivity.async_primitives.TaskFactory"
            
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
class TaskFactory(Protocol):
    def __call__[T](self, func: AsyncFunction[T], /) -> Awaitable[T]: ...

__call__

__call__[T](func: AsyncFunction[T]) -> Awaitable[T]
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def __call__[T](self, func: AsyncFunction[T], /) -> Awaitable[T]: ...

AsyncEffect

Bases: Effect[Awaitable[T]]


              flowchart TD
              reactivity.async_primitives.AsyncEffect[AsyncEffect]
              reactivity.primitives.Effect[Effect]
              reactivity.primitives.BaseComputation[BaseComputation]

                              reactivity.primitives.Effect --> reactivity.async_primitives.AsyncEffect
                                reactivity.primitives.BaseComputation --> reactivity.primitives.Effect
                



              click reactivity.async_primitives.AsyncEffect href "" "reactivity.async_primitives.AsyncEffect"
              click reactivity.primitives.Effect href "" "reactivity.primitives.Effect"
              click reactivity.primitives.BaseComputation href "" "reactivity.primitives.BaseComputation"
            
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
class AsyncEffect[T](Effect[Awaitable[T]]):
    def __init__(self, fn: Callable[[], Awaitable[T]], call_immediately=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
        self.start = task_factory
        Effect.__init__(self, fn, call_immediately, context=context)

    async def _run_in_context(self):
        self.context.fork()
        with self._enter():
            return await self._fn()

    def trigger(self):
        return self.start(self._run_in_context)

start instance-attribute

start = task_factory

__init__

__init__(
    fn: Callable[[], Awaitable[T]],
    call_immediately=True,
    *,
    context: Context | None = None,
    task_factory: TaskFactory = default_task_factory,
)
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def __init__(self, fn: Callable[[], Awaitable[T]], call_immediately=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
    self.start = task_factory
    Effect.__init__(self, fn, call_immediately, context=context)

_run_in_context async

_run_in_context()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
async def _run_in_context(self):
    self.context.fork()
    with self._enter():
        return await self._fn()

trigger

trigger()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def trigger(self):
    return self.start(self._run_in_context)

AsyncDerived

Bases: BaseDerived[Awaitable[T]]


              flowchart TD
              reactivity.async_primitives.AsyncDerived[AsyncDerived]
              reactivity.primitives.BaseDerived[BaseDerived]
              reactivity.primitives.Subscribable[Subscribable]
              reactivity.primitives.BaseComputation[BaseComputation]

                              reactivity.primitives.BaseDerived --> reactivity.async_primitives.AsyncDerived
                                reactivity.primitives.Subscribable --> reactivity.primitives.BaseDerived
                
                reactivity.primitives.BaseComputation --> reactivity.primitives.BaseDerived
                



              click reactivity.async_primitives.AsyncDerived href "" "reactivity.async_primitives.AsyncDerived"
              click reactivity.primitives.BaseDerived href "" "reactivity.primitives.BaseDerived"
              click reactivity.primitives.Subscribable href "" "reactivity.primitives.Subscribable"
              click reactivity.primitives.BaseComputation href "" "reactivity.primitives.BaseComputation"
            
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
class AsyncDerived[T](BaseDerived[Awaitable[T]]):
    UNSET: T = object()  # type: ignore

    def __init__(self, fn: Callable[[], Awaitable[T]], check_equality=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
        super().__init__(context=context)
        self.fn = fn
        self._check_equality = check_equality
        self._value = self.UNSET
        self.start: TaskFactory = task_factory
        self._call_task: Awaitable[None] | None = None
        self._sync_dirty_deps_task: Awaitable[None] | None = None

    async def _run_in_context(self):
        self.context.fork()
        with self._enter():
            return await self.fn()

    async def recompute(self):
        try:
            value = await self._run_in_context()
        finally:
            if self._call_task is not None:
                self.dirty = False  # If invalidated before this run completes, stay dirty.
        if self._check_equality and _equal(value, self._value):
            return
        if self._value is self.UNSET:
            self._value = value
            # do not notify on first set
        else:
            self._value = value
            self.notify()

    async def __sync_dirty_deps(self):
        try:
            current_computations = self.context.leaf.current_computations
            for dep in tuple(self.dependencies):  # note: I don't know why but `self.dependencies` may shrink during iteration
                if isinstance(dep, BaseDerived) and dep not in current_computations:
                    if isinstance(dep, AsyncDerived):
                        await dep._sync_dirty_deps()
                        if dep.dirty:
                            await dep()
                    else:
                        await __class__.__sync_dirty_deps(dep)  # type: ignore
                        if dep.dirty:
                            dep()
        finally:
            self._sync_dirty_deps_task = None

    def _sync_dirty_deps(self):
        if self._sync_dirty_deps_task is not None:
            return self._sync_dirty_deps_task
        task = self._sync_dirty_deps_task = self.start(self.__sync_dirty_deps)
        return task

    async def _call_async(self):
        await self._sync_dirty_deps()
        try:
            if self.dirty:
                if self._call_task is not None:
                    await self._call_task
                else:
                    task = self._call_task = self.start(self.recompute)
                    await task
            return self._value
        finally:
            self._call_task = None

    def __call__(self):
        self.track()
        return self.start(self._call_async)

    def trigger(self):
        self.dirty = True
        self._call_task = None
        if _pulled(self):
            return self()

    def invalidate(self):
        self.trigger()

UNSET class-attribute instance-attribute

UNSET: T = object()

fn instance-attribute

fn = fn

_check_equality instance-attribute

_check_equality = check_equality

_value instance-attribute

_value = UNSET

start instance-attribute

start: TaskFactory = task_factory

_call_task instance-attribute

_call_task: Awaitable[None] | None = None

_sync_dirty_deps_task instance-attribute

_sync_dirty_deps_task: Awaitable[None] | None = None

__init__

__init__(
    fn: Callable[[], Awaitable[T]],
    check_equality=True,
    *,
    context: Context | None = None,
    task_factory: TaskFactory = default_task_factory,
)
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def __init__(self, fn: Callable[[], Awaitable[T]], check_equality=True, *, context: Context | None = None, task_factory: TaskFactory = default_task_factory):
    super().__init__(context=context)
    self.fn = fn
    self._check_equality = check_equality
    self._value = self.UNSET
    self.start: TaskFactory = task_factory
    self._call_task: Awaitable[None] | None = None
    self._sync_dirty_deps_task: Awaitable[None] | None = None

_run_in_context async

_run_in_context()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
async def _run_in_context(self):
    self.context.fork()
    with self._enter():
        return await self.fn()

recompute async

recompute()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
async def recompute(self):
    try:
        value = await self._run_in_context()
    finally:
        if self._call_task is not None:
            self.dirty = False  # If invalidated before this run completes, stay dirty.
    if self._check_equality and _equal(value, self._value):
        return
    if self._value is self.UNSET:
        self._value = value
        # do not notify on first set
    else:
        self._value = value
        self.notify()

__sync_dirty_deps async

__sync_dirty_deps()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
async def __sync_dirty_deps(self):
    try:
        current_computations = self.context.leaf.current_computations
        for dep in tuple(self.dependencies):  # note: I don't know why but `self.dependencies` may shrink during iteration
            if isinstance(dep, BaseDerived) and dep not in current_computations:
                if isinstance(dep, AsyncDerived):
                    await dep._sync_dirty_deps()
                    if dep.dirty:
                        await dep()
                else:
                    await __class__.__sync_dirty_deps(dep)  # type: ignore
                    if dep.dirty:
                        dep()
    finally:
        self._sync_dirty_deps_task = None

_sync_dirty_deps

_sync_dirty_deps()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def _sync_dirty_deps(self):
    if self._sync_dirty_deps_task is not None:
        return self._sync_dirty_deps_task
    task = self._sync_dirty_deps_task = self.start(self.__sync_dirty_deps)
    return task

_call_async async

_call_async()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
async def _call_async(self):
    await self._sync_dirty_deps()
    try:
        if self.dirty:
            if self._call_task is not None:
                await self._call_task
            else:
                task = self._call_task = self.start(self.recompute)
                await task
        return self._value
    finally:
        self._call_task = None

__call__

__call__()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def __call__(self):
    self.track()
    return self.start(self._call_async)

trigger

trigger()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def trigger(self):
    self.dirty = True
    self._call_task = None
    if _pulled(self):
        return self()

invalidate

invalidate()
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def invalidate(self):
    self.trigger()

default_task_factory

default_task_factory[T](
    async_function: AsyncFunction[T],
) -> Awaitable[T]
Source code in .venv/lib/python3.12/site-packages/reactivity/async_primitives.py
def default_task_factory[T](async_function: AsyncFunction[T]) -> Awaitable[T]:
    if platform == "emscripten":
        from asyncio import ensure_future

        return ensure_future(async_function())

    from sniffio import AsyncLibraryNotFoundError, current_async_library

    match current_async_library():
        case "asyncio":
            from asyncio import ensure_future

            return ensure_future(async_function())

        case "trio":
            from trio import Event
            from trio.lowlevel import spawn_system_task

            evt = Event()
            res: T
            exc: BaseException | None = None

            @spawn_system_task
            async def _():
                nonlocal res, exc
                try:
                    res = await async_function()
                except BaseException as e:
                    exc = e
                finally:
                    evt.set()

            class Future:  # An awaitable that can be awaited multiple times
                def __await__(self):
                    if not evt.is_set():
                        yield from evt.wait().__await__()
                    if exc is not None:
                        raise exc
                    return res

            return Future()

        case _ as other:
            raise AsyncLibraryNotFoundError(f"Only asyncio and trio are supported, not {other}")