PStream: fluent, async, iteration

class pstream.Stream(initial=None)[source]
class Enumeration(count, element)
property count

Alias for field number 0

property element

Alias for field number 1

__init__(initial=None)[source]
Parameters

initial – An optional initial value for the stream. Must be either an iterator or an iterable. If initial is None then the stream with be initialized to be empty.

Raises

ValueError if initial is neither an iterator nor an iterable.

chain(*iterables)[source]

Returns a stream that links an arbitrary number of iterators to this iterator, in a chain.

Parameters

iterables – Zero or more iterable objects. These iterables will be chained to the stream in a left-to-right fashion.

Returns

Stream

Example

>>> got = Stream([1, 2, 3]).chain([4, 5, 6], [7, 8, 9]).collect()
>>> assert got == [1, 2, 3, 4, 5, 6, 7, 8, 9]
collect()[source]

Evaluates the stream, consuming it and returning a list of the final output.

Returns

list

Raises

errors.InfiniteCollectionError

Example

>>> stream = Stream([1, 2, 3, 4]).map(lambda x: x * 2)
>>> got = stream.collect()
>>> assert got == [2, 4, 6, 8]
count()[source]

Evaluates the stream, consuming it and returning a count of the number of elements in the stream.

Returns

int

Raises

errors.InfiniteCollectionError

Example

>>> count = Stream(range(100)).filter(lambda x: x % 2 is 0).count()
>>> assert count == 50
distinct()[source]

Returns a stream of distinct elements. Distinction is computed by applying the builtin hash function to each element. Ordering of elements in the stream is maintained.

This functor incurs an additional allocation in the form of a hashset in order to keep track of the elements in the stream.

Returns

Stream

Example

>>> numbers = [1, 2, 2, 3, 2, 1, 4, 5, 6, 1]
>>> got = Stream(numbers).distinct().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
distinct_with(key)[source]

Returns a stream of distinct elements. Distinction is computed by applying the builtin hash function to each item generated by the provided key(element). Ordering of elements in the stream is maintained.

This functor incurs an additional allocation in the form of a hashset in order to keep track of the elements in the stream.

Parameters

key – A function such that key(element) -> T where T must be hashable.

Returns

Stream

Example

>>> import hashlib
>>>
>>> people = ['Bob', 'Alice', 'Eve', 'Alice', 'Alice', 'Eve', 'Achmed']
>>> fingerprinter = lambda x: hashlib.sha256(x.encode('UTF-8')).digest()
>>> got = Stream(people).distinct_with(fingerprinter).collect()
>>> assert got == ['Bob', 'Alice', 'Eve', 'Achmed']
enumerate()[source]

Returns a stream that yields the current count and the element during iteration.

Returns

Stream

Example

>>> got = Stream(range(1, 10)).enumerate().collect()
>>> assert got == [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)]

The constructed tuple is the namedtuple, Stream.Enumeration, which provides the names count and element.

Example

>>> def count(enumeration):
...     print(enumeration.count, enumeration.element)
>>> got = Stream(range(1, 5)).enumerate().inspect(count).map(lambda e: e.element).collect()
0 1
1 2
2 3
3 4
>>> assert got == [1, 2, 3, 4]
filter(predicate)[source]

Returns a stream that filters each element using predicate. Only elements for which predicate returns True are passed through the stream.

Parameters

predicate – A function such that predicate(element) -> bool.

Returns

Stream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> odds = lambda x: x % 2 != 0
>>> got = Stream(numbers).filter(odds).collect()
>>> assert got == [1, 3, 5, 7, 9]
filter_false(predicate)[source]

Returns a stream that filters each element using predicate. Only elements for which predicate returns False are passed through the stream.

Parameters

predicate – A function such that predicate(element) -> bool.

Returns

Stream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> odds = lambda x: x % 2 != 0
>>> got = Stream(numbers).filter_false(odds).collect()
>>> assert got == [2, 4, 6, 8]
flatten()[source]

Returns a stream that flattens one level of nesting in a stream of elements that are themselves iterators.

Returns

Stream

Example

>>> # Flatten a two dimensional array to a one dimensional array.
>>> two_dimensional = [[1, 2, 3], [4, 5, 6]]
>>> got = Stream(two_dimensional).flatten().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
>>> # Flatten a three dimensional array to a two dimensional array.
>>> three_dimensional = [[[1, 2, 3]], [[4, 5, 6]]]
>>> got = Stream(three_dimensional).flatten().collect()
>>> assert got == [[1, 2, 3], [4, 5, 6]]
>>> # Flatten a three dimensional array to a one dimensional array.
>>> three_dimensional = [[[1, 2, 3]], [[4, 5, 6]]]
>>> got = Stream(three_dimensional).flatten().flatten().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
for_each(f)[source]

Evaluates the stream, consuming it and calling f for each element in the stream.

Note that while other stream consumers, such as Stream.collect() and Stream.count(), will raise an an errors.InfiniteCollectionError if called on an infinite stream (see the documentation regarding Stream.repeat() and Stream.repeat_with()), for_each will not.

This makes the following…

>>> Stream().repeat_with(input).for_each(print)  

…roughly equivalent to:

>>> while True:  
...   print(input())  
Parameters

f – A function such that f(element). Any value returned is ignored.

Example

>>> Stream(range(1, 5)).for_each(print)
1
2
3
4
group_by(key)[source]

Returns a stream that groups elements together using the provided key function.

The ordering of the groups is non-deterministic.

Parameters

key – A function such that f(element) -> T where T will be used to group elements together.

Returns

Stream

Example

>>> # Group people by how long their names are.
>>> names = ['Alice', 'Bob', 'Eve', 'Chris', 'Arjuna', 'Zack']
>>> got = Stream(names).group_by(len).collect()
>>> len(got) == 4
True
>>> ['Alice', 'Chris'] in got
True
>>> ['Bob', 'Eve'] in got
True
>>> ['Arjuna'] in got
True
>>> ['Zack'] in got
True
Example

>>> # Group the numbers [0, 10) by evens and odds.
>>> got = Stream(range(10)).group_by(lambda x: x % 2).collect()
>>> len(got) == 2
True
>>> [1, 3, 5, 7, 9] in got
True
>>> [0, 2, 4, 6, 8] in got
True
inspect(f)[source]

Returns a stream that calls the function, f, with a reference to each element before yielding it.

Parameters

f – A function such that f(element). Any value returned is ignored.

Returns

Stream

Example

>>> def log(number):
...     if number % 2 != 0:
...         print("WARNING: {} is not even!".format(number))
>>>
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).inspect(log).collect()
WARNING: 1 is not even!
WARNING: 3 is not even!
WARNING: 5 is not even!
WARNING: 7 is not even!
WARNING: 9 is not even!
>>> assert got == [1, 2, 3, 4, 5, 6, 7, 8, 9]
map(f)[source]

Returns a stream that maps each value using f.

Parameters

f – A function such that f(A) -> B.

Returns

Stream

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> double = lambda x: x * 2
>>> got = Stream(numbers).map(double).collect()
>>> assert got == [2, 4, 6, 8, 10, 12, 14, 16, 18]
pool(size)[source]

Returns a stream that will collect up to size elements into a list before yielding.

Parameters

sizeint. Must be greater than 0.

Returns

Stream

Example

>>> got = Stream([1, 2, 3, 4, 5]).pool(3).collect()
>>> assert got == [[1, 2, 3], [4, 5]]

Note that pool effectively behaves as the inverse to Stream.flatten() by gradually introducing higher levels of dimensionality.

Example

>>> one = [1, 2, 3, 4, 5, 6, 7, 8]
>>> two = Stream(one).pool(2).collect()
>>> assert two == [[1, 2], [3, 4], [5, 6], [7, 8]]
>>> three = Stream(two).pool(2).collect()
>>> assert three == [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]
reduce(f, accumulator)[source]

Evaluates the stream, consuming it and applying the function f to each item in the stream, producing a single value.

After f has been applied to every item in the stream, the updated accumulator is returned.

Parameters
  • f – A function such that f(accumulator: T, element) -> T.

  • accumulator – The initial value provided to f.

Returns

T such that f(accumulator: T, element) -> T.

Example:

>>> def stringify(accumulator, element):
...    return accumulator + str(element)
>>>
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).reduce(stringify, '')
>>> assert got == '123456789'
repeat(element)[source]

Returns a stream that repeats an element endlessly.

Parameters

element – Any object. This exact object will be yieled repeatedly.

Returns

Stream

Example

>>> got = Stream().repeat(1).take(5).collect()
>>> assert got == [1, 1, 1, 1, 1]

A call to repeat wipes out any previous step in the iterator.

Example

>>> # The initial range, enumeration, and chain are completely lost
>>> # and the stream returns 1 indefinitely.
>>> s = Stream(range(10)).enumerate().chain(range(10, 20)).repeat(1)

Unless a limiting step, such as Stream.take_while() or Stream.take(), has been setup after a call to repeat, the consumers Stream.collect() and Stream.count() will throw an errors.InfiniteCollectionError.

Example

>>> try:
...     Stream().repeat(1).collect()
... except InfiniteCollectionError as error:
...     print(error)
Stream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect
repeat_with(f)[source]

Returns a stream that yields the output of f endlessly.

Parameters

f – A function such that f() -> T.

Returns

Stream

Example

>>> got = Stream().repeat_with(lambda: 1).take(5).collect()
>>> assert got == [1, 1, 1, 1, 1]

A call to repeat wipes out any previous step in the iterator.

Example

>>> # The initial range, enumeration, and chain are completely lost
>>> # and the stream returns 1 indefinitely.
>>> s = Stream(range(10)).enumerate().chain(range(10, 20)).repeat_with(lambda: 1)

Unless a limiting step, such as Stream.take_while() or Stream.take(), has been setup after a call to repeat, the consumers Stream.collect() and Stream.count() will throw an errors.InfiniteCollectionError.

Example

>>> try:
...     Stream().repeat_with(lambda: 1).collect()
... except InfiniteCollectionError as error:
...     print(error)
Stream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect
reverse()[source]

Returns a stream whose elements are reversed.

Note that calling reverse itself remains lazy, however at time of collecting the stream a reversal will incur an internal collection at that particular step. This is due to the reliance of Python’s builtin reversed function which itself requires an object that is indexable.

Returns

Stream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).reverse().collect()
>>> assert got == [9, 8, 7, 6, 5, 4, 3, 2, 1]
skip(n)[source]

Returns a stream that skips over n number of elements.

Parameters

nint

Returns

Stream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).skip(3).collect()
>>> assert got == [4, 5, 6, 7, 8, 9]
skip_while(predicate)[source]

Returns a stream that rejects elements while predicate returns True.

skip_while is the complement to Stream.take_while().

Parameters

predicate – A function such that f(element) -> bool.

Returns

Stream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).skip_while(lambda x: x < 5).collect()
>>> assert got == [5, 6, 7, 8, 9]
sort()[source]

Returns a stream whose elements are sorted.

Note that calling sort itself remains lazy, however at time of collecting the stream a sort will incur an internal collection at that particular step.

Returns

Stream

Example

>>> arr = [12, 233, 4567, 344523, 7, 567, 34, 5678, 456, 23, 4, 7, 63, 45, 345]
>>> got = Stream(arr).sort().collect()
>>> assert got == [4, 7, 7, 12, 23, 34, 45, 63, 233, 345, 456, 567, 4567, 5678, 344523]
sort_with(key)[source]

Returns a stream whose elements are sorted using the provided key selection function.

Note that calling sort_with itself remains lazy, however at time of collecting the stream a sort will incur an internal collection at that particular step.

Parameters

key – A function such that key(element) -> T where T is the type used for comparison.

Returns

Stream

Example

>>> arr = ['12', '233', '4567', '344523', '7', '567', '34', '5678', '456', '23', '4', '7', '63', '45', '345']
>>> got = Stream(arr).sort_with(len).collect()
>>> assert got == ['7', '4', '7', '12', '34', '23', '63', '45', '233', '567', '456', '345', '4567', '5678', '344523']
step_by(step)[source]

Returns a stream which steps over items by a custom amount. Regardless of the step, the first item in the stream is always returned.

Parameters

stepint. Must be greater than or equal to one.

Returns

Stream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).step_by(3).collect()
>>> assert got == [1, 4, 7]
take(n)[source]

Returns a stream that only iterates over the first n elements.

Parameters

nint

Returns

Stream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).take(6).collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
take_while(predicate)[source]

Returns a stream that only accepts elements while predicate returns True.

take_while is the complement to Stream.skip_while().

Parameters

predicate – A function such that predicate(element) -> bool.

Returns

Stream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = Stream(numbers).take_while(lambda x: x < 5).collect()
>>> assert got == [1, 2, 3, 4]
tee(*receivers)[source]

Returns a stream whose elements will be appended to objects in receivers.

tee behaves similarly to the Unix tee command.

Parameters

receivers – Zero or more objects that must support an append method.

Returns

Stream

Example

>>> a = list()
>>> b = list()
>>> got = Stream([1, 2, 3, 4]).tee(a, b).map(lambda x: x * 2).collect()
>>> assert got == [2, 4, 6 ,8]
>>> assert a == [1, 2, 3, 4]
>>> assert b == [1, 2, 3, 4]
zip(*iterables)[source]

Returns a stream that iterates over one or more iterators simultaneously.

Parameters

iterables – Zero or more iterable objects.

Returns

Stream

Example

>>> got = Stream([0, 1, 2]).zip([3, 4, 5]).collect()
>>> assert got == [(0, 3), (1, 4), (2, 5)]

Indices and tables