Skip to content

Communication Examples

Setter

examples/comm/redis/setter.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time

from kevinbotlib.comm.redis import RedisCommClient
from kevinbotlib.comm.request import SetRequest
from kevinbotlib.comm.sendables import IntegerSendable, StringSendable
from kevinbotlib.logger import Logger, LoggerConfiguration

logger = Logger()
logger.configure(LoggerConfiguration())

client = RedisCommClient()
client.connect()
client.wait_until_connected()

i = 0
try:
    while True:
        # ! don't do this
        # client.set("example/hierarchy/test", IntegerSendable(value=i))
        # client.set("example/hierarchy/test2", StringSendable(value=f"demo {i}"))
        client.multi_set(
            [
                SetRequest("example/hierarchy/test", IntegerSendable(value=i)),
                SetRequest("example/hierarchy/test2", StringSendable(value=f"demo {i}")),
            ]
        )
        time.sleep(0.5)
        i += 1
except KeyboardInterrupt:
    client.close()

Getter

examples/comm/redis/getter.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time

from kevinbotlib.comm.redis import RedisCommClient
from kevinbotlib.comm.request import GetRequest
from kevinbotlib.comm.sendables import IntegerSendable, StringSendable
from kevinbotlib.logger import Logger, LoggerConfiguration

logger = Logger()
logger.configure(LoggerConfiguration())

client = RedisCommClient()
client.connect()
client.wait_until_connected()

try:
    while True:
        # ! don't do this
        # print(client.get("example/hierarchy/test", IntegerSendable))
        # print(client.get("example/hierarchy/test2", StringSendable))
        test, test2 = client.multi_get(
            [
                GetRequest("example/hierarchy/test", IntegerSendable),
                GetRequest("example/hierarchy/test2", StringSendable),
            ]
        )
        print(test)
        print(test2)
        time.sleep(0.1)
except KeyboardInterrupt:
    client.close()

Getter with Hooks

examples/comm/redis/getter_hooks.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time

from kevinbotlib.comm.redis import RedisCommClient
from kevinbotlib.comm.sendables import IntegerSendable
from kevinbotlib.logger import Logger, LoggerConfiguration

logger = Logger()
logger.configure(LoggerConfiguration())

client = RedisCommClient()
client.connect()
client.wait_until_connected()


def hook(key, message) -> None:
    print(f"Received message from {key}: {message}")


client.add_hook("example/hierarchy/test", IntegerSendable, hook)

try:
    while True:
        time.sleep(0.1)
except KeyboardInterrupt:
    client.close()

Sendable Generator

examples/comm/redis/sendable_generator.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import random
import time

from kevinbotlib.comm.redis import (
    RedisCommClient,
)
from kevinbotlib.comm.sendables import (
    BaseSendable,
    IntegerSendable,
    SendableGenerator,
)
from kevinbotlib.logger import Logger, LoggerConfiguration

logger = Logger()
logger.configure(LoggerConfiguration())

client = RedisCommClient()
client.connect()
client.wait_until_connected()


class TestGenerator(SendableGenerator):
    def generate_sendable(self) -> BaseSendable:
        return IntegerSendable(value=random.randint(0, 100))


generator = TestGenerator()

try:
    while True:
        client.set("example/hierarchy/test", generator)
        time.sleep(0.5)
except KeyboardInterrupt:
    client.close()

Robot Controller using Redis Comms

examples/robot/comms_robot.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from kevinbotlib.comm.request import SetRequest
from kevinbotlib.comm.sendables import (
    AnyListSendable,
    BooleanSendable,
    FloatSendable,
    IntegerSendable,
    StringSendable,
)
from kevinbotlib.logger import Level
from kevinbotlib.robot import BaseRobot


class DemoRobot(BaseRobot):
    def __init__(self):
        super().__init__(
            opmodes=[
                "TestOp1",
                "TestOp2",
                "TestOp3",
                "TestOp4",
            ],  # robot's operational modes
            log_level=Level.TRACE,  # lowset logging level
            enable_stderr_logger=True,
            cycle_time=20,  # loop our robot code 20x per second - it is recommended to run much higher in practice
            metrics_publish_timer=0,  # the test robot doesn't use metrics - see the metrics_robot.py example for a metrics usage example
        )

    def robot_start(self) -> None:  # runs once as the robot starts
        super().robot_start()
        print(
            "Starting robot..."
        )  # print statements are redirected to the KevinbotLib logging system - please don't do this in production

        self.comm_client.multi_set(
            [
                SetRequest("example/string", StringSendable(value="Hello World!")),
                SetRequest("example/integer", IntegerSendable(value=1234)),
                SetRequest("example/float", FloatSendable(value=1234.56)),
                SetRequest("example/list", AnyListSendable(value=[1, 2, 3, 4])),
                SetRequest("example/boolean", BooleanSendable(value=True)),
            ]
        )


if __name__ == "__main__":
    DemoRobot().run()