PStream: fluent, async, iteration

class pstream.AsyncStream(*args, **kwds)[source]

The API for an AsyncStream has a 1:1 correspondence with the API for a Stream.

The difference is that an AsyncStream may accept asynchronous iterators/iterables and asynchronous functions at all opportunities (with the notable exception of AsyncStream.sort_with() which cannot take in an asynchronous key function).

However, it is not mandatory that the provided iterators and functions be all asynchronous. The two runtime characteristics may be mixed and matched. The correct implementation for all combinations is selected at the time of `building`the stream, rather than being dispatched during the stream’s evaluation. The result is that at the time of evaluatiom, each step in your stream is exactly as precise, and as minimal, as you would expect from a manual implementation.

>>> async def consult(element):
...     # Ask some remote microservice whether
...     # or not the given element is valid.
...     asyncio.sleep(1)
...     return True
>>> def double(element):
...     return element * 2
>>> # Mix-and-match async and sync at your leisure.
>>> await AsyncStream([1, 2, 3, 4]).filter(consult).map(double).collect()
__init__(initial: Collection[T] = None)[source]
Parameters

initial – An optional initial value for the stream. Must be either an iterator, an iterable, or an asynchronous iterator (supports an __anext__ method). If initial is None then the stream with be initialized to be empty.

Raises

ValueError if initial is neither an iterator nor, an iterable, nor an asynchronous iterator.

chain(*iterables: Collection[T])[source]

Returns a stream that links an arbitrary number of iterators to this iterator, in a chain.

Parameters

iterables – Zero or more iterable objects. These iterables will be chained to the stream in a left-to-right fashion.

Returns

AsyncStream

Example

>>> got = await AsyncStream([1, 2, 3]).chain([4, 5, 6], [7, 8, 9]).collect()
>>> assert got == [1, 2, 3, 4, 5, 6, 7, 8, 9]
async collect() → List[T][source]

Evaluates the stream, consuming it and returning a list of the final output.

Returns

list

Raises

errors.InfiniteCollectionError

Example

>>> stream = AsyncStream([1, 2, 3, 4]).map(lambda x: x * 2)
>>> got = await stream.collect()
>>> assert got == [2, 4, 6, 8]
async count()[source]

Evaluates the stream, consuming it and returning a count of the number of elements in the stream.

Returns

int

Raises

errors.InfiniteCollectionError

Example

>>> count = await AsyncStream(range(100)).filter(lambda x: x % 2 is 0).count()
>>> assert count == 50
distinct()[source]

Returns a stream of distinct elements. Distinction is computed by applying the builtin hash function to each element. Ordering of elements in the stream is maintained.

This functor incurs an additional allocation in the form of a hashset in order to keep track of the elements in the stream.

Returns

AsyncStream

Example

>>> numbers = [1, 2, 2, 3, 2, 1, 4, 5, 6, 1]
>>> got = await AsyncStream(numbers).distinct().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
distinct_with(key: Callable[[T], U])[source]

Returns a stream of distinct elements. Distinction is computed by applying the builtin hash function to each item generated by the provided key(element). Ordering of elements in the stream is maintained.

This functor incurs an additional allocation in the form of a hashset in order to keep track of the elements in the stream.

Parameters

key – A function such that key(element) -> T where T must be hashable. key may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> import hashlib
>>>
>>> people = ['Bob', 'Alice', 'Eve', 'Alice', 'Alice', 'Eve', 'Achmed']
>>> fingerprinter = lambda x: hashlib.sha256(x.encode('UTF-8')).digest()
>>> got = await AsyncStream(people).distinct_with(fingerprinter).collect()
>>> assert got == ['Bob', 'Alice', 'Eve', 'Achmed']
enumerate()[source]

Returns a stream that yields the current count and the element during iteration.

Returns

AsyncStream

Example

>>> got = await AsyncStream(range(1, 10)).enumerate().collect()
>>> assert got == [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)]

The constructed tuple is the namedtuple, Stream.Enumeration, which provides the names count and element.

Example

>>> def count(enumeration):
...     print(enumeration.count, enumeration.element)
>>> got = await AsyncStream(range(1, 5)).enumerate().inspect(count).map(lambda e: e.element).collect()
0 1
1 2
2 3
3 4
>>> assert got == [1, 2, 3, 4]
filter(predicate: Callable[[T], bool])[source]

Returns a stream that filters each element using predicate. Only elements for which predicate returns True are passed through the stream.

Parameters

predicate – A function such that predicate(element) -> bool. predicate may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> odds = lambda x: x % 2 != 0
>>> got = await AsyncStream(numbers).filter(odds).collect()
>>> assert got == [1, 3, 5, 7, 9]
filter_false(predicate: Callable[[T], bool])[source]

Returns a stream that filters each element using predicate. Only elements for which predicate returns False are passed through the stream.

Parameters

predicate – A function such that predicate(element) -> bool. predicate may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> odds = lambda x: x % 2 != 0
>>> got = await AsyncStream(numbers).filter_false(odds).collect()
>>> assert got == [2, 4, 6, 8]
flatten()[source]

Returns a stream that flattens one level of nesting in a stream of elements that are themselves iterators.

Returns

AsyncStream

Example

>>> # Flatten a two dimensional array to a one dimensional array.
>>> two_dimensional = [[1, 2, 3], [4, 5, 6]]
>>> got = await AsyncStream(two_dimensional).flatten().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
>>> # Flatten a three dimensional array to a two dimensional array.
>>> three_dimensional = [[[1, 2, 3]], [[4, 5, 6]]]
>>> got = await AsyncStream(three_dimensional).flatten().collect()
>>> assert got == [[1, 2, 3], [4, 5, 6]]
>>> # Flatten a three dimensional array to a one dimensional array.
>>> three_dimensional = [[[1, 2, 3]], [[4, 5, 6]]]
>>> got = await AsyncStream(three_dimensional).flatten().flatten().collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
async for_each(f: Callable[[T], None])[source]

Evaluates the stream, consuming it and calling f for each element in the stream.

Note that while other stream consumers, such as Stream.collect() and Stream.count(), will raise an an errors.InfiniteCollectionError if called on an infinite stream (see the documentation regarding Stream.repeat() and Stream.repeat_with()), for_each will not.

This makes the following…

>>> await AsyncStream().repeat_with(input).for_each(print)  

…roughly equivalent to:

>>> while True:  
...   print(input())  
Parameters

f – A function such that f(element). Any value returned is ignored. f may be either asynchronous or synchronous.

Example

>>> await AsyncStream(range(1, 5)).for_each(print)
1
2
3
4
group_by(key: Callable[[T], U])[source]

Returns a stream that groups elements together using the provided key function.

The ordering of the groups is non-deterministic.

Parameters

key – A function such that f(element) -> T where T will be used to group elements together. key may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> # Group people by how long their names are.
>>> names = ['Alice', 'Bob', 'Eve', 'Chris', 'Arjuna', 'Zack']
>>> got = await AsyncStream(names).group_by(len).collect()
>>> len(got) == 4
True
>>> ['Alice', 'Chris'] in got
True
>>> ['Bob', 'Eve'] in got
True
>>> ['Arjuna'] in got
True
>>> ['Zack'] in got
True
Example

>>> # Group the numbers [0, 10) by evens and odds.
>>> got = await AsyncStream(range(10)).group_by(lambda x: x % 2).collect()
>>> len(got) == 2
True
>>> [1, 3, 5, 7, 9] in got
True
>>> [0, 2, 4, 6, 8] in got
True
inspect(f: Callable[[T], None])[source]

Returns a stream that calls the function, f, with a reference to each element before yielding it.

Parameters

f – A function such that f(element). Any value returned is ignored. f may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> def log(number):
...     if number % 2 != 0:
...         print("WARNING: {} is not even!".format(number))
>>>
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = await AsyncStream(numbers).inspect(log).collect()
WARNING: 1 is not even!
WARNING: 3 is not even!
WARNING: 5 is not even!
WARNING: 7 is not even!
WARNING: 9 is not even!
>>> assert got == [1, 2, 3, 4, 5, 6, 7, 8, 9]
map(f: Callable[[T], U])[source]

Returns a stream that maps each value using f.

Parameters

f – A function such that f(A) -> B. f may be either asynchronous or synchronous.

Returns

AsyncStream

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> double = lambda x: x * 2
>>> got = await AsyncStream(numbers).map(double).collect()
>>> assert got == [2, 4, 6, 8, 10, 12, 14, 16, 18]
pool(size: int)[source]

Returns a stream that will collect up to size elements into a list before yielding.

Parameters

sizeint. Must be greater than 0.

Returns

AsyncStream

Example

>>> got = await AsyncStream([1, 2, 3, 4, 5]).pool(3).collect()
>>> assert got == [[1, 2, 3], [4, 5]]

Note that pool effectively behaves as the inverse to Stream.flatten() by gradually introducing higher levels of dimensionality.

Example

>>> one = [1, 2, 3, 4, 5, 6, 7, 8]
>>> two = await AsyncStream(one).pool(2).collect()
>>> assert two == [[1, 2], [3, 4], [5, 6], [7, 8]]
>>> three = await AsyncStream(two).pool(2).collect()
>>> assert three == [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]
async reduce(f: Callable[[T], U], accumulator: U) → U[source]

Evaluates the stream, consuming it and applying the function f to each item in the stream, producing a single value.

After f has been applied to every item in the stream, the updated accumulator is returned.

Parameters
  • f – A function such that f(accumulator: T, element) -> T. f may be either asynchronous or synchronous.

  • accumulator – The initial value provided to f.

Returns

T such that f(accumulator: T, element) -> T.

Example:

>>> def stringify(accumulator, element):
...    return accumulator + str(element)
>>>
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = await AsyncStream(numbers).reduce(stringify, '')
>>> assert got == '123456789'
repeat(element: T)[source]

Returns a stream that repeats an element endlessly.

Parameters

element – Any object. This exact object will be yieled repeatedly.

Returns

AsyncStream

Example

>>> got = await AsyncStream().repeat(1).take(5).collect()
>>> assert got == [1, 1, 1, 1, 1]

A call to repeat wipes out any previous step in the iterator.

Example

>>> # The initial range, enumeration, and chain are completely lost
>>> # and the stream returns 1 indefinitely.
>>> s = await AsyncStream(range(10)).enumerate().chain(range(10, 20)).repeat(1)

Unless a limiting step, such as AsyncStream.take_while() or AsyncStream.take(), has been setup after a call to repeat, the consumers AsyncStream.collect() and AsyncStream.count() will throw an errors.InfiniteCollectionError.

Example

>>> try:
...     await AsyncStream().repeat(1).collect()
... except InfiniteCollectionError as error:
...     print(error)
AsyncStream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect
repeat_with(f: Callable[], T])[source]

Returns a stream that yields the output of f endlessly.

Parameters

f – A function such that f() -> T. f may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> got = await AsyncStream().repeat_with(lambda: 1).take(5).collect()
>>> assert got == [1, 1, 1, 1, 1]

A call to repeat wipes out any previous step in the iterator.

Example

>>> # The initial range, enumeration, and chain are completely lost
>>> # and the stream returns 1 indefinitely.
>>> s = await AsyncStream(range(10)).enumerate().chain(range(10, 20)).repeat_with(lambda: 1)

Unless a limiting step, such as AsyncStream.take_while() or AsyncStream.take(), has been setup after a call to repeat, the consumers AsyncStream.collect() and AsyncStream.count() will throw an errors.InfiniteCollectionError.

Example

>>> try:
...     await AsyncStream().repeat_with(lambda: 1).collect()
... except InfiniteCollectionError as error:
...     print(error)
AsyncStream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect
reverse()[source]

Returns a stream whose elements are reversed.

Note that calling reverse itself remains lazy, however at time of collecting the stream a reversal will incur an internal collection at that particular step. This is due to the reliance of Python’s builtin reversed function which itself requires an object that is indexable.

Returns

AsyncStream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = await AsyncStream(numbers).reverse().collect()
>>> assert got == [9, 8, 7, 6, 5, 4, 3, 2, 1]
skip(n: int)[source]

Returns a stream that skips over n number of elements.

Parameters

nint

Returns

AsyncStream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = await AsyncStream(numbers).skip(3).collect()
>>> assert got == [4, 5, 6, 7, 8, 9]
skip_while(predicate: Callable[[T], bool])[source]

Returns a stream that rejects elements while predicate returns True.

skip_while is the complement to AsyncStream.take_while().

Parameters

predicate – A function such that f(element) -> bool. predicate may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = await AsyncStream(numbers).skip_while(lambda x: x < 5).collect()
>>> assert got == [5, 6, 7, 8, 9]
sort()[source]

Returns a stream whose elements are sorted.

Note that calling sort itself remains lazy, however at time of collecting the stream a sort will incur an internal collection at that particular step.

Returns

AsyncStream

Example

>>> arr = [12, 233, 4567, 344523, 7, 567, 34, 5678, 456, 23, 4, 7, 63, 45, 345]
>>> got = await AsyncStream(arr).sort().collect()
>>> assert got == [4, 7, 7, 12, 23, 34, 45, 63, 233, 345, 456, 567, 4567, 5678, 344523]
sort_with(key: Callable[[T], U])[source]

Returns a stream whose elements are sorted using the provided key selection function.

Note that calling sort_with itself remains lazy, however at time of collecting the stream a sort will incur an internal collection at that particular step.

Parameters

key – A function such that key(element) -> T where T is the type used for comparison. key MAY NOT

be asynchronous! This is due to a limitation in the builtin sorted function which does not support asynchronous key functions.

Returns

AsyncStream

Example

>>> arr = ['12', '233', '4567', '344523', '7', '567', '34', '5678', '456', '23', '4', '7', '63', '45', '345']
>>> got = await AsyncStream(arr).sort_with(len).collect()
>>> assert got == ['7', '4', '7', '12', '34', '23', '63', '45', '233', '567', '456', '345', '4567', '5678', '344523']
step_by(step: int)[source]

Returns a stream which steps over items by a custom amount. Regardless of the step, the first item in the stream is always returned.

Parameters

stepint. Must be greater than or equal to one.

Returns

AsyncStream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = await AsyncStream(numbers).step_by(3).collect()
>>> assert got == [1, 4, 7]
take(n: int)[source]

Returns a stream that only iterates over the first n elements.

Parameters

nint

Returns

AsyncStream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = await AsyncStream(numbers).take(6).collect()
>>> assert got == [1, 2, 3, 4, 5, 6]
take_while(predicate: Callable[[T], bool])[source]

Returns a stream that only accepts elements while predicate returns True.

take_while is the complement to AsyncStream.skip_while().

Parameters

predicate – A function such that predicate(element) -> bool. predicate may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> got = await AsyncStream(numbers).take_while(lambda x: x < 5).collect()
>>> assert got == [1, 2, 3, 4]
tee(*receivers)[source]

Returns a stream whose elements will be appended to objects in receivers.

tee behaves similarly to the Unix tee command.

Parameters

receivers – Zero or more objects that must support an append method. The append method may be either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> a = list()
>>> b = list()
>>> got = await AsyncStream([1, 2, 3, 4]).tee(a, b).map(lambda x: x * 2).collect()
>>> assert got == [2, 4, 6 ,8]
>>> assert a == [1, 2, 3, 4]
>>> assert b == [1, 2, 3, 4]
zip(*iterables: Collection[T])[source]

Returns a stream that iterates over one or more iterators simultaneously.

Parameters

iterables – Zero or more iterable objects. The iterables may be heterogeneous of either asynchronous or synchronous.

Returns

AsyncStream

Example

>>> got = Stream([0, 1, 2]).zip([3, 4, 5]).collect()
>>> assert got == [(0, 3), (1, 4), (2, 5)]

Indices and tables