Skip to content

The Command Scheduler

Architecture Diagram

scheduler-diagram-dark.svg scheduler-diagram-light.svg

Why use the command scheduler?

The command scheduler offers a unique way to run commands in a robot program. Commands can be run in parallel, sequentially, or in the main scheduler FIFO queue. This allows for more flexibility when compared with traditional linear programming. Commands may be scheduled using a trigger.

Examples

Basic Usage

examples/scheduler/basic_example.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from kevinbotlib.scheduler import Command, CommandScheduler


class PrintCommand(Command):
    def __init__(self, message: str):
        self.message = message
        self._finished = False

    def init(self):
        print(f"Initializing: {self.message}")

    def execute(self):
        print(self.message)
        self._finished = True

    def end(self):
        print(f"Ending: {self.message}")

    def finished(self):
        return self._finished


scheduler = CommandScheduler()
scheduler.schedule(PrintCommand("Test"))
scheduler.schedule(PrintCommand("Test2"))
scheduler.iterate()

Parallel Commands

examples/scheduler/parallel.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time

from kevinbotlib.scheduler import Command, CommandScheduler, ParallelCommand


class PrintForOneSecondCommand(Command):
    def __init__(self, message: str):
        self.message = message
        self._finished = False
        self.start = time.time()

    def init(self):
        self.start = time.time()
        print(f"Initializing: {self.message}")

    def execute(self):
        print(self.message)

    def end(self):
        print(f"Ending: {self.message}")

    def finished(self):
        return time.time() > self.start + 1


start_time = time.time()


scheduler = CommandScheduler()
scheduler.schedule(ParallelCommand([PrintForOneSecondCommand("command 1"), PrintForOneSecondCommand("command 2")]))

while True:
    scheduler.iterate()
    time.sleep(0.1)

Sequential Commands

examples/scheduler/sequential.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time

from kevinbotlib.scheduler import Command, CommandScheduler, SequentialCommand


class PrintForOneSecondCommand(Command):
    def __init__(self, message: str):
        self.message = message
        self._finished = False
        self.start = time.time()

    def init(self):
        self.start = time.time()
        print(f"Initializing: {self.message}")

    def execute(self):
        print(self.message)

    def end(self):
        print(f"Ending: {self.message}")

    def finished(self):
        return time.time() > self.start + 1


start_time = time.time()


scheduler = CommandScheduler()
scheduler.schedule(SequentialCommand([PrintForOneSecondCommand("command 1"), PrintForOneSecondCommand("command 2")]))

while True:
    scheduler.iterate()
    time.sleep(0.1)

Conditionally Forked Commands

examples/scheduler/conditionally_forked.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from kevinbotlib.scheduler import Command, CommandScheduler, ConditionallyForkedCommand


class PrintCommand(Command):
    def __init__(self, message: str):
        self.message = message
        self._finished = False

    def init(self):
        print(f"Initializing: {self.message}")

    def execute(self):
        print(self.message)
        self._finished = True

    def end(self):
        print(f"Ending: {self.message}")

    def finished(self):
        return self._finished


if __name__ == "__main__":
    scheduler = CommandScheduler()
    scheduler.schedule(
        ConditionallyForkedCommand(lambda: True, PrintCommand("Condition Met"), PrintCommand("Condition Not Met"))
    )
    scheduler.schedule(
        ConditionallyForkedCommand(lambda: False, PrintCommand("Condition Met"), PrintCommand("Condition Not Met"))
    )

    for _ in range(2):
        scheduler.iterate()

Named Controller Command Trigger

examples/scheduler/joystick_trigger.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import time

from kevinbotlib.joystick import LocalNamedController
from kevinbotlib.logger import Logger, LoggerConfiguration
from kevinbotlib.scheduler import Command, CommandScheduler

logger = Logger()
logger.configure(LoggerConfiguration())


class PrintCommand(Command):
    def __init__(self, message: str):
        self.message = message
        self._finished = False

    def init(self):
        print(f"Initializing: {self.message}")

    def execute(self):
        print(self.message)
        self._finished = True

    def end(self):
        print(f"Ending: {self.message}")

    def finished(self):
        return self._finished


class PrintForOneSecondCommand(Command):
    def __init__(self, message: str):
        self.message = message
        self._finished = False
        self.start = time.time()

    def init(self):
        self.start = time.time()
        print(f"Initializing: {self.message}")

    def execute(self):
        print(self.message)

    def end(self):
        print(f"Ending: {self.message}")

    def finished(self):
        return time.time() > self.start + 1


start_time = time.time()


scheduler = CommandScheduler()

controller = LocalNamedController(0)
controller.start_polling()

controller.command.a().while_true(PrintForOneSecondCommand("A Button Command"))
controller.command.b().on_true(PrintForOneSecondCommand("B Button Command"))
controller.command.x().on_true(PrintCommand("X Button Command"))

while True:
    scheduler.iterate()
    time.sleep(0.1)