Publishing Data¶
The arrakis.publish.Publisher class publishes timeseries data into Arrakis via Kafka.
Publisher Setup¶
A publisher is identified by a server-assigned publisher ID. Create a
Publisher, register it to retrieve channel assignments, then enter the
publication context:
from arrakis import Publisher
publisher = Publisher("my_producer")
publisher.register()
Registration queries the server for the channels assigned to this publisher ID
and their Kafka partition information. If the publisher ID is unknown to the
server, a ValueError is raised.
Dynamic Publishers¶
Publishers that are configured as dynamic on the server can register their
own channels by passing them to register():
from arrakis import Channel, Publisher
publisher = Publisher("my_dynamic_producer")
publisher.register(
channels=[
Channel("H1:FKE-NEW_CHANNEL", data_type="float64", sample_rate=1024),
]
)
Any channel that is new, or whose core metadata (sample rate or data type) differs from the server's current registration for this publisher, is partitioned dynamically by the server, which responds with the required Kafka partition information. Channels whose metadata already matches the server's state are left untouched.
If the publisher is not permitted to self-register (the server has not marked
it dynamic), a pyarrow.flight.FlightUnauthorizedError is raised with a
server-provided explanatory message.
After registration, inspect the assigned channels:
for name, channel in publisher.channels.items():
print(f"{name}: {channel.sample_rate} Hz, stride={channel.stride} ns")
Publishing Blocks¶
Publishing uses a context manager that sets up the Kafka producer connection:
from arrakis import Channel, Publisher, SeriesBlock, Time
import numpy
publisher = Publisher("my_producer")
publisher.register()
metadata = {
"H1:FKE-TEST_CHANNEL1": Channel(
"H1:FKE-TEST_CHANNEL1",
data_type=numpy.float64,
sample_rate=4,
),
"H1:FKE-TEST_CHANNEL2": Channel(
"H1:FKE-TEST_CHANNEL2",
data_type=numpy.int32,
sample_rate=2,
),
}
with publisher:
# one second of data: 4 samples at 4 Hz, 2 samples at 2 Hz
series = {
"H1:FKE-TEST_CHANNEL1": numpy.array(
[0.1, 0.2, 0.3, 0.4], dtype=numpy.float64
),
"H1:FKE-TEST_CHANNEL2": numpy.array(
[1, 2], dtype=numpy.int32
),
}
block = SeriesBlock(
1234567890 * Time.SECONDS,
series,
metadata,
)
publisher.publish(block)
Warning
Always use the context manager (with publisher:) or call publisher.enter()
before publishing. Calling publish() without initializing the Kafka
producer raises a RuntimeError.
Time and Frequency Units¶
The Time and Freq enums help with unit
conversions:
from arrakis import Time, Freq
# Time: convert to nanoseconds
timestamp_ns = 1234567890 * Time.SECONDS # GPS seconds -> nanoseconds
timestamp_ns = 1234567890 * Time.s # shorthand
# Freq: convert sample rate to stride in nanoseconds
stride_ns = 64 * Freq.Hz # 64 Hz -> stride for one sample
stride_ns = 1 * Freq.kHz # 1 kHz -> stride for one sample
Time members:
| Member | Value (ns) |
|---|---|
SECONDS / s |
1,000,000,000 |
MILLISECONDS / ms |
1,000,000 |
MICROSECONDS / us |
1,000 |
NANOSECONDS / ns |
1 |
Freq members:
| Member | Samples/s |
|---|---|
Hz |
1 |
kHz |
1,000 |
MHz |
1,000,000 |
GHz |
1,000,000,000 |
Multiplying a number by a Freq member converts it to a stride in nanoseconds:
64 * Freq.Hz gives the nanosecond period for a 64 Hz signal.
Replay Publishing¶
Publishers can produce derived channels under a named replay context. Data
is written to replay-namespaced Kafka topics and is only visible to clients
querying with the matching replay_id.
Specify a replay_id when creating the publisher:
from arrakis import Publisher
publisher = Publisher("DERIVED_O4a", replay_id="O4a")
publisher.register()
The server validates that the publisher is authorized for this replay ID
(it must appear in the publisher's allowed_replay_ids configuration).
Publishers configured with live = false on the server must specify a
replay_id -- attempting to publish without one raises a
FlightUnauthorizedError.
Registration, entering the publication context, and publishing blocks all
work identically to live publishing. The only difference is that data is
written to arrakis-replay-{replay_id}-{partition_id} topics instead of
arrakis-{partition_id}.
with publisher:
publisher.publish(block) # writes to replay-namespaced topic
Channels published this way are tagged with the replay_id in the server's
metadata and are discoverable via arrakis.find(replay_id="O4a").
Validation¶
The publisher validates each block before sending:
- Channel metadata
- The channel metadata in the block must exactly match what the server assigned during registration (name, data type, sample rate).
- Stride
- The block duration must match the publisher's expected stride. Blocks with
the wrong duration are rejected with a
ValueError. - Monotonic timestamps
- Blocks must be published with strictly increasing timestamps. Publishing a
block with a timestamp equal to or earlier than the previous block raises a
ValueError.