When dealing with real-time* data (or events), I often find the need to ether; monitor the amount of data points, within a given time frame, or use the data for performing some rolling statistical calculating.
Okay, but it’s not that difficult!
Indeed, it’s relatively easily done, using a normal dict structure with a timestamp as key, then iterating over all the keys, in order to discard the ones too old, every time your want to perform your count or calculation.
But.. Hey! That adds a loop to on your real-time path, a loop that easily can be very large. If the precision needed, for old elements to be removed, is >1 ms (0.001 sec), using a thread to do the purging, this can be handled beforehand, allowing fast access to the data and preventing congestion and slowdowns.
This all of a sudden turns more complicated, and this relative simple feature clocks up the readability of the code, the solution: Introducing the TimedDict data structure.
Read more about the TimedDict on PyPI.
Example
The following code snippet illustrates the functionality of the TimedDict, from instantiation to stopping of the purge thread.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
import time from TimedDict.timeddict import TimedDict events_window = TimedDict() now = time.time() # Assign values like a normal dict like: events_window[now] = 'value_1' events_window[now + 1] = 'value_2' # ...or like: events_window.update({now + 2: {'values': {'value_3', 'value_4'}}}) print('Raw data:') print(events_window) # NOTE: # As the TimedDict has a thread running purging old elements, it's important to ether # use the protect() or pause() followed by a resume() when iterating. # Automatic by the use of context manager, protect() approach with events_window.protect(): print('\n- protect()') for event in events_window: print(event) # Manual setting, pause() and resume() approach events_window.pause() print('\n- pause() followed by resume()') for event in events_window: print(event) events_window.resume() # TTL example print('\nLength of the TimedDict: {}'.format(len(events_window))) print(events_window) time.sleep(1.1) print(events_window) time.sleep(1) print(events_window) time.sleep(1) print(events_window) # Gracefully stop the purge thread events_window.stop() |
Output
Raw data: {1534608053.6948583: 'value_1', 1534608054.6948583: 'value_2', 1534608055.6948583: {'values': {'value_4', 'value_3'}}} - protect() 1534608053.6948583 1534608054.6948583 1534608055.6948583 - pause() followed by resume() 1534608053.6948583 1534608054.6948583 1534608055.6948583 Length of the TimedDict: 3 {1534608053.6948583: 'value_1', 1534608054.6948583: 'value_2', 1534608055.6948583: {'values': {'value_4', 'value_3'}}} {1534608054.6948583: 'value_2', 1534608055.6948583: {'values': {'value_4', 'value_3'}}} {1534608055.6948583: {'values': {'value_4', 'value_3'}}} {}
* Soft real-time that is, as a deterministic execution time is not guaranteed, when using a garbage collected language.