Iterator utility classes and functions
Project description
Provides a wrapper class TimeoutIterator to add timeout feature to normal iterators
Installation:
pip intall iterator
See help of TimeoutIterator for all the features. Check tests for examples on how to use TimeoutIterator. See example tests below for basic usage
Example:
-
TimeoutIterator works like normal iterator:
def iter_simple(): yield 1 yield 2 def test_normal_iteration(self): i = iter_simple() it = TimeoutIterator(i) self.assertEqual(next(it), 1) self.assertEqual(next(it), 2) self.assertRaises(StopIteration, next, it) self.assertRaises(StopIteration, next, it)
-
When timeout is needed, use like this
def iter_with_sleep(): yield 1 time.sleep(0.6) yield 2 time.sleep(0.4) yield 3 def test_fixed_timeout(self): i = iter_with_sleep() it = TimeoutIterator(i, timeout=0.5) self.assertEqual(next(it), 1) self.assertEqual(next(it), it.get_sentinel()) self.assertEqual(next(it), 2) self.assertEqual(next(it), 3) self.assertRaises(StopIteration, next, it)
-
Dynamic timeout adjustment
def iter_with_sleep(): yield 1 time.sleep(0.6) yield 2 time.sleep(0.4) yield 3 def test_timeout_update(self): i = iter_with_sleep() it = TimeoutIterator(i, timeout=0.5) self.assertEqual(next(it), 1) self.assertEqual(next(it), it.get_sentinel()) it.set_timeout(0.3) self.assertEqual(next(it), 2) self.assertEqual(next(it), it.get_sentinel()) self.assertEqual(next(it), 3) self.assertRaises(StopIteration, next, it)
Run unit tests locally:
python -m unittest discover tests
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
iterators-0.0.1.tar.gz
(2.7 kB
view hashes)
Built Distribution
Close
Hashes for iterators-0.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1227bcd9404c03d945c220dbac497a21d04ac06ec1f68e7c6bcb6ef108c2e38c |
|
MD5 | 5779a4f8d6b61910ce6f8d13c6c50933 |
|
BLAKE2b-256 | 9a6b9a9d1d862175094ff7b8940a7482eca3978f7b5a9387c33f01b5d704b417 |