Skip to content

stream

MultiStreamReader

MultiStreamReader(readers)

Bases: StreamReader

A composite StreamReader that merges multiple readers.

Used when a single stream request spans data from multiple sources (e.g. different Kafka broker clusters).

Parameters:

Name Type Description Default
readers Sequence[StreamReader]

StreamReader instances to merge.

required
Source code in arrakis/stream.py
120
121
def __init__(self, readers: Sequence[StreamReader]):
    self._readers = readers

StreamReader

Bases: AbstractContextManager, Protocol

Non-blocking Arrow RecordBatch stream reader

May produce multiple independent streams. Streams should be buffered in a queue, and the read method should return all RecordBatches currently in the queue.

streams property

streams

stream channels dictionary

A StreamReader may produce multiple independent streams. This property includes a key for each stream name, who value is the list of channel names in each individual stream.

close

close()

close the stream

Source code in arrakis/stream.py
 98
 99
100
def close(self):
    """close the stream"""
    pass

done

done()

return True if the collector is done

Source code in arrakis/stream.py
88
89
90
def done(self) -> bool:
    """return True if the collector is done"""
    return False

enter

enter()

open the streams

Source code in arrakis/stream.py
47
48
49
def enter(self) -> None:
    """open the streams"""
    pass

read

read(*, convert_blocks: Literal[False] = False) -> Generator[tuple[str, RecordBatch], None, None]
read(*, convert_blocks: Literal[True] = True) -> Generator[tuple[str, SeriesBlock], None, None]
read(*, convert_blocks=False)

Read all available elements.

Parameters:

Name Type Description Default
convert_blocks bool

Convert RecordBatches to SeriesBlocks

False

Yields:

Type Description
tuple of (stream name, RecordBatch | SeriesBlock)
Source code in arrakis/stream.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def read(
    self,
    *,
    convert_blocks: bool = False,
) -> Generator[tuple[str, RecordBatch | SeriesBlock], None, None]:
    """Read all available elements.

    Parameters
    ----------
    convert_blocks : bool
        Convert RecordBatches to SeriesBlocks

    Yields
    ------
    tuple of (stream name, RecordBatch | SeriesBlock)

    """
    ...

unpack

unpack()

synchronously unpack all batch streams into individual elements

Source code in arrakis/stream.py
92
93
94
95
96
def unpack(self) -> Generator[Any, None, None]:
    """synchronously unpack all batch streams into individual elements"""
    while not self.done():
        for _, batch in self.read(convert_blocks=False):
            yield from batch.to_pylist()