Skip to content

channel

Channel information.

Channel dataclass

Channel(name, data_type, sample_rate, time=None, publisher=None, partition_id=None, partition_index=None, expected_latency=None)

Metadata associated with a channel.

Channels have the form {domain}:*.

Parameters:

Name Type Description Default
name str

The name associated with this channel.

required
data_type dtype

The data type associated with this channel.

required
sample_rate float

The sampling rate associated with this channel.

required
time int

The timestamp when this metadata became active.

None
publisher str

The publisher associated with this channel.

None
partition_id str

The partition ID associated with this channel.

None
partition_index int | None

Partition index for the channel. It is unique within the partition and allows the use of an integer value to identify the channel instead of a string.

None
expected_latency int | None

Expected publication latency for this channel's data, in seconds.

None

from_field classmethod

from_field(field)

Create a Channel from Arrow Flight field metadata.

Parameters:

Name Type Description Default
field field

The channel field containing relevant metadata.

required

Returns:

Type Description
Channel

The newly created channel.

Source code in arrakis/channel.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@classmethod
def from_field(cls, field: pyarrow.field) -> Channel:
    """Create a Channel from Arrow Flight field metadata.

    Parameters
    ----------
    field : pyarrow.field
        The channel field containing relevant metadata.

    Returns
    -------
    Channel
        The newly created channel.

    """
    data_type = numpy.dtype(_list_dtype_to_str(field.type))
    sample_rate = float(field.metadata[b"rate"].decode())
    return cls(field.name, data_type, sample_rate)

from_json classmethod

from_json(payload)

Create a Channel from its JSON representation.

Parameters:

Name Type Description Default
payload str

The JSON-serialized channel.

required

Returns:

Type Description
Channel

The newly created channel.

Source code in arrakis/channel.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@classmethod
def from_json(cls, payload: str) -> Channel:
    """Create a Channel from its JSON representation.

    Parameters
    ----------
    payload : str
        The JSON-serialized channel.

    Returns
    -------
    Channel
        The newly created channel.

    """
    obj = json.loads(payload)
    obj["data_type"] = numpy.dtype(obj["data_type"])
    return cls(**obj)

to_json

to_json(time=None)

Serialize channel metadata to JSON.

Parameters:

Name Type Description Default
time int

If specified, the timestamp when this metadata became active.

None
Source code in arrakis/channel.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def to_json(self, time: int | None = None) -> str:
    """Serialize channel metadata to JSON.

    Parameters
    ----------
    time : int, optional
        If specified, the timestamp when this metadata became active.

    """
    # generate dict from dataclass and adjust fields
    # to be JSON compatible. In addition, store the
    # channel name, as well as updating the timestamp
    # if passed in.
    obj = asdict(self)
    obj["data_type"] = numpy.dtype(self.data_type).name
    if time is not None:
        obj["time"] = time
    obj = {k: v for k, v in obj.items() if v is not None}
    return json.dumps(obj)

ParsedChannelName dataclass

ParsedChannelName(domain, subsystem, subsystem_delimiter, rest)

A parsed channel name

Channel names have the following structure:

<domain>:<subsystem>[-_]<rest>

parse classmethod

parse(name)

Parse a channel name into it's constituent parts

Source code in arrakis/channel.py
44
45
46
47
48
49
50
51
52
53
54
55
@classmethod
def parse(cls, name: str) -> ParsedChannelName:
    """Parse a channel name into it's constituent parts"""
    if rem := CHANNEL_NAME_RE.match(name):
        return cls(
            rem["domain"],
            rem["subsystem"],
            rem["delimiter"],
            rem["rest"],
        )
    msg = "Invalid channel name structure."
    raise ValueError(msg)

PartitionInfo dataclass

PartitionInfo(name, id, index)

Partition metadata associated with a channel.

Parameters:

Name Type Description Default
name str

The name associated with this channel.

required
id str

The partition ID associated with this channel.

required
index int

Partition index for the channel. It is unique within the partition and allows the use of an integer value to identify the channel instead of a string.

required

from_channel classmethod

from_channel(channel)

Extract partition information from a Channel.

Parameters:

Name Type Description Default
channel Channel

The channel containing relevant metadata.

required

Returns:

Type Description
PartitionInfo

The newly created partition information.

Source code in arrakis/channel.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@classmethod
def from_channel(cls, channel: Channel) -> PartitionInfo:
    """Extract partition information from a Channel.

    Parameters
    ----------
    channel : Channel
        The channel containing relevant metadata.

    Returns
    -------
    PartitionInfo
        The newly created partition information.

    """
    assert channel.partition_id is not None
    assert channel.partition_index is not None
    return cls(
        name=channel.name,
        id=channel.partition_id,
        index=channel.partition_index,
    )