Skip to content

from reactivity.collections import *

ReactiveMappingProxy

Bases: MutableMapping[K, V]

Source code in reactivity/collections.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ReactiveMappingProxy[K, V](MutableMapping[K, V]):
    def _signal(self, value=False):
        return Signal(value, context=self.context)  # False for unset

    def __init__(self, initial: MutableMapping[K, V], check_equality=True, *, context: Context | None = None):
        self.context = context or default_context
        self._check_equality = check_equality
        self._data = initial
        self._keys = defaultdict(self._signal, {k: self._signal(True) for k in tuple(initial)})  # in subclasses, self._signal() may mutate `initial`
        self._iter = Subscribable()

    def __getitem__(self, key: K):
        if self._keys[key].get():
            return self._data[key]
        raise KeyError(key)

    def __setitem__(self, key: K, value: V):
        if self._keys[key]._value:
            should_notify = not self._check_equality or not _equal(self._data[key], value)
            self._data[key] = value
            if should_notify:
                self._keys[key].notify()
        else:
            self._data[key] = value
            with self.context.batch(force_flush=False):
                self._keys[key].set(True)
                self._iter.notify()

    def __delitem__(self, key: K):
        if not self._keys[key]._value:
            raise KeyError(key)
        del self._data[key]
        with self.context.batch(force_flush=False):
            self._keys[key].set(False)
            self._iter.notify()

    def __iter__(self):
        self._iter.track()
        for key in self._keys:
            if self._keys[key]._value:
                yield key

    def __len__(self):
        self._iter.track()
        return len(self._data)

    def __repr__(self):
        return repr({**self})

context instance-attribute

context = context or default_context

_check_equality instance-attribute

_check_equality = check_equality

_data instance-attribute

_data = initial

_keys instance-attribute

_keys = defaultdict(
    _signal, {k: (_signal(True)) for k in (tuple(initial))}
)

_iter instance-attribute

_iter = Subscribable()

_signal

_signal(value=False)
Source code in reactivity/collections.py
11
12
def _signal(self, value=False):
    return Signal(value, context=self.context)  # False for unset

__init__

__init__(
    initial: MutableMapping[K, V],
    check_equality=True,
    *,
    context: Context | None = None,
)
Source code in reactivity/collections.py
14
15
16
17
18
19
def __init__(self, initial: MutableMapping[K, V], check_equality=True, *, context: Context | None = None):
    self.context = context or default_context
    self._check_equality = check_equality
    self._data = initial
    self._keys = defaultdict(self._signal, {k: self._signal(True) for k in tuple(initial)})  # in subclasses, self._signal() may mutate `initial`
    self._iter = Subscribable()

__getitem__

__getitem__(key: K)
Source code in reactivity/collections.py
21
22
23
24
def __getitem__(self, key: K):
    if self._keys[key].get():
        return self._data[key]
    raise KeyError(key)

__setitem__

__setitem__(key: K, value: V)
Source code in reactivity/collections.py
26
27
28
29
30
31
32
33
34
35
36
def __setitem__(self, key: K, value: V):
    if self._keys[key]._value:
        should_notify = not self._check_equality or not _equal(self._data[key], value)
        self._data[key] = value
        if should_notify:
            self._keys[key].notify()
    else:
        self._data[key] = value
        with self.context.batch(force_flush=False):
            self._keys[key].set(True)
            self._iter.notify()

__delitem__

__delitem__(key: K)
Source code in reactivity/collections.py
38
39
40
41
42
43
44
def __delitem__(self, key: K):
    if not self._keys[key]._value:
        raise KeyError(key)
    del self._data[key]
    with self.context.batch(force_flush=False):
        self._keys[key].set(False)
        self._iter.notify()

__iter__

__iter__()
Source code in reactivity/collections.py
46
47
48
49
50
def __iter__(self):
    self._iter.track()
    for key in self._keys:
        if self._keys[key]._value:
            yield key

__len__

__len__()
Source code in reactivity/collections.py
52
53
54
def __len__(self):
    self._iter.track()
    return len(self._data)

__repr__

__repr__()
Source code in reactivity/collections.py
56
57
def __repr__(self):
    return repr({**self})

ReactiveMapping

Bases: ReactiveMappingProxy[K, V]

Source code in reactivity/collections.py
60
61
62
class ReactiveMapping[K, V](ReactiveMappingProxy[K, V]):
    def __init__(self, initial: Mapping[K, V] | None = None, check_equality=True, *, context: Context | None = None):
        super().__init__({**initial} if initial is not None else {}, check_equality, context=context)

__init__

__init__(
    initial: Mapping[K, V] | None = None,
    check_equality=True,
    *,
    context: Context | None = None,
)
Source code in reactivity/collections.py
61
62
def __init__(self, initial: Mapping[K, V] | None = None, check_equality=True, *, context: Context | None = None):
    super().__init__({**initial} if initial is not None else {}, check_equality, context=context)

ReactiveSetProxy

Bases: MutableSet[T]

Source code in reactivity/collections.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class ReactiveSetProxy[T](MutableSet[T]):
    def _signal(self, value=False):
        return Signal(value, self._check_equality, context=self.context)  # False for unset

    def __init__(self, initial: MutableSet[T], check_equality=True, *, context: Context | None = None):
        self.context = context or default_context
        self._check_equality = check_equality
        self._data = initial
        self._items = defaultdict(self._signal, {k: self._signal(True) for k in tuple(initial)})
        self._iter = Subscribable()

    def __contains__(self, value):
        return self._items[value].get()

    def add(self, value):
        with self.context.batch(force_flush=False):
            if self._items[value].set(True):
                self._data.add(value)
                self._iter.notify()

    def discard(self, value):
        if value in self._items and (signal := self._items[value]) and signal._value:
            self._data.remove(value)
            with self.context.batch(force_flush=False):
                signal.set(False)
                self._iter.notify()

    def remove(self, value):
        if value in self._items and (signal := self._items[value]) and signal._value:
            self._data.remove(value)
            with self.context.batch(force_flush=False):
                signal.set(False)
                self._iter.notify()
        else:
            raise KeyError(value)

    def __iter__(self):
        self._iter.track()
        for item in self._items:
            if self._items[item]._value:
                yield item

    def __len__(self):
        self._iter.track()
        return len(self._data)

    def __repr__(self):
        return repr({*self})

context instance-attribute

context = context or default_context

_check_equality instance-attribute

_check_equality = check_equality

_data instance-attribute

_data = initial

_items instance-attribute

_items = defaultdict(
    _signal, {k: (_signal(True)) for k in (tuple(initial))}
)

_iter instance-attribute

_iter = Subscribable()

_signal

_signal(value=False)
Source code in reactivity/collections.py
66
67
def _signal(self, value=False):
    return Signal(value, self._check_equality, context=self.context)  # False for unset

__init__

__init__(
    initial: MutableSet[T],
    check_equality=True,
    *,
    context: Context | None = None,
)
Source code in reactivity/collections.py
69
70
71
72
73
74
def __init__(self, initial: MutableSet[T], check_equality=True, *, context: Context | None = None):
    self.context = context or default_context
    self._check_equality = check_equality
    self._data = initial
    self._items = defaultdict(self._signal, {k: self._signal(True) for k in tuple(initial)})
    self._iter = Subscribable()

__contains__

__contains__(value)
Source code in reactivity/collections.py
76
77
def __contains__(self, value):
    return self._items[value].get()

add

add(value)
Source code in reactivity/collections.py
79
80
81
82
83
def add(self, value):
    with self.context.batch(force_flush=False):
        if self._items[value].set(True):
            self._data.add(value)
            self._iter.notify()

discard

discard(value)
Source code in reactivity/collections.py
85
86
87
88
89
90
def discard(self, value):
    if value in self._items and (signal := self._items[value]) and signal._value:
        self._data.remove(value)
        with self.context.batch(force_flush=False):
            signal.set(False)
            self._iter.notify()

remove

remove(value)
Source code in reactivity/collections.py
92
93
94
95
96
97
98
99
def remove(self, value):
    if value in self._items and (signal := self._items[value]) and signal._value:
        self._data.remove(value)
        with self.context.batch(force_flush=False):
            signal.set(False)
            self._iter.notify()
    else:
        raise KeyError(value)

__iter__

__iter__()
Source code in reactivity/collections.py
101
102
103
104
105
def __iter__(self):
    self._iter.track()
    for item in self._items:
        if self._items[item]._value:
            yield item

__len__

__len__()
Source code in reactivity/collections.py
107
108
109
def __len__(self):
    self._iter.track()
    return len(self._data)

__repr__

__repr__()
Source code in reactivity/collections.py
111
112
def __repr__(self):
    return repr({*self})

ReactiveSet

Bases: ReactiveSetProxy[T]

Source code in reactivity/collections.py
115
116
117
class ReactiveSet[T](ReactiveSetProxy[T]):
    def __init__(self, initial: Set[T] | None = None, check_equality=True, *, context: Context | None = None):
        super().__init__({*initial} if initial is not None else set(), check_equality, context=context)

__init__

__init__(
    initial: Set[T] | None = None,
    check_equality=True,
    *,
    context: Context | None = None,
)
Source code in reactivity/collections.py
116
117
def __init__(self, initial: Set[T] | None = None, check_equality=True, *, context: Context | None = None):
    super().__init__({*initial} if initial is not None else set(), check_equality, context=context)

ReactiveSequenceProxy

Bases: MutableSequence[T]

Source code in reactivity/collections.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
class ReactiveSequenceProxy[T](MutableSequence[T]):
    def _signal(self):
        return Subscribable(context=self.context)

    def __init__(self, initial: MutableSequence[T], check_equality=True, *, context: Context | None = None):
        self.context = context or default_context
        self._check_equality = check_equality
        self._data = initial
        self._keys = keys = defaultdict(self._signal)  # positive and negative index signals
        self._iter = Subscribable()
        self._length = len(initial)

        for index in range(-len(initial), len(initial)):
            keys[index] = self._signal()

    @overload
    def __getitem__(self, key: int) -> T: ...
    @overload
    def __getitem__(self, key: slice) -> list[T]: ...

    def __getitem__(self, key: int | slice):
        if isinstance(key, slice):
            start, stop, step = key.indices(self._length)
            if step != 1:
                raise NotImplementedError  # TODO
            for i in range(start, stop):
                self._keys[i].track()
            if not self._check_equality:
                self._iter.track()
                return self._data[start:stop]
            # The following implementation is inefficient but works. TODO: refactor this
            return _weak_derived(lambda: (self._iter.track(), self._data[slice(*key.indices(self._length))])[1])()

        else:
            # Handle integer indices
            self._keys[key].track()
            if -self._length <= key < self._length:
                return self._data[key]
            raise IndexError(key)

    def _replace(self, range_slice: slice, target: Iterable[T]):
        start, stop, step = range_slice.indices(self._length)
        if step != 1:
            raise NotImplementedError  # TODO

        target = [*target]
        assert start <= stop
        delta = len(target) - (stop - start)

        with self.context.batch(force_flush=False):
            if delta > 0:
                if not self._check_equality:
                    for i in range(start, self._length + delta):
                        self._keys[i].notify()
                    for i in range(stop + delta):
                        self._keys[i - self._length - delta].notify()
                else:
                    for i in range(start, self._length + delta):
                        if i < self._length:
                            if i - start < len(target):
                                if _equal(self._data[i], target[i - start]):
                                    continue
                            else:
                                if _equal(self._data[i], self._data[i - delta]):
                                    continue
                        self._keys[i].notify()
                    for i in range(stop + delta):
                        if i >= delta:
                            if i >= start:
                                if _equal(self._data[i - self._length - delta], target[i - start]):
                                    continue
                            else:
                                if _equal(self._data[i - self._length - delta], self._data[i - self._length]):
                                    continue
                        self._keys[i - self._length - delta].notify()

            elif delta < 0:
                if not self._check_equality:
                    for i in range(start, self._length):
                        self._keys[i].notify()
                    for i in range(stop):
                        self._keys[i - self._length].notify()
                else:
                    for i in range(start, self._length):
                        if i < self._length + delta:
                            if i - start < len(target):
                                if _equal(self._data[i], target[i - start]):
                                    continue
                            else:
                                if _equal(self._data[i], self._data[i - delta]):
                                    continue
                        self._keys[i].notify()
                    for i in range(stop):
                        if i >= -delta:
                            if 0 <= i - start < len(target):
                                if _equal(self._data[i - self._length], target[i - start]):
                                    continue
                            else:
                                if _equal(self._data[i - self._length], self._data[i - self._length + delta]):
                                    continue
                        self._keys[i - self._length].notify()

            else:
                if not self._check_equality:
                    for i in range(start, stop):
                        self._data[i] = target[i - start]
                        self._keys[i].notify()
                        self._keys[i - self._length].notify()
                else:
                    for i in range(start, stop):
                        original = self._data[i]
                        if not _equal(original, target[i - start]):
                            self._data[i] = target[i - start]
                            self._keys[i].notify()
                            self._keys[i - self._length].notify()

            if delta:
                self._length += delta
                self._iter.notify()
            self._data[start:stop] = target

    def __len__(self):
        self._iter.track()
        return self._length

    def __setitem__(self, key, value):
        if isinstance(key, slice):
            self._replace(key, value)
        else:
            if key < 0:
                key += self._length
            if not 0 <= key < self._length:
                raise IndexError(key)
            self._replace(slice(key, key + 1), [value])

    def __delitem__(self, key):
        if isinstance(key, slice):
            self._replace(key, [])
        else:
            if key < 0:
                key += self._length
            if not 0 <= key < self._length:
                raise IndexError(key)
            self._replace(slice(key, key + 1), [])

    def insert(self, index, value):
        if index < 0:
            index += self._length
        if index < 0:
            index = 0
        if index > self._length:
            index = self._length
        self._replace(slice(index, index), [value])

    def append(self, value):
        self._replace(slice(self._length, self._length), [value])

    def extend(self, values):
        self._replace(slice(self._length, self._length), values)

    def pop(self, index=-1):
        if index < 0:
            index += self._length
        if not 0 <= index < self._length:
            raise IndexError(index)
        value = self._data[index]
        self._replace(slice(index, index + 1), [])
        return value

    def remove(self, value):
        for i in range(self._length):
            if self._data[i] == value:
                self._replace(slice(i, i + 1), [])
                return
        raise ValueError(value)

    def clear(self):
        self._replace(slice(0, self._length), [])

    def reverse(self):
        self._replace(slice(0, self._length), reversed(self._data))

    def sort(self, *, key=None, reverse=False):
        self._replace(slice(0, self._length), sorted(self._data, key=key, reverse=reverse))  # type: ignore

    def __repr__(self):
        return repr([*self])

    def __eq__(self, value):
        return [*self] == value

context instance-attribute

context = context or default_context

_check_equality instance-attribute

_check_equality = check_equality

_data instance-attribute

_data = initial

_keys instance-attribute

_keys = defaultdict(_signal)

_iter instance-attribute

_iter = Subscribable()

_length instance-attribute

_length = len(initial)

_signal

_signal()
Source code in reactivity/collections.py
130
131
def _signal(self):
    return Subscribable(context=self.context)

__init__

__init__(
    initial: MutableSequence[T],
    check_equality=True,
    *,
    context: Context | None = None,
)
Source code in reactivity/collections.py
133
134
135
136
137
138
139
140
141
142
def __init__(self, initial: MutableSequence[T], check_equality=True, *, context: Context | None = None):
    self.context = context or default_context
    self._check_equality = check_equality
    self._data = initial
    self._keys = keys = defaultdict(self._signal)  # positive and negative index signals
    self._iter = Subscribable()
    self._length = len(initial)

    for index in range(-len(initial), len(initial)):
        keys[index] = self._signal()

__getitem__

__getitem__(key: int) -> T
__getitem__(key: slice) -> list[T]
Source code in reactivity/collections.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def __getitem__(self, key: int | slice):
    if isinstance(key, slice):
        start, stop, step = key.indices(self._length)
        if step != 1:
            raise NotImplementedError  # TODO
        for i in range(start, stop):
            self._keys[i].track()
        if not self._check_equality:
            self._iter.track()
            return self._data[start:stop]
        # The following implementation is inefficient but works. TODO: refactor this
        return _weak_derived(lambda: (self._iter.track(), self._data[slice(*key.indices(self._length))])[1])()

    else:
        # Handle integer indices
        self._keys[key].track()
        if -self._length <= key < self._length:
            return self._data[key]
        raise IndexError(key)

_replace

_replace(range_slice: slice, target: Iterable[T])
Source code in reactivity/collections.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def _replace(self, range_slice: slice, target: Iterable[T]):
    start, stop, step = range_slice.indices(self._length)
    if step != 1:
        raise NotImplementedError  # TODO

    target = [*target]
    assert start <= stop
    delta = len(target) - (stop - start)

    with self.context.batch(force_flush=False):
        if delta > 0:
            if not self._check_equality:
                for i in range(start, self._length + delta):
                    self._keys[i].notify()
                for i in range(stop + delta):
                    self._keys[i - self._length - delta].notify()
            else:
                for i in range(start, self._length + delta):
                    if i < self._length:
                        if i - start < len(target):
                            if _equal(self._data[i], target[i - start]):
                                continue
                        else:
                            if _equal(self._data[i], self._data[i - delta]):
                                continue
                    self._keys[i].notify()
                for i in range(stop + delta):
                    if i >= delta:
                        if i >= start:
                            if _equal(self._data[i - self._length - delta], target[i - start]):
                                continue
                        else:
                            if _equal(self._data[i - self._length - delta], self._data[i - self._length]):
                                continue
                    self._keys[i - self._length - delta].notify()

        elif delta < 0:
            if not self._check_equality:
                for i in range(start, self._length):
                    self._keys[i].notify()
                for i in range(stop):
                    self._keys[i - self._length].notify()
            else:
                for i in range(start, self._length):
                    if i < self._length + delta:
                        if i - start < len(target):
                            if _equal(self._data[i], target[i - start]):
                                continue
                        else:
                            if _equal(self._data[i], self._data[i - delta]):
                                continue
                    self._keys[i].notify()
                for i in range(stop):
                    if i >= -delta:
                        if 0 <= i - start < len(target):
                            if _equal(self._data[i - self._length], target[i - start]):
                                continue
                        else:
                            if _equal(self._data[i - self._length], self._data[i - self._length + delta]):
                                continue
                    self._keys[i - self._length].notify()

        else:
            if not self._check_equality:
                for i in range(start, stop):
                    self._data[i] = target[i - start]
                    self._keys[i].notify()
                    self._keys[i - self._length].notify()
            else:
                for i in range(start, stop):
                    original = self._data[i]
                    if not _equal(original, target[i - start]):
                        self._data[i] = target[i - start]
                        self._keys[i].notify()
                        self._keys[i - self._length].notify()

        if delta:
            self._length += delta
            self._iter.notify()
        self._data[start:stop] = target

__len__

__len__()
Source code in reactivity/collections.py
250
251
252
def __len__(self):
    self._iter.track()
    return self._length

__setitem__

__setitem__(key, value)
Source code in reactivity/collections.py
254
255
256
257
258
259
260
261
262
def __setitem__(self, key, value):
    if isinstance(key, slice):
        self._replace(key, value)
    else:
        if key < 0:
            key += self._length
        if not 0 <= key < self._length:
            raise IndexError(key)
        self._replace(slice(key, key + 1), [value])

__delitem__

__delitem__(key)
Source code in reactivity/collections.py
264
265
266
267
268
269
270
271
272
def __delitem__(self, key):
    if isinstance(key, slice):
        self._replace(key, [])
    else:
        if key < 0:
            key += self._length
        if not 0 <= key < self._length:
            raise IndexError(key)
        self._replace(slice(key, key + 1), [])

insert

insert(index, value)
Source code in reactivity/collections.py
274
275
276
277
278
279
280
281
def insert(self, index, value):
    if index < 0:
        index += self._length
    if index < 0:
        index = 0
    if index > self._length:
        index = self._length
    self._replace(slice(index, index), [value])

append

append(value)
Source code in reactivity/collections.py
283
284
def append(self, value):
    self._replace(slice(self._length, self._length), [value])

extend

extend(values)
Source code in reactivity/collections.py
286
287
def extend(self, values):
    self._replace(slice(self._length, self._length), values)

pop

pop(index=-1)
Source code in reactivity/collections.py
289
290
291
292
293
294
295
296
def pop(self, index=-1):
    if index < 0:
        index += self._length
    if not 0 <= index < self._length:
        raise IndexError(index)
    value = self._data[index]
    self._replace(slice(index, index + 1), [])
    return value

remove

remove(value)
Source code in reactivity/collections.py
298
299
300
301
302
303
def remove(self, value):
    for i in range(self._length):
        if self._data[i] == value:
            self._replace(slice(i, i + 1), [])
            return
    raise ValueError(value)

clear

clear()
Source code in reactivity/collections.py
305
306
def clear(self):
    self._replace(slice(0, self._length), [])

reverse

reverse()
Source code in reactivity/collections.py
308
309
def reverse(self):
    self._replace(slice(0, self._length), reversed(self._data))

sort

sort(*, key=None, reverse=False)
Source code in reactivity/collections.py
311
312
def sort(self, *, key=None, reverse=False):
    self._replace(slice(0, self._length), sorted(self._data, key=key, reverse=reverse))  # type: ignore

__repr__

__repr__()
Source code in reactivity/collections.py
314
315
def __repr__(self):
    return repr([*self])

__eq__

__eq__(value)
Source code in reactivity/collections.py
317
318
def __eq__(self, value):
    return [*self] == value

ReactiveSequence

Bases: ReactiveSequenceProxy[T]

Source code in reactivity/collections.py
321
322
323
class ReactiveSequence[T](ReactiveSequenceProxy[T]):
    def __init__(self, initial: Sequence[T] | None = None, check_equality=True, *, context: Context | None = None):
        super().__init__([*initial] if initial is not None else [], check_equality, context=context)

__init__

__init__(
    initial: Sequence[T] | None = None,
    check_equality=True,
    *,
    context: Context | None = None,
)
Source code in reactivity/collections.py
322
323
def __init__(self, initial: Sequence[T] | None = None, check_equality=True, *, context: Context | None = None):
    super().__init__([*initial] if initial is not None else [], check_equality, context=context)

_weak_derived

_weak_derived[T](
    fn: Callable[[], T],
    check_equality=True,
    *,
    context: Context | None = None,
)
Source code in reactivity/collections.py
120
121
122
123
124
125
126
def _weak_derived[T](fn: Callable[[], T], check_equality=True, *, context: Context | None = None):
    d = Derived(fn, check_equality, context=context)
    s = d.subscribers = ReactiveSetProxy(d.subscribers)  # type: ignore
    e = Effect(lambda: not s and d.dispose(), False)  # when `subscribers` is empty, gc it
    s._iter.subscribers.add(e)
    e.dependencies.add(s._iter)
    return d

reactive_object_proxy

reactive_object_proxy[T](
    initial: T,
    check_equality=True,
    *,
    context: Context | None = None,
) -> T
Source code in reactivity/collections.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def reactive_object_proxy[T](initial: T, check_equality=True, *, context: Context | None = None) -> T:
    context = context or default_context

    names = ReactiveMappingProxy(initial.__dict__, check_equality, context=context)  # TODO: support classes with `__slots__`
    _iter = names._iter
    _keys: defaultdict[str, Signal[bool | None]] = names._keys  # type: ignore
    # true for instance attributes, false for non-existent attributes, None for class attributes
    # only instance attributes are visible in `__dict__`
    # TODO: accessing non-data descriptors should be treated as getting `Derived` instead of `Signal`
    CLASS_ATTR = None  # sentinel for class attributes

    cls = initial.__class__
    meta: type[type[T]] = type(cls)

    from inspect import isclass, ismethod

    class Proxy(cls, metaclass=meta):
        def __getattribute__(self, key):
            if key == "__dict__":
                return names
            if _keys[key].get():
                res = getattr(initial, key)
                if ismethod(res):
                    return res.__func__.__get__(self)
                return res
            return super().__getattribute__(key)

        def __setattr__(self, key: str, value):
            if _keys[key]._value is not False:
                should_notify = not check_equality or not _equal(getattr(initial, key), value)
                setattr(initial, key, value)
                if should_notify:
                    _keys[key].notify()
            else:
                setattr(initial, key, value)
                with context.batch(force_flush=False):
                    _keys[key].set(True if key in initial.__dict__ else CLASS_ATTR)  # non-instance attributes are tracked but not visible in `__dict__`
                    _iter.notify()

        def __delattr__(self, key):
            if not _keys[key]._value:
                raise AttributeError(key)
            delattr(initial, key)
            with context.batch(force_flush=False):
                _keys[key].set(False)
                _iter.notify()

        def __dir__(self):
            _iter.track()
            return dir(initial)

        if isclass(initial):
            __new__ = meta.__new__

            def __call__(self, *args, **kwargs):
                # TODO: refactor this because making a new class whenever constructing a new instance is wasteful
                return reactive(initial(*args, **kwargs), check_equality, context=context)  # type: ignore

            # it seems that __str__ and __repr__ are not looked up on the class, so we have to define them here
            # note that this do loses reactivity but probably nobody needs reactive stringifying of classes themselves

            def __str__(self):
                return str(initial)

            def __repr__(self):
                return repr(initial)

        else:

            def __init__(self, *args, **kwargs):
                nonlocal bypassed
                if bypassed:
                    bypassed = False
                    return
                super().__init__(*args, **kwargs)

    bypassed = True

    update_wrapper(Proxy, cls, updated=())

    if isclass(initial):
        return Proxy(initial.__name__, (initial,), {**initial.__dict__})  # type: ignore

    return Proxy()  # type: ignore

reactive

reactive[K, V](
    value: MutableMapping[K, V],
    check_equality=True,
    *,
    context: Context | None = None,
) -> ReactiveMappingProxy[K, V]
reactive[K, V](
    value: Mapping[K, V],
    check_equality=True,
    *,
    context: Context | None = None,
) -> ReactiveMapping[K, V]
reactive[T](
    value: MutableSet[T],
    check_equality=True,
    *,
    context: Context | None = None,
) -> ReactiveSetProxy[T]
reactive[T](
    value: Set[T],
    check_equality=True,
    *,
    context: Context | None = None,
) -> ReactiveSet[T]
reactive[T](
    value: MutableSequence[T],
    check_equality=True,
    *,
    context: Context | None = None,
) -> ReactiveSequenceProxy[T]
reactive[T](
    value: Sequence[T],
    check_equality=True,
    *,
    context: Context | None = None,
) -> ReactiveSequence[T]
reactive[T](
    value: T,
    check_equality=True,
    *,
    context: Context | None = None,
) -> T
Source code in reactivity/collections.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def reactive(value: Mapping | Set | Sequence | Any, check_equality=True, *, context: Context | None = None):
    match value:
        case MutableMapping():
            return ReactiveMappingProxy(value, check_equality, context=context)
        case Mapping():
            return ReactiveMapping(value, check_equality, context=context)
        case MutableSet():
            return ReactiveSetProxy(value, check_equality, context=context)
        case Set():
            return ReactiveSet(value, check_equality, context=context)
        case MutableSequence():
            return ReactiveSequenceProxy(value, check_equality, context=context)
        case Sequence():
            return ReactiveSequence(value, check_equality, context=context)
        case _:
            return reactive_object_proxy(value, check_equality, context=context)