Skip to content

Reference

BaseKevinbotSubsystem

The base subsystem class.

Not to be used directly

Source code in src/kevinbotlib/core.py
33
34
35
36
37
38
39
40
41
class BaseKevinbotSubsystem:
    """The base subsystem class.

    Not to be used directly
    """

    def __init__(self, robot: "SerialKevinbot | MqttKevinbot") -> None:
        self.robot = robot
        self.robot._register_component(self)  # noqa: SLF001

BaseKevinbot

The base robot class.

Not to be used directly

Source code in src/kevinbotlib/core.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class BaseKevinbot:
    """The base robot class.

    Not to be used directly
    """

    def __init__(self) -> None:
        self._state = KevinbotState()
        self._server_state = KevinbotServerState()
        self.type = KevinbotConnectionType.BASE
        self._subsystems: list[BaseKevinbotSubsystem] = []

        self._auto_disconnect = True
        self._auto_disable = True

    def get_state(self) -> KevinbotState:
        """Gets the current state of the robot

        Returns:
            KevinbotState: State class
        """
        return self._state

    @property
    def server_state(self) -> KevinbotServerState:
        return self._server_state

    def disconnect(self):
        """Basic robot disconnect"""
        self._state.connected = False
        if self.auto_disable:
            self.request_disable()

    @property
    def auto_disconnect(self) -> bool:
        """Getter for auto disconnect state.

        Returns:
            bool: Whether to disconnect on application exit
        """
        return self._auto_disconnect

    @auto_disconnect.setter
    def auto_disconnect(self, value: bool):
        """Setter for auto disconnect.

        Args:
            value (bool): Whether to disconnect on application exit
        """
        self._auto_disconnect = value
        if value:
            atexit.register(self.disconnect)
        else:
            atexit.unregister(self.disconnect)

    @property
    def auto_disable(self) -> bool:
        """Getter for auto disable state.

        Returns:
            bool: Whether to disconnect on application exit
        """
        return self._auto_disable

    @auto_disable.setter
    def auto_disable(self, value: bool):
        """Setter for auto disable.

        Args:
            value (bool): Whether to disconnect on application exit
        """
        self._auto_disable = value

    def send(self, data: str):
        """Null implementation of the send method

        Args:
            data (str): Data to send nowhere

        Raises:
            NotImplementedError: Always raised
        """
        msg = f"Function not implemented, attempting to send {data}"
        raise NotImplementedError(msg)

    def request_enable(self) -> int:
        """Request the core to enable

        Returns:
            int: Always 1
        """
        self.send("kevinbot.tryenable=1")
        return 1

    def request_disable(self) -> int:
        """Request the core to disable

        Returns:
            int: Always 1
        """
        self.send("kevinbot.tryenable=0")
        return 1

    def e_stop(self):
        """Attempt to send and E-Stop signal to the Core"""
        self.send("system.estop")
        self._state.estop = True

    def _register_component(self, component: BaseKevinbotSubsystem):
        self._subsystems.append(component)

auto_disconnect: bool property writable

Getter for auto disconnect state.

Returns:

Name Type Description
bool bool

Whether to disconnect on application exit

auto_disable: bool property writable

Getter for auto disable state.

Returns:

Name Type Description
bool bool

Whether to disconnect on application exit

get_state()

Gets the current state of the robot

Returns:

Name Type Description
KevinbotState KevinbotState

State class

Source code in src/kevinbotlib/core.py
59
60
61
62
63
64
65
def get_state(self) -> KevinbotState:
    """Gets the current state of the robot

    Returns:
        KevinbotState: State class
    """
    return self._state

disconnect()

Basic robot disconnect

Source code in src/kevinbotlib/core.py
71
72
73
74
75
def disconnect(self):
    """Basic robot disconnect"""
    self._state.connected = False
    if self.auto_disable:
        self.request_disable()

send(data)

Null implementation of the send method

Parameters:

Name Type Description Default
data str

Data to send nowhere

required

Raises:

Type Description
NotImplementedError

Always raised

Source code in src/kevinbotlib/core.py
117
118
119
120
121
122
123
124
125
126
127
def send(self, data: str):
    """Null implementation of the send method

    Args:
        data (str): Data to send nowhere

    Raises:
        NotImplementedError: Always raised
    """
    msg = f"Function not implemented, attempting to send {data}"
    raise NotImplementedError(msg)

request_enable()

Request the core to enable

Returns:

Name Type Description
int int

Always 1

Source code in src/kevinbotlib/core.py
129
130
131
132
133
134
135
136
def request_enable(self) -> int:
    """Request the core to enable

    Returns:
        int: Always 1
    """
    self.send("kevinbot.tryenable=1")
    return 1

request_disable()

Request the core to disable

Returns:

Name Type Description
int int

Always 1

Source code in src/kevinbotlib/core.py
138
139
140
141
142
143
144
145
def request_disable(self) -> int:
    """Request the core to disable

    Returns:
        int: Always 1
    """
    self.send("kevinbot.tryenable=0")
    return 1

e_stop()

Attempt to send and E-Stop signal to the Core

Source code in src/kevinbotlib/core.py
147
148
149
150
def e_stop(self):
    """Attempt to send and E-Stop signal to the Core"""
    self.send("system.estop")
    self._state.estop = True

SerialKevinbot

Bases: BaseKevinbot

The main serial robot class

Source code in src/kevinbotlib/core.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
class SerialKevinbot(BaseKevinbot):
    """The main serial robot class"""

    def __init__(self) -> None:
        super().__init__()
        self.type = KevinbotConnectionType.SERIAL

        self.serial: Serial | None = None
        self.rx_thread: Thread | None = None

        self._callback: Callable[[str, str | None], Any] | None = None

        atexit.register(self.disconnect)

    def connect(
        self,
        port: str,
        baud: int,
        timeout: float,
        tick_interval: float,
        ser_timeout: float = 0.5,
        *,
        tick_thread: bool = True,
    ):
        """Start a connection with Kevinbot Core

        Args:
            port (str): Serial port to use (`/dev/ttyAMA2` is standard with the typical Kevinbot Hardware)
            baud (int): Baud rate to use (`921600` is typical for the defualt Core configs)
            timeout (float): Timeout for handshake
            tick_interval (float): How often a heartbeat should be produced
            ser_timeout (float, optional): Readline timeout, should be lower than `timeout`. Defaults to 0.5.
            tick_thread (bool, optional): Whether a tick thread should be started. Defaults to True.

        Raises:
            HandshakeTimeoutException: Core didn't respond to the connection handshake before the timeout
        """
        serial = self._setup_serial(port, baud, ser_timeout)

        start_time = time.monotonic()
        while True:
            serial.write(b"connection.isready=0\n")

            line = serial.readline().decode("utf-8", errors="ignore").strip("\n")

            if line == "ready":
                serial.write(b"connection.start\n")
                serial.write(b"core.errors.clear\n")
                serial.write(b"connection.ok\n")
                break

            if time.monotonic() - start_time > timeout:
                msg = "Handshake timed out"
                raise HandshakeTimeoutException(msg)

            time.sleep(0.1)  # Avoid spamming the connection

        # Data rx thread
        self.rx_thread = Thread(target=self._rx_loop, args=(serial, "="), daemon=True)
        self.rx_thread.name = "KevinbotLib.Rx"
        self.rx_thread.start()

        if tick_thread:
            thread = Thread(target=self.tick_loop, args=(tick_interval,), daemon=True)
            thread.start()
            thread.name = "KevinbotLib.Tick"

        self._state.connected = True

    def disconnect(self):
        """Disconnect core gracefully"""
        super().disconnect()
        if self.serial and self.serial.is_open:
            self.send("core.link.unlink")
            self.serial.flush()
            self.serial.close()
        else:
            logger.warning("Already disconnected")

    @property
    def on_data(self) -> Callable[[str, str | None], Any] | None:
        """Return the data recieved callback function

        Returns:
            Callable[[str, str | None], Any] | None: Callback function
        """
        return self._callback

    @on_data.setter
    def on_data(self, callback: Callable[[str, str | None], Any]) -> None:
        """Set the data recieved callback function

        Args:
            callback (Callable[[str, str  |  None], Any]): Callback function
        """
        self._callback = callback

    def tick_loop(self, interval: float = 1):
        """Send ticks indefinetely

        Args:
            interval (float, optional): Interval between ticks in seconds. Defaults to 1.
        """
        while True:
            self._tick()
            time.sleep(interval)

    def send(self, data: str):
        """Send a string through serial.

        Automatically adds a newline.

        Args:
            data (str): Data to send
        """
        self.raw_tx((data + "\n").encode("utf-8"))

    def raw_tx(self, data: bytes):
        """Send raw bytes over serial.

        Args:
            data (bytes): Raw data
        """
        if self.serial:
            self.serial.write(data)
        else:
            logger.warning(f"Couldn't transmit data: {data!r}, Core isn't connected")

    def _rx_loop(self, serial: Serial, delimeter: str = "="):
        while True:
            try:
                raw: bytes = serial.readline()
            except TypeError:
                # serial has been stopped
                return

            cmd: str = raw.decode("utf-8").split(delimeter, maxsplit=1)[0].strip()
            if not cmd:
                continue

            val: str | None = None
            if len(raw.decode("utf-8").split(delimeter)) > 1:
                val = raw.decode("utf-8").split(delimeter, maxsplit=1)[1].strip("\r\n")

            match cmd:
                case "ready":
                    pass
                case "core.enabled":
                    if not val:
                        logger.warning("No value recieved for 'core.enabled'")
                        continue
                    if val.lower() in ["true", "t", "1"]:
                        self._state.enabled = True
                    else:
                        self._state.enabled = False
                case "core.uptime":
                    if val:
                        self._state.uptime = int(val)
                case "core.uptime_ms":
                    if val:
                        self._state.uptime_ms = int(val)
                case "connection.requesthandshake":
                    serial.write(b"connection.start\n")
                    serial.write(b"core.errors.clear\n")
                    serial.write(b"connection.ok\n")
                    logger.warning("A handshake was re-requested. This could indicate a core power fault or reset")
                case "motors.amps":
                    if not val:
                        logger.error("No value given for motors.amps")
                        continue

                    try:
                        [int(sv) for sv in val.split(",")]
                    except ValueError:
                        logger.error(f"Values of motion.amps are not ints: {val}")
                        continue

                    self._state.motion.amps = list(map(float, val.split(",")))
                case "motors.watts":
                    if not val:
                        logger.error("No value given for motors.watts")
                        continue

                    try:
                        [int(sv) for sv in val.split(",")]
                    except ValueError:
                        logger.error(f"Values of motion.watts are not ints: {val}")
                        continue

                    self._state.motion.watts = list(map(float, val.split(",")))
                case "motors.status":
                    if not val:
                        logger.error("No value given for motors.status")
                        continue

                    try:
                        [int(sv) for sv in val.split(",")]
                    except ValueError:
                        logger.error(f"Values of motion.status are not ints: {val}")
                        continue

                    self._state.motion.status = [MotorDriveStatus(int(x)) for x in val.split(",")]
                case "bms.voltages":
                    if not val:
                        logger.error("No value given for bms.voltages")
                        continue

                    try:
                        [int(sv) for sv in val.split(",")]
                    except ValueError:
                        logger.error(f"Values of bms.voltages are not ints: {val}")
                        continue

                    self._state.battery.voltages = [float(x) / 10 for x in val.split(",")]
                case "bms.raw_voltages":
                    if val:
                        self._state.battery.raw_voltages = [float(x) / 10 for x in val.split(",")]
                case "bms.status":
                    if val:
                        self._state.battery.states = [BmsBatteryState(int(x)) for x in val.split(",")]
                case "sensors.gyro":
                    if not val:
                        logger.error("No value given for sensors.gyro")
                        continue

                    try:
                        [int(sv) for sv in val.split(",")]
                    except ValueError:
                        logger.error(f"Values of sensors.gyro are not ints: {val}")
                        continue

                    self._state.imu.gyro = [int(x) for x in val.split(",")]
                case "sensors.accel":
                    if not val:
                        logger.error("No value given for sensors.accel")
                        continue

                    try:
                        [int(sv) for sv in val.split(",")]
                    except ValueError:
                        logger.error(f"Values of sensors.accel are not ints: {val}")
                        continue

                    self._state.imu.accel = [int(x) for x in val.split(",")]
                case "sensors.temps":
                    if val:
                        temps = val.split(",")
                        valid = True
                        for temp in temps:
                            if not re.match("^[-+]?[0-9]+$", temp):
                                logger.error(f"Found non-integer value in temps, {temps}")
                                valid = False
                                break
                        if valid:
                            self._state.thermal.left_motor = int(temps[0]) / 100
                            self._state.thermal.right_motor = int(temps[1]) / 100
                            self._state.thermal.internal = int(temps[2]) / 100
                case "sensors.bme":
                    if val:
                        vals = val.split(",")
                        for value in vals:
                            if not re.match("^[-+]?[0-9]+$", value):
                                logger.error(f"Found non-integer value in bme values, {temps}")
                                continue

                        self._state.enviro.temperature = int(vals[0])
                        self._state.enviro.humidity = int(vals[2])
                        self._state.enviro.pressure = int(vals[3])
                case _:
                    logger.warning(f"Got a command that isn't supported yet: {cmd} with value {val}")

            if self.on_data:
                self.on_data(cmd, val)

    def _setup_serial(self, port: str, baud: int, timeout: float = 1):
        self.serial = Serial(port, baud, timeout=timeout)
        return self.serial

    def _tick(self):
        if self.serial and self.serial.is_open:
            self.serial.write(b"core.tick\n")

on_data: Callable[[str, str | None], Any] | None property writable

Return the data recieved callback function

Returns:

Type Description
Callable[[str, str | None], Any] | None

Callable[[str, str | None], Any] | None: Callback function

connect(port, baud, timeout, tick_interval, ser_timeout=0.5, *, tick_thread=True)

Start a connection with Kevinbot Core

Parameters:

Name Type Description Default
port str

Serial port to use (/dev/ttyAMA2 is standard with the typical Kevinbot Hardware)

required
baud int

Baud rate to use (921600 is typical for the defualt Core configs)

required
timeout float

Timeout for handshake

required
tick_interval float

How often a heartbeat should be produced

required
ser_timeout float

Readline timeout, should be lower than timeout. Defaults to 0.5.

0.5
tick_thread bool

Whether a tick thread should be started. Defaults to True.

True

Raises:

Type Description
HandshakeTimeoutException

Core didn't respond to the connection handshake before the timeout

Source code in src/kevinbotlib/core.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def connect(
    self,
    port: str,
    baud: int,
    timeout: float,
    tick_interval: float,
    ser_timeout: float = 0.5,
    *,
    tick_thread: bool = True,
):
    """Start a connection with Kevinbot Core

    Args:
        port (str): Serial port to use (`/dev/ttyAMA2` is standard with the typical Kevinbot Hardware)
        baud (int): Baud rate to use (`921600` is typical for the defualt Core configs)
        timeout (float): Timeout for handshake
        tick_interval (float): How often a heartbeat should be produced
        ser_timeout (float, optional): Readline timeout, should be lower than `timeout`. Defaults to 0.5.
        tick_thread (bool, optional): Whether a tick thread should be started. Defaults to True.

    Raises:
        HandshakeTimeoutException: Core didn't respond to the connection handshake before the timeout
    """
    serial = self._setup_serial(port, baud, ser_timeout)

    start_time = time.monotonic()
    while True:
        serial.write(b"connection.isready=0\n")

        line = serial.readline().decode("utf-8", errors="ignore").strip("\n")

        if line == "ready":
            serial.write(b"connection.start\n")
            serial.write(b"core.errors.clear\n")
            serial.write(b"connection.ok\n")
            break

        if time.monotonic() - start_time > timeout:
            msg = "Handshake timed out"
            raise HandshakeTimeoutException(msg)

        time.sleep(0.1)  # Avoid spamming the connection

    # Data rx thread
    self.rx_thread = Thread(target=self._rx_loop, args=(serial, "="), daemon=True)
    self.rx_thread.name = "KevinbotLib.Rx"
    self.rx_thread.start()

    if tick_thread:
        thread = Thread(target=self.tick_loop, args=(tick_interval,), daemon=True)
        thread.start()
        thread.name = "KevinbotLib.Tick"

    self._state.connected = True

disconnect()

Disconnect core gracefully

Source code in src/kevinbotlib/core.py
225
226
227
228
229
230
231
232
233
def disconnect(self):
    """Disconnect core gracefully"""
    super().disconnect()
    if self.serial and self.serial.is_open:
        self.send("core.link.unlink")
        self.serial.flush()
        self.serial.close()
    else:
        logger.warning("Already disconnected")

tick_loop(interval=1)

Send ticks indefinetely

Parameters:

Name Type Description Default
interval float

Interval between ticks in seconds. Defaults to 1.

1
Source code in src/kevinbotlib/core.py
253
254
255
256
257
258
259
260
261
def tick_loop(self, interval: float = 1):
    """Send ticks indefinetely

    Args:
        interval (float, optional): Interval between ticks in seconds. Defaults to 1.
    """
    while True:
        self._tick()
        time.sleep(interval)

send(data)

Send a string through serial.

Automatically adds a newline.

Parameters:

Name Type Description Default
data str

Data to send

required
Source code in src/kevinbotlib/core.py
263
264
265
266
267
268
269
270
271
def send(self, data: str):
    """Send a string through serial.

    Automatically adds a newline.

    Args:
        data (str): Data to send
    """
    self.raw_tx((data + "\n").encode("utf-8"))

raw_tx(data)

Send raw bytes over serial.

Parameters:

Name Type Description Default
data bytes

Raw data

required
Source code in src/kevinbotlib/core.py
273
274
275
276
277
278
279
280
281
282
def raw_tx(self, data: bytes):
    """Send raw bytes over serial.

    Args:
        data (bytes): Raw data
    """
    if self.serial:
        self.serial.write(data)
    else:
        logger.warning(f"Couldn't transmit data: {data!r}, Core isn't connected")

MqttKevinbot

Bases: BaseKevinbot

KevinbotLib interface over MQTT

Source code in src/kevinbotlib/core.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
class MqttKevinbot(BaseKevinbot):
    """KevinbotLib interface over MQTT"""

    def __init__(self, cid: str | None = None) -> None:
        """Instansiate a new KevinbotLib interface over MQTT

        Args:
            cid (str | None, optional): MQTT Client id. Defaults to an auto-generated uuid.
        """
        super().__init__()
        self.type = KevinbotConnectionType.MQTT

        self.root_topic = "kevinbot"
        self.host = "localhost"
        self.port = 1883
        self.keepalive = 60
        self.connected = False

        self._hb_thread: Thread | None = None  # thread to produce client's heartbeat
        self._server_hb_thread: Thread | None = None  # thread to check in server heartbeat is slow/stopped

        self._callback: Callable[[list[str], str], Any] | None = None  # message callback
        self._on_server_startup: Callable[[], Any] | None = None
        self._on_server_disconnect: Callable[[], Any] | None = None

        self._eyes: MqttEyes | None = None

        self.cid = cid if cid else f"kevinbotlib-{shortuuid.random()}"  # client id
        self.client = Client(CallbackAPIVersion.VERSION2, self.cid)
        self.client.on_message = self._on_message

        atexit.register(self.disconnect)

    @property
    def callback(self) -> Callable[[list[str], str], Any] | None:
        return self._callback

    @callback.setter
    def callback(self, callback: Callable[[list[str], str], Any] | None) -> None:
        self._callback = callback

    @property
    def on_server_startup(self) -> Callable[[], Any] | None:
        return self._on_server_startup

    @on_server_startup.setter
    def on_server_startup(self, callback: Callable[[], Any] | None) -> None:
        self._on_server_startup = callback

    @property
    def on_server_disconnect(self) -> Callable[[], Any] | None:
        return self._on_server_disconnect

    @on_server_disconnect.setter
    def on_server_disconnect(self, callback: Callable[[], Any] | None) -> None:
        self._on_server_disconnect = callback

    @property
    def mqtt_connected(self) -> bool:
        return self.client.is_connected()

    def connect(
        self,
        root_topic: str = "kevinbot",
        host: str = "localhost",
        port: int = 1883,
        timeout: float = 5.0,
        keepalive: int = 60,
        heartbeat: float = 1.0,
    ) -> MQTTErrorCode:
        """Connect to MQTT Broker

        Args:
            root_topic (str, optional): Root communication topic. Defaults to "kevinbot".
            host (str, optional): KevinbotLib server host. Defaults to "localhost".
            port (int, optional): Kevinbot MQTT Broker port. Defaults to 1883.
            timeout (float, optional): KevinbotLib connection timeout in seconds. Defaults to 5.
            keepalive (int, optional): Maximum period in seconds between communications with the broker. Defaults to 60.
            heartbeat (float, optional): Heartbeat interval in seconds. Defaults to 1.0.

        Returns:
            MQTTErrorCode: Connection error
        """
        self.host = host
        self.port = port
        self.keepalive = keepalive
        self.root_topic = root_topic
        self.connected = False

        self._last_ts_update = datetime.fromtimestamp(0, timezone.utc)
        self._last_server_hb = datetime.fromtimestamp(0, timezone.utc)

        rc = self.client.connect(self.host, self.port, self.keepalive)
        self.client.subscribe(f"{self.root_topic}/state", 0)
        self.client.subscribe(f"{self.root_topic}/eyes/state", 0)
        self.client.subscribe(f"{self.root_topic}/serverstate", 0)
        self.client.subscribe(f"{self.root_topic}/server/startup", 0)
        self.client.subscribe(f"{self.root_topic}/server/shutdown", 0)
        self.client.subscribe(f"{self.root_topic}/clients/connect/ack", 0)
        self.client.loop_start()

        connect_time = time.time()
        while (not self.server_state.mqtt_connected) or (self.server_state.heartbeat_freq == -1):
            time.sleep(0.01)
            if connect_time < time.time() - timeout:
                msg = "KevinbotLib over MQTT handhsake timed out."
                self.client.loop_stop()
                self.client.disconnect()
                raise HandshakeTimeoutException(msg)

        self.connected = True

        self.client.publish(f"{self.root_topic}/clients/connect", self.cid, 0)

        self._hb_thread = Thread(target=self._hb_loop, args=(heartbeat,), daemon=True)
        self._hb_thread.name = f"KevinbotLib.Mqtt.Heartbeat:{self.cid}"
        self._hb_thread.start()

        self._server_hb_thread = Thread(target=self._server_hb_loop, daemon=True)
        self._server_hb_thread.name = f"KevinbotLib.Mqtt.ServerHeartbeat:{self.cid}"
        self._server_hb_thread.start()

        return rc

    def _server_hb_loop(self):
        while True:
            if not self.connected:
                break

            if self.server_state.heartbeat_freq == -1:
                time.sleep(1)
                continue

            if self._last_server_hb < datetime.fromtimestamp(0, timezone.utc) - timedelta(
                seconds=self.server_state.heartbeat_freq
            ):
                # server heartbeat is slow or stopped
                self.connected = False
                if self.on_server_disconnect:
                    self.on_server_disconnect()

            time.sleep(self.server_state.heartbeat_freq)

    def _hb_loop(self, heartbeat: float):
        while True:
            if not self.connected:
                break

            self.client.publish(f"{self.root_topic}/clients/heartbeat", f"{self.cid}:{self.ts.timestamp()}", 0)
            time.sleep(heartbeat)

    def send(self, data: str):
        """Determine topic and publish data. Compatible with send of `SerialKevinbot`

        Args:
            data (str): Data to parse and publish
        """
        if len(data.split("=", 2)) > 1:
            cmd, val = data.split("=", 2)
        else:
            cmd = data
            val = None

        self.client.publish(f"{self.root_topic}/{cmd.replace('.', '/')}", val, 0)

    def disconnect(self):
        """Disconnect from server"""
        super().disconnect()

        if self.mqtt_connected:
            self.client.publish(f"{self.root_topic}/clients/disconnect", self.cid, 0).wait_for_publish(1)
            self.client.loop_stop()
            self.client.disconnect()
        self.connected = False

    def request_enable(self) -> int:
        """Request the core to enable

        Returns:
            int: Always 1
        """
        self.client.publish(f"{self.root_topic}/main/state_request", "enable", 1)
        return 1

    def request_disable(self) -> int:
        """Request the core to disable

        Returns:
            int: Always 1
        """
        self.client.publish(f"{self.root_topic}/main/state_request", "disable", 1)
        return 1

    def e_stop(self):
        """Attempt to send and E-Stop signal to the Core"""
        self.client.publish(f"{self.root_topic}/main/estop", 1)

    @property
    def ts(self) -> datetime:
        """
        Get a semi-accurate timestamp from the server. Used for drivebase timeouts.

        Returns:
            datetime: Server time or UNIX timestamp 0 if server hasn't broadcasted a timestamp yet
        """
        ts = self.server_state.timestamp
        if ts:
            ts += datetime.now(timezone.utc) - self._last_ts_update
            return ts
        return datetime.fromtimestamp(0, timezone.utc)

    def _on_message(self, _, __, msg: MQTTMessage):
        logger.trace(f"Got MQTT message at: {msg.topic} payload={msg.payload!r} with qos={msg.qos}")

        if msg.topic[0] == "/" or msg.topic[-1] == "/":
            logger.warning(f"MQTT topic: {msg.topic} has a leading/trailing slash. Removing it.")
            topic = msg.topic.strip("/")
        else:
            topic = msg.topic

        value = msg.payload.decode("utf-8")

        subtopics = topic.split("/")[1:]
        match subtopics:
            case ["state"]:
                self._state = KevinbotState(**json.loads(value))
            case ["eyes", "state"]:
                if self._eyes:
                    self._eyes._load_data(value)  # noqa: SLF001
            case ["serverstate"]:
                new_state = KevinbotServerState(**json.loads(value))

                if self._server_state.timestamp != new_state.timestamp:
                    self._last_ts_update = datetime.now(timezone.utc)

                self._server_state = new_state
            case ["server", "startup"]:
                # we must reconnect
                self.client.publish(f"{self.root_topic}/clients/connect", self.cid, 0)
                self.connected = True

                if self.on_server_startup:
                    self.on_server_startup()
            case ["server", "shutdown"]:
                self.connected = False
                if self.on_server_disconnect:
                    self.on_server_disconnect()
            case ["clients", "connect", "ack"]:
                if value == f"ack:{self.cid}":
                    self.connected = True

        if self.callback:
            self.callback(subtopics, value)

ts: datetime property

Get a semi-accurate timestamp from the server. Used for drivebase timeouts.

Returns:

Name Type Description
datetime datetime

Server time or UNIX timestamp 0 if server hasn't broadcasted a timestamp yet

__init__(cid=None)

Instansiate a new KevinbotLib interface over MQTT

Parameters:

Name Type Description Default
cid str | None

MQTT Client id. Defaults to an auto-generated uuid.

None
Source code in src/kevinbotlib/core.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def __init__(self, cid: str | None = None) -> None:
    """Instansiate a new KevinbotLib interface over MQTT

    Args:
        cid (str | None, optional): MQTT Client id. Defaults to an auto-generated uuid.
    """
    super().__init__()
    self.type = KevinbotConnectionType.MQTT

    self.root_topic = "kevinbot"
    self.host = "localhost"
    self.port = 1883
    self.keepalive = 60
    self.connected = False

    self._hb_thread: Thread | None = None  # thread to produce client's heartbeat
    self._server_hb_thread: Thread | None = None  # thread to check in server heartbeat is slow/stopped

    self._callback: Callable[[list[str], str], Any] | None = None  # message callback
    self._on_server_startup: Callable[[], Any] | None = None
    self._on_server_disconnect: Callable[[], Any] | None = None

    self._eyes: MqttEyes | None = None

    self.cid = cid if cid else f"kevinbotlib-{shortuuid.random()}"  # client id
    self.client = Client(CallbackAPIVersion.VERSION2, self.cid)
    self.client.on_message = self._on_message

    atexit.register(self.disconnect)

connect(root_topic='kevinbot', host='localhost', port=1883, timeout=5.0, keepalive=60, heartbeat=1.0)

Connect to MQTT Broker

Parameters:

Name Type Description Default
root_topic str

Root communication topic. Defaults to "kevinbot".

'kevinbot'
host str

KevinbotLib server host. Defaults to "localhost".

'localhost'
port int

Kevinbot MQTT Broker port. Defaults to 1883.

1883
timeout float

KevinbotLib connection timeout in seconds. Defaults to 5.

5.0
keepalive int

Maximum period in seconds between communications with the broker. Defaults to 60.

60
heartbeat float

Heartbeat interval in seconds. Defaults to 1.0.

1.0

Returns:

Name Type Description
MQTTErrorCode MQTTErrorCode

Connection error

Source code in src/kevinbotlib/core.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def connect(
    self,
    root_topic: str = "kevinbot",
    host: str = "localhost",
    port: int = 1883,
    timeout: float = 5.0,
    keepalive: int = 60,
    heartbeat: float = 1.0,
) -> MQTTErrorCode:
    """Connect to MQTT Broker

    Args:
        root_topic (str, optional): Root communication topic. Defaults to "kevinbot".
        host (str, optional): KevinbotLib server host. Defaults to "localhost".
        port (int, optional): Kevinbot MQTT Broker port. Defaults to 1883.
        timeout (float, optional): KevinbotLib connection timeout in seconds. Defaults to 5.
        keepalive (int, optional): Maximum period in seconds between communications with the broker. Defaults to 60.
        heartbeat (float, optional): Heartbeat interval in seconds. Defaults to 1.0.

    Returns:
        MQTTErrorCode: Connection error
    """
    self.host = host
    self.port = port
    self.keepalive = keepalive
    self.root_topic = root_topic
    self.connected = False

    self._last_ts_update = datetime.fromtimestamp(0, timezone.utc)
    self._last_server_hb = datetime.fromtimestamp(0, timezone.utc)

    rc = self.client.connect(self.host, self.port, self.keepalive)
    self.client.subscribe(f"{self.root_topic}/state", 0)
    self.client.subscribe(f"{self.root_topic}/eyes/state", 0)
    self.client.subscribe(f"{self.root_topic}/serverstate", 0)
    self.client.subscribe(f"{self.root_topic}/server/startup", 0)
    self.client.subscribe(f"{self.root_topic}/server/shutdown", 0)
    self.client.subscribe(f"{self.root_topic}/clients/connect/ack", 0)
    self.client.loop_start()

    connect_time = time.time()
    while (not self.server_state.mqtt_connected) or (self.server_state.heartbeat_freq == -1):
        time.sleep(0.01)
        if connect_time < time.time() - timeout:
            msg = "KevinbotLib over MQTT handhsake timed out."
            self.client.loop_stop()
            self.client.disconnect()
            raise HandshakeTimeoutException(msg)

    self.connected = True

    self.client.publish(f"{self.root_topic}/clients/connect", self.cid, 0)

    self._hb_thread = Thread(target=self._hb_loop, args=(heartbeat,), daemon=True)
    self._hb_thread.name = f"KevinbotLib.Mqtt.Heartbeat:{self.cid}"
    self._hb_thread.start()

    self._server_hb_thread = Thread(target=self._server_hb_loop, daemon=True)
    self._server_hb_thread.name = f"KevinbotLib.Mqtt.ServerHeartbeat:{self.cid}"
    self._server_hb_thread.start()

    return rc

send(data)

Determine topic and publish data. Compatible with send of SerialKevinbot

Parameters:

Name Type Description Default
data str

Data to parse and publish

required
Source code in src/kevinbotlib/core.py
590
591
592
593
594
595
596
597
598
599
600
601
602
def send(self, data: str):
    """Determine topic and publish data. Compatible with send of `SerialKevinbot`

    Args:
        data (str): Data to parse and publish
    """
    if len(data.split("=", 2)) > 1:
        cmd, val = data.split("=", 2)
    else:
        cmd = data
        val = None

    self.client.publish(f"{self.root_topic}/{cmd.replace('.', '/')}", val, 0)

disconnect()

Disconnect from server

Source code in src/kevinbotlib/core.py
604
605
606
607
608
609
610
611
612
def disconnect(self):
    """Disconnect from server"""
    super().disconnect()

    if self.mqtt_connected:
        self.client.publish(f"{self.root_topic}/clients/disconnect", self.cid, 0).wait_for_publish(1)
        self.client.loop_stop()
        self.client.disconnect()
    self.connected = False

request_enable()

Request the core to enable

Returns:

Name Type Description
int int

Always 1

Source code in src/kevinbotlib/core.py
614
615
616
617
618
619
620
621
def request_enable(self) -> int:
    """Request the core to enable

    Returns:
        int: Always 1
    """
    self.client.publish(f"{self.root_topic}/main/state_request", "enable", 1)
    return 1

request_disable()

Request the core to disable

Returns:

Name Type Description
int int

Always 1

Source code in src/kevinbotlib/core.py
623
624
625
626
627
628
629
630
def request_disable(self) -> int:
    """Request the core to disable

    Returns:
        int: Always 1
    """
    self.client.publish(f"{self.root_topic}/main/state_request", "disable", 1)
    return 1

e_stop()

Attempt to send and E-Stop signal to the Core

Source code in src/kevinbotlib/core.py
632
633
634
def e_stop(self):
    """Attempt to send and E-Stop signal to the Core"""
    self.client.publish(f"{self.root_topic}/main/estop", 1)

Drivebase

Bases: BaseKevinbotSubsystem

Drivebase subsystem for Kevinbot

Source code in src/kevinbotlib/core.py
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
class Drivebase(BaseKevinbotSubsystem):
    """Drivebase subsystem for Kevinbot"""

    def get_amps(self) -> list[float]:
        """Get the amps being used by the drivebase

        Returns:
            list[float]: Amps
        """
        return self.robot.get_state().motion.amps

    def get_watts(self) -> list[float]:
        """Get the watts being used by the drivebase

        Returns:
            list[float]: Watts
        """
        return self.robot.get_state().motion.watts

    def get_powers(self) -> tuple[int, int]:
        """Get the currently set wheel speeds in percent

        Returns:
            tuple[int, int]: Percent values from 0 to 100
        """
        return self.robot.get_state().motion.left_power, self.robot.get_state().motion.right_power

    def get_states(self) -> list[MotorDriveStatus]:
        """Get the wheels states

        Returns:
            list[MotorDriveStatus]: States
        """
        return self.robot.get_state().motion.status

    def drive_at_power(self, left: float, right: float):
        """Set the drive power for wheels. 0 to 1

        Args:
            left (float): Left motor power
            right (float): Right motor power
        """
        if isinstance(self.robot, SerialKevinbot):
            self.robot.send(f"drive.power={int(left*100)},{int(right*100)}")
        elif isinstance(self.robot, MqttKevinbot):
            self.robot.client.publish(
                f"{self.robot.root_topic}/drive/power",
                f"{int(left*100)},{int(right*100)},{self.robot.cid},{self.robot.ts}",
                1,
            )

    def stop(self):
        """Set all wheel powers to 0"""
        self.drive_at_power(0, 0)

get_amps()

Get the amps being used by the drivebase

Returns:

Type Description
list[float]

list[float]: Amps

Source code in src/kevinbotlib/core.py
697
698
699
700
701
702
703
def get_amps(self) -> list[float]:
    """Get the amps being used by the drivebase

    Returns:
        list[float]: Amps
    """
    return self.robot.get_state().motion.amps

get_watts()

Get the watts being used by the drivebase

Returns:

Type Description
list[float]

list[float]: Watts

Source code in src/kevinbotlib/core.py
705
706
707
708
709
710
711
def get_watts(self) -> list[float]:
    """Get the watts being used by the drivebase

    Returns:
        list[float]: Watts
    """
    return self.robot.get_state().motion.watts

get_powers()

Get the currently set wheel speeds in percent

Returns:

Type Description
tuple[int, int]

tuple[int, int]: Percent values from 0 to 100

Source code in src/kevinbotlib/core.py
713
714
715
716
717
718
719
def get_powers(self) -> tuple[int, int]:
    """Get the currently set wheel speeds in percent

    Returns:
        tuple[int, int]: Percent values from 0 to 100
    """
    return self.robot.get_state().motion.left_power, self.robot.get_state().motion.right_power

get_states()

Get the wheels states

Returns:

Type Description
list[MotorDriveStatus]

list[MotorDriveStatus]: States

Source code in src/kevinbotlib/core.py
721
722
723
724
725
726
727
def get_states(self) -> list[MotorDriveStatus]:
    """Get the wheels states

    Returns:
        list[MotorDriveStatus]: States
    """
    return self.robot.get_state().motion.status

drive_at_power(left, right)

Set the drive power for wheels. 0 to 1

Parameters:

Name Type Description Default
left float

Left motor power

required
right float

Right motor power

required
Source code in src/kevinbotlib/core.py
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
def drive_at_power(self, left: float, right: float):
    """Set the drive power for wheels. 0 to 1

    Args:
        left (float): Left motor power
        right (float): Right motor power
    """
    if isinstance(self.robot, SerialKevinbot):
        self.robot.send(f"drive.power={int(left*100)},{int(right*100)}")
    elif isinstance(self.robot, MqttKevinbot):
        self.robot.client.publish(
            f"{self.robot.root_topic}/drive/power",
            f"{int(left*100)},{int(right*100)},{self.robot.cid},{self.robot.ts}",
            1,
        )

stop()

Set all wheel powers to 0

Source code in src/kevinbotlib/core.py
745
746
747
def stop(self):
    """Set all wheel powers to 0"""
    self.drive_at_power(0, 0)

Servo

Individually controllable servo

Source code in src/kevinbotlib/core.py
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
class Servo:
    """Individually controllable servo"""

    def __init__(self, robot: SerialKevinbot | MqttKevinbot, index: int) -> None:
        self.robot = robot
        self.index = index

    @property
    def bank(self) -> int:
        """Get the bank the servo is in

        Returns:
            int: Bank number
        """
        return self.index // 16

    @property
    def angle(self) -> int:
        """Get the optimistic current servo angle

        Returns:
            int: Angle in degrees
        """
        return self.robot.get_state().servos.angles[self.index]

    @angle.setter
    def angle(self, angle: int):
        """Set the optimistic servo angle

        Args:
            angle (int): Angle in degrees
        """
        if isinstance(self.robot, SerialKevinbot):
            self.robot.send(f"s={self.index},{angle}")
        elif isinstance(self.robot, MqttKevinbot):
            self.robot.client.publish(f"{self.robot.root_topic}/servo/set", f"{self.index},{angle}", 0)
        else:
            return

        self.robot.get_state().servos.angles[self.index] = angle

bank: int property

Get the bank the servo is in

Returns:

Name Type Description
int int

Bank number

angle: int property writable

Get the optimistic current servo angle

Returns:

Name Type Description
int int

Angle in degrees

Servos

Bases: BaseKevinbotSubsystem

Servo subsystem for Kevinbot

Source code in src/kevinbotlib/core.py
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
class Servos(BaseKevinbotSubsystem):
    """Servo subsystem for Kevinbot"""

    def __len__(self) -> int:
        """Length will always be 32 since the P2 Kevinbot Board can only control 32

        Returns:
            int: Number of servos in the subsystem
        """
        return 32

    def __iter__(self):
        for i in range(self.__len__()):
            yield Servo(self.robot, i)

    def __getitem__(self, index: int):
        if index > self.__len__():
            msg = f"Servo index {index} > {self.__len__()}"
            raise IndexError(msg)
        if index < 0:
            msg = f"Servo index {index} < 0"
            raise IndexError(msg)
        return Servo(self.robot, index)

    def get_servo(self, channel: int) -> Servo:
        """Get an individual servo in the subsystem

        Args:
            channel (int): PWM Port

        Returns:
            Servo: Individual servo
        """
        if channel > self.__len__() or channel < 0:
            msg = f"Servo channel {channel} is out of bounds."
            raise IndexError(msg)
        return Servo(self.robot, channel)

    @property
    def all(self) -> int:
        if all(i == self.robot.get_state().servos.angles[0] for i in self.robot.get_state().servos.angles):
            return self.robot.get_state().servos.angles[0]
        return -1

    @all.setter
    def all(self, angle: int):
        self.robot.send(f"servo.all={angle}")
        self.robot.get_state().servos.angles = [angle] * self.__len__()

__len__()

Length will always be 32 since the P2 Kevinbot Board can only control 32

Returns:

Name Type Description
int int

Number of servos in the subsystem

Source code in src/kevinbotlib/core.py
795
796
797
798
799
800
801
def __len__(self) -> int:
    """Length will always be 32 since the P2 Kevinbot Board can only control 32

    Returns:
        int: Number of servos in the subsystem
    """
    return 32

get_servo(channel)

Get an individual servo in the subsystem

Parameters:

Name Type Description Default
channel int

PWM Port

required

Returns:

Name Type Description
Servo Servo

Individual servo

Source code in src/kevinbotlib/core.py
816
817
818
819
820
821
822
823
824
825
826
827
828
def get_servo(self, channel: int) -> Servo:
    """Get an individual servo in the subsystem

    Args:
        channel (int): PWM Port

    Returns:
        Servo: Individual servo
    """
    if channel > self.__len__() or channel < 0:
        msg = f"Servo channel {channel} is out of bounds."
        raise IndexError(msg)
    return Servo(self.robot, channel)

Lighting

Bases: BaseKevinbotSubsystem

Lighting subsystem for Kevinbot

Source code in src/kevinbotlib/core.py
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
class Lighting(BaseKevinbotSubsystem):
    """Lighting subsystem for Kevinbot"""

    class Channel(Enum):
        """Lighting segment identifier"""

        Head = "head"
        Body = "body"
        Base = "base"

    def get_state(self) -> LightingState:
        """Get the state of the robot's light segments

        Returns:
            LightingState: State
        """
        return self.robot.get_state().lighting

    def set_cam_brightness(self, brightness: int):
        """Set brightness of camera illumination

        Args:
            brightness (int): Brightness from 0 to 255
        """
        self.robot.send(f"lighting.cam.bright={brightness}")
        self.robot.get_state().lighting.camera = brightness

    def set_brightness(self, channel: Channel, brightness: int):
        """Set the brightness of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            brightness (int): Brightness from 0 to 255
        """
        self.robot.send(f"lighting.{channel.value}.bright={brightness}")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_bright = brightness
            case self.Channel.Body:
                self.robot.get_state().lighting.body_bright = brightness
            case self.Channel.Head:
                self.robot.get_state().lighting.head_bright = brightness

    def set_color1(self, channel: Channel, color: list[int] | tuple[int, int, int]):
        """Set the Color 1 of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            color (Iterable[int]): RGB Color values. Must have a length of 3
        """
        self.robot.send(f"lighting.{channel.value}.color1={color[0]:02x}{color[1]:02x}{color[2]:02x}00")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_color1 = list(color)
            case self.Channel.Body:
                self.robot.get_state().lighting.base_color1 = list(color)
            case self.Channel.Head:
                self.robot.get_state().lighting.base_color1 = list(color)

    def set_color2(self, channel: Channel, color: list[int] | tuple[int, int, int]):
        """Set the Color 2 of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            color (Iterable[int]): RGB Color values. Must have a length of 3
        """
        self.robot.send(f"lighting.{channel.value}.color2={color[0]:02x}{color[1]:02x}{color[2]:02x}00")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_color2 = list(color)
            case self.Channel.Body:
                self.robot.get_state().lighting.base_color2 = list(color)
            case self.Channel.Head:
                self.robot.get_state().lighting.base_color2 = list(color)

    def set_effect(self, channel: Channel, effect: str):
        """Set the animation of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            effect (str): Animation ID
        """
        self.robot.send(f"lighting.{channel.value}.effect={effect}")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_effect = effect
            case self.Channel.Body:
                self.robot.get_state().lighting.base_effect = effect
            case self.Channel.Head:
                self.robot.get_state().lighting.base_effect = effect

    def set_update(self, channel: Channel, update: int):
        """Set the animation of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            update (int): Update rate (no fixed unit)
        """
        self.robot.send(f"lighting.{channel.value}.update={update}")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_update = update
            case self.Channel.Body:
                self.robot.get_state().lighting.base_update = update
            case self.Channel.Head:
                self.robot.get_state().lighting.base_update = update

Channel

Bases: Enum

Lighting segment identifier

Source code in src/kevinbotlib/core.py
845
846
847
848
849
850
class Channel(Enum):
    """Lighting segment identifier"""

    Head = "head"
    Body = "body"
    Base = "base"

get_state()

Get the state of the robot's light segments

Returns:

Name Type Description
LightingState LightingState

State

Source code in src/kevinbotlib/core.py
852
853
854
855
856
857
858
def get_state(self) -> LightingState:
    """Get the state of the robot's light segments

    Returns:
        LightingState: State
    """
    return self.robot.get_state().lighting

set_cam_brightness(brightness)

Set brightness of camera illumination

Parameters:

Name Type Description Default
brightness int

Brightness from 0 to 255

required
Source code in src/kevinbotlib/core.py
860
861
862
863
864
865
866
867
def set_cam_brightness(self, brightness: int):
    """Set brightness of camera illumination

    Args:
        brightness (int): Brightness from 0 to 255
    """
    self.robot.send(f"lighting.cam.bright={brightness}")
    self.robot.get_state().lighting.camera = brightness

set_brightness(channel, brightness)

Set the brightness of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
brightness int

Brightness from 0 to 255

required
Source code in src/kevinbotlib/core.py
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
def set_brightness(self, channel: Channel, brightness: int):
    """Set the brightness of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        brightness (int): Brightness from 0 to 255
    """
    self.robot.send(f"lighting.{channel.value}.bright={brightness}")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_bright = brightness
        case self.Channel.Body:
            self.robot.get_state().lighting.body_bright = brightness
        case self.Channel.Head:
            self.robot.get_state().lighting.head_bright = brightness

set_color1(channel, color)

Set the Color 1 of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
color Iterable[int]

RGB Color values. Must have a length of 3

required
Source code in src/kevinbotlib/core.py
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
def set_color1(self, channel: Channel, color: list[int] | tuple[int, int, int]):
    """Set the Color 1 of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        color (Iterable[int]): RGB Color values. Must have a length of 3
    """
    self.robot.send(f"lighting.{channel.value}.color1={color[0]:02x}{color[1]:02x}{color[2]:02x}00")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_color1 = list(color)
        case self.Channel.Body:
            self.robot.get_state().lighting.base_color1 = list(color)
        case self.Channel.Head:
            self.robot.get_state().lighting.base_color1 = list(color)

set_color2(channel, color)

Set the Color 2 of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
color Iterable[int]

RGB Color values. Must have a length of 3

required
Source code in src/kevinbotlib/core.py
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
def set_color2(self, channel: Channel, color: list[int] | tuple[int, int, int]):
    """Set the Color 2 of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        color (Iterable[int]): RGB Color values. Must have a length of 3
    """
    self.robot.send(f"lighting.{channel.value}.color2={color[0]:02x}{color[1]:02x}{color[2]:02x}00")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_color2 = list(color)
        case self.Channel.Body:
            self.robot.get_state().lighting.base_color2 = list(color)
        case self.Channel.Head:
            self.robot.get_state().lighting.base_color2 = list(color)

set_effect(channel, effect)

Set the animation of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
effect str

Animation ID

required
Source code in src/kevinbotlib/core.py
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
def set_effect(self, channel: Channel, effect: str):
    """Set the animation of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        effect (str): Animation ID
    """
    self.robot.send(f"lighting.{channel.value}.effect={effect}")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_effect = effect
        case self.Channel.Body:
            self.robot.get_state().lighting.base_effect = effect
        case self.Channel.Head:
            self.robot.get_state().lighting.base_effect = effect

set_update(channel, update)

Set the animation of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
update int

Update rate (no fixed unit)

required
Source code in src/kevinbotlib/core.py
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
def set_update(self, channel: Channel, update: int):
    """Set the animation of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        update (int): Update rate (no fixed unit)
    """
    self.robot.send(f"lighting.{channel.value}.update={update}")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_update = update
        case self.Channel.Body:
            self.robot.get_state().lighting.base_update = update
        case self.Channel.Head:
            self.robot.get_state().lighting.base_update = update

BaseKevinbotEyes

The base Kevinbot Eyes class.

Not to be used directly

Source code in src/kevinbotlib/eyes.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
class BaseKevinbotEyes:
    """The base Kevinbot Eyes class.

    Not to be used directly
    """

    def __init__(self) -> None:
        self._state = KevinbotEyesState()
        self.type = KevinbotConnectionType.BASE

        self._auto_disconnect = True

        self._robot: MqttKevinbot = MqttKevinbot()

    def get_state(self) -> KevinbotEyesState:
        """Gets the current state of the eyes

        Returns:
            KevinbotEyesState: State class
        """
        return self._state

    def disconnect(self):
        """Basic disconnect"""
        self._state.connected = False

    @property
    def auto_disconnect(self) -> bool:
        """Getter for auto disconnect state.

        Returns:
            bool: Whether to disconnect on application exit
        """
        return self._auto_disconnect

    @auto_disconnect.setter
    def auto_disconnect(self, value: bool):
        """Setter for auto disconnect.

        Args:
            value (bool): Whether to disconnect on application exit
        """
        self._auto_disconnect = value
        if value:
            atexit.register(self.disconnect)
        else:
            atexit.unregister(self.disconnect)

    def send(self, data: str):
        """Null implementation of the send method

        Args:
            data (str): Data to send nowhere

        Raises:
            NotImplementedError: Always raised
        """
        msg = f"Function not implemented, attempting to send {data}"
        raise NotImplementedError(msg)

    def set_skin(self, skin: EyeSkin):
        """Set the current skin

        Args:
            skin (EyeSkin): Skin index
        """
        if isinstance(self, SerialEyes):
            self._state.settings.states.page = skin
            self.send(f"setState={skin.value}")
        elif isinstance(self, MqttEyes):
            self._robot.client.publish(f"{self._robot.root_topic}/eyes/skin", skin.value, 0)

    def set_backlight(self, bl: float):
        """Set the current backlight brightness

        Args:
            bl (float): Brightness from 0 to 1
        """
        if isinstance(self, SerialEyes):
            self._state.settings.display.backlight = min(int(bl * 100), 100)
            self.send(f"setBacklight={self._state.settings.display.backlight}")
        elif isinstance(self, MqttEyes):
            self._robot.client.publish(f"{self._robot.root_topic}/eyes/backlight", int(255 * bl), 0)

    def get_backlight(self):
        """Get the current backlight setting

        Returns:
            float: Value from 0 to 1
        """
        return self._state.settings.display.backlight / 255


    def set_motion(self, motion: EyeMotion):
        """Set the current backlight brightness

        Args:
            motion (EyeMotion): Motion mode
        """
        if isinstance(self, SerialEyes):
            self._state.settings.states.motion = motion
            self.send(f"setMotion={motion.value}")
        elif isinstance(self, MqttEyes):
            self._robot.client.publish(f"{self._robot.root_topic}/eyes/motion", motion.value, 0)

    def set_manual_pos(self, x: int, y: int):
        """Set the on-screen position of pupil

        Args:
            x (int): X Position of pupil
            y (int): Y Position of pupil
        """
        if isinstance(self, SerialEyes):
            self._state.settings.motions.pos = x, y
            self.send(f"setPosition={x},{y}")
        elif isinstance(self, MqttEyes):
            self._robot.client.publish(f"{self._robot.root_topic}/eyes/pos", f"{x},{y}", 0)

    def set_skin_option(self, data: list):
        """Set a raw skin option.

        Args:
            data (list): list of keys, last item is the value
        """
        if len(data) < 3:  # noqa: PLR2004
            logger.error("Data must have at least 2 keys and one value.")
            return

        keys = data[:-1]
        value = data[-1]

        skin_key = keys[0]
        if skin_key not in self._state.settings.skins.model_dump():
            logger.error(f"Invalid skin key: {skin_key}")
            return

        skin = getattr(self._state.settings.skins, skin_key)
        for key in keys[1:]:
            if not hasattr(skin, key):
                logger.error(f"Invalid key '{key}' for skin '{skin_key}'")
                return
            if keys.index(key) == len(keys[1:]) - 1:  # Final attribute to set
                setattr(skin, key, value)
            else:
                skin = getattr(skin, key)

        if isinstance(self, SerialEyes):
            self.send(f"setSkinOption={':'.join(map(str, data))}")
        elif isinstance(self, MqttEyes):
            self._robot.client.publish(f"{self._robot.root_topic}/eyes/skinopt", ":".join(map(str, data)), 0)

    @property
    def skins(self) -> _EyeSkinManager:
        return _EyeSkinManager(self)

auto_disconnect: bool property writable

Getter for auto disconnect state.

Returns:

Name Type Description
bool bool

Whether to disconnect on application exit

get_state()

Gets the current state of the eyes

Returns:

Name Type Description
KevinbotEyesState KevinbotEyesState

State class

Source code in src/kevinbotlib/eyes.py
289
290
291
292
293
294
295
def get_state(self) -> KevinbotEyesState:
    """Gets the current state of the eyes

    Returns:
        KevinbotEyesState: State class
    """
    return self._state

disconnect()

Basic disconnect

Source code in src/kevinbotlib/eyes.py
297
298
299
def disconnect(self):
    """Basic disconnect"""
    self._state.connected = False

send(data)

Null implementation of the send method

Parameters:

Name Type Description Default
data str

Data to send nowhere

required

Raises:

Type Description
NotImplementedError

Always raised

Source code in src/kevinbotlib/eyes.py
323
324
325
326
327
328
329
330
331
332
333
def send(self, data: str):
    """Null implementation of the send method

    Args:
        data (str): Data to send nowhere

    Raises:
        NotImplementedError: Always raised
    """
    msg = f"Function not implemented, attempting to send {data}"
    raise NotImplementedError(msg)

set_skin(skin)

Set the current skin

Parameters:

Name Type Description Default
skin EyeSkin

Skin index

required
Source code in src/kevinbotlib/eyes.py
335
336
337
338
339
340
341
342
343
344
345
def set_skin(self, skin: EyeSkin):
    """Set the current skin

    Args:
        skin (EyeSkin): Skin index
    """
    if isinstance(self, SerialEyes):
        self._state.settings.states.page = skin
        self.send(f"setState={skin.value}")
    elif isinstance(self, MqttEyes):
        self._robot.client.publish(f"{self._robot.root_topic}/eyes/skin", skin.value, 0)

set_backlight(bl)

Set the current backlight brightness

Parameters:

Name Type Description Default
bl float

Brightness from 0 to 1

required
Source code in src/kevinbotlib/eyes.py
347
348
349
350
351
352
353
354
355
356
357
def set_backlight(self, bl: float):
    """Set the current backlight brightness

    Args:
        bl (float): Brightness from 0 to 1
    """
    if isinstance(self, SerialEyes):
        self._state.settings.display.backlight = min(int(bl * 100), 100)
        self.send(f"setBacklight={self._state.settings.display.backlight}")
    elif isinstance(self, MqttEyes):
        self._robot.client.publish(f"{self._robot.root_topic}/eyes/backlight", int(255 * bl), 0)

get_backlight()

Get the current backlight setting

Returns:

Name Type Description
float

Value from 0 to 1

Source code in src/kevinbotlib/eyes.py
359
360
361
362
363
364
365
def get_backlight(self):
    """Get the current backlight setting

    Returns:
        float: Value from 0 to 1
    """
    return self._state.settings.display.backlight / 255

set_motion(motion)

Set the current backlight brightness

Parameters:

Name Type Description Default
motion EyeMotion

Motion mode

required
Source code in src/kevinbotlib/eyes.py
368
369
370
371
372
373
374
375
376
377
378
def set_motion(self, motion: EyeMotion):
    """Set the current backlight brightness

    Args:
        motion (EyeMotion): Motion mode
    """
    if isinstance(self, SerialEyes):
        self._state.settings.states.motion = motion
        self.send(f"setMotion={motion.value}")
    elif isinstance(self, MqttEyes):
        self._robot.client.publish(f"{self._robot.root_topic}/eyes/motion", motion.value, 0)

set_manual_pos(x, y)

Set the on-screen position of pupil

Parameters:

Name Type Description Default
x int

X Position of pupil

required
y int

Y Position of pupil

required
Source code in src/kevinbotlib/eyes.py
380
381
382
383
384
385
386
387
388
389
390
391
def set_manual_pos(self, x: int, y: int):
    """Set the on-screen position of pupil

    Args:
        x (int): X Position of pupil
        y (int): Y Position of pupil
    """
    if isinstance(self, SerialEyes):
        self._state.settings.motions.pos = x, y
        self.send(f"setPosition={x},{y}")
    elif isinstance(self, MqttEyes):
        self._robot.client.publish(f"{self._robot.root_topic}/eyes/pos", f"{x},{y}", 0)

set_skin_option(data)

Set a raw skin option.

Parameters:

Name Type Description Default
data list

list of keys, last item is the value

required
Source code in src/kevinbotlib/eyes.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def set_skin_option(self, data: list):
    """Set a raw skin option.

    Args:
        data (list): list of keys, last item is the value
    """
    if len(data) < 3:  # noqa: PLR2004
        logger.error("Data must have at least 2 keys and one value.")
        return

    keys = data[:-1]
    value = data[-1]

    skin_key = keys[0]
    if skin_key not in self._state.settings.skins.model_dump():
        logger.error(f"Invalid skin key: {skin_key}")
        return

    skin = getattr(self._state.settings.skins, skin_key)
    for key in keys[1:]:
        if not hasattr(skin, key):
            logger.error(f"Invalid key '{key}' for skin '{skin_key}'")
            return
        if keys.index(key) == len(keys[1:]) - 1:  # Final attribute to set
            setattr(skin, key, value)
        else:
            skin = getattr(skin, key)

    if isinstance(self, SerialEyes):
        self.send(f"setSkinOption={':'.join(map(str, data))}")
    elif isinstance(self, MqttEyes):
        self._robot.client.publish(f"{self._robot.root_topic}/eyes/skinopt", ":".join(map(str, data)), 0)

SerialEyes

Bases: BaseKevinbotEyes

The main serial Kevinbot Eyes class

Source code in src/kevinbotlib/eyes.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
class SerialEyes(BaseKevinbotEyes):
    """The main serial Kevinbot Eyes class"""

    def __init__(self) -> None:
        super().__init__()
        self.type = KevinbotConnectionType.SERIAL

        self.serial: Serial | None = None
        self.rx_thread: Thread | None = None

        self._callback: Callable[[str, str | None], Any] | None = None
        self._state_callback: Callable[[KevinbotEyesState], Any] | None = None

        atexit.register(self.disconnect)

    def connect(
        self,
        port: str,
        baud: int,
        timeout: float,
        ser_timeout: float = 0.5,
    ):
        """Start a connection with Kevinbot Eyes

        Args:
            port (str): Serial port to use (`/dev/ttyUSB0` is standard with the typical Kevinbot Hardware)
            baud (int): Baud rate to use (`115200` is typical for the defualt eye configs)
            timeout (float): Timeout for handshake
            ser_timeout (float, optional): Readline timeout, should be lower than `timeout`. Defaults to 0.5.

        Raises:
            HandshakeTimeoutException: Eyes didn't respond to the connection handshake before the timeout
        """
        serial = self._setup_serial(port, baud, ser_timeout)

        start_time = time.monotonic()
        hs_started = False
        while True:
            if not hs_started:
                serial.write(b"connectionReady\n")

            line = serial.readline().decode("utf-8", errors="ignore").strip("\n")

            if line == "handshake.request":
                hs_started = True
                serial.write(b"getSettings=true\n")

            if line == "settTx.done":
                serial.write(b"handshake.complete\n")
                break

            data = line.split("=", 2)
            cmd = line.split("=", 2)[0]
            val = line.split("=", 2)[1] if len(data) > 1 else None

            if cmd.startswith("eyeSettings."):
                # Remove prefix and split into path and value
                setting = cmd[len("eyeSettings.") :]

                path = setting.split(".")

                if not val:
                    logger.error(f"Got eyeSettings command without a value: {cmd} :: {val}")
                    continue

                # Handle array values [x, y]
                if val.startswith("[") and val.endswith("]"):
                    value_str = val.strip("[]")
                    value = tuple(int(x.strip()) for x in value_str.split(","))
                # Handle hex colors
                elif val.startswith("#"):
                    value = val
                # Handle quoted strings
                elif val.startswith('"') and val.endswith('"'):
                    value = val.strip('"')
                # Handle numbers
                else:
                    try:
                        value = int(val)
                    except ValueError:
                        value = val

                # Create a dict representation of the settings
                settings_dict = self._state.settings.model_dump()

                # Navigate to the correct nested dictionary
                current_dict = settings_dict
                for i, key in enumerate(path[:-1]):
                    if key not in current_dict:
                        logger.error(f"Invalid path: {'.'.join(path[:i+1])}")
                        continue
                    if not isinstance(current_dict[key], dict):
                        logger.error(f"Cannot navigate through non-dict value at {'.'.join(path[:i+1])}")
                        continue
                    current_dict = current_dict[key]

                # Update the value
                if path[-1] not in current_dict:
                    logger.error(f"Invalid setting: {'.'.join(path)}")
                    continue
                current_dict[path[-1]] = value

                # Create new settings instance with updated values
                self._state.settings = EyeSettings.model_validate(settings_dict)

            if time.monotonic() - start_time > timeout:
                msg = "Handshake timed out"
                raise HandshakeTimeoutException(msg)

            time.sleep(0.1)  # Avoid spamming the connection

        # Data rx thread
        self.rx_thread = Thread(target=self._rx_loop, args=(serial, "="), daemon=True)
        self.rx_thread.name = "KevinbotLib.Eyes.Rx"
        self.rx_thread.start()

        self._state.connected = True

    def disconnect(self):
        super().disconnect()
        if self.serial and self.serial.is_open:
            self.send("resetConnection")
            self.serial.flush()
            self.serial.close()
        else:
            logger.warning("Already disconnected")

    def update(self):
        """Retrive updated settings from eyes"""

        self.send("getSettings=true")

    @property
    def on_state_updated(self) -> Callable[[KevinbotEyesState], Any] | None:
        return self._state_callback

    @on_state_updated.setter
    def on_state_updated(self, callback: Callable[[KevinbotEyesState], Any] | None):
        self._state_callback = callback

    def send(self, data: str):
        """Send a string through serial.

        Automatically adds a newline.

        Args:
            data (str): Data to send
        """
        self.raw_tx((data + "\n").encode("utf-8"))

    def raw_tx(self, data: bytes):
        """Send raw bytes over serial.

        Args:
            data (bytes): Raw data
        """
        if self.serial:
            self.serial.write(data)
        else:
            logger.warning(f"Couldn't transmit data: {data!r}, Eyes aren't connected")

    @property
    def callback(self) -> Callable[[str, str | None], Any] | None:
        return self._callback

    @callback.setter
    def callback(self, callback: Callable[[str, str | None], Any]) -> None:
        self._callback = callback

    def _setup_serial(self, port: str, baud: int, timeout: float = 1):
        self.serial = Serial(port, baud, timeout=timeout)
        return self.serial

    def _rx_loop(self, serial: Serial, delimeter: str = "="):
        while True:
            try:
                raw: bytes = serial.readline()
            except TypeError:
                # serial has been stopped
                return

            cmd: str = raw.decode("utf-8").split(delimeter, maxsplit=1)[0].strip().replace("\00", "")
            if not cmd:
                continue

            val: str | None = None
            if len(raw.decode("utf-8").split(delimeter)) > 1:
                val = raw.decode("utf-8").split(delimeter, maxsplit=1)[1].strip("\r\n").replace("\00", "")

            if cmd.startswith("eyeSettings."):
                # Remove prefix and split into path and value
                setting = cmd[len("eyeSettings.") :]

                path = setting.split(".")

                if not val:
                    logger.error(f"Got eyeSettings command without a value: {cmd} :: {val}")
                    continue

                # Handle array values [x, y]
                if val.startswith("[") and val.endswith("]"):
                    value_str = val.strip("[]")
                    value = tuple(int(x.strip()) for x in value_str.split(","))
                # Handle hex colors
                elif val.startswith("#"):
                    value = val
                # Handle quoted strings
                elif val.startswith('"') and val.endswith('"'):
                    value = val.strip('"')
                # Handle numbers
                else:
                    try:
                        value = int(val)
                    except ValueError:
                        value = val

                # Create a dict representation of the settings
                settings_dict = self._state.settings.model_dump()

                # Navigate to the correct nested dictionary
                current_dict = settings_dict
                for i, key in enumerate(path[:-1]):
                    if key not in current_dict:
                        logger.error(f"Invalid path: {'.'.join(path[:i+1])}")
                        continue
                    if not isinstance(current_dict[key], dict):
                        logger.error(f"Cannot navigate through non-dict value at {'.'.join(path[:i+1])}")
                        continue
                    current_dict = current_dict[key]

                # Update the value
                if path[-1] not in current_dict:
                    logger.error(f"Invalid setting: {'.'.join(path)}")
                    continue
                current_dict[path[-1]] = value

                # Create new settings instance with updated values
                self._state.settings = EyeSettings.model_validate(settings_dict)
            else:
                match cmd:
                    case "settTx.done":
                        if self.on_state_updated:
                            self.on_state_updated(self._state)

            if self.callback:
                self.callback(cmd, val)

connect(port, baud, timeout, ser_timeout=0.5)

Start a connection with Kevinbot Eyes

Parameters:

Name Type Description Default
port str

Serial port to use (/dev/ttyUSB0 is standard with the typical Kevinbot Hardware)

required
baud int

Baud rate to use (115200 is typical for the defualt eye configs)

required
timeout float

Timeout for handshake

required
ser_timeout float

Readline timeout, should be lower than timeout. Defaults to 0.5.

0.5

Raises:

Type Description
HandshakeTimeoutException

Eyes didn't respond to the connection handshake before the timeout

Source code in src/kevinbotlib/eyes.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
def connect(
    self,
    port: str,
    baud: int,
    timeout: float,
    ser_timeout: float = 0.5,
):
    """Start a connection with Kevinbot Eyes

    Args:
        port (str): Serial port to use (`/dev/ttyUSB0` is standard with the typical Kevinbot Hardware)
        baud (int): Baud rate to use (`115200` is typical for the defualt eye configs)
        timeout (float): Timeout for handshake
        ser_timeout (float, optional): Readline timeout, should be lower than `timeout`. Defaults to 0.5.

    Raises:
        HandshakeTimeoutException: Eyes didn't respond to the connection handshake before the timeout
    """
    serial = self._setup_serial(port, baud, ser_timeout)

    start_time = time.monotonic()
    hs_started = False
    while True:
        if not hs_started:
            serial.write(b"connectionReady\n")

        line = serial.readline().decode("utf-8", errors="ignore").strip("\n")

        if line == "handshake.request":
            hs_started = True
            serial.write(b"getSettings=true\n")

        if line == "settTx.done":
            serial.write(b"handshake.complete\n")
            break

        data = line.split("=", 2)
        cmd = line.split("=", 2)[0]
        val = line.split("=", 2)[1] if len(data) > 1 else None

        if cmd.startswith("eyeSettings."):
            # Remove prefix and split into path and value
            setting = cmd[len("eyeSettings.") :]

            path = setting.split(".")

            if not val:
                logger.error(f"Got eyeSettings command without a value: {cmd} :: {val}")
                continue

            # Handle array values [x, y]
            if val.startswith("[") and val.endswith("]"):
                value_str = val.strip("[]")
                value = tuple(int(x.strip()) for x in value_str.split(","))
            # Handle hex colors
            elif val.startswith("#"):
                value = val
            # Handle quoted strings
            elif val.startswith('"') and val.endswith('"'):
                value = val.strip('"')
            # Handle numbers
            else:
                try:
                    value = int(val)
                except ValueError:
                    value = val

            # Create a dict representation of the settings
            settings_dict = self._state.settings.model_dump()

            # Navigate to the correct nested dictionary
            current_dict = settings_dict
            for i, key in enumerate(path[:-1]):
                if key not in current_dict:
                    logger.error(f"Invalid path: {'.'.join(path[:i+1])}")
                    continue
                if not isinstance(current_dict[key], dict):
                    logger.error(f"Cannot navigate through non-dict value at {'.'.join(path[:i+1])}")
                    continue
                current_dict = current_dict[key]

            # Update the value
            if path[-1] not in current_dict:
                logger.error(f"Invalid setting: {'.'.join(path)}")
                continue
            current_dict[path[-1]] = value

            # Create new settings instance with updated values
            self._state.settings = EyeSettings.model_validate(settings_dict)

        if time.monotonic() - start_time > timeout:
            msg = "Handshake timed out"
            raise HandshakeTimeoutException(msg)

        time.sleep(0.1)  # Avoid spamming the connection

    # Data rx thread
    self.rx_thread = Thread(target=self._rx_loop, args=(serial, "="), daemon=True)
    self.rx_thread.name = "KevinbotLib.Eyes.Rx"
    self.rx_thread.start()

    self._state.connected = True

update()

Retrive updated settings from eyes

Source code in src/kevinbotlib/eyes.py
558
559
560
561
def update(self):
    """Retrive updated settings from eyes"""

    self.send("getSettings=true")

send(data)

Send a string through serial.

Automatically adds a newline.

Parameters:

Name Type Description Default
data str

Data to send

required
Source code in src/kevinbotlib/eyes.py
571
572
573
574
575
576
577
578
579
def send(self, data: str):
    """Send a string through serial.

    Automatically adds a newline.

    Args:
        data (str): Data to send
    """
    self.raw_tx((data + "\n").encode("utf-8"))

raw_tx(data)

Send raw bytes over serial.

Parameters:

Name Type Description Default
data bytes

Raw data

required
Source code in src/kevinbotlib/eyes.py
581
582
583
584
585
586
587
588
589
590
def raw_tx(self, data: bytes):
    """Send raw bytes over serial.

    Args:
        data (bytes): Raw data
    """
    if self.serial:
        self.serial.write(data)
    else:
        logger.warning(f"Couldn't transmit data: {data!r}, Eyes aren't connected")

MqttEyes

Bases: BaseKevinbotEyes

The main serial Kevinbot Eyes class

Source code in src/kevinbotlib/eyes.py
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
class MqttEyes(BaseKevinbotEyes):
    """The main serial Kevinbot Eyes class"""

    def __init__(self, robot: MqttKevinbot) -> None:
        super().__init__()
        self.type = KevinbotConnectionType.MQTT

        self.serial: Serial | None = None
        self.rx_thread: Thread | None = None

        self._callback: Callable[[str, str | None], Any] | None = None

        self._robot: MqttKevinbot = robot
        self._robot._eyes = self  # noqa: SLF001

        self._state_loaded = False
        robot.client.publish(f"{robot.root_topic}/eyes/get", "request_settings", 0)

        while not self._state_loaded:
            time.sleep(0.01)

        atexit.register(self.disconnect)

    def update(self):
        """Retrive updated settings from eyes"""

        self._robot.client.publish(f"{self._robot.root_topic}/eyes/get", "request_settings", 0)

    def _load_data(self, data: str):
        self._state_loaded = True
        self._state = KevinbotEyesState(**json.loads(data))

update()

Retrive updated settings from eyes

Source code in src/kevinbotlib/eyes.py
702
703
704
705
def update(self):
    """Retrive updated settings from eyes"""

    self._robot.client.publish(f"{self._robot.root_topic}/eyes/get", "request_settings", 0)

WirelessRadio

Bases: BaseKevinbotSubsystem

Source code in src/kevinbotlib/xbee.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@deprecated(
    "XBee radio support is deprecated. Please use WiFi or use a custom implementation. It will be removed in a future version"
)
class WirelessRadio(BaseKevinbotSubsystem):
    def __init__(self, robot: SerialKevinbot, port: str, baud: int, api: int, timeout: float) -> None:
        """Initialize Kevinbot Wireless Radio (XBee)

        Args:
            robot (Kevinbot): The main robot class
            port (str): Serial port to connect to `/dev/ttyAMA0` for typical Kevinbot hardware
            baud (int): Baud rate for serial interface `921600` for typical Kevinbot configs
            api (int): API mode for xbee interface `2` for typical Kevinbot configs (`0` isn't supported yet)
            timeout (float): Timeout for serial operations
        """
        super().__init__(robot)

        if api not in [1, 2]:
            logger.error(f"XBee API Mode {api} isn't supported. Assuming API escaped (2)")
            api = 2

        self.callback: Callable | None = None

        self.serial = Serial(port, baud, timeout=timeout)
        self.xbee = xbee.XBee(self.serial, callback=self.callback)

    def get(self) -> dict:
        """Get the latest packet (blocking)

        Returns:
            dict: Data packet
        """
        return self.xbee.wait_read_frame()

    def disconnect(self):
        """Disconnect robot radio, and halt processing"""
        self.xbee.halt()
        self.serial.close()

__init__(robot, port, baud, api, timeout)

Initialize Kevinbot Wireless Radio (XBee)

Parameters:

Name Type Description Default
robot Kevinbot

The main robot class

required
port str

Serial port to connect to /dev/ttyAMA0 for typical Kevinbot hardware

required
baud int

Baud rate for serial interface 921600 for typical Kevinbot configs

required
api int

API mode for xbee interface 2 for typical Kevinbot configs (0 isn't supported yet)

required
timeout float

Timeout for serial operations

required
Source code in src/kevinbotlib/xbee.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, robot: SerialKevinbot, port: str, baud: int, api: int, timeout: float) -> None:
    """Initialize Kevinbot Wireless Radio (XBee)

    Args:
        robot (Kevinbot): The main robot class
        port (str): Serial port to connect to `/dev/ttyAMA0` for typical Kevinbot hardware
        baud (int): Baud rate for serial interface `921600` for typical Kevinbot configs
        api (int): API mode for xbee interface `2` for typical Kevinbot configs (`0` isn't supported yet)
        timeout (float): Timeout for serial operations
    """
    super().__init__(robot)

    if api not in [1, 2]:
        logger.error(f"XBee API Mode {api} isn't supported. Assuming API escaped (2)")
        api = 2

    self.callback: Callable | None = None

    self.serial = Serial(port, baud, timeout=timeout)
    self.xbee = xbee.XBee(self.serial, callback=self.callback)

get()

Get the latest packet (blocking)

Returns:

Name Type Description
dict dict

Data packet

Source code in src/kevinbotlib/xbee.py
39
40
41
42
43
44
45
def get(self) -> dict:
    """Get the latest packet (blocking)

    Returns:
        dict: Data packet
    """
    return self.xbee.wait_read_frame()

disconnect()

Disconnect robot radio, and halt processing

Source code in src/kevinbotlib/xbee.py
47
48
49
50
def disconnect(self):
    """Disconnect robot radio, and halt processing"""
    self.xbee.halt()
    self.serial.close()

CoreErrors

Bases: Enum

Kevinbot Core Error States

Source code in src/kevinbotlib/states.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class CoreErrors(Enum):
    """
    Kevinbot Core Error States
    """

    OK = 0
    """No errors are present"""
    UNKNOWN = 1
    """Error state unknown"""
    OW_SHORT = 2
    """One-Wire bus is shorted"""
    OW_ERROR = 3
    """One-Wire bus error"""
    OW_DNF = 4
    """One-Wire device not found"""
    LCD_INIT_FAIL = 5
    """LCD Init failed"""
    PCA_INIT_FAIL = 6
    """PCA9685 (servos) init fail"""
    TICK_FAIL = 7
    """Failure to recieve core tick"""
    QUEUE_OVERRUN = 8
    """Serial queue overrun"""
    ESTOP = 9
    """Core is in E-Stop state"""
    BME_CHIP_ID = 10
    """Error getting environment sensor chip id"""
    BME_CALIB_NVM = 11
    """Error with environment sensor calibration"""
    BME_CALIB_TP = 12
    """Error with environment sensor calibration"""
    BME_CALIB_HUM = 13
    """Error with environment sensor calibration"""
    BME_THP = 14
    """Error with environment sensor"""
    BME_MEAS_TIMEOUT = 15
    """Timeout with environment sensor measurement"""
    BME_NOT_NORMAL_MODE = 16
    """Environemnt sensor is not in normal mode"""
    BATT1_UV = 17
    """Battery #1 Undervoltage"""
    BATT1_OV = 18
    """Battery #1 Overvoltage"""
    BATT2_UV = 19
    """Battery #2 Undervoltage"""
    BATT2_OV = 20
    """Battery #2 Overvoltage"""
    BATT_UV = 21
    """Battery Undervoltage (single battery mode)"""
    BATT_OV = 22
    """Battery Overvoltage (single battery mode)"""

OK = 0 class-attribute instance-attribute

No errors are present

UNKNOWN = 1 class-attribute instance-attribute

Error state unknown

OW_SHORT = 2 class-attribute instance-attribute

One-Wire bus is shorted

OW_ERROR = 3 class-attribute instance-attribute

One-Wire bus error

OW_DNF = 4 class-attribute instance-attribute

One-Wire device not found

LCD_INIT_FAIL = 5 class-attribute instance-attribute

LCD Init failed

PCA_INIT_FAIL = 6 class-attribute instance-attribute

PCA9685 (servos) init fail

TICK_FAIL = 7 class-attribute instance-attribute

Failure to recieve core tick

QUEUE_OVERRUN = 8 class-attribute instance-attribute

Serial queue overrun

ESTOP = 9 class-attribute instance-attribute

Core is in E-Stop state

BME_CHIP_ID = 10 class-attribute instance-attribute

Error getting environment sensor chip id

BME_CALIB_NVM = 11 class-attribute instance-attribute

Error with environment sensor calibration

BME_CALIB_TP = 12 class-attribute instance-attribute

Error with environment sensor calibration

BME_CALIB_HUM = 13 class-attribute instance-attribute

Error with environment sensor calibration

BME_THP = 14 class-attribute instance-attribute

Error with environment sensor

BME_MEAS_TIMEOUT = 15 class-attribute instance-attribute

Timeout with environment sensor measurement

BME_NOT_NORMAL_MODE = 16 class-attribute instance-attribute

Environemnt sensor is not in normal mode

BATT1_UV = 17 class-attribute instance-attribute

Battery #1 Undervoltage

BATT1_OV = 18 class-attribute instance-attribute

Battery #1 Overvoltage

BATT2_UV = 19 class-attribute instance-attribute

Battery #2 Undervoltage

BATT2_OV = 20 class-attribute instance-attribute

Battery #2 Overvoltage

BATT_UV = 21 class-attribute instance-attribute

Battery Undervoltage (single battery mode)

BATT_OV = 22 class-attribute instance-attribute

Battery Overvoltage (single battery mode)

MotorDriveStatus

Bases: Enum

The status of each motor in the drivebase

Source code in src/kevinbotlib/states.py
64
65
66
67
68
69
70
71
72
73
74
75
76
class MotorDriveStatus(Enum):
    """
    The status of each motor in the drivebase
    """

    UNKNOWN = 10
    """Motor status is unknown"""
    MOVING = 11
    """Motor is rotating"""
    HOLDING = 12
    """Motor is holding at position"""
    OFF = 13
    """Motor is off"""

UNKNOWN = 10 class-attribute instance-attribute

Motor status is unknown

MOVING = 11 class-attribute instance-attribute

Motor is rotating

HOLDING = 12 class-attribute instance-attribute

Motor is holding at position

OFF = 13 class-attribute instance-attribute

Motor is off

BmsBatteryState

Bases: Enum

The status of a single battery attached to the BMS

Source code in src/kevinbotlib/states.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class BmsBatteryState(Enum):
    """
    The status of a single battery attached to the BMS
    """

    UNKNOWN = 0
    """State is unknown (usually at bootup)"""
    NORMAL = 1
    """Battery is normal"""
    UNDER = 2
    """Battery is undervoltage"""
    OVER = 3
    """Battery is overvoltage"""
    STOPPED = 4  # Stopped state if BMS driver crashed
    """BMS has crashed or stopped"""

UNKNOWN = 0 class-attribute instance-attribute

State is unknown (usually at bootup)

NORMAL = 1 class-attribute instance-attribute

Battery is normal

UNDER = 2 class-attribute instance-attribute

Battery is undervoltage

OVER = 3 class-attribute instance-attribute

Battery is overvoltage

STOPPED = 4 class-attribute instance-attribute

BMS has crashed or stopped

DrivebaseState

Bases: BaseModel

The state of the drivebase as a whole

Source code in src/kevinbotlib/states.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class DrivebaseState(BaseModel):
    """The state of the drivebase as a whole"""

    left_power: int = 0
    """Current power of the left motor"""
    right_power: int = 0
    """Current power of the right motor"""
    amps: list[float] = Field(default_factory=lambda: [0, 0])
    """Current amps for both motors"""
    watts: list[float] = Field(default_factory=lambda: [0, 0])
    """Current watts for both motors"""
    status: list[MotorDriveStatus] = Field(default_factory=lambda: [MotorDriveStatus.UNKNOWN, MotorDriveStatus.UNKNOWN])
    """Current status for both motors"""

left_power: int = 0 class-attribute instance-attribute

Current power of the left motor

right_power: int = 0 class-attribute instance-attribute

Current power of the right motor

amps: list[float] = Field(default_factory=lambda: [0, 0]) class-attribute instance-attribute

Current amps for both motors

watts: list[float] = Field(default_factory=lambda: [0, 0]) class-attribute instance-attribute

Current watts for both motors

status: list[MotorDriveStatus] = Field(default_factory=lambda: [MotorDriveStatus.UNKNOWN, MotorDriveStatus.UNKNOWN]) class-attribute instance-attribute

Current status for both motors

ServoState

Bases: BaseModel

The state of the servo subsystem

Source code in src/kevinbotlib/states.py
111
112
113
114
115
class ServoState(BaseModel):
    """The state of the servo subsystem"""

    angles: list[int] = Field(default_factory=lambda: [-1] * 32)
    """Servo angles"""

angles: list[int] = Field(default_factory=lambda: [-1] * 32) class-attribute instance-attribute

Servo angles

BMState

Bases: BaseModel

The state of the BMS (Battery Management System)

Source code in src/kevinbotlib/states.py
118
119
120
121
122
123
class BMState(BaseModel):
    """The state of the BMS (Battery Management System)"""

    voltages: list[float] = Field(default_factory=lambda: [0.0, 0.0])
    raw_voltages: list[float] = Field(default_factory=lambda: [0.0, 0.0])
    states: list[BmsBatteryState] = Field(default_factory=lambda: [BmsBatteryState.UNKNOWN, BmsBatteryState.UNKNOWN])

IMUState

Bases: BaseModel

The state of the IMU (Inertial Measurement System)

Source code in src/kevinbotlib/states.py
126
127
128
129
130
class IMUState(BaseModel):
    """The state of the IMU (Inertial Measurement System)"""

    accel: list[int] = Field(default_factory=lambda: [-1] * 3)  # X Y Z
    gyro: list[int] = Field(default_factory=lambda: [-1] * 3)  # R P Y

ThermometerState

Bases: BaseModel

The state of the DS18B20 Thermometers (does not include BME280)

Source code in src/kevinbotlib/states.py
133
134
135
136
137
138
class ThermometerState(BaseModel):
    """The state of the DS18B20 Thermometers (does not include BME280)"""

    left_motor: float = -1
    right_motor: float = -1
    internal: float = -1

EnviroState

Bases: BaseModel

The state of the BME280 Envoronmental sensor

Source code in src/kevinbotlib/states.py
141
142
143
144
145
146
class EnviroState(BaseModel):
    """The state of the BME280 Envoronmental sensor"""

    temperature: float = -1
    humidity: float = 0
    pressure: int = 0

LightingState

Bases: BaseModel

The state of Kevinbot's led segments

Source code in src/kevinbotlib/states.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class LightingState(BaseModel):
    """The state of Kevinbot's led segments"""

    camera: int = 0
    head_effect: str = "unknown"
    head_bright: int = 0
    head_update: int = -1
    head_color1: list[int] = Field(default=[0, 0, 0], min_length=3)
    head_color2: list[int] = Field(default=[0, 0, 0], min_length=3)
    body_effect: str = "unknown"
    body_bright: int = 0
    body_update: int = -1
    body_color1: list[int] = Field(default=[0, 0, 0], min_length=3)
    body_color2: list[int] = Field(default=[0, 0, 0], min_length=3)
    base_effect: str = "unknown"
    base_bright: int = 0
    base_update: int = -1
    base_color1: list[int] = Field(default=[0, 0, 0], min_length=3)
    base_color2: list[int] = Field(default=[0, 0, 0], min_length=3)

KevinbotState

Bases: BaseModel

The state of the robot as a whole

Source code in src/kevinbotlib/states.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class KevinbotState(BaseModel):
    """The state of the robot as a whole"""

    connected: bool = False
    enabled: bool = False
    error: CoreErrors = CoreErrors.OK
    estop: bool = False
    uptime: int = 0
    uptime_ms: int = 0
    motion: DrivebaseState = Field(default_factory=DrivebaseState)
    servos: ServoState = Field(default_factory=ServoState)
    battery: BMState = Field(default_factory=BMState)
    imu: IMUState = Field(default_factory=IMUState)
    thermal: ThermometerState = Field(default_factory=ThermometerState)
    enviro: EnviroState = Field(default_factory=EnviroState)
    lighting: LightingState = Field(default_factory=LightingState)

EyeSkin

Bases: Enum

Eye Skins for the eye system

Source code in src/kevinbotlib/states.py
188
189
190
191
192
193
194
195
196
197
198
199
200
class EyeSkin(Enum):
    """
    Eye Skins for the eye system
    """

    TV_STATIC = 0
    """TV Static-style random colors"""
    SIMPLE = 1
    """Simple skin with variable pupil, iris, and background"""
    METAL = 2
    """Skin with fancy pupil and iris over an aluminum background"""
    NEON = 3
    """Neon skin"""

TV_STATIC = 0 class-attribute instance-attribute

TV Static-style random colors

SIMPLE = 1 class-attribute instance-attribute

Simple skin with variable pupil, iris, and background

METAL = 2 class-attribute instance-attribute

Skin with fancy pupil and iris over an aluminum background

NEON = 3 class-attribute instance-attribute

Neon skin

EyeMotion

Bases: Enum

Motion modes for the eye system

Source code in src/kevinbotlib/states.py
203
204
205
206
207
208
209
210
211
212
213
214
215
class EyeMotion(Enum):
    """
    Motion modes for the eye system
    """

    DISABLE = 0
    """No motion"""
    LEFT_RIGHT = 1
    """Smooth left to right and back"""
    JUMP = 2
    """Jumpy left to right and back"""
    MANUAL = 3
    """Allow manual control of pupil position"""

DISABLE = 0 class-attribute instance-attribute

No motion

LEFT_RIGHT = 1 class-attribute instance-attribute

Smooth left to right and back

JUMP = 2 class-attribute instance-attribute

Jumpy left to right and back

MANUAL = 3 class-attribute instance-attribute

Allow manual control of pupil position

KevinbotEyesState

Bases: BaseModel

The state of the eye system

Source code in src/kevinbotlib/states.py
286
287
288
289
290
class KevinbotEyesState(BaseModel):
    """The state of the eye system"""

    connected: bool = False
    settings: EyeSettings = EyeSettings()

KevinbotServerState

Bases: BaseModel

The state system used internally in the Kevinbot Server

Source code in src/kevinbotlib/states.py
293
294
295
296
297
298
299
300
301
302
303
304
305
class KevinbotServerState(BaseModel):
    """The state system used internally in the Kevinbot Server"""

    mqtt_connected: bool = False
    clients: int = 0
    heartbeat_freq: float = -1
    connected_cids: list[str] = []
    dead_cids: list[str] = []
    cid_heartbeats: dict[str, float] = {}
    last_driver_cid: str | None = None
    driver_cid: str | None = None
    last_drive_command_time: datetime | None = None
    timestamp: datetime | None = None

HandshakeTimeoutException

Bases: BaseException

Exception that is produced when the connection handshake times out

Source code in src/kevinbotlib/exceptions.py
6
7
class HandshakeTimeoutException(BaseException):
    """Exception that is produced when the connection handshake times out"""

Configuration manager for KevinbotLib

ConfigLocation

Bases: Enum

Enum to represent the location of the config file

Source code in src/kevinbotlib/config.py
14
15
16
17
18
19
20
21
class ConfigLocation(Enum):
    """Enum to represent the location of the config file"""

    USER = "user"
    SYSTEM = "system"
    AUTO = "auto"
    NONE = "none"
    MANUAL = "manual"

KevinbotConfig

Source code in src/kevinbotlib/config.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
class KevinbotConfig:
    def __init__(self, location: ConfigLocation = ConfigLocation.AUTO, path: str | Path | None = None):
        self.config_location = location

        self.user_config_path = Path(user_config_dir("kevinbotlib")) / "settings.yaml"
        self.system_config_path = Path(site_config_dir("kevinbotlib")) / "settings.yaml"

        self.manual_path: Path | None = None
        if path:
            self.manual_path = Path(path)

        self.config_path = self._get_config_path()

        self.config: dict = {}

        self.mqtt: _MQTT = _MQTT({}, self)
        self.core: _Core = _Core({}, self)
        self.server: _Server = _Server({}, self)
        self.eyes: _Eyes = _Eyes({}, self)

        self.load()

    def _get_config_path(self) -> Path | None:
        """Get the optimal configuration path

        Returns:
            Path | None: File location
        """
        if self.config_location == ConfigLocation.NONE:
            return None
        if self.config_location == ConfigLocation.MANUAL:
            if self.manual_path:
                return Path(self.manual_path)
            logger.warning("ConfigLocation.MANUAL set without config path, defaulting to ConfigLocation.NONE")
            return None  # should never happen
        if self.config_location == ConfigLocation.USER:
            return self.user_config_path
        if self.config_location == ConfigLocation.SYSTEM:
            return self.system_config_path
        # AUTO: Prefer user, else system, if none, return user
        if self.user_config_path.exists():
            return self.user_config_path
        if self.system_config_path.exists():
            return self.system_config_path
        return self.user_config_path

    def load(self) -> None:
        if self.config_path and self.config_path.exists():
            with open(self.config_path) as file:
                self.config = yaml.safe_load(file) or {}

        self.mqtt = _MQTT(self.config.get("mqtt", {}), self)
        self.core = _Core(self.config.get("core", {}), self)
        self.server = _Server(self.config.get("server", {}), self)
        self.eyes = _Eyes(self.config.get("eyes", {}), self)

    def save(self) -> None:
        if self.config_path:
            with open(self.config_path, "w") as file:
                yaml.dump(self._get_data(), file, default_flow_style=False)
        else:
            logger.error("Couldn't save configuration to empty path")

    def dump(self) -> str:
        """Dump configuration

        Returns:
            str: YAML
        """
        return yaml.dump(self._get_data(), default_flow_style=False)

    def _get_data(self):
        return {
            "mqtt": self.mqtt.data,
            "core": self.core.data,
            "eyes": self.eyes.data,
            "server": self.server.data,
        }

    def __repr__(self):
        return f"{super().__repr__()}\n\n{yaml.dump(self._get_data(), default_flow_style=False)}"

dump()

Dump configuration

Returns:

Name Type Description
str str

YAML

Source code in src/kevinbotlib/config.py
327
328
329
330
331
332
333
def dump(self) -> str:
    """Dump configuration

    Returns:
        str: YAML
    """
    return yaml.dump(self._get_data(), default_flow_style=False)

KevinbotLib Robot Server Allow accessing KevinbotLib APIs over MQTT