Skip to content

Publishing Data

The arrakis.publish.Publisher class publishes timeseries data into Arrakis via Kafka.

Publisher Setup

A publisher is identified by a server-assigned publisher ID. Create a Publisher, register it to retrieve channel assignments, then enter the publication context:

from arrakis import Publisher

publisher = Publisher("my_producer")
publisher.register()

Registration queries the server for the channels assigned to this publisher ID and their Kafka partition information. If the publisher ID is unknown to the server, a ValueError is raised.

Dynamic Publishers

Publishers that are configured as dynamic on the server can register their own channels by passing them to register():

from arrakis import Channel, Publisher

publisher = Publisher("my_dynamic_producer")
publisher.register(
    channels=[
        Channel("H1:FKE-NEW_CHANNEL", data_type="float64", sample_rate=1024),
    ]
)

Any channel that is new, or whose core metadata (sample rate or data type) differs from the server's current registration for this publisher, is partitioned dynamically by the server, which responds with the required Kafka partition information. Channels whose metadata already matches the server's state are left untouched.

If the publisher is not permitted to self-register (the server has not marked it dynamic), a pyarrow.flight.FlightUnauthorizedError is raised with a server-provided explanatory message.

After registration, inspect the assigned channels:

for name, channel in publisher.channels.items():
    print(f"{name}: {channel.sample_rate} Hz, stride={channel.stride} ns")

Publishing Blocks

Publishing uses a context manager that sets up the Kafka producer connection:

from arrakis import Channel, Publisher, SeriesBlock, Time
import numpy

publisher = Publisher("my_producer")
publisher.register()

metadata = {
    "H1:FKE-TEST_CHANNEL1": Channel(
        "H1:FKE-TEST_CHANNEL1",
        data_type=numpy.float64,
        sample_rate=4,
    ),
    "H1:FKE-TEST_CHANNEL2": Channel(
        "H1:FKE-TEST_CHANNEL2",
        data_type=numpy.int32,
        sample_rate=2,
    ),
}

with publisher:
    # one second of data: 4 samples at 4 Hz, 2 samples at 2 Hz
    series = {
        "H1:FKE-TEST_CHANNEL1": numpy.array(
            [0.1, 0.2, 0.3, 0.4], dtype=numpy.float64
        ),
        "H1:FKE-TEST_CHANNEL2": numpy.array(
            [1, 2], dtype=numpy.int32
        ),
    }
    block = SeriesBlock(
        1234567890 * Time.SECONDS,
        series,
        metadata,
    )
    publisher.publish(block)

Warning

Always use the context manager (with publisher:) or call publisher.enter() before publishing. Calling publish() without initializing the Kafka producer raises a RuntimeError.

Time and Frequency Units

The Time and Freq enums help with unit conversions:

from arrakis import Time, Freq

# Time: convert to nanoseconds
timestamp_ns = 1234567890 * Time.SECONDS    # GPS seconds -> nanoseconds
timestamp_ns = 1234567890 * Time.s           # shorthand

# Freq: convert sample rate to stride in nanoseconds
stride_ns = 64 * Freq.Hz     # 64 Hz -> stride for one sample
stride_ns = 1 * Freq.kHz     # 1 kHz -> stride for one sample

Time members:

Member Value (ns)
SECONDS / s 1,000,000,000
MILLISECONDS / ms 1,000,000
MICROSECONDS / us 1,000
NANOSECONDS / ns 1

Freq members:

Member Samples/s
Hz 1
kHz 1,000
MHz 1,000,000
GHz 1,000,000,000

Multiplying a number by a Freq member converts it to a stride in nanoseconds: 64 * Freq.Hz gives the nanosecond period for a 64 Hz signal.

Replay Publishing

Publishers can produce derived channels under a named replay context. Data is written to replay-namespaced Kafka topics and is only visible to clients querying with the matching replay_id.

Specify a replay_id when creating the publisher:

from arrakis import Publisher

publisher = Publisher("DERIVED_O4a", replay_id="O4a")
publisher.register()

The server validates that the publisher is authorized for this replay ID (it must appear in the publisher's allowed_replay_ids configuration). Publishers configured with live = false on the server must specify a replay_id -- attempting to publish without one raises a FlightUnauthorizedError.

Registration, entering the publication context, and publishing blocks all work identically to live publishing. The only difference is that data is written to arrakis-replay-{replay_id}-{partition_id} topics instead of arrakis-{partition_id}.

with publisher:
    publisher.publish(block)  # writes to replay-namespaced topic

Channels published this way are tagged with the replay_id in the server's metadata and are discoverable via arrakis.find(replay_id="O4a").

Validation

The publisher validates each block before sending:

Channel metadata
The channel metadata in the block must exactly match what the server assigned during registration (name, data type, sample rate).
Stride
The block duration must match the publisher's expected stride. Blocks with the wrong duration are rejected with a ValueError.
Monotonic timestamps
Blocks must be published with strictly increasing timestamps. Publishing a block with a timestamp equal to or earlier than the previous block raises a ValueError.