Skip to content

Reference

BaseKevinbotSubsystem

The base subsystem class.

Not to be used directly

Source code in src/kevinbotlib/core.py
29
30
31
32
33
34
35
36
37
class BaseKevinbotSubsystem:
    """The base subsystem class.

    Not to be used directly
    """

    def __init__(self, robot: "SerialKevinbot | MqttKevinbot") -> None:
        self.robot = robot
        self.robot._register_component(self)  # noqa: SLF001

BaseKevinbot

The base robot class.

Not to be used directly

Source code in src/kevinbotlib/core.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class BaseKevinbot:
    """The base robot class.

    Not to be used directly
    """

    def __init__(self) -> None:
        self._state = KevinbotState()
        self.type = KevinbotConnectionType.BASE
        self._subsystems: list[BaseKevinbotSubsystem] = []

        self._auto_disconnect = True
        self._auto_disable = True

    def get_state(self) -> KevinbotState:
        """Gets the current state of the robot

        Returns:
            KevinbotState: State class
        """
        return self._state

    def disconnect(self):
        """Basic robot disconnect"""
        self._state.connected = False
        if self.auto_disable:
            self.request_disable()

    @property
    def auto_disconnect(self) -> bool:
        """Getter for auto disconnect state.

        Returns:
            bool: Whether to disconnect on application exit
        """
        return self._auto_disconnect

    @auto_disconnect.setter
    def auto_disconnect(self, value: bool):
        """Setter for auto disconnect.

        Args:
            value (bool): Whether to disconnect on application exit
        """
        self._auto_disconnect = value
        if value:
            atexit.register(self.disconnect)
        else:
            atexit.unregister(self.disconnect)

    @property
    def auto_disable(self) -> bool:
        """Getter for auto disable state.

        Returns:
            bool: Whether to disconnect on application exit
        """
        return self._auto_disable

    @auto_disable.setter
    def auto_disable(self, value: bool):
        """Setter for auto disable.

        Args:
            value (bool): Whether to disconnect on application exit
        """
        self._auto_disable = value

    def send(self, data: str):
        """Null implementation of the send method

        Args:
            data (str): Data to send nowhere

        Raises:
            NotImplementedError: Always raised
        """
        msg = f"Function not implemented, attempting to send {data}"
        raise NotImplementedError(msg)

    def request_enable(self) -> int:
        """Request the core to enable

        Returns:
            int: Always 1
        """
        self.send("kevinbot.tryenable=1")
        return 1

    def request_disable(self) -> int:
        """Request the core to disable

        Returns:
            int: Always 1
        """
        self.send("kevinbot.tryenable=0")
        return 1

    def e_stop(self):
        """Attempt to send and E-Stop signal to the Core"""
        self.send("system.estop")
        self._state.estop = True

    def _register_component(self, component: BaseKevinbotSubsystem):
        self._subsystems.append(component)

auto_disconnect: bool property writable

Getter for auto disconnect state.

Returns:

Name Type Description
bool bool

Whether to disconnect on application exit

auto_disable: bool property writable

Getter for auto disable state.

Returns:

Name Type Description
bool bool

Whether to disconnect on application exit

get_state()

Gets the current state of the robot

Returns:

Name Type Description
KevinbotState KevinbotState

State class

Source code in src/kevinbotlib/core.py
54
55
56
57
58
59
60
def get_state(self) -> KevinbotState:
    """Gets the current state of the robot

    Returns:
        KevinbotState: State class
    """
    return self._state

disconnect()

Basic robot disconnect

Source code in src/kevinbotlib/core.py
62
63
64
65
66
def disconnect(self):
    """Basic robot disconnect"""
    self._state.connected = False
    if self.auto_disable:
        self.request_disable()

send(data)

Null implementation of the send method

Parameters:

Name Type Description Default
data str

Data to send nowhere

required

Raises:

Type Description
NotImplementedError

Always raised

Source code in src/kevinbotlib/core.py
108
109
110
111
112
113
114
115
116
117
118
def send(self, data: str):
    """Null implementation of the send method

    Args:
        data (str): Data to send nowhere

    Raises:
        NotImplementedError: Always raised
    """
    msg = f"Function not implemented, attempting to send {data}"
    raise NotImplementedError(msg)

request_enable()

Request the core to enable

Returns:

Name Type Description
int int

Always 1

Source code in src/kevinbotlib/core.py
120
121
122
123
124
125
126
127
def request_enable(self) -> int:
    """Request the core to enable

    Returns:
        int: Always 1
    """
    self.send("kevinbot.tryenable=1")
    return 1

request_disable()

Request the core to disable

Returns:

Name Type Description
int int

Always 1

Source code in src/kevinbotlib/core.py
129
130
131
132
133
134
135
136
def request_disable(self) -> int:
    """Request the core to disable

    Returns:
        int: Always 1
    """
    self.send("kevinbot.tryenable=0")
    return 1

e_stop()

Attempt to send and E-Stop signal to the Core

Source code in src/kevinbotlib/core.py
138
139
140
141
def e_stop(self):
    """Attempt to send and E-Stop signal to the Core"""
    self.send("system.estop")
    self._state.estop = True

SerialKevinbot

Bases: BaseKevinbot

The main serial robot class

Source code in src/kevinbotlib/core.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
class SerialKevinbot(BaseKevinbot):
    """The main serial robot class"""

    def __init__(self) -> None:
        super().__init__()
        self.type = KevinbotConnectionType.SERIAL

        self.serial: Serial | None = None
        self.rx_thread: Thread | None = None

        self._callback: Callable[[str, str | None], Any] | None = None

        atexit.register(self.disconnect)

    def connect(
        self,
        port: str,
        baud: int,
        timeout: float,
        tick_interval: float,
        ser_timeout: float = 0.5,
        *,
        tick_thread: bool = True,
    ):
        """Start a connection with Kevinbot Core

        Args:
            port (str): Serial port to use (`/dev/ttyAMA2` is standard with the typical Kevinbot Hardware)
            baud (int): Baud rate to use (`921600` is typical for the defualt Core configs)
            timeout (float): Timeout for handshake
            tick_interval (float): How often a heartbeat should be produced
            ser_timeout (float, optional): Readline timeout, should be lower than `timeout`. Defaults to 0.5.
            tick_thread (bool, optional): Whether a tick thread should be started. Defaults to True.

        Raises:
            HandshakeTimeoutException: Core didn't respond to the connection handshake before the timeout
        """
        serial = self._setup_serial(port, baud, ser_timeout)

        start_time = time.monotonic()
        while True:
            serial.write(b"connection.isready=0\n")

            line = serial.readline().decode("utf-8", errors="ignore").strip("\n")

            if line == "ready":
                serial.write(b"connection.start\n")
                serial.write(b"core.errors.clear\n")
                serial.write(b"connection.ok\n")
                break

            if time.monotonic() - start_time > timeout:
                msg = "Handshake timed out"
                raise HandshakeTimeoutException(msg)

            time.sleep(0.1)  # Avoid spamming the connection

        # Data rx thread
        self.rx_thread = Thread(target=self._rx_loop, args=(serial, "="), daemon=True)
        self.rx_thread.name = "KevinbotLib.Rx"
        self.rx_thread.start()

        if tick_thread:
            thread = Thread(target=self.tick_loop, args=(tick_interval,), daemon=True)
            thread.start()
            thread.name = "KevinbotLib.Tick"

        self._state.connected = True

    def disconnect(self):
        """Disconnect core gracefully"""
        super().disconnect()
        if self.serial and self.serial.is_open:
            self.send("core.link.unlink")
            self.serial.flush()
            self.serial.close()
        else:
            logger.warning("Already disconnected")

    @property
    def callback(self) -> Callable[[str, str | None], Any] | None:
        return self._callback

    @callback.setter
    def callback(self, callback: Callable[[str, str | None], Any]) -> None:
        self._callback = callback

    def tick_loop(self, interval: float = 1):
        """Send ticks indefinetely

        Args:
            interval (float, optional): Interval between ticks in seconds. Defaults to 1.
        """
        while True:
            self._tick()
            time.sleep(interval)

    def send(self, data: str):
        """Send a string through serial.

        Automatically adds a newline.

        Args:
            data (str): Data to send
        """
        self.raw_tx((data + "\n").encode("utf-8"))

    def raw_tx(self, data: bytes):
        """Send raw bytes over serial.

        Args:
            data (bytes): Raw data
        """
        if self.serial:
            self.serial.write(data)
        else:
            logger.warning(f"Couldn't transmit data: {data!r}, Core isn't connected")

    def _rx_loop(self, serial: Serial, delimeter: str = "="):
        while True:
            try:
                raw: bytes = serial.readline()
            except TypeError:
                # serial has been stopped
                return

            cmd: str = raw.decode("utf-8").split(delimeter, maxsplit=1)[0].strip()
            if not cmd:
                continue

            val: str | None = None
            if len(raw.decode("utf-8").split(delimeter)) > 1:
                val = raw.decode("utf-8").split(delimeter, maxsplit=1)[1].strip("\r\n")

            match cmd:
                case "ready":
                    pass
                case "core.enabled":
                    if not val:
                        logger.warning("No value recieved for 'core.enabled'")
                        continue
                    if val.lower() in ["true", "t", "1"]:
                        self._state.enabled = True
                    else:
                        self._state.enabled = False
                case "core.uptime":
                    if val:
                        self._state.uptime = int(val)
                case "core.uptime_ms":
                    if val:
                        self._state.uptime_ms = int(val)
                case "connection.requesthandshake":
                    serial.write(b"connection.start\n")
                    serial.write(b"core.errors.clear\n")
                    serial.write(b"connection.ok\n")
                    logger.warning("A handshake was re-requested. This could indicate a core power fault or reset")
                case "motors.amps":
                    if val:
                        self._state.motion.amps = list(map(float, val.split(",")))
                case "motors.watts":
                    if val:
                        self._state.motion.watts = list(map(float, val.split(",")))
                case "motors.status":
                    if val:
                        self._state.motion.status = [MotorDriveStatus(int(x)) for x in val.split(",")]
                case "bms.voltages":
                    if val:
                        self._state.battery.voltages = [float(x) / 10 for x in val.split(",")]
                case "bms.raw_voltages":
                    if val:
                        self._state.battery.raw_voltages = [float(x) / 10 for x in val.split(",")]
                case "bms.status":
                    if val:
                        self._state.battery.states = [BmsBatteryState(int(x)) for x in val.split(",")]
                case "sensors.gyro":
                    if val:
                        self._state.imu.gyro = [int(x) for x in val.split(",")]
                case "sensors.accel":
                    if val:
                        self._state.imu.accel = [int(x) for x in val.split(",")]
                case "sensors.temps":
                    if val:
                        temps = val.split(",")
                        valid = True
                        for temp in temps:
                            if not re.match("^[-+]?[0-9]+$", temp):
                                logger.error(f"Found non-integer value in temps, {temps}")
                                valid = False
                                break
                        if valid:
                            self._state.thermal.left_motor = int(temps[0]) / 100
                            self._state.thermal.right_motor = int(temps[1]) / 100
                            self._state.thermal.internal = int(temps[2]) / 100
                case "sensors.bme":
                    if val:
                        vals = val.split(",")
                        for value in vals:
                            if not re.match("^[-+]?[0-9]+$", value):
                                logger.error(f"Found non-integer value in bme values, {temps}")
                                continue

                        self._state.enviro.temperature = int(vals[0])
                        self._state.enviro.humidity = int(vals[2])
                        self._state.enviro.pressure = int(vals[3])
                case _:
                    logger.warning(f"Got a command that isn't supported yet: {cmd} with value {val}")

            if self.callback:
                self.callback(cmd, val)

    def _setup_serial(self, port: str, baud: int, timeout: float = 1):
        self.serial = Serial(port, baud, timeout=timeout)
        return self.serial

    def _tick(self):
        if self.serial and self.serial.is_open:
            self.serial.write(b"core.tick\n")

connect(port, baud, timeout, tick_interval, ser_timeout=0.5, *, tick_thread=True)

Start a connection with Kevinbot Core

Parameters:

Name Type Description Default
port str

Serial port to use (/dev/ttyAMA2 is standard with the typical Kevinbot Hardware)

required
baud int

Baud rate to use (921600 is typical for the defualt Core configs)

required
timeout float

Timeout for handshake

required
tick_interval float

How often a heartbeat should be produced

required
ser_timeout float

Readline timeout, should be lower than timeout. Defaults to 0.5.

0.5
tick_thread bool

Whether a tick thread should be started. Defaults to True.

True

Raises:

Type Description
HandshakeTimeoutException

Core didn't respond to the connection handshake before the timeout

Source code in src/kevinbotlib/core.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def connect(
    self,
    port: str,
    baud: int,
    timeout: float,
    tick_interval: float,
    ser_timeout: float = 0.5,
    *,
    tick_thread: bool = True,
):
    """Start a connection with Kevinbot Core

    Args:
        port (str): Serial port to use (`/dev/ttyAMA2` is standard with the typical Kevinbot Hardware)
        baud (int): Baud rate to use (`921600` is typical for the defualt Core configs)
        timeout (float): Timeout for handshake
        tick_interval (float): How often a heartbeat should be produced
        ser_timeout (float, optional): Readline timeout, should be lower than `timeout`. Defaults to 0.5.
        tick_thread (bool, optional): Whether a tick thread should be started. Defaults to True.

    Raises:
        HandshakeTimeoutException: Core didn't respond to the connection handshake before the timeout
    """
    serial = self._setup_serial(port, baud, ser_timeout)

    start_time = time.monotonic()
    while True:
        serial.write(b"connection.isready=0\n")

        line = serial.readline().decode("utf-8", errors="ignore").strip("\n")

        if line == "ready":
            serial.write(b"connection.start\n")
            serial.write(b"core.errors.clear\n")
            serial.write(b"connection.ok\n")
            break

        if time.monotonic() - start_time > timeout:
            msg = "Handshake timed out"
            raise HandshakeTimeoutException(msg)

        time.sleep(0.1)  # Avoid spamming the connection

    # Data rx thread
    self.rx_thread = Thread(target=self._rx_loop, args=(serial, "="), daemon=True)
    self.rx_thread.name = "KevinbotLib.Rx"
    self.rx_thread.start()

    if tick_thread:
        thread = Thread(target=self.tick_loop, args=(tick_interval,), daemon=True)
        thread.start()
        thread.name = "KevinbotLib.Tick"

    self._state.connected = True

disconnect()

Disconnect core gracefully

Source code in src/kevinbotlib/core.py
216
217
218
219
220
221
222
223
224
def disconnect(self):
    """Disconnect core gracefully"""
    super().disconnect()
    if self.serial and self.serial.is_open:
        self.send("core.link.unlink")
        self.serial.flush()
        self.serial.close()
    else:
        logger.warning("Already disconnected")

tick_loop(interval=1)

Send ticks indefinetely

Parameters:

Name Type Description Default
interval float

Interval between ticks in seconds. Defaults to 1.

1
Source code in src/kevinbotlib/core.py
234
235
236
237
238
239
240
241
242
def tick_loop(self, interval: float = 1):
    """Send ticks indefinetely

    Args:
        interval (float, optional): Interval between ticks in seconds. Defaults to 1.
    """
    while True:
        self._tick()
        time.sleep(interval)

send(data)

Send a string through serial.

Automatically adds a newline.

Parameters:

Name Type Description Default
data str

Data to send

required
Source code in src/kevinbotlib/core.py
244
245
246
247
248
249
250
251
252
def send(self, data: str):
    """Send a string through serial.

    Automatically adds a newline.

    Args:
        data (str): Data to send
    """
    self.raw_tx((data + "\n").encode("utf-8"))

raw_tx(data)

Send raw bytes over serial.

Parameters:

Name Type Description Default
data bytes

Raw data

required
Source code in src/kevinbotlib/core.py
254
255
256
257
258
259
260
261
262
263
def raw_tx(self, data: bytes):
    """Send raw bytes over serial.

    Args:
        data (bytes): Raw data
    """
    if self.serial:
        self.serial.write(data)
    else:
        logger.warning(f"Couldn't transmit data: {data!r}, Core isn't connected")

MqttKevinbot

Bases: BaseKevinbot

KevinbotLib interface over MQTT

Source code in src/kevinbotlib/core.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
class MqttKevinbot(BaseKevinbot):
    """KevinbotLib interface over MQTT"""

    def __init__(self, cid: str | None = None) -> None:
        """Instansiate a new KevinbotLib interface over MQTT

        Args:
            cid (str | None, optional): MQTT Client id. Defaults to an auto-generated uuid.
        """
        super().__init__()
        self.type = KevinbotConnectionType.MQTT

        self.root_topic = "kevinbot"
        self.host = "localhost"
        self.port = 1883
        self.keepalive = 60

        self.cid = cid if cid else f"kevinbotlib-{shortuuid.random()}"
        self.client = Client(CallbackAPIVersion.VERSION2, self.cid)
        self.client.on_message = self._on_message

        atexit.register(self.disconnect)

    def connect(
        self, root_topic: str = "kevinbot", host: str = "localhost", port: int = 1883, keepalive: int = 60
    ) -> MQTTErrorCode:
        """Connect to MQTT Broker

        Args:
            root_topic (str, optional): Root communication topic. Defaults to "kevinbot".
            host (str, optional): KevinbotLib server host. Defaults to "localhost".
            port (int, optional): Kevinbot MQTT Broker port. Defaults to 1883.
            keepalive (int, optional): Maximum period in seconds between communications with the broker. Defaults to 60.

        Returns:
            MQTTErrorCode: Connection error
        """
        self.host = host
        self.port = port
        self.keepalive = keepalive
        self.root_topic = root_topic
        self.connected = False

        rc = self.client.connect(self.host, self.port, self.keepalive)
        self.client.subscribe(f"{self.root_topic}/state", 0)
        self.client.subscribe(f"{self.root_topic}/clients/connect/ack", 0)
        self.client.publish(f"{self.root_topic}/clients/connect", self.cid, 0)
        self.client.loop_start()

        while not self.connected:
            time.sleep(0.01)

        return rc

    def send(self, data: str):
        """Determine topic and publish data. Compatible with send of `SerialKevinbot`

        Args:
            data (str): Data to parse and publish
        """
        if len(data.split("=", 2)) > 1:
            cmd, val = data.split("=", 2)
        else:
            cmd = data
            val = None

        self.client.publish(f"{self.root_topic}/{cmd.replace('.', '/')}", val, 0)

    def disconnect(self):
        """Disconnect from server"""
        super().disconnect()
        self.client.publish(f"{self.root_topic}/clients/disconnect", self.cid, 0).wait_for_publish(1)

    def request_enable(self) -> int:
        """Request the core to enable

        Returns:
            int: Always 1
        """
        self.client.publish(f"{self.root_topic}/main/state_request", "enable", 1)
        return 1

    def request_disable(self) -> int:
        """Request the core to disable

        Returns:
            int: Always 1
        """
        self.client.publish(f"{self.root_topic}/main/state_request", "disable", 1)
        return 1

    def e_stop(self):
        """Attempt to send and E-Stop signal to the Core"""
        self.client.publish(f"{self.root_topic}/main/estop", 1)

    def _on_message(self, _, __, msg: MQTTMessage):
        logger.trace(f"Got MQTT message at: {msg.topic} payload={msg.payload!r} with qos={msg.qos}")

        if msg.topic[0] == "/" or msg.topic[-1] == "/":
            logger.warning(f"MQTT topic: {msg.topic} has a leading/trailing slash. Removing it.")
            topic = msg.topic.strip("/")
        else:
            topic = msg.topic

        value = msg.payload.decode("utf-8")

        subtopics = topic.split("/")[1:]
        match subtopics:
            case ["state"]:
                self._state = KevinbotState(**json.loads(value))
            case ["clients", "connect", "ack"]:
                if value == f"ack:{self.cid}":
                    self.connected = True

__init__(cid=None)

Instansiate a new KevinbotLib interface over MQTT

Parameters:

Name Type Description Default
cid str | None

MQTT Client id. Defaults to an auto-generated uuid.

None
Source code in src/kevinbotlib/core.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def __init__(self, cid: str | None = None) -> None:
    """Instansiate a new KevinbotLib interface over MQTT

    Args:
        cid (str | None, optional): MQTT Client id. Defaults to an auto-generated uuid.
    """
    super().__init__()
    self.type = KevinbotConnectionType.MQTT

    self.root_topic = "kevinbot"
    self.host = "localhost"
    self.port = 1883
    self.keepalive = 60

    self.cid = cid if cid else f"kevinbotlib-{shortuuid.random()}"
    self.client = Client(CallbackAPIVersion.VERSION2, self.cid)
    self.client.on_message = self._on_message

    atexit.register(self.disconnect)

connect(root_topic='kevinbot', host='localhost', port=1883, keepalive=60)

Connect to MQTT Broker

Parameters:

Name Type Description Default
root_topic str

Root communication topic. Defaults to "kevinbot".

'kevinbot'
host str

KevinbotLib server host. Defaults to "localhost".

'localhost'
port int

Kevinbot MQTT Broker port. Defaults to 1883.

1883
keepalive int

Maximum period in seconds between communications with the broker. Defaults to 60.

60

Returns:

Name Type Description
MQTTErrorCode MQTTErrorCode

Connection error

Source code in src/kevinbotlib/core.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def connect(
    self, root_topic: str = "kevinbot", host: str = "localhost", port: int = 1883, keepalive: int = 60
) -> MQTTErrorCode:
    """Connect to MQTT Broker

    Args:
        root_topic (str, optional): Root communication topic. Defaults to "kevinbot".
        host (str, optional): KevinbotLib server host. Defaults to "localhost".
        port (int, optional): Kevinbot MQTT Broker port. Defaults to 1883.
        keepalive (int, optional): Maximum period in seconds between communications with the broker. Defaults to 60.

    Returns:
        MQTTErrorCode: Connection error
    """
    self.host = host
    self.port = port
    self.keepalive = keepalive
    self.root_topic = root_topic
    self.connected = False

    rc = self.client.connect(self.host, self.port, self.keepalive)
    self.client.subscribe(f"{self.root_topic}/state", 0)
    self.client.subscribe(f"{self.root_topic}/clients/connect/ack", 0)
    self.client.publish(f"{self.root_topic}/clients/connect", self.cid, 0)
    self.client.loop_start()

    while not self.connected:
        time.sleep(0.01)

    return rc

send(data)

Determine topic and publish data. Compatible with send of SerialKevinbot

Parameters:

Name Type Description Default
data str

Data to parse and publish

required
Source code in src/kevinbotlib/core.py
420
421
422
423
424
425
426
427
428
429
430
431
432
def send(self, data: str):
    """Determine topic and publish data. Compatible with send of `SerialKevinbot`

    Args:
        data (str): Data to parse and publish
    """
    if len(data.split("=", 2)) > 1:
        cmd, val = data.split("=", 2)
    else:
        cmd = data
        val = None

    self.client.publish(f"{self.root_topic}/{cmd.replace('.', '/')}", val, 0)

disconnect()

Disconnect from server

Source code in src/kevinbotlib/core.py
434
435
436
437
def disconnect(self):
    """Disconnect from server"""
    super().disconnect()
    self.client.publish(f"{self.root_topic}/clients/disconnect", self.cid, 0).wait_for_publish(1)

request_enable()

Request the core to enable

Returns:

Name Type Description
int int

Always 1

Source code in src/kevinbotlib/core.py
439
440
441
442
443
444
445
446
def request_enable(self) -> int:
    """Request the core to enable

    Returns:
        int: Always 1
    """
    self.client.publish(f"{self.root_topic}/main/state_request", "enable", 1)
    return 1

request_disable()

Request the core to disable

Returns:

Name Type Description
int int

Always 1

Source code in src/kevinbotlib/core.py
448
449
450
451
452
453
454
455
def request_disable(self) -> int:
    """Request the core to disable

    Returns:
        int: Always 1
    """
    self.client.publish(f"{self.root_topic}/main/state_request", "disable", 1)
    return 1

e_stop()

Attempt to send and E-Stop signal to the Core

Source code in src/kevinbotlib/core.py
457
458
459
def e_stop(self):
    """Attempt to send and E-Stop signal to the Core"""
    self.client.publish(f"{self.root_topic}/main/estop", 1)

Drivebase

Bases: BaseKevinbotSubsystem

Drivebase subsystem for Kevinbot

Source code in src/kevinbotlib/core.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
class Drivebase(BaseKevinbotSubsystem):
    """Drivebase subsystem for Kevinbot"""

    def get_amps(self) -> list[float]:
        """Get the amps being used by the drivebase

        Returns:
            list[float]: Amps
        """
        return self.robot.get_state().motion.amps

    def get_watts(self) -> list[float]:
        """Get the watts being used by the drivebase

        Returns:
            list[float]: Watts
        """
        return self.robot.get_state().motion.watts

    def get_powers(self) -> tuple[int, int]:
        """Get the currently set wheel speeds in percent

        Returns:
            tuple[int, int]: Percent values from 0 to 100
        """
        return self.robot.get_state().motion.left_power, self.robot.get_state().motion.right_power

    def get_states(self) -> list[MotorDriveStatus]:
        """Get the wheels states

        Returns:
            list[MotorDriveStatus]: States
        """
        return self.robot.get_state().motion.status

    def drive_at_power(self, left: float, right: float):
        """Set the drive power for wheels. 0 to 1

        Args:
            left (float): Left motor power
            right (float): Right motor power
        """
        if isinstance(self.robot, SerialKevinbot):
            self.robot.send(f"drive.power={int(left*100)},{int(right*100)}")
        elif isinstance(self.robot, MqttKevinbot):
            self.robot.client.publish(f"{self.robot.root_topic}/drive/power", f"{int(left*100)},{int(right*100)},{self.robot.cid}", 1)

    def stop(self):
        """Set all wheel powers to 0"""
        if isinstance(self.robot, SerialKevinbot):
            self.robot.send("drive.power=0,0")
        elif isinstance(self.robot, MqttKevinbot):
            self.robot.client.publish(f"{self.robot.root_topic}/drive/power", "0,0,{self.robot.cid}", 1)

get_amps()

Get the amps being used by the drivebase

Returns:

Type Description
list[float]

list[float]: Amps

Source code in src/kevinbotlib/core.py
484
485
486
487
488
489
490
def get_amps(self) -> list[float]:
    """Get the amps being used by the drivebase

    Returns:
        list[float]: Amps
    """
    return self.robot.get_state().motion.amps

get_watts()

Get the watts being used by the drivebase

Returns:

Type Description
list[float]

list[float]: Watts

Source code in src/kevinbotlib/core.py
492
493
494
495
496
497
498
def get_watts(self) -> list[float]:
    """Get the watts being used by the drivebase

    Returns:
        list[float]: Watts
    """
    return self.robot.get_state().motion.watts

get_powers()

Get the currently set wheel speeds in percent

Returns:

Type Description
tuple[int, int]

tuple[int, int]: Percent values from 0 to 100

Source code in src/kevinbotlib/core.py
500
501
502
503
504
505
506
def get_powers(self) -> tuple[int, int]:
    """Get the currently set wheel speeds in percent

    Returns:
        tuple[int, int]: Percent values from 0 to 100
    """
    return self.robot.get_state().motion.left_power, self.robot.get_state().motion.right_power

get_states()

Get the wheels states

Returns:

Type Description
list[MotorDriveStatus]

list[MotorDriveStatus]: States

Source code in src/kevinbotlib/core.py
508
509
510
511
512
513
514
def get_states(self) -> list[MotorDriveStatus]:
    """Get the wheels states

    Returns:
        list[MotorDriveStatus]: States
    """
    return self.robot.get_state().motion.status

drive_at_power(left, right)

Set the drive power for wheels. 0 to 1

Parameters:

Name Type Description Default
left float

Left motor power

required
right float

Right motor power

required
Source code in src/kevinbotlib/core.py
516
517
518
519
520
521
522
523
524
525
526
def drive_at_power(self, left: float, right: float):
    """Set the drive power for wheels. 0 to 1

    Args:
        left (float): Left motor power
        right (float): Right motor power
    """
    if isinstance(self.robot, SerialKevinbot):
        self.robot.send(f"drive.power={int(left*100)},{int(right*100)}")
    elif isinstance(self.robot, MqttKevinbot):
        self.robot.client.publish(f"{self.robot.root_topic}/drive/power", f"{int(left*100)},{int(right*100)},{self.robot.cid}", 1)

stop()

Set all wheel powers to 0

Source code in src/kevinbotlib/core.py
528
529
530
531
532
533
def stop(self):
    """Set all wheel powers to 0"""
    if isinstance(self.robot, SerialKevinbot):
        self.robot.send("drive.power=0,0")
    elif isinstance(self.robot, MqttKevinbot):
        self.robot.client.publish(f"{self.robot.root_topic}/drive/power", "0,0,{self.robot.cid}", 1)

Servo

Individually controllable servo

Source code in src/kevinbotlib/core.py
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
class Servo:
    """Individually controllable servo"""

    def __init__(self, robot: SerialKevinbot | MqttKevinbot, index: int) -> None:
        self.robot = robot
        self.index = index

    @property
    def bank(self) -> int:
        """Get the bank the servo is in

        Returns:
            int: Bank number
        """
        return self.index // 16

    @property
    def angle(self) -> int:
        """Get the optimistic current servo angle

        Returns:
            int: Angle in degrees
        """
        return self.robot.get_state().servos.angles[self.index]

    @angle.setter
    def angle(self, angle: int):
        """Set the optimistic servo angle

        Args:
            angle (int): Angle in degrees
        """
        if isinstance(self.robot, SerialKevinbot):
            self.robot.send(f"s={self.index},{angle}")
        elif isinstance(self.robot, MqttKevinbot):
            self.robot.client.publish(f"{self.robot.root_topic}/servo/set", f"{self.index},{angle}", 0)
        else:
            return

        self.robot.get_state().servos.angles[self.index] = angle

bank: int property

Get the bank the servo is in

Returns:

Name Type Description
int int

Bank number

angle: int property writable

Get the optimistic current servo angle

Returns:

Name Type Description
int int

Angle in degrees

Servos

Bases: BaseKevinbotSubsystem

Servo subsystem for Kevinbot

Source code in src/kevinbotlib/core.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
class Servos(BaseKevinbotSubsystem):
    """Servo subsystem for Kevinbot"""

    def __len__(self) -> int:
        """Length will always be 32 since the P2 Kevinbot Board can only control 32

        Returns:
            int: Number of servos in the subsystem
        """
        return 32

    def __iter__(self):
        for i in range(self.__len__()):
            yield Servo(self.robot, i)

    def __getitem__(self, index: int):
        if index > self.__len__():
            msg = f"Servo index {index} > {self.__len__()}"
            raise IndexError(msg)
        if index < 0:
            msg = f"Servo index {index} < 0"
            raise IndexError(msg)
        return Servo(self.robot, index)

    def get_servo(self, channel: int) -> Servo:
        """Get an individual servo in the subsystem

        Args:
            channel (int): PWM Port

        Returns:
            Servo: Individual servo
        """
        if channel > self.__len__() or channel < 0:
            msg = f"Servo channel {channel} is out of bounds."
            raise IndexError(msg)
        return Servo(self.robot, channel)

    @property
    def all(self) -> int:
        if all(i == self.robot.get_state().servos.angles[0] for i in self.robot.get_state().servos.angles):
            return self.robot.get_state().servos.angles[0]
        return -1

    @all.setter
    def all(self, angle: int):
        self.robot.send(f"servo.all={angle}")
        self.robot.get_state().servos.angles = [angle] * self.__len__()

__len__()

Length will always be 32 since the P2 Kevinbot Board can only control 32

Returns:

Name Type Description
int int

Number of servos in the subsystem

Source code in src/kevinbotlib/core.py
581
582
583
584
585
586
587
def __len__(self) -> int:
    """Length will always be 32 since the P2 Kevinbot Board can only control 32

    Returns:
        int: Number of servos in the subsystem
    """
    return 32

get_servo(channel)

Get an individual servo in the subsystem

Parameters:

Name Type Description Default
channel int

PWM Port

required

Returns:

Name Type Description
Servo Servo

Individual servo

Source code in src/kevinbotlib/core.py
602
603
604
605
606
607
608
609
610
611
612
613
614
def get_servo(self, channel: int) -> Servo:
    """Get an individual servo in the subsystem

    Args:
        channel (int): PWM Port

    Returns:
        Servo: Individual servo
    """
    if channel > self.__len__() or channel < 0:
        msg = f"Servo channel {channel} is out of bounds."
        raise IndexError(msg)
    return Servo(self.robot, channel)

Lighting

Bases: BaseKevinbotSubsystem

Lighting subsystem for Kevinbot

Source code in src/kevinbotlib/core.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
class Lighting(BaseKevinbotSubsystem):
    """Lighting subsystem for Kevinbot"""

    class Channel(Enum):
        """Lighting segment identifier"""

        Head = "head"
        Body = "body"
        Base = "base"

    def get_state(self) -> LightingState:
        """Get the state of the robot's light segments

        Returns:
            LightingState: State
        """
        return self.robot.get_state().lighting

    def set_cam_brightness(self, brightness: int):
        """Set brightness of camera illumination

        Args:
            brightness (int): Brightness from 0 to 255
        """
        self.robot.send(f"lighting.cam.bright={brightness}")
        self.robot.get_state().lighting.camera = brightness

    def set_brightness(self, channel: Channel, brightness: int):
        """Set the brightness of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            brightness (int): Brightness from 0 to 255
        """
        self.robot.send(f"lighting.{channel.value}.bright={brightness}")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_bright = brightness
            case self.Channel.Body:
                self.robot.get_state().lighting.body_bright = brightness
            case self.Channel.Head:
                self.robot.get_state().lighting.head_bright = brightness

    def set_color1(self, channel: Channel, color: list[int] | tuple[int, int, int]):
        """Set the Color 1 of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            color (Iterable[int]): RGB Color values. Must have a length of 3
        """
        self.robot.send(f"lighting.{channel.value}.color1={color[0]:02x}{color[1]:02x}{color[2]:02x}00")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_color1 = list(color)
            case self.Channel.Body:
                self.robot.get_state().lighting.base_color1 = list(color)
            case self.Channel.Head:
                self.robot.get_state().lighting.base_color1 = list(color)

    def set_color2(self, channel: Channel, color: list[int] | tuple[int, int, int]):
        """Set the Color 2 of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            color (Iterable[int]): RGB Color values. Must have a length of 3
        """
        self.robot.send(f"lighting.{channel.value}.color2={color[0]:02x}{color[1]:02x}{color[2]:02x}00")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_color2 = list(color)
            case self.Channel.Body:
                self.robot.get_state().lighting.base_color2 = list(color)
            case self.Channel.Head:
                self.robot.get_state().lighting.base_color2 = list(color)

    def set_effect(self, channel: Channel, effect: str):
        """Set the animation of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            effect (str): Animation ID
        """
        self.robot.send(f"lighting.{channel.value}.effect={effect}")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_effect = effect
            case self.Channel.Body:
                self.robot.get_state().lighting.base_effect = effect
            case self.Channel.Head:
                self.robot.get_state().lighting.base_effect = effect

    def set_update(self, channel: Channel, update: int):
        """Set the animation of a lighting segment

        Args:
            channel (Channel): Base, Body, or Head
            update (int): Update rate (no fixed unit)
        """
        self.robot.send(f"lighting.{channel.value}.update={update}")
        match channel:
            case self.Channel.Base:
                self.robot.get_state().lighting.base_update = update
            case self.Channel.Body:
                self.robot.get_state().lighting.base_update = update
            case self.Channel.Head:
                self.robot.get_state().lighting.base_update = update

Channel

Bases: Enum

Lighting segment identifier

Source code in src/kevinbotlib/core.py
631
632
633
634
635
636
class Channel(Enum):
    """Lighting segment identifier"""

    Head = "head"
    Body = "body"
    Base = "base"

get_state()

Get the state of the robot's light segments

Returns:

Name Type Description
LightingState LightingState

State

Source code in src/kevinbotlib/core.py
638
639
640
641
642
643
644
def get_state(self) -> LightingState:
    """Get the state of the robot's light segments

    Returns:
        LightingState: State
    """
    return self.robot.get_state().lighting

set_cam_brightness(brightness)

Set brightness of camera illumination

Parameters:

Name Type Description Default
brightness int

Brightness from 0 to 255

required
Source code in src/kevinbotlib/core.py
646
647
648
649
650
651
652
653
def set_cam_brightness(self, brightness: int):
    """Set brightness of camera illumination

    Args:
        brightness (int): Brightness from 0 to 255
    """
    self.robot.send(f"lighting.cam.bright={brightness}")
    self.robot.get_state().lighting.camera = brightness

set_brightness(channel, brightness)

Set the brightness of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
brightness int

Brightness from 0 to 255

required
Source code in src/kevinbotlib/core.py
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
def set_brightness(self, channel: Channel, brightness: int):
    """Set the brightness of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        brightness (int): Brightness from 0 to 255
    """
    self.robot.send(f"lighting.{channel.value}.bright={brightness}")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_bright = brightness
        case self.Channel.Body:
            self.robot.get_state().lighting.body_bright = brightness
        case self.Channel.Head:
            self.robot.get_state().lighting.head_bright = brightness

set_color1(channel, color)

Set the Color 1 of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
color Iterable[int]

RGB Color values. Must have a length of 3

required
Source code in src/kevinbotlib/core.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
def set_color1(self, channel: Channel, color: list[int] | tuple[int, int, int]):
    """Set the Color 1 of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        color (Iterable[int]): RGB Color values. Must have a length of 3
    """
    self.robot.send(f"lighting.{channel.value}.color1={color[0]:02x}{color[1]:02x}{color[2]:02x}00")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_color1 = list(color)
        case self.Channel.Body:
            self.robot.get_state().lighting.base_color1 = list(color)
        case self.Channel.Head:
            self.robot.get_state().lighting.base_color1 = list(color)

set_color2(channel, color)

Set the Color 2 of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
color Iterable[int]

RGB Color values. Must have a length of 3

required
Source code in src/kevinbotlib/core.py
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
def set_color2(self, channel: Channel, color: list[int] | tuple[int, int, int]):
    """Set the Color 2 of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        color (Iterable[int]): RGB Color values. Must have a length of 3
    """
    self.robot.send(f"lighting.{channel.value}.color2={color[0]:02x}{color[1]:02x}{color[2]:02x}00")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_color2 = list(color)
        case self.Channel.Body:
            self.robot.get_state().lighting.base_color2 = list(color)
        case self.Channel.Head:
            self.robot.get_state().lighting.base_color2 = list(color)

set_effect(channel, effect)

Set the animation of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
effect str

Animation ID

required
Source code in src/kevinbotlib/core.py
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
def set_effect(self, channel: Channel, effect: str):
    """Set the animation of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        effect (str): Animation ID
    """
    self.robot.send(f"lighting.{channel.value}.effect={effect}")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_effect = effect
        case self.Channel.Body:
            self.robot.get_state().lighting.base_effect = effect
        case self.Channel.Head:
            self.robot.get_state().lighting.base_effect = effect

set_update(channel, update)

Set the animation of a lighting segment

Parameters:

Name Type Description Default
channel Channel

Base, Body, or Head

required
update int

Update rate (no fixed unit)

required
Source code in src/kevinbotlib/core.py
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def set_update(self, channel: Channel, update: int):
    """Set the animation of a lighting segment

    Args:
        channel (Channel): Base, Body, or Head
        update (int): Update rate (no fixed unit)
    """
    self.robot.send(f"lighting.{channel.value}.update={update}")
    match channel:
        case self.Channel.Base:
            self.robot.get_state().lighting.base_update = update
        case self.Channel.Body:
            self.robot.get_state().lighting.base_update = update
        case self.Channel.Head:
            self.robot.get_state().lighting.base_update = update

BaseKevinbotEyes

The base Kevinbot Eyes class.

Not to be used directly

Source code in src/kevinbotlib/eyes.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class BaseKevinbotEyes:
    """The base Kevinbot Eyes class.

    Not to be used directly
    """

    def __init__(self) -> None:
        self._state = KevinbotEyesState()
        self.type = KevinbotConnectionType.BASE

        self._auto_disconnect = True

    def get_state(self) -> KevinbotEyesState:
        """Gets the current state of the eyes

        Returns:
            KevinbotEyesState: State class
        """
        return self._state

    def disconnect(self):
        """Basic disconnect"""
        self._state.connected = False

    @property
    def auto_disconnect(self) -> bool:
        """Getter for auto disconnect state.

        Returns:
            bool: Whether to disconnect on application exit
        """
        return self._auto_disconnect

    @auto_disconnect.setter
    def auto_disconnect(self, value: bool):
        """Setter for auto disconnect.

        Args:
            value (bool): Whether to disconnect on application exit
        """
        self._auto_disconnect = value
        if value:
            atexit.register(self.disconnect)
        else:
            atexit.unregister(self.disconnect)

    def send(self, data: str):
        """Null implementation of the send method

        Args:
            data (str): Data to send nowhere

        Raises:
            NotImplementedError: Always raised
        """
        msg = f"Function not implemented, attempting to send {data}"
        raise NotImplementedError(msg)

auto_disconnect: bool property writable

Getter for auto disconnect state.

Returns:

Name Type Description
bool bool

Whether to disconnect on application exit

get_state()

Gets the current state of the eyes

Returns:

Name Type Description
KevinbotEyesState KevinbotEyesState

State class

Source code in src/kevinbotlib/eyes.py
31
32
33
34
35
36
37
def get_state(self) -> KevinbotEyesState:
    """Gets the current state of the eyes

    Returns:
        KevinbotEyesState: State class
    """
    return self._state

disconnect()

Basic disconnect

Source code in src/kevinbotlib/eyes.py
39
40
41
def disconnect(self):
    """Basic disconnect"""
    self._state.connected = False

send(data)

Null implementation of the send method

Parameters:

Name Type Description Default
data str

Data to send nowhere

required

Raises:

Type Description
NotImplementedError

Always raised

Source code in src/kevinbotlib/eyes.py
65
66
67
68
69
70
71
72
73
74
75
def send(self, data: str):
    """Null implementation of the send method

    Args:
        data (str): Data to send nowhere

    Raises:
        NotImplementedError: Always raised
    """
    msg = f"Function not implemented, attempting to send {data}"
    raise NotImplementedError(msg)

SerialEyes

Bases: BaseKevinbotEyes

The main serial Kevinbot Eyes class

Source code in src/kevinbotlib/eyes.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class SerialEyes(BaseKevinbotEyes):
    """The main serial Kevinbot Eyes class"""

    def __init__(self) -> None:
        super().__init__()
        self.type = KevinbotConnectionType.SERIAL

        self.serial: Serial | None = None
        self.rx_thread: Thread | None = None

        self._callback: Callable[[str, str | None], Any] | None = None

        atexit.register(self.disconnect)

    def connect(
        self,
        port: str,
        baud: int,
        timeout: float,
        ser_timeout: float = 0.5,
    ):
        """Start a connection with Kevinbot Eyes

        Args:
            port (str): Serial port to use (`/dev/ttyUSB0` is standard with the typical Kevinbot Hardware)
            baud (int): Baud rate to use (`115200` is typical for the defualt eye configs)
            timeout (float): Timeout for handshake
            ser_timeout (float, optional): Readline timeout, should be lower than `timeout`. Defaults to 0.5.

        Raises:
            HandshakeTimeoutException: Eyes didn't respond to the connection handshake before the timeout
        """
        serial = self._setup_serial(port, baud, ser_timeout)

        start_time = time.monotonic()
        while True:
            serial.write(b"connectionReady\n")

            line = serial.readline().decode("utf-8", errors="ignore").strip("\n")

            if line == "handshake.request":
                serial.write(b"getSettings=true\n")
                serial.write(b"handshake.complete\n")
                break

            if time.monotonic() - start_time > timeout:
                msg = "Handshake timed out"
                raise HandshakeTimeoutException(msg)

            time.sleep(0.1)  # Avoid spamming the connection

        # Data rx thread
        self.rx_thread = Thread(target=self._rx_loop, args=(serial, "="), daemon=True)
        self.rx_thread.name = "KevinbotLib.Eyes.Rx"
        self.rx_thread.start()

        self._state.connected = True

    def disconnect(self):
        super().disconnect()
        if self.serial and self.serial.is_open:
            self.send("resetConnection")
            self.serial.flush()
            self.serial.close()
        else:
            logger.warning("Already disconnected")

    def send(self, data: str):
        """Send a string through serial.

        Automatically adds a newline.

        Args:
            data (str): Data to send
        """
        self.raw_tx((data + "\n").encode("utf-8"))

    def raw_tx(self, data: bytes):
        """Send raw bytes over serial.

        Args:
            data (bytes): Raw data
        """
        if self.serial:
            self.serial.write(data)
        else:
            logger.warning(f"Couldn't transmit data: {data!r}, Eyes aren't connected")


    @property
    def callback(self) -> Callable[[str, str | None], Any] | None:
        return self._callback

    @callback.setter
    def callback(self, callback: Callable[[str, str | None], Any]) -> None:
        self._callback = callback

    def _setup_serial(self, port: str, baud: int, timeout: float = 1):
        self.serial = Serial(port, baud, timeout=timeout)
        return self.serial

    def _rx_loop(self, serial: Serial, delimeter: str = "="):
        while True:
            try:
                raw: bytes = serial.readline()
            except TypeError:
                # serial has been stopped
                return

            cmd: str = raw.decode("utf-8").split(delimeter, maxsplit=1)[0].strip().replace('\00', '')
            if not cmd:
                continue

            val: str | None = None
            if len(raw.decode("utf-8").split(delimeter)) > 1:
                val = raw.decode("utf-8").split(delimeter, maxsplit=1)[1].strip("\r\n").replace('\00', '')

            if cmd.startswith("eyeSettings."):
                # Remove prefix and split into path and value
                setting = cmd[len("eyeSettings."):]

                path = setting.split(".")

                if not val:
                    logger.error(f"Got eyeSettings command without a value: {cmd} :: {val}")
                    continue

                # Convert the value to appropriate type
                try:
                    # Handle array values [x, y]
                    if val.startswith("[") and val.endswith("]"):
                        value_str = val.strip("[]")
                        value = tuple(int(x.strip()) for x in value_str.split(","))
                    # Handle hex colors
                    elif val.startswith("#"):
                        value = val
                    # Handle quoted strings
                    elif val.startswith('"') and val.endswith('"'):
                        value = val.strip('"')
                    # Handle numbers
                    else:
                        try:
                            value = int(val)
                        except ValueError:
                            value = val
                except Exception as e:
                    raise ValueError(f"Failed to parse value '{value_str}': {e!s}")

                # Create a dict representation of the settings
                settings_dict = self._state.settings.model_dump()

                # Navigate to the correct nested dictionary
                current_dict = settings_dict
                for i, key in enumerate(path[:-1]):
                    if key not in current_dict:
                        raise ValueError(f"Invalid path: {'.'.join(path[:i+1])}")
                    if not isinstance(current_dict[key], dict):
                        raise ValueError(f"Cannot navigate through non-dict value at {'.'.join(path[:i+1])}")
                    current_dict = current_dict[key]

                # Update the value
                if path[-1] not in current_dict:
                    raise ValueError(f"Invalid setting: {'.'.join(path)}")
                current_dict[path[-1]] = value

                # Create new settings instance with updated values
                self._state.settings = EyeSettings.model_validate(settings_dict)

            if self.callback:
                self.callback(cmd, val)

    def set_skin(self, skin: Skin):
        """Set the current skin

        Args:
            skin (int): Skin index
        """
        self._state.settings.states.page = skin
        self.send(f"setState={skin.value}")

connect(port, baud, timeout, ser_timeout=0.5)

Start a connection with Kevinbot Eyes

Parameters:

Name Type Description Default
port str

Serial port to use (/dev/ttyUSB0 is standard with the typical Kevinbot Hardware)

required
baud int

Baud rate to use (115200 is typical for the defualt eye configs)

required
timeout float

Timeout for handshake

required
ser_timeout float

Readline timeout, should be lower than timeout. Defaults to 0.5.

0.5

Raises:

Type Description
HandshakeTimeoutException

Eyes didn't respond to the connection handshake before the timeout

Source code in src/kevinbotlib/eyes.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def connect(
    self,
    port: str,
    baud: int,
    timeout: float,
    ser_timeout: float = 0.5,
):
    """Start a connection with Kevinbot Eyes

    Args:
        port (str): Serial port to use (`/dev/ttyUSB0` is standard with the typical Kevinbot Hardware)
        baud (int): Baud rate to use (`115200` is typical for the defualt eye configs)
        timeout (float): Timeout for handshake
        ser_timeout (float, optional): Readline timeout, should be lower than `timeout`. Defaults to 0.5.

    Raises:
        HandshakeTimeoutException: Eyes didn't respond to the connection handshake before the timeout
    """
    serial = self._setup_serial(port, baud, ser_timeout)

    start_time = time.monotonic()
    while True:
        serial.write(b"connectionReady\n")

        line = serial.readline().decode("utf-8", errors="ignore").strip("\n")

        if line == "handshake.request":
            serial.write(b"getSettings=true\n")
            serial.write(b"handshake.complete\n")
            break

        if time.monotonic() - start_time > timeout:
            msg = "Handshake timed out"
            raise HandshakeTimeoutException(msg)

        time.sleep(0.1)  # Avoid spamming the connection

    # Data rx thread
    self.rx_thread = Thread(target=self._rx_loop, args=(serial, "="), daemon=True)
    self.rx_thread.name = "KevinbotLib.Eyes.Rx"
    self.rx_thread.start()

    self._state.connected = True

send(data)

Send a string through serial.

Automatically adds a newline.

Parameters:

Name Type Description Default
data str

Data to send

required
Source code in src/kevinbotlib/eyes.py
145
146
147
148
149
150
151
152
153
def send(self, data: str):
    """Send a string through serial.

    Automatically adds a newline.

    Args:
        data (str): Data to send
    """
    self.raw_tx((data + "\n").encode("utf-8"))

raw_tx(data)

Send raw bytes over serial.

Parameters:

Name Type Description Default
data bytes

Raw data

required
Source code in src/kevinbotlib/eyes.py
155
156
157
158
159
160
161
162
163
164
def raw_tx(self, data: bytes):
    """Send raw bytes over serial.

    Args:
        data (bytes): Raw data
    """
    if self.serial:
        self.serial.write(data)
    else:
        logger.warning(f"Couldn't transmit data: {data!r}, Eyes aren't connected")

set_skin(skin)

Set the current skin

Parameters:

Name Type Description Default
skin int

Skin index

required
Source code in src/kevinbotlib/eyes.py
249
250
251
252
253
254
255
256
def set_skin(self, skin: Skin):
    """Set the current skin

    Args:
        skin (int): Skin index
    """
    self._state.settings.states.page = skin
    self.send(f"setState={skin.value}")

WirelessRadio

Bases: BaseKevinbotSubsystem

Source code in src/kevinbotlib/xbee.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class WirelessRadio(BaseKevinbotSubsystem):
    def __init__(self, robot: SerialKevinbot, port: str, baud: int, api: int, timeout: float) -> None:
        """Initialize Kevinbot Wireless Radio (XBee)

        Args:
            robot (Kevinbot): The main robot class
            port (str): Serial port to connect to `/dev/ttyAMA0` for typical Kevinbot hardware
            baud (int): Baud rate for serial interface `921600` for typical Kevinbot configs
            api (int): API mode for xbee interface `2` for typical Kevinbot configs (`0` isn't supported yet)
            timeout (float): Timeout for serial operations
        """
        super().__init__(robot)

        if api not in [1, 2]:
            logger.error(f"XBee API Mode {api} isn't supported. Assuming API escaped (2)")
            api = 2

        self.callback: Callable | None = None

        self.serial = Serial(port, baud, timeout=timeout)
        self.xbee = xbee.XBee(self.serial, callback=self.callback)

    def get(self) -> dict:
        """Get the latest packet (blocking)

        Returns:
            dict: Data packet
        """
        return self.xbee.wait_read_frame()

    def disconnect(self):
        """Disconnect robot radio, and halt processing"""
        self.xbee.halt()
        self.serial.close()

__init__(robot, port, baud, api, timeout)

Initialize Kevinbot Wireless Radio (XBee)

Parameters:

Name Type Description Default
robot Kevinbot

The main robot class

required
port str

Serial port to connect to /dev/ttyAMA0 for typical Kevinbot hardware

required
baud int

Baud rate for serial interface 921600 for typical Kevinbot configs

required
api int

API mode for xbee interface 2 for typical Kevinbot configs (0 isn't supported yet)

required
timeout float

Timeout for serial operations

required
Source code in src/kevinbotlib/xbee.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self, robot: SerialKevinbot, port: str, baud: int, api: int, timeout: float) -> None:
    """Initialize Kevinbot Wireless Radio (XBee)

    Args:
        robot (Kevinbot): The main robot class
        port (str): Serial port to connect to `/dev/ttyAMA0` for typical Kevinbot hardware
        baud (int): Baud rate for serial interface `921600` for typical Kevinbot configs
        api (int): API mode for xbee interface `2` for typical Kevinbot configs (`0` isn't supported yet)
        timeout (float): Timeout for serial operations
    """
    super().__init__(robot)

    if api not in [1, 2]:
        logger.error(f"XBee API Mode {api} isn't supported. Assuming API escaped (2)")
        api = 2

    self.callback: Callable | None = None

    self.serial = Serial(port, baud, timeout=timeout)
    self.xbee = xbee.XBee(self.serial, callback=self.callback)

get()

Get the latest packet (blocking)

Returns:

Name Type Description
dict dict

Data packet

Source code in src/kevinbotlib/xbee.py
35
36
37
38
39
40
41
def get(self) -> dict:
    """Get the latest packet (blocking)

    Returns:
        dict: Data packet
    """
    return self.xbee.wait_read_frame()

disconnect()

Disconnect robot radio, and halt processing

Source code in src/kevinbotlib/xbee.py
43
44
45
46
def disconnect(self):
    """Disconnect robot radio, and halt processing"""
    self.xbee.halt()
    self.serial.close()

CoreErrors

Bases: Enum

Kevinbot Core Error States

Source code in src/kevinbotlib/states.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class CoreErrors(Enum):
    """
    Kevinbot Core Error States
    """

    OK = 0
    """No errors are present"""
    UNKNOWN = 1
    """Error state unknown"""
    OW_SHORT = 2
    """One-Wire bus is shorted"""
    OW_ERROR = 3
    """One-Wire bus error"""
    OW_DNF = 4
    """One-Wire device not found"""
    LCD_INIT_FAIL = 5
    """LCD Init failed"""
    PCA_INIT_FAIL = 6
    """PCA9685 (servos) init fail"""
    TICK_FAIL = 7
    """Failure to recieve core tick"""
    QUEUE_OVERRUN = 8
    """Serial queue overrun"""
    ESTOP = 9
    """Core is in E-Stop state"""
    BME_CHIP_ID = 10
    """Error getting environment sensor chip id"""
    BME_CALIB_NVM = 11
    """Error with environment sensor calibration"""
    BME_CALIB_TP = 12
    """Error with environment sensor calibration"""
    BME_CALIB_HUM = 13
    """Error with environment sensor calibration"""
    BME_THP = 14
    """Error with environment sensor"""
    BME_MEAS_TIMEOUT = 15
    """Timeout with environment sensor measurement"""
    BME_NOT_NORMAL_MODE = 16
    """Environemnt sensor is not in normal mode"""
    BATT1_UV = 17
    """Battery #1 Undervoltage"""
    BATT1_OV = 18
    """Battery #1 Overvoltage"""
    BATT2_UV = 19
    """Battery #2 Undervoltage"""
    BATT2_OV = 20
    """Battery #2 Overvoltage"""
    BATT_UV = 21
    """Battery Undervoltage (single battery mode)"""
    BATT_OV = 22
    """Battery Overvoltage (single battery mode)"""

OK = 0 class-attribute instance-attribute

No errors are present

UNKNOWN = 1 class-attribute instance-attribute

Error state unknown

OW_SHORT = 2 class-attribute instance-attribute

One-Wire bus is shorted

OW_ERROR = 3 class-attribute instance-attribute

One-Wire bus error

OW_DNF = 4 class-attribute instance-attribute

One-Wire device not found

LCD_INIT_FAIL = 5 class-attribute instance-attribute

LCD Init failed

PCA_INIT_FAIL = 6 class-attribute instance-attribute

PCA9685 (servos) init fail

TICK_FAIL = 7 class-attribute instance-attribute

Failure to recieve core tick

QUEUE_OVERRUN = 8 class-attribute instance-attribute

Serial queue overrun

ESTOP = 9 class-attribute instance-attribute

Core is in E-Stop state

BME_CHIP_ID = 10 class-attribute instance-attribute

Error getting environment sensor chip id

BME_CALIB_NVM = 11 class-attribute instance-attribute

Error with environment sensor calibration

BME_CALIB_TP = 12 class-attribute instance-attribute

Error with environment sensor calibration

BME_CALIB_HUM = 13 class-attribute instance-attribute

Error with environment sensor calibration

BME_THP = 14 class-attribute instance-attribute

Error with environment sensor

BME_MEAS_TIMEOUT = 15 class-attribute instance-attribute

Timeout with environment sensor measurement

BME_NOT_NORMAL_MODE = 16 class-attribute instance-attribute

Environemnt sensor is not in normal mode

BATT1_UV = 17 class-attribute instance-attribute

Battery #1 Undervoltage

BATT1_OV = 18 class-attribute instance-attribute

Battery #1 Overvoltage

BATT2_UV = 19 class-attribute instance-attribute

Battery #2 Undervoltage

BATT2_OV = 20 class-attribute instance-attribute

Battery #2 Overvoltage

BATT_UV = 21 class-attribute instance-attribute

Battery Undervoltage (single battery mode)

BATT_OV = 22 class-attribute instance-attribute

Battery Overvoltage (single battery mode)

MotorDriveStatus

Bases: Enum

The status of each motor in the drivebase

Source code in src/kevinbotlib/states.py
63
64
65
66
67
68
69
70
71
72
73
74
75
class MotorDriveStatus(Enum):
    """
    The status of each motor in the drivebase
    """

    UNKNOWN = 10
    """Motor status is unknown"""
    MOVING = 11
    """Motor is rotating"""
    HOLDING = 12
    """Motor is holding at position"""
    OFF = 13
    """Motor is off"""

UNKNOWN = 10 class-attribute instance-attribute

Motor status is unknown

MOVING = 11 class-attribute instance-attribute

Motor is rotating

HOLDING = 12 class-attribute instance-attribute

Motor is holding at position

OFF = 13 class-attribute instance-attribute

Motor is off

BmsBatteryState

Bases: Enum

The status of a single battery attached to the BMS

Source code in src/kevinbotlib/states.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class BmsBatteryState(Enum):
    """
    The status of a single battery attached to the BMS
    """

    UNKNOWN = 0
    """State is unknown (usually at bootup)"""
    NORMAL = 1
    """Battery is normal"""
    UNDER = 2
    """Battery is undervoltage"""
    OVER = 3
    """Battery is overvoltage"""
    STOPPED = 4  # Stopped state if BMS driver crashed
    """BMS has crashed or stopped"""

UNKNOWN = 0 class-attribute instance-attribute

State is unknown (usually at bootup)

NORMAL = 1 class-attribute instance-attribute

Battery is normal

UNDER = 2 class-attribute instance-attribute

Battery is undervoltage

OVER = 3 class-attribute instance-attribute

Battery is overvoltage

STOPPED = 4 class-attribute instance-attribute

BMS has crashed or stopped

DrivebaseState

Bases: BaseModel

The state of the drivebase as a whole

Source code in src/kevinbotlib/states.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class DrivebaseState(BaseModel):
    """The state of the drivebase as a whole"""

    left_power: int = 0
    """Current power of the left motor"""
    right_power: int = 0
    """Current power of the right motor"""
    amps: list[float] = Field(default_factory=lambda: [0, 0])
    """Current amps for both motors"""
    watts: list[float] = Field(default_factory=lambda: [0, 0])
    """Current watts for both motors"""
    status: list[MotorDriveStatus] = Field(default_factory=lambda: [MotorDriveStatus.UNKNOWN, MotorDriveStatus.UNKNOWN])
    """Current status for both motors"""

left_power: int = 0 class-attribute instance-attribute

Current power of the left motor

right_power: int = 0 class-attribute instance-attribute

Current power of the right motor

amps: list[float] = Field(default_factory=lambda: [0, 0]) class-attribute instance-attribute

Current amps for both motors

watts: list[float] = Field(default_factory=lambda: [0, 0]) class-attribute instance-attribute

Current watts for both motors

status: list[MotorDriveStatus] = Field(default_factory=lambda: [MotorDriveStatus.UNKNOWN, MotorDriveStatus.UNKNOWN]) class-attribute instance-attribute

Current status for both motors

ServoState

Bases: BaseModel

The state of the servo subsystem

Source code in src/kevinbotlib/states.py
110
111
112
113
class ServoState(BaseModel):
    """The state of the servo subsystem"""

    angles: list[int] = Field(default_factory=lambda: [-1] * 32)

BMState

Bases: BaseModel

The state of the BMS (Battery Management System)

Source code in src/kevinbotlib/states.py
116
117
118
119
120
121
class BMState(BaseModel):
    """The state of the BMS (Battery Management System)"""

    voltages: list[float] = Field(default_factory=lambda: [0.0, 0.0])
    raw_voltages: list[float] = Field(default_factory=lambda: [0.0, 0.0])
    states: list[BmsBatteryState] = Field(default_factory=lambda: [BmsBatteryState.UNKNOWN, BmsBatteryState.UNKNOWN])

IMUState

Bases: BaseModel

The state of the IMU (Inertial Measurement System)

Source code in src/kevinbotlib/states.py
124
125
126
127
128
class IMUState(BaseModel):
    """The state of the IMU (Inertial Measurement System)"""

    accel: list[int] = Field(default_factory=lambda: [-1] * 3)  # X Y Z
    gyro: list[int] = Field(default_factory=lambda: [-1] * 3)  # R P Y

ThermometerState

Bases: BaseModel

The state of the DS18B20 Thermometers (does not include BME280)

Source code in src/kevinbotlib/states.py
131
132
133
134
135
136
class ThermometerState(BaseModel):
    """The state of the DS18B20 Thermometers (does not include BME280)"""

    left_motor: float = -1
    right_motor: float = -1
    internal: float = -1

EnviroState

Bases: BaseModel

The state of the BME280 Envoronmental sensor

Source code in src/kevinbotlib/states.py
139
140
141
142
143
144
class EnviroState(BaseModel):
    """The state of the BME280 Envoronmental sensor"""

    temperature: float = -1
    humidity: float = 0
    pressure: int = 0

LightingState

Bases: BaseModel

The state of Kevinbot's led segments

Source code in src/kevinbotlib/states.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class LightingState(BaseModel):
    """The state of Kevinbot's led segments"""

    camera: int = 0
    head_effect: str = "unknown"
    head_bright: int = 0
    head_update: int = -1
    head_color1: list[int] = Field(default=[0, 0, 0], min_length=3)
    head_color2: list[int] = Field(default=[0, 0, 0], min_length=3)
    body_effect: str = "unknown"
    body_bright: int = 0
    body_update: int = -1
    body_color1: list[int] = Field(default=[0, 0, 0], min_length=3)
    body_color2: list[int] = Field(default=[0, 0, 0], min_length=3)
    base_effect: str = "unknown"
    base_bright: int = 0
    base_update: int = -1
    base_color1: list[int] = Field(default=[0, 0, 0], min_length=3)
    base_color2: list[int] = Field(default=[0, 0, 0], min_length=3)

KevinbotState

Bases: BaseModel

The state of the robot as a whole

Source code in src/kevinbotlib/states.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class KevinbotState(BaseModel):
    """The state of the robot as a whole"""

    connected: bool = False
    enabled: bool = False
    error: CoreErrors = CoreErrors.OK
    estop: bool = False
    uptime: int = 0
    uptime_ms: int = 0
    motion: DrivebaseState = Field(default_factory=DrivebaseState)
    servos: ServoState = Field(default_factory=ServoState)
    battery: BMState = Field(default_factory=BMState)
    imu: IMUState = Field(default_factory=IMUState)
    thermal: ThermometerState = Field(default_factory=ThermometerState)
    enviro: EnviroState = Field(default_factory=EnviroState)
    lighting: LightingState = Field(default_factory=LightingState)

KevinbotEyesState

Bases: BaseModel

The state of the eye system

Source code in src/kevinbotlib/states.py
263
264
265
266
267
class KevinbotEyesState(BaseModel):
    """The state of the eye system"""

    connected: bool = False
    settings: EyeSettings = EyeSettings()

KevinbotServerState

Bases: BaseModel

The state system used internally in the Kevinbot Server

Source code in src/kevinbotlib/states.py
270
271
272
273
274
275
276
277
class KevinbotServerState(BaseModel):
    """The state system used internally in the Kevinbot Server"""

    mqtt_connected: bool = False
    clients: int = 0
    connected_cids: list[str] = []
    last_driver_cid: str | None = None
    driver_cid: str | None = None

HandshakeTimeoutException

Bases: BaseException

Exception that is produced when the connection handshake times out

Source code in src/kevinbotlib/exceptions.py
6
7
class HandshakeTimeoutException(BaseException):
    """Exception that is produced when the connection handshake times out"""

Configuration manager for KevinbotLib

ConfigLocation

Bases: Enum

Enum to represent the location of the config file

Source code in src/kevinbotlib/config.py
14
15
16
17
18
19
20
21
class ConfigLocation(Enum):
    """Enum to represent the location of the config file"""

    USER = "user"
    SYSTEM = "system"
    AUTO = "auto"
    NONE = "none"
    MANUAL = "manual"

KevinbotConfig

Source code in src/kevinbotlib/config.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class KevinbotConfig:
    def __init__(self, location: ConfigLocation = ConfigLocation.AUTO, path: str | Path | None = None):
        self.config_location = location

        self.user_config_path = Path(user_config_dir("kevinbotlib")) / "settings.yaml"
        self.system_config_path = Path(site_config_dir("kevinbotlib")) / "settings.yaml"

        self.manual_path: Path | None = None
        if path:
            self.manual_path = Path(path)

        self.config_path = self._get_config_path()

        self.config: dict = {}

        self.mqtt: _MQTT = _MQTT({}, self)
        self.core: _Core = _Core({}, self)
        self.xbee: _XBee = _XBee({}, self)
        self.server: _Server = _Server({}, self)

        self.load()

    def _get_config_path(self) -> Path | None:
        """Get the optimal configuration path

        Returns:
            Path | None: File location
        """
        if self.config_location == ConfigLocation.NONE:
            return None
        if self.config_location == ConfigLocation.MANUAL:
            if self.manual_path:
                return Path(self.manual_path)
            logger.warning("ConfigLocation.MANUAL set without config path, defaulting to ConfigLocation.NONE")
            return None  # should never happen
        if self.config_location == ConfigLocation.USER:
            return self.user_config_path
        if self.config_location == ConfigLocation.SYSTEM:
            return self.system_config_path
        # AUTO: Prefer user, else system, if none, return user
        if self.user_config_path.exists():
            return self.user_config_path
        if self.system_config_path.exists():
            return self.system_config_path
        return self.user_config_path

    def load(self) -> None:
        if self.config_path and self.config_path.exists():
            with open(self.config_path) as file:
                self.config = yaml.safe_load(file) or {}

        self.mqtt = _MQTT(self.config.get("mqtt", {}), self)
        self.core = _Core(self.config.get("core", {}), self)
        self.xbee = _XBee(self.config.get("xbee", {}), self)
        self.server = _Server(self.config.get("server", {}), self)

    def save(self) -> None:
        if self.config_path:
            with open(self.config_path, "w") as file:
                yaml.dump(self._get_data(), file, default_flow_style=False)
        else:
            logger.error("Couldn't save configuration to empty path")

    def dump(self) -> str:
        """Dump configuration

        Returns:
            str: YAML
        """
        return yaml.dump(self._get_data(), default_flow_style=False)

    def _get_data(self):
        return {
            "mqtt": self.mqtt.data,
            "core": self.core.data,
            "xbee": self.xbee.data,
            "server": self.server.data,
        }

    def __repr__(self):
        return f"{super().__repr__()}\n\n{yaml.dump(self._get_data(), default_flow_style=False)}"

dump()

Dump configuration

Returns:

Name Type Description
str str

YAML

Source code in src/kevinbotlib/config.py
250
251
252
253
254
255
256
def dump(self) -> str:
    """Dump configuration

    Returns:
        str: YAML
    """
    return yaml.dump(self._get_data(), default_flow_style=False)

KevinbotLib Robot Server Allow accessing KevinbotLib APIs over MQTT and XBee API Mode