Skip to content

block

Series block representation of timeseries data.

Series dataclass

Series(time_ns, data, channel)

Bases: Generic[ChannelLike]

Single-channel timeseries data for a given timestamp.

Parameters:

Name Type Description Default
time_ns int

The timestamp associated with this data, in nanoseconds.

required
data ndarray

The timeseries data.

required
channel Channel

Channel metadata associated with this timeseries.

required

data_type property

data_type

Data type of the data array's elements.

dt property

dt

The time separation in seconds between successive samples.

dtype property

dtype

Data type of the data array's elements.

duration cached property

duration

Series duration in seconds.

duration_ns cached property

duration_ns

Series duration in nanoseconds.

has_nulls property

has_nulls

Whether the timeseries data contains any null values.

name property

name

Channel name.

sample_rate property

sample_rate

Data rate for this series in samples per second (Hz).

t0 property

t0

Timestamp associated with this data, in seconds.

time cached property

time

Timestamp associated with this data, in seconds.

times cached property

times

The array of times corresponding to all data points in the series.

SeriesBlock dataclass

SeriesBlock(time_ns, data, channels=dict())

Bases: Generic[ChannelLike]

Series block containing timeseries for channels for a given timestamp.

Parameters:

Name Type Description Default
time_ns int

The timestamp associated with this data, in nanoseconds.

required
data dict[str, ndarray]

Mapping between channels and timeseries.

required
channels dict[str, Channel]

Channel metadata associated with this data block.

dict()

duration cached property

duration

Duration of this block, in seconds.

duration_ns property

duration_ns

Duration of this block, in nanoseconds.

t0 property

t0

Timestamp associated with this block, in seconds.

time cached property

time

Timestamp associated with this block, in seconds.

create_gaps

create_gaps(channels)

Add channels with all null values (gaps).

Parameters:

Name Type Description Default
channels Iterable[Channel]

The channels to create gaps for. Any channels currently present will be ignored.

required

Returns:

Type Description
SeriesBlock

The block with additional gaps present.

Source code in arrakis/block.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def create_gaps(self, channels: Iterable[ChannelLike]) -> SeriesBlock:
    """Add channels with all null values (gaps).

    Parameters
    ----------
    channels : Iterable[Channel]
        The channels to create gaps for. Any channels currently present
        will be ignored.

    Returns
    -------
    SeriesBlock
        The block with additional gaps present.

    """
    series_dict = self.data
    channel_dict = self.channels
    for channel in channels:
        if channel in channel_dict:
            continue
        size = int(channel.sample_rate * self.duration_ns) // Time.s
        series_dict[channel.name] = numpy.ma.masked_all(size, dtype=channel.dtype)
        channel_dict[channel.name] = channel

    return type(self)(self.time_ns, series_dict, channel_dict)

filter

filter(channels=None)

Filter a block based on criteria.

Fixme

more info needed

Parameters:

Name Type Description Default
channels list[str]

If specified, keep only these channels.

None

Returns:

Type Description
SeriesBlock

The filtered series.

Source code in arrakis/block.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def filter(self, channels: list[str] | None = None) -> SeriesBlock:
    """Filter a block based on criteria.

    FIXME: more info needed

    Parameters
    ----------
    channels : list[str], optional
        If specified, keep only these channels.

    Returns
    -------
    SeriesBlock
        The filtered series.

    """
    if not channels:
        return self

    data = {channel: self.data[channel] for channel in channels}
    if self.channels:
        channel_dict = {channel: self.channels[channel] for channel in channels}
    else:
        channel_dict = self.channels

    return type(self)(self.time_ns, data, channel_dict)

from_column_batch classmethod

from_column_batch(batch, channels)

Create a series block from a record batch.

Parameters:

Name Type Description Default
batch RecordBatch

A record batch, with a 'time' column with the timestamp and channel columns with all channels to publish.

required
channels dict[str, Channel]

Channel metadata. The metadata for the channels defined in the batch will be extracted from this dictionary, so this dictionary may include metadata for additional channels now included in the batch.

required

Returns:

Type Description
SeriesBlock

The block representation of the record batch.

Source code in arrakis/block.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
@classmethod
def from_column_batch(
    cls,
    batch: pyarrow.RecordBatch,
    channels: dict[str, ChannelLike],
) -> SeriesBlock:
    """Create a series block from a record batch.

    Parameters
    ----------
    batch : pyarrow.RecordBatch
        A record batch, with a 'time' column with the timestamp
        and channel columns with all channels to publish.
    channels : dict[str, Channel]
        Channel metadata.  The metadata for the channels defined
        in the batch will be extracted from this dictionary, so
        this dictionary may include metadata for additional
        channels now included in the batch.

    Returns
    -------
    SeriesBlock
        The block representation of the record batch.

    """
    time = batch.column("time")[0].as_py()
    fields: list[pyarrow.field] = list(batch.schema)
    channel_names = [field.name for field in fields[1:]]
    series_dict = {
        channel: _arrow_to_numpy_array(
            pyarrow.compute.list_flatten(batch.column(channel))
        )
        for channel in channel_names
    }
    channel_dict = {channel: channels[channel] for channel in channel_names}
    return cls(time, series_dict, channel_dict)

from_row_batch classmethod

from_row_batch(batch, index_to_channel)

Create a series block from a record batch.

Parameters:

Name Type Description Default
batch RecordBatch

A record batch, with a 'time' column with the timestamp, a 'channel' column with the channel name, and a 'data' column containing the timeseries.

required
index_to_channel dict[int, Channel]

Mapping from the id (channel index) to the channel. The channel name is not encoded in the record batch in order to save space. Instead, an index value is sent. This is the reverse mapping, back to the channel. It is specific to the partitioning of the channels.

required

Returns:

Type Description
SeriesBlock

The block representation of the record batch.

Source code in arrakis/block.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
@classmethod
def from_row_batch(
    cls,
    batch: pyarrow.RecordBatch,
    index_to_channel: dict[int, ChannelLike],
) -> SeriesBlock:
    """Create a series block from a record batch.

    Parameters
    ----------
    batch : pyarrow.RecordBatch
        A record batch, with a 'time' column with the timestamp, a
        'channel' column with the channel name, and a 'data' column
        containing the timeseries.
    index_to_channel : dict[int, Channel]
        Mapping from the id (channel index) to the channel.
        The channel name is not encoded in the record batch in order to
        save space.  Instead, an index value is sent.  This is the reverse
        mapping, back to the channel.  It is specific to the
        partitioning of the channels.
    Returns
    -------
    SeriesBlock
        The block representation of the record batch.

    """
    time = batch.column("time")[0].as_py()
    channel_indexes = batch.column("id").to_pylist()
    data = batch.column("data")
    series_dict = {}
    channel_dict = {}
    for idx, channel_index in enumerate(channel_indexes):
        channel = index_to_channel[channel_index]
        channel_name = channel.name
        series_dict[channel_name] = _arrow_to_numpy_array(data[idx].values)
        channel_dict[channel_name] = channel
    return cls(time, series_dict, channel_dict)

to_column_batch

to_column_batch(schema=None)

Create a row-based record batch from a series block.

Parameters:

Name Type Description Default
schema Schema

Pre-defined schema to use for the record batch. If provided, the schema must be compatible with the block's data (matching channel names and types). If not provided, a schema will be generated from the block's data.

None

Returns:

Type Description
RecordBatch

A record batch, with a 'time' column with the timestamp and channel columns with all channels to publish.

Raises:

Type Description
ArrowInvalid

If the provided schema is incompatible with the block's data, such as missing channels, mismatched channel names, or incompatible data types.

Source code in arrakis/block.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def to_column_batch(
    self, schema: pyarrow.Schema | None = None
) -> pyarrow.RecordBatch:
    """Create a row-based record batch from a series block.

    Parameters
    ----------
    schema : pyarrow.Schema, optional
        Pre-defined schema to use for the record batch. If provided, the schema
        must be compatible with the block's data (matching channel names and types).
        If not provided, a schema will be generated from the block's data.

    Returns
    -------
    pyarrow.RecordBatch
        A record batch, with a 'time' column with the timestamp
        and channel columns with all channels to publish.

    Raises
    ------
    pyarrow.lib.ArrowInvalid
        If the provided schema is incompatible with the block's data, such as
        missing channels, mismatched channel names, or incompatible data types.

    """
    if schema is None:
        schema = self._generate_column_schema()
    channels = [field.name for field in schema][1:]

    return pyarrow.RecordBatch.from_arrays(
        [
            pyarrow.array([self.time_ns], type=schema.field("time").type),
            *[
                _numpy_to_arrow_list_array(
                    self.data[channel], schema.field(channel).type
                )
                for channel in channels
            ],
        ],
        schema=schema,
    )

to_row_batches

to_row_batches(partitions)

Create column-based record batches from a series block.

Parameters:

Name Type Description Default
partitions dict[str, PartitionInfo]

Mapping between channel names and partition information (partition names and indices).

required

Yields:

Type Description
RecordBatch

Record batches, one per data type. The record batches have a 'time' column with the timestamp, a 'channel' column with the channel name, and a 'data' column containing the timeseries.

Source code in arrakis/block.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def to_row_batches(
    self, partitions: dict[str, PartitionInfo]
) -> Iterator[tuple[str, pyarrow.RecordBatch]]:
    """Create column-based record batches from a series block.

    Parameters
    ----------
    partitions : dict[str, PartitionInfo]
        Mapping between channel names and partition information
        (partition names and indices).

    Yields
    -------
    pyarrow.RecordBatch
        Record batches, one per data type. The record batches have a
        'time' column with the timestamp, a 'channel' column with
        the channel name, and a 'data' column containing the timeseries.

    """
    # group channels by partitions
    index_by_part = defaultdict(list)
    for channel in self.keys():
        if channel in partitions:
            partition_info = partitions[channel]
            index_by_part[partition_info.id].append(partition_info)

    # generate column-based record batches
    for partition_id, partition_infos in index_by_part.items():
        # all channels have the same data type
        dtype = self.channels[partition_infos[0].name].data_type
        schema = self._generate_row_schema(pyarrow.from_numpy_dtype(dtype))
        series: list[numpy.ndarray] = [
            pyarrow.array(self.data[info.name]) for info in partition_infos
        ]
        ids = [entry.index for entry in partition_infos]
        yield (
            partition_id,
            pyarrow.RecordBatch.from_arrays(
                [
                    pyarrow.array(
                        numpy.full(len(partition_infos), self.time_ns),
                        type=schema.field("time").type,
                    ),
                    pyarrow.array(ids, type=schema.field("id").type),
                    pyarrow.array(series, type=schema.field("data").type),
                ],
                schema=schema,
            ),
        )

combine_blocks

combine_blocks(*blocks)

Combine multiple SeriesBlocks from the same time into a single SeriesBlock

Each block must contain a distinct set of channels, and the time properties of each block must agree, otherwise an AssertionError will be thrown.

Parameters:

Name Type Description Default
*blocks SeriesBlock

The blocks to combine.

()

Returns:

Type Description
SeriesBlock

The combined block.

Source code in arrakis/block.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
def combine_blocks(*blocks: SeriesBlock) -> SeriesBlock:
    """Combine multiple SeriesBlocks from the same time into a single SeriesBlock

    Each block must contain a distinct set of channels, and the time
    properties of each block must agree, otherwise an AssertionError
    will be thrown.

    Parameters
    ----------
    *blocks : SeriesBlock
        The blocks to combine.

    Returns
    -------
    SeriesBlock
        The combined block.

    """
    time_ns = blocks[0].time_ns
    duration_ns = blocks[0].duration_ns
    series_dict: dict[str, numpy.ndarray] = {}
    channel_dict: dict[str, Channel] = {}
    for block in blocks:
        assert block.time_ns == time_ns, "all block times must agree"
        assert block.duration_ns == duration_ns, "all block durations must agree"
        for channel, series in block.items():
            assert channel not in series_dict, (
                f"channel {channel} has already been included from another block"
            )
            series_dict[channel] = series.data
            channel_dict[channel] = series.channel
    return SeriesBlock(time_ns, series_dict, channel_dict)

concatenate_blocks

concatenate_blocks(*blocks)

Join a sequence of timeseries blocks into a single block.

If the SeriesBlock arguments are not sequential in time an AssertionError will be thrown.

Parameters:

Name Type Description Default
*blocks SeriesBlock

The timeseries blocks to concatenate.

()

Returns:

Type Description
SeriesBlock

The combined timeseries block.

Source code in arrakis/block.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
def concatenate_blocks(*blocks: SeriesBlock) -> SeriesBlock:
    """Join a sequence of timeseries blocks into a single block.

    If the SeriesBlock arguments are not sequential in time an
    AssertionError will be thrown.

    Parameters
    ----------
    *blocks : SeriesBlock
        The timeseries blocks to concatenate.

    Returns
    -------
    SeriesBlock
        The combined timeseries block.

    """
    channel_dict = blocks[0].channels
    channel_set = set(channel_dict)
    start_time_ns = end_time_ns = blocks[0].time_ns
    duration_ns = 0
    for block in blocks:
        assert set(block.data.keys()) == channel_set, (
            "all blocks must contain the same channel sets"
        )
        assert block.time_ns == end_time_ns, (
            f"block start time ({block.time_ns}) does not match "
            f"concatenated block end time ({end_time_ns})"
        )
        duration_ns += block.duration_ns
        end_time_ns += block.duration_ns
    series_dict: dict[str, numpy.ndarray] = {}
    for channel in channel_set:
        series_dict[str(channel)] = numpy.concatenate(
            [block[str(channel)].data for block in blocks]
        )
    return SeriesBlock(start_time_ns, series_dict, channel_dict)

time_as_ns

time_as_ns(time)

Convert a timestamp from seconds to nanoseconds.

Parameters:

Name Type Description Default
time float

The timestamp to convert, in seconds.

required

Returns:

Type Description
int

The converted timestamp, in nanoseconds.

Source code in arrakis/block.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def time_as_ns(time: float) -> int:
    """Convert a timestamp from seconds to nanoseconds.

    Parameters
    ----------
    time : float
        The timestamp to convert, in seconds.

    Returns
    -------
    int
        The converted timestamp, in nanoseconds.

    """
    seconds = int(time) * Time.s
    nanoseconds = int((time % 1)) * Time.s
    return seconds + nanoseconds