Skip to content

Deprecated

This API is considered legacy due to bad design decisions. A better API will be introduced in the next minor version (v0.8.0). You can continue using this API for now, but it is recommended to switch to the new API once it is available.

from reactivity.hmr.api import *

LifecycleMixin

Bases: BaseReloader

Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
class LifecycleMixin(BaseReloader):
    def run_with_hooks(self):
        self._original_main_module = sys.modules["__main__"]
        sys.modules["__main__"] = self.entry_module
        call_pre_reload_hooks()
        self.effect = HMR_CONTEXT.effect(self.run_entry_file)
        call_post_reload_hooks()

    def clean_up(self):
        self.effect.dispose()
        self.entry_module.load.dispose()
        self.entry_module.load.invalidate()
        sys.modules["__main__"] = self._original_main_module

run_with_hooks

run_with_hooks()
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
def run_with_hooks(self):
    self._original_main_module = sys.modules["__main__"]
    sys.modules["__main__"] = self.entry_module
    call_pre_reload_hooks()
    self.effect = HMR_CONTEXT.effect(self.run_entry_file)
    call_post_reload_hooks()

clean_up

clean_up()
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
def clean_up(self):
    self.effect.dispose()
    self.entry_module.load.dispose()
    self.entry_module.load.invalidate()
    sys.modules["__main__"] = self._original_main_module

SyncReloaderAPI

Bases: SyncReloader, LifecycleMixin

Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
class SyncReloaderAPI(SyncReloader, LifecycleMixin):
    def __enter__(self):
        from threading import Thread

        self.run_with_hooks()
        self.thread = Thread(target=self.start_watching)
        self.thread.start()
        return super()

    def __exit__(self, *_):
        self.stop_watching()
        self.thread.join()
        self.clean_up()

    async def __aenter__(self):
        from asyncio import ensure_future, sleep, to_thread

        await to_thread(self.run_with_hooks)
        self.future = ensure_future(to_thread(self.start_watching))
        await sleep(0)
        return super()

    async def __aexit__(self, *_):
        self.stop_watching()
        await self.future
        self.clean_up()

__enter__

__enter__()
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
def __enter__(self):
    from threading import Thread

    self.run_with_hooks()
    self.thread = Thread(target=self.start_watching)
    self.thread.start()
    return super()

__exit__

__exit__(*_)
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
def __exit__(self, *_):
    self.stop_watching()
    self.thread.join()
    self.clean_up()

__aenter__ async

__aenter__()
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
async def __aenter__(self):
    from asyncio import ensure_future, sleep, to_thread

    await to_thread(self.run_with_hooks)
    self.future = ensure_future(to_thread(self.start_watching))
    await sleep(0)
    return super()

__aexit__ async

__aexit__(*_)
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
async def __aexit__(self, *_):
    self.stop_watching()
    await self.future
    self.clean_up()

AsyncReloaderAPI

Bases: AsyncReloader, LifecycleMixin

Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
class AsyncReloaderAPI(AsyncReloader, LifecycleMixin):
    def __enter__(self):
        from asyncio import run
        from threading import Event, Thread

        self.run_with_hooks()

        e = Event()

        async def task():
            e.set()
            await self.start_watching()

        self.thread = Thread(target=lambda: run(task()))
        self.thread.start()
        e.wait()
        return super()

    def __exit__(self, *_):
        self.stop_watching()
        self.thread.join()
        self.clean_up()

    async def __aenter__(self):
        from asyncio import ensure_future, sleep, to_thread

        await to_thread(self.run_with_hooks)
        self.future = ensure_future(self.start_watching())
        await sleep(0)
        return super()

    async def __aexit__(self, *_):
        self.stop_watching()
        await self.future
        self.clean_up()

__enter__

__enter__()
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
def __enter__(self):
    from asyncio import run
    from threading import Event, Thread

    self.run_with_hooks()

    e = Event()

    async def task():
        e.set()
        await self.start_watching()

    self.thread = Thread(target=lambda: run(task()))
    self.thread.start()
    e.wait()
    return super()

__exit__

__exit__(*_)
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
def __exit__(self, *_):
    self.stop_watching()
    self.thread.join()
    self.clean_up()

__aenter__ async

__aenter__()
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
async def __aenter__(self):
    from asyncio import ensure_future, sleep, to_thread

    await to_thread(self.run_with_hooks)
    self.future = ensure_future(self.start_watching())
    await sleep(0)
    return super()

__aexit__ async

__aexit__(*_)
Source code in .venv/lib/python3.12/site-packages/reactivity/hmr/api.py
async def __aexit__(self, *_):
    self.stop_watching()
    await self.future
    self.clean_up()