PStream: fluent, async, iteration¶
-
class
pstream.Stream(initial=None)[source]¶ -
class
Enumeration(count, element)¶ -
property
count¶ Alias for field number 0
-
property
element¶ Alias for field number 1
-
property
-
__init__(initial=None)[source]¶ - Parameters
initial – An optional initial value for the stream. Must be either an iterator or an iterable. If initial is None then the stream with be initialized to be empty.
- Raises
ValueErrorif initial is neither an iterator nor an iterable.
-
chain(*iterables)[source]¶ Returns a stream that links an arbitrary number of iterators to this iterator, in a chain.
- Parameters
iterables – Zero or more iterable objects. These iterables will be chained to the stream in a left-to-right fashion.
- Returns
- Example
>>> got = Stream([1, 2, 3]).chain([4, 5, 6], [7, 8, 9]).collect() >>> assert got == [1, 2, 3, 4, 5, 6, 7, 8, 9]
-
collect()[source]¶ Evaluates the stream, consuming it and returning a list of the final output.
- Returns
list- Raises
- Example
>>> stream = Stream([1, 2, 3, 4]).map(lambda x: x * 2) >>> got = stream.collect() >>> assert got == [2, 4, 6, 8]
-
count()[source]¶ Evaluates the stream, consuming it and returning a count of the number of elements in the stream.
- Returns
int- Raises
- Example
>>> count = Stream(range(100)).filter(lambda x: x % 2 is 0).count() >>> assert count == 50
-
distinct()[source]¶ Returns a stream of distinct elements. Distinction is computed by applying the builtin hash function to each element. Ordering of elements in the stream is maintained.
This functor incurs an additional allocation in the form of a hashset in order to keep track of the elements in the stream.
- Returns
- Example
>>> numbers = [1, 2, 2, 3, 2, 1, 4, 5, 6, 1] >>> got = Stream(numbers).distinct().collect() >>> assert got == [1, 2, 3, 4, 5, 6]
-
distinct_with(key)[source]¶ Returns a stream of distinct elements. Distinction is computed by applying the builtin hash function to each item generated by the provided key(element). Ordering of elements in the stream is maintained.
This functor incurs an additional allocation in the form of a hashset in order to keep track of the elements in the stream.
- Parameters
key – A function such that key(element) -> T where T must be hashable.
- Returns
- Example
>>> import hashlib >>> >>> people = ['Bob', 'Alice', 'Eve', 'Alice', 'Alice', 'Eve', 'Achmed'] >>> fingerprinter = lambda x: hashlib.sha256(x.encode('UTF-8')).digest() >>> got = Stream(people).distinct_with(fingerprinter).collect() >>> assert got == ['Bob', 'Alice', 'Eve', 'Achmed']
-
enumerate()[source]¶ Returns a stream that yields the current count and the element during iteration.
- Returns
- Example
>>> got = Stream(range(1, 10)).enumerate().collect() >>> assert got == [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)]
The constructed tuple is the namedtuple,
Stream.Enumeration, which provides the names count and element.- Example
>>> def count(enumeration): ... print(enumeration.count, enumeration.element) >>> got = Stream(range(1, 5)).enumerate().inspect(count).map(lambda e: e.element).collect() 0 1 1 2 2 3 3 4 >>> assert got == [1, 2, 3, 4]
-
filter(predicate)[source]¶ Returns a stream that filters each element using predicate. Only elements for which predicate returns True are passed through the stream.
- Parameters
predicate – A function such that predicate(element) -> bool.
- Returns
- Example
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> odds = lambda x: x % 2 != 0 >>> got = Stream(numbers).filter(odds).collect() >>> assert got == [1, 3, 5, 7, 9]
-
filter_false(predicate)[source]¶ Returns a stream that filters each element using predicate. Only elements for which predicate returns False are passed through the stream.
- Parameters
predicate – A function such that predicate(element) -> bool.
- Returns
- Example
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> odds = lambda x: x % 2 != 0 >>> got = Stream(numbers).filter_false(odds).collect() >>> assert got == [2, 4, 6, 8]
-
flatten()[source]¶ Returns a stream that flattens one level of nesting in a stream of elements that are themselves iterators.
- Returns
- Example
>>> # Flatten a two dimensional array to a one dimensional array. >>> two_dimensional = [[1, 2, 3], [4, 5, 6]] >>> got = Stream(two_dimensional).flatten().collect() >>> assert got == [1, 2, 3, 4, 5, 6]
>>> # Flatten a three dimensional array to a two dimensional array. >>> three_dimensional = [[[1, 2, 3]], [[4, 5, 6]]] >>> got = Stream(three_dimensional).flatten().collect() >>> assert got == [[1, 2, 3], [4, 5, 6]]
>>> # Flatten a three dimensional array to a one dimensional array. >>> three_dimensional = [[[1, 2, 3]], [[4, 5, 6]]] >>> got = Stream(three_dimensional).flatten().flatten().collect() >>> assert got == [1, 2, 3, 4, 5, 6]
-
for_each(f)[source]¶ Evaluates the stream, consuming it and calling f for each element in the stream.
Note that while other stream consumers, such as
Stream.collect()andStream.count(), will raise an anerrors.InfiniteCollectionErrorif called on an infinite stream (see the documentation regardingStream.repeat()andStream.repeat_with()), for_each will not.This makes the following…
>>> Stream().repeat_with(input).for_each(print)
…roughly equivalent to:
>>> while True: ... print(input())
- Parameters
f – A function such that f(element). Any value returned is ignored.
- Example
>>> Stream(range(1, 5)).for_each(print) 1 2 3 4
-
group_by(key)[source]¶ Returns a stream that groups elements together using the provided key function.
The ordering of the groups is non-deterministic.
- Parameters
key – A function such that f(element) -> T where T will be used to group elements together.
- Returns
- Example
>>> # Group people by how long their names are. >>> names = ['Alice', 'Bob', 'Eve', 'Chris', 'Arjuna', 'Zack'] >>> got = Stream(names).group_by(len).collect() >>> len(got) == 4 True >>> ['Alice', 'Chris'] in got True >>> ['Bob', 'Eve'] in got True >>> ['Arjuna'] in got True >>> ['Zack'] in got True
- Example
>>> # Group the numbers [0, 10) by evens and odds. >>> got = Stream(range(10)).group_by(lambda x: x % 2).collect() >>> len(got) == 2 True >>> [1, 3, 5, 7, 9] in got True >>> [0, 2, 4, 6, 8] in got True
-
inspect(f)[source]¶ Returns a stream that calls the function, f, with a reference to each element before yielding it.
- Parameters
f – A function such that f(element). Any value returned is ignored.
- Returns
- Example
>>> def log(number): ... if number % 2 != 0: ... print("WARNING: {} is not even!".format(number)) >>> >>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> got = Stream(numbers).inspect(log).collect() WARNING: 1 is not even! WARNING: 3 is not even! WARNING: 5 is not even! WARNING: 7 is not even! WARNING: 9 is not even! >>> assert got == [1, 2, 3, 4, 5, 6, 7, 8, 9]
-
map(f)[source]¶ Returns a stream that maps each value using f.
- Parameters
f – A function such that f(A) -> B.
- Returns
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> double = lambda x: x * 2 >>> got = Stream(numbers).map(double).collect() >>> assert got == [2, 4, 6, 8, 10, 12, 14, 16, 18]
-
pool(size)[source]¶ Returns a stream that will collect up to size elements into a list before yielding.
- Parameters
size –
int. Must be greater than 0.- Returns
- Example
>>> got = Stream([1, 2, 3, 4, 5]).pool(3).collect() >>> assert got == [[1, 2, 3], [4, 5]]
Note that pool effectively behaves as the inverse to
Stream.flatten()by gradually introducing higher levels of dimensionality.- Example
>>> one = [1, 2, 3, 4, 5, 6, 7, 8] >>> two = Stream(one).pool(2).collect() >>> assert two == [[1, 2], [3, 4], [5, 6], [7, 8]] >>> three = Stream(two).pool(2).collect() >>> assert three == [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]
-
reduce(f, accumulator)[source]¶ Evaluates the stream, consuming it and applying the function f to each item in the stream, producing a single value.
After f has been applied to every item in the stream, the updated accumulator is returned.
- Parameters
f – A function such that f(accumulator: T, element) -> T.
accumulator – The initial value provided to f.
- Returns
T such that f(accumulator: T, element) -> T.
Example:
>>> def stringify(accumulator, element): ... return accumulator + str(element) >>> >>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> got = Stream(numbers).reduce(stringify, '') >>> assert got == '123456789'
-
repeat(element)[source]¶ Returns a stream that repeats an element endlessly.
- Parameters
element – Any object. This exact object will be yieled repeatedly.
- Returns
- Example
>>> got = Stream().repeat(1).take(5).collect() >>> assert got == [1, 1, 1, 1, 1]
A call to repeat wipes out any previous step in the iterator.
- Example
>>> # The initial range, enumeration, and chain are completely lost >>> # and the stream returns 1 indefinitely. >>> s = Stream(range(10)).enumerate().chain(range(10, 20)).repeat(1)
Unless a limiting step, such as
Stream.take_while()orStream.take(), has been setup after a call to repeat, the consumersStream.collect()andStream.count()will throw anerrors.InfiniteCollectionError.- Example
>>> try: ... Stream().repeat(1).collect() ... except InfiniteCollectionError as error: ... print(error) Stream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect
-
repeat_with(f)[source]¶ Returns a stream that yields the output of f endlessly.
- Parameters
f – A function such that f() -> T.
- Returns
- Example
>>> got = Stream().repeat_with(lambda: 1).take(5).collect() >>> assert got == [1, 1, 1, 1, 1]
A call to repeat wipes out any previous step in the iterator.
- Example
>>> # The initial range, enumeration, and chain are completely lost >>> # and the stream returns 1 indefinitely. >>> s = Stream(range(10)).enumerate().chain(range(10, 20)).repeat_with(lambda: 1)
Unless a limiting step, such as
Stream.take_while()orStream.take(), has been setup after a call to repeat, the consumersStream.collect()andStream.count()will throw anerrors.InfiniteCollectionError.- Example
>>> try: ... Stream().repeat_with(lambda: 1).collect() ... except InfiniteCollectionError as error: ... print(error) Stream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect
-
reverse()[source]¶ Returns a stream whose elements are reversed.
Note that calling reverse itself remains lazy, however at time of collecting the stream a reversal will incur an internal collection at that particular step. This is due to the reliance of Python’s builtin reversed function which itself requires an object that is indexable.
- Returns
- Example
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> got = Stream(numbers).reverse().collect() >>> assert got == [9, 8, 7, 6, 5, 4, 3, 2, 1]
-
skip(n)[source]¶ Returns a stream that skips over n number of elements.
- Parameters
n –
int- Returns
- Example
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> got = Stream(numbers).skip(3).collect() >>> assert got == [4, 5, 6, 7, 8, 9]
-
skip_while(predicate)[source]¶ Returns a stream that rejects elements while predicate returns True.
skip_while is the complement to
Stream.take_while().- Parameters
predicate – A function such that f(element) -> bool.
- Returns
- Example
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> got = Stream(numbers).skip_while(lambda x: x < 5).collect() >>> assert got == [5, 6, 7, 8, 9]
-
sort()[source]¶ Returns a stream whose elements are sorted.
Note that calling sort itself remains lazy, however at time of collecting the stream a sort will incur an internal collection at that particular step.
- Returns
- Example
>>> arr = [12, 233, 4567, 344523, 7, 567, 34, 5678, 456, 23, 4, 7, 63, 45, 345] >>> got = Stream(arr).sort().collect() >>> assert got == [4, 7, 7, 12, 23, 34, 45, 63, 233, 345, 456, 567, 4567, 5678, 344523]
-
sort_with(key)[source]¶ Returns a stream whose elements are sorted using the provided key selection function.
Note that calling sort_with itself remains lazy, however at time of collecting the stream a sort will incur an internal collection at that particular step.
- Parameters
key – A function such that key(element) -> T where T is the type used for comparison.
- Returns
- Example
>>> arr = ['12', '233', '4567', '344523', '7', '567', '34', '5678', '456', '23', '4', '7', '63', '45', '345'] >>> got = Stream(arr).sort_with(len).collect() >>> assert got == ['7', '4', '7', '12', '34', '23', '63', '45', '233', '567', '456', '345', '4567', '5678', '344523']
-
step_by(step)[source]¶ Returns a stream which steps over items by a custom amount. Regardless of the step, the first item in the stream is always returned.
- Parameters
step –
int. Must be greater than or equal to one.- Returns
- Example
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> got = Stream(numbers).step_by(3).collect() >>> assert got == [1, 4, 7]
-
take(n)[source]¶ Returns a stream that only iterates over the first n elements.
- Parameters
n –
int- Returns
- Example
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> got = Stream(numbers).take(6).collect() >>> assert got == [1, 2, 3, 4, 5, 6]
-
take_while(predicate)[source]¶ Returns a stream that only accepts elements while predicate returns True.
take_while is the complement to
Stream.skip_while().- Parameters
predicate – A function such that predicate(element) -> bool.
- Returns
- Example
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>> got = Stream(numbers).take_while(lambda x: x < 5).collect() >>> assert got == [1, 2, 3, 4]
-
tee(*receivers)[source]¶ Returns a stream whose elements will be appended to objects in receivers.
tee behaves similarly to the Unix tee command.
- Parameters
receivers – Zero or more objects that must support an append method.
- Returns
- Example
>>> a = list() >>> b = list() >>> got = Stream([1, 2, 3, 4]).tee(a, b).map(lambda x: x * 2).collect() >>> assert got == [2, 4, 6 ,8] >>> assert a == [1, 2, 3, 4] >>> assert b == [1, 2, 3, 4]
-
class