Vision
The KevinbotLib Vision System is a pipeline-based computer vision framework building on top of OpenCV .
Cameras
A KevinbotLib Camera will retrieve frames from a connected camera and can set its resolution and frame rate.
KevinbotLib includes two built-in camera connectors, CameraByIndex and CameraByDevicePath .
Creating Custom Cameras
Custom camera implementations must extend BaseCamera for proper simulation support.
See more info here .
Pipelines
A pipeline will take in camera frames (or another video source), and produce output calculations as well as a frame for visualization.
KevinbotLib includes one basic pipeline, the EmptyPipeline .
The EmptyPipeline will output exactly what is put in.
Comms, Encoding, and Decoding
The KevinbotLib Vision Systems includes comm sendables and encode/decode support for (M)JPG and PNG
Warning
The sendables are not activated by default for performance reasons.
Use the following to activate them before attempting to set/get video data.
from kevinbotlib.vision import VisionCommUtils
VisionCommUtils . init_comms_types ( client )
Examples
Pipeline
examples/vision/pipeline.py 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 from collections.abc import Callable
from typing import Any
import cv2
from cv2.typing import MatLike
from kevinbotlib.logger import Logger , LoggerConfiguration
from kevinbotlib.vision import CameraByIndex , VisionPipeline
Logger () . configure ( LoggerConfiguration ())
camera = CameraByIndex ( None , 0 )
class MyPipeline ( VisionPipeline ):
def __init__ ( self , source : Callable [[], tuple [ bool , MatLike ]]):
super () . __init__ ( source )
def run ( self ) -> tuple [ bool , MatLike | None ]:
# convert to grayscale as an example
inp = self . input_frame
return inp [ 0 ], cv2 . cvtColor ( inp [ 1 ], cv2 . COLOR_BGR2GRAY )
def return_values ( self ) -> Any :
return None
while True :
pipeline = MyPipeline ( camera . get_frame )
ok , frame = pipeline . run ()
if ok :
cv2 . imshow ( "image" , frame )
if cv2 . waitKey ( 1 ) == ord ( "q" ):
break
Note
The following examples require a running Redis server for RedisCommClient.
Sending a Video Stream over Redis
examples/vision/video_sender.py 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 from kevinbotlib.comm.redis import RedisCommClient
from kevinbotlib.logger import FileLoggerConfig , Logger , LoggerConfiguration
from kevinbotlib.vision import (
CameraByIndex ,
EmptyPipeline ,
FrameEncoders ,
MjpegStreamSendable ,
VisionCommUtils ,
)
logger = Logger ()
logger . configure ( LoggerConfiguration ( file_logger = FileLoggerConfig ()))
client = RedisCommClient ()
VisionCommUtils . init_comms_types ( client )
client . connect ()
client . wait_until_connected ()
camera = CameraByIndex ( None , 0 )
camera . set_resolution ( 1920 , 1080 )
pipeline = EmptyPipeline ( camera . get_frame )
while True :
ok , frame = pipeline . run ()
if ok :
encoded = FrameEncoders . encode_jpg ( frame , 100 )
client . set (
"streams/camera0" ,
MjpegStreamSendable ( value = encoded , quality = 100 , resolution = frame . shape [: 2 ]),
)
Receiving a Video Stream over Redis
examples/vision/video_rx.py 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import cv2
from kevinbotlib.comm.redis import RedisCommClient
from kevinbotlib.logger import Logger , LoggerConfiguration
from kevinbotlib.vision import FrameDecoders , MjpegStreamSendable , VisionCommUtils
logger = Logger ()
logger . configure ( LoggerConfiguration ())
client = RedisCommClient ( "robot.local" , 6379 )
VisionCommUtils . init_comms_types ( client )
client . connect ()
client . wait_until_connected ()
try :
while True :
sendable = client . get ( "streams/camera0" , MjpegStreamSendable )
if sendable :
cv2 . imshow ( "image" , FrameDecoders . decode_sendable ( sendable ))
cv2 . waitKey ( 1 )
except KeyboardInterrupt :
client . close ()
See also
Vision System Reference