Skip to content

Vision

The KevinbotLib Vision System is a pipeline-based computer vision framework building on top of OpenCV.

Cameras

A KevinbotLib Camera will retrieve frames from a connected camera and can set its resolution and frame rate.

KevinbotLib includes two built-in camera connectors, CameraByIndex and CameraByDevicePath.

Creating Custom Cameras

Custom camera implementations must extend BaseCamera for proper simulation support. See more info here.

Pipelines

A pipeline will take in camera frames (or another video source), and produce output calculations as well as a frame for visualization.

vision-pipeline-diagram-dark.svg vision-pipeline-diagram-dark.svg

KevinbotLib includes one basic pipeline, the EmptyPipeline. The EmptyPipeline will output exactly what is put in.

Comms, Encoding, and Decoding

The KevinbotLib Vision Systems includes comm sendables and encode/decode support for (M)JPG and PNG

Warning

The sendables are not activated by default for performance reasons. Use the following to activate them before attempting to set/get video data.

from kevinbotlib.vision import VisionCommUtils
VisionCommUtils.init_comms_types(client)    

Examples

Pipeline

examples/vision/pipeline.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from collections.abc import Callable
from typing import Any

import cv2
from cv2.typing import MatLike

from kevinbotlib.logger import Logger, LoggerConfiguration
from kevinbotlib.vision import CameraByIndex, VisionPipeline

Logger().configure(LoggerConfiguration())

camera = CameraByIndex(None, 0)


class MyPipeline(VisionPipeline):
    def __init__(self, source: Callable[[], tuple[bool, MatLike]]):
        super().__init__(source)

    def run(self) -> tuple[bool, MatLike | None]:
        # convert to grayscale as an example
        inp = self.input_frame
        return inp[0], cv2.cvtColor(inp[1], cv2.COLOR_BGR2GRAY)

    def return_values(self) -> Any:
        return None


while True:
    pipeline = MyPipeline(camera.get_frame)
    ok, frame = pipeline.run()
    if ok:
        cv2.imshow("image", frame)
    if cv2.waitKey(1) == ord("q"):
        break

Note

The following examples require a running Redis server for RedisCommClient.

Sending a Video Stream over Redis

examples/vision/video_sender.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from kevinbotlib.comm.redis import RedisCommClient
from kevinbotlib.logger import FileLoggerConfig, Logger, LoggerConfiguration
from kevinbotlib.vision import (
    CameraByIndex,
    EmptyPipeline,
    FrameEncoders,
    MjpegStreamSendable,
    VisionCommUtils,
)

logger = Logger()
logger.configure(LoggerConfiguration(file_logger=FileLoggerConfig()))

client = RedisCommClient()
VisionCommUtils.init_comms_types(client)

client.connect()
client.wait_until_connected()

camera = CameraByIndex(None, 0)
camera.set_resolution(1920, 1080)

pipeline = EmptyPipeline(camera.get_frame)

while True:
    ok, frame = pipeline.run()
    if ok:
        encoded = FrameEncoders.encode_jpg(frame, 100)
        client.set(
            "streams/camera0",
            MjpegStreamSendable(value=encoded, quality=100, resolution=frame.shape[:2]),
        )

Receiving a Video Stream over Redis

examples/vision/video_rx.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import cv2

from kevinbotlib.comm.redis import RedisCommClient
from kevinbotlib.logger import Logger, LoggerConfiguration
from kevinbotlib.vision import FrameDecoders, MjpegStreamSendable, VisionCommUtils

logger = Logger()
logger.configure(LoggerConfiguration())
client = RedisCommClient("robot.local", 6379)
VisionCommUtils.init_comms_types(client)

client.connect()
client.wait_until_connected()

try:
    while True:
        sendable = client.get("streams/camera0", MjpegStreamSendable)
        if sendable:
            cv2.imshow("image", FrameDecoders.decode_sendable(sendable))
        cv2.waitKey(1)
except KeyboardInterrupt:
    client.close()

See also

Vision System Reference