Skip to content

Communication Examples

Setter

examples/comm/setter.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import time

from kevinbotlib.comm import IntegerSendable, RedisCommClient, StringSendable
from kevinbotlib.logger import Logger, LoggerConfiguration

logger = Logger()
logger.configure(LoggerConfiguration())

client = RedisCommClient()
client.connect()
client.wait_until_connected()

i = 0
try:
    while True:
        client.set("example/hierarchy/test", IntegerSendable(value=i))
        client.set("example/hierarchy/test2", StringSendable(value=f"demo {i}"))
        time.sleep(0.5)
        i += 1
except KeyboardInterrupt:
    client.close()

Getter

examples/comm/getter.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import time

from kevinbotlib.comm import IntegerSendable, RedisCommClient, StringSendable
from kevinbotlib.logger import Logger, LoggerConfiguration

logger = Logger()
logger.configure(LoggerConfiguration())

client = RedisCommClient()
client.connect()
client.wait_until_connected()

try:
    while True:
        print(client.get("example/hierarchy/test", IntegerSendable))
        print(client.get("example/hierarchy/test2", StringSendable))
        time.sleep(0.1)
except KeyboardInterrupt:
    client.close()

Getter with Hooks

examples/comm/getter_hooks.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time

from kevinbotlib.comm import IntegerSendable, RedisCommClient
from kevinbotlib.logger import Logger, LoggerConfiguration

logger = Logger()
logger.configure(LoggerConfiguration())

client = RedisCommClient()
client.connect()
client.wait_until_connected()


def hook(key, message) -> None:
    print(f"Received message from {key}: {message}")


client.add_hook("example/hierarchy/test", IntegerSendable, hook)

try:
    while True:
        time.sleep(0.1)
except KeyboardInterrupt:
    client.close()

Sendable Generator

examples/comm/sendable_generator.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import random
import time

from kevinbotlib.comm import (
    BaseSendable,
    IntegerSendable,
    RedisCommClient,
    SendableGenerator,
)
from kevinbotlib.logger import Logger, LoggerConfiguration

logger = Logger()
logger.configure(LoggerConfiguration())

client = RedisCommClient()
client.connect()
client.wait_until_connected()


class TestGenerator(SendableGenerator):
    def generate_sendable(self) -> BaseSendable:
        return IntegerSendable(value=random.randint(0, 100))


generator = TestGenerator()

try:
    while True:
        client.set("example/hierarchy/test", generator)
        time.sleep(0.5)
except KeyboardInterrupt:
    client.close()