# MIT License
#
# Copyright (c) 2020 Christopher Henderson, chris@chenderson.org
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import absolute_import
import functools
import itertools
from builtins import object
from builtins import map
from builtins import filter
from builtins import enumerate
from builtins import zip
from builtins import reversed
from builtins import sorted
from collections import namedtuple, defaultdict
from pstream._sync.util import not_infinite
try:
# Py3
from collections.abc import Iterator, Iterable
except ImportError: # pragma: no cover
# Py2
from collections import Iterator, Iterable
from pstream.errors import InfiniteCollectionError
[docs]class Stream(object):
[docs] def __init__(self, initial=None):
"""
:param initial: An optional initial value for the stream. `Must` be either an iterator or an iterable.
If `initial` is `None` then the stream with be initialized to be empty.
:Raises: :class:`ValueError` if `initial` is neither an iterator nor an iterable.
"""
if initial is None:
initial = []
if isinstance(initial, Iterator):
self._stream = initial
elif isinstance(initial, Iterable):
self._stream = (x for x in initial)
else:
raise ValueError(
'pstream.Stream can only accept either an iterator or an iterable. Got {}'.format(type(initial)))
self._infinite = False
[docs] def chain(self, *iterables):
"""
Returns a stream that links an arbitrary number of iterators to this iterator, in a chain.
:param iterables: Zero or more iterable objects. These iterables will be chained to the stream
in a left-to-right fashion.
:Returns: :class:`Stream`
:Example:
>>> got = Stream([1, 2, 3]).chain([4, 5, 6], [7, 8, 9]).collect()
>>> assert got == [1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
self._stream = itertools.chain(self._stream, *iterables)
return self
[docs] @not_infinite
def count(self):
"""
Evaluates the stream, consuming it and returning a count of the number of elements in the stream.
:Returns: :class:`int`
:Raises: :class:`errors.InfiniteCollectionError`
:Example:
>>> count = Stream(range(100)).filter(lambda x: x % 2 is 0).count()
>>> assert count == 50
"""
count = 0
for _ in self:
count += 1
return count
[docs] @not_infinite
def collect(self):
"""
Evaluates the stream, consuming it and returning a list of the final output.
:Returns: :class:`list`
:Raises: :class:`errors.InfiniteCollectionError`
:Example:
>>> stream = Stream([1, 2, 3, 4]).map(lambda x: x * 2)
>>> got = stream.collect()
>>> assert got == [2, 4, 6, 8]
"""
return [_ for _ in self]
[docs] def distinct(self):
"""
Returns a stream of distinct elements. Distinction is computed by applying the builtin `hash` function
to each element. Ordering of elements in the stream is maintained.
This functor incurs an additional allocation in the form of a hashset in order to keep track of
the elements in the stream.
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 2, 3, 2, 1, 4, 5, 6, 1]
>>> got = Stream(numbers).distinct().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
"""
seen = set()
stream = self._stream
def inner():
for x in stream:
if x in seen:
continue
seen.add(x)
yield x
self._stream = inner()
return self
[docs] def distinct_with(self, key):
"""
Returns a stream of distinct elements. Distinction is computed by applying the builtin `hash` function
to each item generated by the provided `key(element)`. Ordering of elements in the stream is maintained.
This functor incurs an additional allocation in the form of a hashset in order to keep track of
the elements in the stream.
:param key: A function such that `key(element) -> T` where `T` must be hashable.
:Returns: :class:`Stream`
:Example:
>>> import hashlib
>>>
>>> people = ['Bob', 'Alice', 'Eve', 'Alice', 'Alice', 'Eve', 'Achmed']
>>> fingerprinter = lambda x: hashlib.sha256(x.encode('UTF-8')).digest()
>>> got = Stream(people).distinct_with(fingerprinter).collect()
>>> assert got == ['Bob', 'Alice', 'Eve', 'Achmed']
"""
seen = set()
stream = self._stream
def inner():
for x in stream:
h = key(x)
if h in seen:
continue
seen.add(h)
yield x
self._stream = inner()
return self
Enumeration = namedtuple('Enumeration', ['count', 'element'])
[docs] def enumerate(self):
"""
Returns a stream that yields the current count and the element during iteration.
:Returns: :class:`Stream`
:Example:
>>> got = Stream(range(1, 10)).enumerate().collect()
>>> assert got == [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)]
The constructed tuple is the namedtuple, :class:`Stream.Enumeration`, which provides
the names `count` and `element`.
:Example:
>>> def count(enumeration):
... print(enumeration.count, enumeration.element)
>>> got = Stream(range(1, 5)).enumerate().inspect(count).map(lambda e: e.element).collect()
0 1
1 2
2 3
3 4
>>> assert got == [1, 2, 3, 4]
"""
self._stream = enumerate(self._stream)
return self.map(lambda enumeration: Stream.Enumeration(*enumeration))
[docs] def filter(self, predicate):
"""
Returns a stream that filters each element using `predicate`. Only elements for which `predicate`
returns `True` are passed through the stream.
:param predicate: A function such that `predicate(element) -> bool`.
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> odds = lambda x: x % 2 != 0
>>> got = Stream(numbers).filter(odds).collect()
>>> assert got == [1, 3, 5, 7, 9]
"""
self._stream = filter(predicate, self._stream)
return self
[docs] def filter_false(self, predicate):
"""
Returns a stream that filters each element using `predicate`. Only elements for which `predicate`
returns `False` are passed through the stream.
:param predicate: A function such that `predicate(element) -> bool`.
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> odds = lambda x: x % 2 != 0
>>> got = Stream(numbers).filter_false(odds).collect()
>>> assert got == [2, 4, 6, 8]
"""
return self.filter(lambda x: not predicate(x))
[docs] def flatten(self):
"""
Returns a stream that flattens one level of nesting in a stream of elements that are themselves iterators.
:Returns: :class:`Stream`
:Example:
>>> # Flatten a two dimensional array to a one dimensional array.
>>> two_dimensional = [[1, 2, 3], [4, 5, 6]]
>>> got = Stream(two_dimensional).flatten().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
>>> # Flatten a three dimensional array to a two dimensional array.
>>> three_dimensional = [[[1, 2, 3]], [[4, 5, 6]]]
>>> got = Stream(three_dimensional).flatten().collect()
>>> assert got == [[1, 2, 3], [4, 5, 6]]
>>> # Flatten a three dimensional array to a one dimensional array.
>>> three_dimensional = [[[1, 2, 3]], [[4, 5, 6]]]
>>> got = Stream(three_dimensional).flatten().flatten().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
"""
self._stream = (x for stream in self._stream for x in stream)
return self
[docs] @not_infinite
def for_each(self, f):
"""
Evaluates the stream, consuming it and calling `f` for each element in the stream.
Note that while other stream consumers, such as :meth:`Stream.collect` and :meth:`Stream.count`, will raise an
an :class:`errors.InfiniteCollectionError` if called on an infinite stream (see the documentation
regarding :meth:`Stream.repeat` and :meth:`Stream.repeat_with`), `for_each` will not.
This makes the following...
>>> Stream().repeat_with(input).for_each(print) # doctest: +SKIP
...roughly equivalent to:
>>> while True: # doctest: +SKIP
... print(input()) # doctest: +SKIP
:param f: A function such that `f(element)`. Any value returned is ignored.
:Example:
>>> Stream(range(1, 5)).for_each(print)
1
2
3
4
"""
for x in self:
f(x)
[docs] @not_infinite
def group_by(self, key):
"""
Returns a stream that groups elements together using the provided `key` function.
The ordering of the groups is non-deterministic.
:param key: A function such that `f(element) -> T` where `T` will be used to group elements together.
:Returns: :class:`Stream`
:Example:
>>> # Group people by how long their names are.
>>> names = ['Alice', 'Bob', 'Eve', 'Chris', 'Arjuna', 'Zack']
>>> got = Stream(names).group_by(len).collect()
>>> len(got) == 4
True
>>> ['Alice', 'Chris'] in got
True
>>> ['Bob', 'Eve'] in got
True
>>> ['Arjuna'] in got
True
>>> ['Zack'] in got
True
:Example:
>>> # Group the numbers [0, 10) by evens and odds.
>>> got = Stream(range(10)).group_by(lambda x: x % 2).collect()
>>> len(got) == 2
True
>>> [1, 3, 5, 7, 9] in got
True
>>> [0, 2, 4, 6, 8] in got
True
"""
stream = self._stream
def inner():
m = defaultdict(list)
for element in stream:
m[key(element)].append(element)
for grouping in m.values():
yield grouping
self._stream = inner()
return self
[docs] def inspect(self, f):
"""
Returns a stream that calls the function, `f`, with a reference to each element before yielding it.
:param f: A function such that `f(element)`. Any value returned is ignored.
:Returns: :class:`Stream`
:Example:
>>> def log(number):
... if number % 2 != 0:
... print("WARNING: {} is not even!".format(number))
>>>
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).inspect(log).collect()
WARNING: 1 is not even!
WARNING: 3 is not even!
WARNING: 5 is not even!
WARNING: 7 is not even!
WARNING: 9 is not even!
>>> assert got == [1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
stream = self._stream
def inner():
for x in stream:
f(x)
yield x
self._stream = inner()
return self
[docs] def map(self, f):
"""
Returns a stream that maps each value using `f`.
:param f: A function such that `f(A) -> B`.
:Returns: :class:`Stream`
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> double = lambda x: x * 2
>>> got = Stream(numbers).map(double).collect()
>>> assert got == [2, 4, 6, 8, 10, 12, 14, 16, 18]
"""
self._stream = map(f, self._stream)
return self
[docs] @not_infinite
def reduce(self, f, accumulator):
"""
Evaluates the stream, consuming it and applying the function `f` to each item in the stream,
producing a single value.
After `f` has been applied to every item in the stream, the updated `accumulator` is returned.
:param f: A function such that `f(accumulator: T, element) -> T`.
:param accumulator: The initial value provided to `f`.
:Returns: `T` such that `f(accumulator: T, element) -> T`.
Example:
>>> def stringify(accumulator, element):
... return accumulator + str(element)
>>>
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).reduce(stringify, '')
>>> assert got == '123456789'
"""
return functools.reduce(f, self, accumulator)
[docs] @not_infinite
def reverse(self):
"""
Returns a stream whose elements are reversed.
Note that calling `reverse` itself remains lazy, however at time of collecting the stream a reversal
will incur an internal collection at that particular step. This is due to the reliance of Python's builtin
`reversed` function which itself requires an object that is indexable.
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).reverse().collect()
>>> assert got == [9, 8, 7, 6, 5, 4, 3, 2, 1]
"""
stream = self._stream
def inner():
return reversed([x for x in stream])
self._stream = inner()
return self
[docs] def skip(self, n):
"""
Returns a stream that skips over `n` number of elements.
:param n: :class:`int`
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).skip(3).collect()
>>> assert got == [4, 5, 6, 7, 8, 9]
"""
stream = self._stream
def inner():
for _ in range(n):
next(stream)
for x in stream:
yield x
self._stream = inner()
return self
[docs] def skip_while(self, predicate):
"""
Returns a stream that rejects elements while `predicate` returns `True`.
`skip_while` is the complement to :meth:`Stream.take_while`.
:param predicate: A function such that `f(element) -> bool`.
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).skip_while(lambda x: x < 5).collect()
>>> assert got == [5, 6, 7, 8, 9]
"""
self._stream = itertools.dropwhile(predicate, self._stream)
return self
[docs] @not_infinite
def sort(self):
"""
Returns a stream whose elements are sorted.
Note that calling `sort` itself remains lazy, however at time of collecting the stream a sort
will incur an internal collection at that particular step.
:Returns: :class:`Stream`
:Example:
>>> arr = [12, 233, 4567, 344523, 7, 567, 34, 5678, 456, 23, 4, 7, 63, 45, 345]
>>> got = Stream(arr).sort().collect()
>>> assert got == [4, 7, 7, 12, 23, 34, 45, 63, 233, 345, 456, 567, 4567, 5678, 344523]
"""
return self.sort_with(None)
[docs] @not_infinite
def sort_with(self, key):
"""
Returns a stream whose elements are sorted using the provided key selection function.
Note that calling `sort_with` itself remains lazy, however at time of collecting the stream a sort
will incur an internal collection at that particular step.
:param key: A function such that `key(element) -> T` where `T` is the type used for comparison.
:Returns: :class:`Stream`
:Example:
>>> arr = ['12', '233', '4567', '344523', '7', '567', '34', '5678', '456', '23', '4', '7', '63', '45', '345']
>>> got = Stream(arr).sort_with(len).collect()
>>> assert got == ['7', '4', '7', '12', '34', '23', '63', '45', '233', '567', '456', '345', '4567', '5678', '344523']
"""
stream = self._stream
def inner():
return sorted(stream, key=key)
self._stream = inner()
return self
[docs] def step_by(self, step):
"""
Returns a stream which steps over items by a custom amount. Regardless of the step, the first item
in the stream is always returned.
:param step: :class:`int`. `Must` be greater than or equal to one.
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).step_by(3).collect()
>>> assert got == [1, 4, 7]
"""
self._stream = itertools.islice(self._stream, 0, None, step)
return self
[docs] def take(self, n):
"""
Returns a stream that only iterates over the first `n` elements.
:param n: :class:`int`
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).take(6).collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
"""
stream = self._stream
def inner():
for _ in range(n):
try:
yield next(stream)
except StopIteration:
break
self._stream = inner()
self._infinite = False
return self
[docs] def take_while(self, predicate):
"""
Returns a stream that only accepts elements while `predicate` returns `True`.
`take_while` is the complement to :meth:`Stream.skip_while`.
:param predicate: A function such that `predicate(element) -> bool`.
:Returns: :class:`Stream`
:Example:
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).take_while(lambda x: x < 5).collect()
>>> assert got == [1, 2, 3, 4]
"""
self._stream = itertools.takewhile(predicate, self._stream)
self._infinite = False
return self
[docs] def tee(self, *receivers):
"""
Returns a stream whose elements will be appended to objects in `receivers`.
`tee` behaves similarly to the `Unix tee command <https://man7.org/linux/man-pages/man1/tee.1.html>`_.
:param receivers: Zero or more objects that `must` support an `append` method.
:Returns: :class:`Stream`
:Example:
>>> a = list()
>>> b = list()
>>> got = Stream([1, 2, 3, 4]).tee(a, b).map(lambda x: x * 2).collect()
>>> assert got == [2, 4, 6 ,8]
>>> assert a == [1, 2, 3, 4]
>>> assert b == [1, 2, 3, 4]
"""
stream = self._stream
def inner():
for element in stream:
for other in receivers:
other.append(element)
yield element
self._stream = inner()
return self
[docs] def zip(self, *iterables):
"""
Returns a stream that iterates over one or more iterators simultaneously.
:param iterables: Zero or more iterable objects.
:Returns: :class:`Stream`
:Example:
>>> got = Stream([0, 1, 2]).zip([3, 4, 5]).collect()
>>> assert got == [(0, 3), (1, 4), (2, 5)]
"""
self._stream = zip(self._stream, *iterables)
return self
[docs] def pool(self, size):
"""
Returns a stream that will collect up to `size` elements into a list before yielding.
:param size: :class:`int`. `Must` be greater than 0.
:Returns: :class:`Stream`
:Example:
>>> got = Stream([1, 2, 3, 4, 5]).pool(3).collect()
>>> assert got == [[1, 2, 3], [4, 5]]
Note that `pool` effectively behaves as the inverse to :meth:`Stream.flatten` by gradually
introducing higher levels of dimensionality.
:Example:
>>> one = [1, 2, 3, 4, 5, 6, 7, 8]
>>> two = Stream(one).pool(2).collect()
>>> assert two == [[1, 2], [3, 4], [5, 6], [7, 8]]
>>> three = Stream(two).pool(2).collect()
>>> assert three == [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]
"""
if size <= 0:
raise ValueError("pstream.Stream.pool sizes must be greater than 0. Received {}.".format(size))
stream = self._stream
def inner():
pool = list()
for x in stream:
pool.append(x)
if len(pool) == size:
yield pool
pool = list()
if len(pool) != 0:
yield pool
self._stream = inner()
return self
[docs] def repeat(self, element):
"""
Returns a stream that repeats an element endlessly.
:param element: Any object. This exact object will be yieled repeatedly.
:Returns: :class:`Stream`
:Example:
>>> got = Stream().repeat(1).take(5).collect()
>>> assert got == [1, 1, 1, 1, 1]
A call to `repeat` wipes out any previous step in the iterator.
:Example:
>>> # The initial range, enumeration, and chain are completely lost
>>> # and the stream returns 1 indefinitely.
>>> s = Stream(range(10)).enumerate().chain(range(10, 20)).repeat(1)
Unless a limiting step, such as :meth:`Stream.take_while` or :meth:`Stream.take`, has been setup after
a call to `repeat`, the consumers :meth:`Stream.collect` and :meth:`Stream.count`
will throw an :class:`errors.InfiniteCollectionError`.
:Example:
>>> try:
... Stream().repeat(1).collect()
... except InfiniteCollectionError as error:
... print(error)
Stream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect
"""
def inner():
while True:
yield element
self._stream = inner()
self._infinite = True
return self
[docs] def repeat_with(self, f):
"""
Returns a stream that yields the output of `f` endlessly.
:param f: A function such that `f() -> T`.
:Returns: :class:`Stream`
:Example:
>>> got = Stream().repeat_with(lambda: 1).take(5).collect()
>>> assert got == [1, 1, 1, 1, 1]
A call to `repeat` wipes out any previous step in the iterator.
:Example:
>>> # The initial range, enumeration, and chain are completely lost
>>> # and the stream returns 1 indefinitely.
>>> s = Stream(range(10)).enumerate().chain(range(10, 20)).repeat_with(lambda: 1)
Unless a limiting step, such as :meth:`Stream.take_while` or :meth:`Stream.take`, has been setup after
a call to `repeat`, the consumers :meth:`Stream.collect` and :meth:`Stream.count`
will throw an :class:`errors.InfiniteCollectionError`.
:Example:
>>> try:
... Stream().repeat_with(lambda: 1).collect()
... except InfiniteCollectionError as error:
... print(error)
Stream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect
"""
def inner():
while True:
yield f()
self._stream = inner()
self._infinite = True
return self
def __iter__(self):
return (x for x in self._stream)
def __next__(self):
return next(self._stream)
if __name__ == "__main__": # pragma: no cover
import doctest
doctest.testmod()