Skip to content

client

Client-based API access.

Client

Client(url=None)

Retrieve channel information and timeseries data from Arrakis.

Parameters:

Name Type Description Default
url str

The URL to connect to. If the URL is not set, connect to a default server or one set by ARRAKIS_SERVER.

None
Source code in arrakis/client.py
74
75
76
77
def __init__(self, url: str | None = None):
    self.initial_url = parse_url(url)
    logger.debug("initial url: %s", self.initial_url)
    self._validator = RequestValidator()

count

count(pattern=constants.DEFAULT_MATCH, data_type=None, min_rate=constants.MIN_SAMPLE_RATE, max_rate=constants.MAX_SAMPLE_RATE, publisher=None, time=None, replay_id=None)

Count channels matching a set of conditions

Parameters:

Name Type Description Default
pattern str

Channel pattern to match channels with, using regular expressions.

DEFAULT_MATCH
data_type dtype - like | list[dtype - like]

If set, find all channels with these data types.

None
min_rate int

The minimum sampling rate for channels.

MIN_SAMPLE_RATE
max_rate int

The maximum sampling rate for channels.

MAX_SAMPLE_RATE
publisher str | list[str]

If set, find all channels associated with these publishers.

None
time float

GPS time in seconds indicating when the metadata query is valid. If None, routes to the live backend (current state).

None
replay_id str

Server-registered replay identifier.

None

Returns:

Type Description
int

The number of channels matching query.

Source code in arrakis/client.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def count(
    self,
    pattern: str = constants.DEFAULT_MATCH,
    data_type: DataTypeLike | None = None,
    min_rate: int | None = constants.MIN_SAMPLE_RATE,
    max_rate: int | None = constants.MAX_SAMPLE_RATE,
    publisher: str | list[str] | None = None,
    time: float | None = None,
    replay_id: str | None = None,
) -> int:
    """Count channels matching a set of conditions

    Parameters
    ----------
    pattern : str, optional
        Channel pattern to match channels with, using regular expressions.
    data_type : numpy.dtype-like | list[numpy.dtype-like], optional
        If set, find all channels with these data types.
    min_rate : int, optional
        The minimum sampling rate for channels.
    max_rate : int, optional
        The maximum sampling rate for channels.
    publisher : str | list[str], optional
        If set, find all channels associated with these publishers.
    time : float, optional
        GPS time in seconds indicating when the metadata query is valid.
        If None, routes to the live backend (current state).
    replay_id : str, optional
        Server-registered replay identifier.

    Returns
    -------
    int
        The number of channels matching query.

    """
    data_type = _parse_data_types(data_type)
    if min_rate is None:
        min_rate = constants.MIN_SAMPLE_RATE
    if max_rate is None:
        max_rate = constants.MAX_SAMPLE_RATE
    if publisher is None:
        publisher = []
    elif isinstance(publisher, str):
        publisher = [publisher]

    time_ns = time_as_ns(time) if time is not None else None
    descriptor = create_descriptor(
        RequestType.Count,
        pattern=pattern,
        data_type=data_type,
        min_rate=min_rate,
        max_rate=max_rate,
        publisher=publisher,
        time=time_ns,
        replay_id=replay_id,
        validator=self._validator,
    )
    count = 0
    with connect(self.initial_url) as client:
        flight_info = get_flight_info(client, descriptor)
        with MultiFlightReader(flight_info.endpoints, client) as stream:
            for data in stream.unpack():
                count += data["count"]
    return count

describe

describe(channels, time=None, replay_id=None)

Get channel metadata for channels requested

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required
time float

GPS time in seconds indicating when the metadata query is valid. If None, routes to the live backend (current state).

None
replay_id str

Server-registered replay identifier.

None

Returns:

Type Description
dict[str, Channel]

Mapping of channel names to channel metadata.

Source code in arrakis/client.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def describe(
    self,
    channels: list[str],
    time: float | None = None,
    replay_id: str | None = None,
) -> dict[str, Channel]:
    """Get channel metadata for channels requested

    Parameters
    ----------
    channels : list[str]
        List of channels to request.
    time : float, optional
        GPS time in seconds indicating when the metadata query is valid.
        If None, routes to the live backend (current state).
    replay_id : str, optional
        Server-registered replay identifier.

    Returns
    -------
    dict[str, Channel]
        Mapping of channel names to channel metadata.

    """
    time_ns = time_as_ns(time) if time is not None else None
    with connect(self.initial_url) as client:
        return self._describe(
            client,
            channels,
            time_ns=time_ns,
            replay_id=replay_id,
        )

domains

domains()

Get the list of domains available on the server.

Returns:

Type Description
list[str]

Sorted list of domain names.

Source code in arrakis/client.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def domains(self) -> list[str]:
    """Get the list of domains available on the server.

    Returns
    -------
    list[str]
        Sorted list of domain names.

    """
    result = self.scope_map()
    domain_set = set()
    for endpoint_info in result["endpoints"].values():
        domain_set.update(endpoint_info["scopes"].keys())
    return sorted(domain_set)

fetch

fetch(channels, start, end, replay_start=None, replay_end=None, replay_id=None)

Fetch timeseries data

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required
start float

GPS start time, in seconds.

required
end float

GPS end time, in seconds.

required
replay_start float

GPS start time of the replay window, in seconds.

None
replay_end float

GPS end time of the replay window, in seconds.

None
replay_id str

Server-registered replay identifier. Mutually exclusive with replay_start/replay_end.

None

Returns:

Type Description
SeriesBlock

Dictionary-like object containing all requested channel data.

Source code in arrakis/client.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def fetch(
    self,
    channels: list[str],
    start: float,
    end: float,
    replay_start: float | None = None,
    replay_end: float | None = None,
    replay_id: str | None = None,
) -> SeriesBlock:
    """Fetch timeseries data

    Parameters
    ----------
    channels : list[str]
        List of channels to request.
    start : float
        GPS start time, in seconds.
    end : float
        GPS end time, in seconds.
    replay_start : float, optional
        GPS start time of the replay window, in seconds.
    replay_end : float, optional
        GPS end time of the replay window, in seconds.
    replay_id : str, optional
        Server-registered replay identifier. Mutually exclusive with
        replay_start/replay_end.

    Returns
    -------
    SeriesBlock
        Dictionary-like object containing all requested channel data.

    """
    return concatenate_blocks(
        *self.stream(
            channels,
            start,
            end,
            replay_start=replay_start,
            replay_end=replay_end,
            replay_id=replay_id,
        )
    )

find

find(pattern=constants.DEFAULT_MATCH, data_type=None, min_rate=constants.MIN_SAMPLE_RATE, max_rate=constants.MAX_SAMPLE_RATE, publisher=None, time=None, replay_id=None)

Find channels matching a set of conditions

Parameters:

Name Type Description Default
pattern str

Channel pattern to match channels with, using regular expressions.

DEFAULT_MATCH
data_type dtype - like | list[dtype - like]

If set, find all channels with these data types.

None
min_rate int

Minimum sampling rate for channels.

MIN_SAMPLE_RATE
max_rate int

Maximum sampling rate for channels.

MAX_SAMPLE_RATE
publisher str | list[str]

If set, find all channels associated with these publishers.

None
time float

GPS time in seconds indicating when the metadata query is valid. If None, routes to the live backend (current state).

None
replay_id str

Server-registered replay identifier.

None

Yields:

Type Description
Channel

Channel objects for all channels matching query.

Source code in arrakis/client.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def find(
    self,
    pattern: str = constants.DEFAULT_MATCH,
    data_type: DataTypeLike | None = None,
    min_rate: int | None = constants.MIN_SAMPLE_RATE,
    max_rate: int | None = constants.MAX_SAMPLE_RATE,
    publisher: str | list[str] | None = None,
    time: float | None = None,
    replay_id: str | None = None,
) -> Generator[Channel, None, None]:
    """Find channels matching a set of conditions

    Parameters
    ----------
    pattern : str, optional
        Channel pattern to match channels with, using regular expressions.
    data_type : numpy.dtype-like | list[numpy.dtype-like], optional
        If set, find all channels with these data types.
    min_rate : int, optional
        Minimum sampling rate for channels.
    max_rate : int, optional
        Maximum sampling rate for channels.
    publisher : str | list[str], optional
        If set, find all channels associated with these publishers.
    time : float, optional
        GPS time in seconds indicating when the metadata query is valid.
        If None, routes to the live backend (current state).
    replay_id : str, optional
        Server-registered replay identifier.

    Yields
    -------
    Channel
        Channel objects for all channels matching query.

    """
    data_type = _parse_data_types(data_type)
    if min_rate is None:
        min_rate = constants.MIN_SAMPLE_RATE
    if max_rate is None:
        max_rate = constants.MAX_SAMPLE_RATE
    if publisher is None:
        publisher = []
    elif isinstance(publisher, str):
        publisher = [publisher]

    time_ns = time_as_ns(time) if time is not None else None
    descriptor = create_descriptor(
        RequestType.Find,
        pattern=pattern,
        data_type=data_type,
        min_rate=min_rate,
        max_rate=max_rate,
        publisher=publisher,
        time=time_ns,
        replay_id=replay_id,
        validator=self._validator,
    )
    with connect(self.initial_url) as client:
        yield from self._stream_channel_metadata(client, descriptor)

replays

replays()

Get available replay windows with start/end times.

Returns:

Type Description
dict

Mapping of replay IDs to their start/end GPS times in seconds.

Source code in arrakis/client.py
423
424
425
426
427
428
429
430
431
432
def replays(self) -> dict:
    """Get available replay windows with start/end times.

    Returns
    -------
    dict
        Mapping of replay IDs to their start/end GPS times in seconds.

    """
    return self._do_action("replays")

scope_map

scope_map()

Get the mapping of endpoints to their scopes.

Returns:

Type Description
dict

Mapping of endpoint information including scopes per domain.

Source code in arrakis/client.py
412
413
414
415
416
417
418
419
420
421
def scope_map(self) -> dict:
    """Get the mapping of endpoints to their scopes.

    Returns
    -------
    dict
        Mapping of endpoint information including scopes per domain.

    """
    return self._do_action("scope-map")

server_info

server_info()

Get server version and capability metadata.

Returns:

Type Description
dict

Server metadata including version info, backend type, capabilities, and domains.

Source code in arrakis/client.py
400
401
402
403
404
405
406
407
408
409
410
def server_info(self) -> dict:
    """Get server version and capability metadata.

    Returns
    -------
    dict
        Server metadata including version info, backend type,
        capabilities, and domains.

    """
    return self._do_action("server-info")

stream

stream(channels, start=None, end=None, kafka_url=None, replay_start=None, replay_end=None, replay_id=None)

Stream live or offline timeseries data

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required
start float

GPS start time, in seconds.

None
end float

GPS end time, in seconds.

None
replay_start float

GPS start time of the replay window, in seconds.

None
replay_end float

GPS end time of the replay window, in seconds.

None
replay_id str

Server-registered replay identifier. Mutually exclusive with replay_start/replay_end.

None

Yields:

Type Description
SeriesBlock

Dictionary-like object containing all requested channel data.

Setting neither start nor end begins a live stream starting
from now.
Source code in arrakis/client.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def stream(
    self,
    channels: list[str],
    start: float | None = None,
    end: float | None = None,
    kafka_url: str | None = None,
    replay_start: float | None = None,
    replay_end: float | None = None,
    replay_id: str | None = None,
) -> Generator[SeriesBlock, None, None]:
    """Stream live or offline timeseries data

    Parameters
    ----------
    channels : list[str]
        List of channels to request.
    start : float, optional
        GPS start time, in seconds.
    end : float, optional
        GPS end time, in seconds.
    replay_start : float, optional
        GPS start time of the replay window, in seconds.
    replay_end : float, optional
        GPS end time of the replay window, in seconds.
    replay_id : str, optional
        Server-registered replay identifier. Mutually exclusive with
        replay_start/replay_end.

    Yields
    ------
    SeriesBlock
        Dictionary-like object containing all requested channel data.

    Setting neither start nor end begins a live stream starting
    from now.

    """
    start_ns = time_as_ns(start) if start is not None else None
    end_ns = time_as_ns(end) if end is not None else None
    replay_start_ns = time_as_ns(replay_start) if replay_start is not None else None
    replay_end_ns = time_as_ns(replay_end) if replay_end is not None else None
    metadata: dict[str, Channel] = {}
    reader: StreamReader

    # for opportunistic replay, translate start time into the
    # replay window so describe routes to the archival backend
    describe_time: int | None
    if replay_start_ns is not None and replay_end_ns is not None and not replay_id:
        describe_time = translate_time(start_ns, replay_start_ns, replay_end_ns)
    else:
        describe_time = start_ns

    with connect(self.initial_url) as client:
        # initial describe request to get the full metadata needed
        # to construct the appropriate muxer — use start time so the
        # info server routes to the correct backend for the request
        metadata = self._describe(
            client,
            channels,
            time_ns=describe_time,
            replay_id=replay_id,
        )
        for channel in metadata.values():
            assert channel.stride, "Channels do not include stride info."

        # get the flight info for streams
        descriptor = create_descriptor(
            RequestType.Stream,
            channels=channels,
            start=start_ns,
            end=end_ns,
            replay_start=replay_start_ns,
            replay_end=replay_end_ns,
            replay_id=replay_id,
            validator=self._validator,
        )
        flight_info = get_flight_info(client, descriptor)

    reader = _construct_stream_reader(
        flight_info.endpoints,
        metadata,
        start_ns,
        kafka_url=kafka_url,
        initial_url=self.initial_url,
    )

    mux: BlockMuxStream = BlockMuxStream(reader, start=start_ns)

    for block in mux.stream(end_ns):
        yield block