本文介绍了不使用复杂的深度学习算法计算道路交通的方法。该方法基于计算机视觉,仅使用 Python 和 OpenCV,在背景提取算法的帮助下,使用简单的移动侦测来完成任务。
今天我们将学习如何在没有复杂深度学习算法的前提下基于计算机视觉计算道路交通。
该教程中,我们仅使用 Python 和 OpenCV,在背景提取算法的帮助下,使用简单的移动侦测来完成任务。
代码地址:https://github.com/creotiv/object_detection_projects/tree/master/opencv_traffic_counting
这里是我们的计划:
- 了解用于前景检测的背景提取算法的主要思想。
- OpenCV 图像过滤器。
- 基于轮廓的目标检测。
- 构建处理管道,用于进一步的数据处理。
背景提取算法
背景提取有很多不同算法,但是它们的主要思想非常简单。
我们来假设你有一个自己房间的视频,该视频的很多帧都没有人/宠物,因此基本上是静态的,我们称之为 background_layer。那么,要想获取视频中移动的物体,我们只需:
foreground_objects = current_frame - background_layer
但是有时候,我们无法获取静态帧,因为光线的变化、某些物体被移动或一直移动等。在这些情况下,我们保存某些帧,尝试找出它们中相同的像素,这些像素就是 background_layer 的一部分。区别通常在于我们获取 background_layer 和用于使选择更加准确的额外过滤的方式。
本教程中,我们将使用 MOG 算法进行背景提取。视频经算法处理后,如下图所示:
左侧是原始帧,右侧是使用 MOG(带有阴影检测)算法提取的背景
如图所示,前景模板仍存在一些噪声,我们将尝试使用标准过滤技术移除噪声。
代码如下:
import os
import logging
import logging.handlers
import random
import numpy as np
import skvideo.io
import cv2
import matplotlib.pyplot as plt
import utils
# without this some strange errors happen
cv2.ocl.setUseOpenCL(False)
random.seed(123)
# ============================================================================
IMAGE_DIR = "./out"
VIDEO_SOURCE = "input.mp4"
SHAPE = (720, 1280) # HxW
# ============================================================================
def train_bg_subtractor(inst, cap, num=500):
'''
BG substractor need process some amount of frames to start giving result
'''
print ('Training BG Subtractor...')
i = 0
for frame in cap:
inst.apply(frame, None, 0.001)
i += 1
if i >= num:
return cap
def main():
log = logging.getLogger("main")
# creting MOG bg subtractor with 500 frames in cache
# and shadow detction
bg_subtractor = cv2.createBackgroundSubtractorMOG2(
history=500, detectShadows=True)
# Set up image source
# You can use also CV2, for some reason it not working for me
cap = skvideo.io.vreader(VIDEO_SOURCE)
# skipping 500 frames to train bg subtractor
train_bg_subtractor(bg_subtractor, cap, num=500)
frame_number = -1
for frame in cap:
if not frame.any():
log.error("Frame capture failed, stopping...")
break
frame_number += 1
utils.save_frame(frame, "./out/frame_%04d.png" % frame_number)
fg_mask = bg_subtractor.apply(frame, None, 0.001)
utils.save_frame(frame, "./out/fg_mask_%04d.png" % frame_number)
# ============================================================================
if __name__ == "__main__":
log = utils.init_logging()
if not os.path.exists(IMAGE_DIR):
log.debug("Creating image directory `%s`...", IMAGE_DIR)
os.makedirs(IMAGE_DIR)
main()
过滤
我们这种情况需要这些过滤器:
Threshold(http://docs.opencv.org/3.1.0/d7/d4d/tutorial_py_thresholding.html)、Erode、Dilate、Opening 和 Closing(http://docs.opencv.org/3.1.0/d9/d61/tutorial_py_morphological_ops.html)。请打开链接并阅读,查看这些过滤器的工作方式(而不是简单的复制/粘贴)。
那么,现在我们将使用过滤器移除前景模板上的噪声。
首先,我们将使用 Closing 过滤器移除区域中的缝隙,然后使用 Opening 移除 1–2 个像素点,之后使用 Dilate 使物体更加清晰。
def filter_mask(img):
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
# Fill any small holes
closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
# Remove noise
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
# Dilate to merge adjacent blobs
dilation = cv2.dilate(opening, kernel, iterations=2)
# threshold
th = dilation[dilation < 240] = 0
return th
处理后的前景如下图所示:
基于轮廓的目标检测
为达到目的,我们使用带有下列参数的标准 cv2.findContours 方法:
cv2.CV_RETR_EXTERNAL—get only outer contours.
cv2.CV_CHAIN_APPROX_TC89_L1 - use Teh-Chin chain approximation algorithm (faster)
def get_centroid(x, y, w, h):
x1 = int(w / 2)
y1 = int(h / 2)
cx = x + x1
cy = y + y1
return (cx, cy)
def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35):
matches = []
# finding external contours
im, contours, hierarchy = cv2.findContours(
fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
# filtering by with, height
for (i, contour) in enumerate(contours):
(x, y, w, h) = cv2.boundingRect(contour)
contour_valid = (w >= min_contour_width) and (
h >= min_contour_height)
if not contour_valid:
continue
# getting center of the bounding box
centroid = get_centroid(x, y, w, h)
matches.append(((x, y, w, h), centroid))
return matches
在出口区,我们通过高度、宽度添加过滤,并添加质心。
很简单,对吧?
构建处理管道
你必须理解,在机器学习和计算机视觉领域中,没有一种魔术般的算法能够搞定一切,即使我们想象存在这样一种算法,我们仍然无法使用它,因为它在大规模应用时会无效。比如,几年前,Netflix 创办了一个比赛,最佳电影推荐算法奖励 300 万美元。有一支队伍创建了一个最佳算法,但问题是该算法无法大规模应用,因此对该公司没有用处。但是,Netflix 仍然奖励了他们 100 万。:)
那么,现在我们将构建简单的处理管道,该管道不是为了大规模使用,而是为了方便,但原理是一样的。
class PipelineRunner(object):
'''
Very simple pipline.
Just run passed processors in order with passing context from one to
another.
You can also set log level for processors.
'''
def __init__(self, pipeline=None, log_level=logging.DEBUG):
self.pipeline = pipeline or []
self.context = {}
self.log = logging.getLogger(self.__class__.__name__)
self.log.setLevel(log_level)
self.log_level = log_level
self.set_log_level()
def set_context(self, data):
self.context = data
def add(self, processor):
if not isinstance(processor, PipelineProcessor):
raise Exception(
'Processor should be an isinstance of PipelineProcessor.')
processor.log.setLevel(self.log_level)
self.pipeline.append(processor)
def remove(self, name):
for i, p in enumerate(self.pipeline):
if p.__class__.__name__ == name:
del self.pipeline[i]
return True
return False
def set_log_level(self):
for p in self.pipeline:
p.log.setLevel(self.log_level)
def run(self):
for p in self.pipeline:
self.context = p(self.context)
self.log.debug("Frame #%d processed.", self.context['frame_number'])
return self.context
class PipelineProcessor(object):
'''
Base class for processors.
'''
def __init__(self):
self.log = logging.getLogger(self.__class__.__name__)
由于输入构造函数(input constructor)将使用一串处理器,它们将按顺序运行,每个处理器处理一部分工作。那么,现在我们就来创建一个轮廓检测处理器。
class ContourDetection(PipelineProcessor):
'''
Detecting moving objects.
Purpose of this processor is to subtrac background, get moving objects
and detect them with a cv2.findContours method, and then filter off-by
width and height.
bg_subtractor - background subtractor isinstance.
min_contour_width - min bounding rectangle width.
min_contour_height - min bounding rectangle height.
save_image - if True will save detected objects mask to file.
image_dir - where to save images(must exist).
'''
def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'):
super(ContourDetection, self).__init__()
self.bg_subtractor = bg_subtractor
self.min_contour_width = min_contour_width
self.min_contour_height = min_contour_height
self.save_image = save_image
self.image_dir = image_dir
def filter_mask(self, img, a=None):
'''
This filters are hand-picked just based on visual tests
'''
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))
# Fill any small holes
closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
# Remove noise
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
# Dilate to merge adjacent blobs
dilation = cv2.dilate(opening, kernel, iterations=2)
return dilation
def detect_vehicles(self, fg_mask, context):
matches = []
# finding external contours
im2, contours, hierarchy = cv2.findContours(
fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)
for (i, contour) in enumerate(contours):
(x, y, w, h) = cv2.boundingRect(contour)
contour_valid = (w >= self.min_contour_width) and (
h >= self.min_contour_height)
if not contour_valid:
continue
centroid = utils.get_centroid(x, y, w, h)
matches.append(((x, y, w, h), centroid))
return matches
def __call__(self, context):
frame = context['frame'].copy()
frame_number = context['frame_number']
fg_mask = self.bg_subtractor.apply(frame, None, 0.001)
# just thresholding values
fg_mask[fg_mask < 240] = 0
fg_mask = self.filter_mask(fg_mask, frame_number)
if self.save_image:
utils.save_frame(fg_mask, self.image_dir +
"/mask_%04d.png" % frame_number, flip=False)
context['objects'] = self.detect_vehicles(fg_mask, context)
context['fg_mask'] = fg_mask
return contex
其实就是把背景提取、过滤和检测部分合并起来。
现在,我们来创建一个处理器,其将在不同帧上检测到的物体连接起来并创建路径,还能计算出口区的车辆数量。
'''
Counting vehicles that entered in exit zone.
Purpose of this class based on detected object and local cache create
objects pathes and count that entered in exit zone defined by exit masks.
exit_masks - list of the exit masks.
path_size - max number of points in a path.
max_dst - max distance between two points.
'''
def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0):
super(VehicleCounter, self).__init__()
self.exit_masks = exit_masks
self.vehicle_count = 0
self.path_size = path_size
self.pathes = []
self.max_dst = max_dst
self.x_weight = x_weight
self.y_weight = y_weight
def check_exit(self, point):
for exit_mask in self.exit_masks:
try:
if exit_mask[point[1]][point[0]] == 255:
return True
except:
return True
return False
def __call__(self, context):
objects = context['objects']
context['exit_masks'] = self.exit_masks
context['pathes'] = self.pathes
context['vehicle_count'] = self.vehicle_count
if not objects:
return context
points = np.array(objects)[:, 0:2]
points = points.tolist()
# add new points if pathes is empty
if not self.pathes:
for match in points:
self.pathes.append([match])
else:
# link new points with old pathes based on minimum distance between
# points
new_pathes = []
for path in self.pathes:
_min = 999999
_match = None
for p in points:
if len(path) == 1:
# distance from last point to current
d = utils.distance(p[0], path[-1][0])
else:
# based on 2 prev points predict next point and calculate
# distance from predicted next point to current
xn = 2 * path[-1][0][0] - path[-2][0][0]
yn = 2 * path[-1][0][1] - path[-2][0][1]
d = utils.distance(
p[0], (xn, yn),
x_weight=self.x_weight,
y_weight=self.y_weight
)
if d < _min:
_min = d
_match = p
if _match and _min <= self.max_dst:
points.remove(_match)
path.append(_match)
new_pathes.append(path)
# do not drop path if current frame has no matches
if _match is None:
new_pathes.append(path)
self.pathes = new_pathes
# add new pathes
if len(points):
for p in points:
# do not add points that already should be counted
if self.check_exit(p[1]):
continue
self.pathes.append([p])
# save only last N points in path
for i, _ in enumerate(self.pathes):
self.pathes[i] = self.pathes[i][self.path_size * -1:]
# count vehicles and drop counted pathes:
new_pathes = []
for i, path in enumerate(self.pathes):
d = path[-2:]
if (
# need at list two points to count
len(d) >= 2 and
# prev point not in exit zone
not self.check_exit(d[0][1]) and
# current point in exit zone
self.check_exit(d[1][1]) and
# path len is bigger then min
self.path_size <= len(path)
):
self.vehicle_count += 1
else:
# prevent linking with path that already in exit zone
add = True
for p in path:
if self.check_exit(p[1]):
add = False
break
if add:
new_pathes.append(path)
self.pathes = new_pathes
context['pathes'] = self.pathes
context['objects'] = objects
context['vehicle_count'] = self.vehicle_count
self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)
return context
该教程有一点复杂,我们一部分一部分地过一遍。
下图中绿色的掩膜是出口区,我们在该区域计算车辆的数量。比如,我们将计算长度大于 3 个点(以移除噪声)的路径,其中第 4 个点就在绿色区域。
我们使用掩膜,因为它对很多操作都有效且比使用向量算法更简单。只需要使用二元和(binary and)运算检查该区域的点就可以了。下图显示了我们的设置方式:
EXIT_PTS = np.array([
[[732, 720], [732, 590], [1280, 500], [1280, 720]],
[[0, 400], [645, 400], [645, 0], [0, 0]]
])
base = np.zeros(SHAPE + (3,), dtype='uint8')
exit_mask = cv2.fillPoly(base, EXIT_PTS, (255, 255, 255))[:, :, 0]
现在,我们连接路径中的点:
new_pathes = []
for path in self.pathes:
_min = 999999
_match = None
for p in points:
if len(path) == 1:
# distance from last point to current
d = utils.distance(p[0], path[-1][0])
else:
# based on 2 prev points predict next point and calculate
# distance from predicted next point to current
xn = 2 * path[-1][0][0] - path[-2][0][0]
yn = 2 * path[-1][0][1] - path[-2][0][1]
d = utils.distance(
p[0], (xn, yn),
x_weight=self.x_weight,
y_weight=self.y_weight
)
if d < _min:
_min = d
_match = p
if _match and _min <= self.max_dst:
points.remove(_match)
path.append(_match)
new_pathes.append(path)
# do not drop path if current frame has no matches
if _match is None:
new_pathes.append(path)
self.pathes = new_pathes
# add new pathes
if len(points):
for p in points:
# do not add points that already should be counted
if self.check_exit(p[1]):
continue
self.pathes.append([p])
# save only last N points in path
for i, _ in enumerate(self.pathes):
self.pathes[i] = self.pathes[i][self.path_size * -1:]
在第一帧上,我们只需添加所有点作为新的路径。
接下来,如果 len(path) == 1,对于高速缓存中的每个路径,我们将尝试从新检测到的物体中找出点(质心),这些物体到路径最后一个点的欧几里得距离最短。
如果 len(path) > 1,我们将使用该路径中的最后两个点在同一条线上预测新的点,找出它和当前点之间的最小距离。
将最小距离的点添加至当前路径的末尾,然后将其从列表中移除。
如果还有剩下的点,我们将它们添加为新的路径。
我们还可以限制该路径中点的数量。
# count vehicles and drop counted pathes:
new_pathes = []
for i, path in enumerate(self.pathes):
d = path[-2:]
if (
# need at list two points to count
len(d) >= 2 and
# prev point not in exit zone
not self.check_exit(d[0][1]) and
# current point in exit zone
self.check_exit(d[1][1]) and
# path len is bigger then min
self.path_size <= len(path)
):
self.vehicle_count += 1
else:
# prevent linking with path that already in exit zone
add = True
for p in path:
if self.check_exit(p[1]):
add = False
break
if add:
new_pathes.append(path)
self.pathes = new_pathes
context['pathes'] = self.pathes
context['objects'] = objects
context['vehicle_count'] = self.vehicle_count
self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)
return context
现在,我们尝试计算进入出口区的车辆的数量。我们需要观察路径中的最后两个点,并在出口区检查,是否其中靠后的一个在出口区,而靠前的不在,并确保 len(path) 比下限值要大。
之后的部分就是阻止新的点回联至出口区的点。
最后两个处理器是 CSV writer,可创建报告 CSV 文件和可视化文件,用于调试和输出更好的画面。
class CsvWriter(PipelineProcessor):
def __init__(self, path, name, start_time=0, fps=15):
super(CsvWriter, self).__init__()
self.fp = open(os.path.join(path, name), 'w')
self.writer = csv.DictWriter(self.fp, fieldnames=['time', 'vehicles'])
self.writer.writeheader()
self.start_time = start_time
self.fps = fps
self.path = path
self.name = name
self.prev = None
def __call__(self, context):
frame_number = context['frame_number']
count = _count = context['vehicle_count']
if self.prev:
_count = count - self.prev
time = ((self.start_time + int(frame_number / self.fps)) * 100
+ int(100.0 / self.fps) * (frame_number % self.fps))
self.writer.writerow({'time': time, 'vehicles': _count})
self.prev = count
return context
class Visualizer(PipelineProcessor):
def __init__(self, save_image=True, image_dir='images'):
super(Visualizer, self).__init__()
self.save_image = save_image
self.image_dir = image_dir
def check_exit(self, point, exit_masks=[]):
for exit_mask in exit_masks:
if exit_mask[point[1]][point[0]] == 255:
return True
return False
def draw_pathes(self, img, pathes):
if not img.any():
return
for i, path in enumerate(pathes):
path = np.array(path)[:, 1].tolist()
for point in path:
cv2.circle(img, point, 2, CAR_COLOURS[0], -1)
cv2.polylines(img, [np.int32(path)], False, CAR_COLOURS[0], 1)
return img
def draw_boxes(self, img, pathes, exit_masks=[]):
for (i, match) in enumerate(pathes):
contour, centroid = match[-1][:2]
if self.check_exit(centroid, exit_masks):
continue
x, y, w, h = contour
cv2.rectangle(img, (x, y), (x + w - 1, y + h - 1),
BOUNDING_BOX_COLOUR, 1)
cv2.circle(img, centroid, 2, CENTROID_COLOUR, -1)
return img
def draw_ui(self, img, vehicle_count, exit_masks=[]):
# this just add green mask with opacity to the image
for exit_mask in exit_masks:
_img = np.zeros(img.shape, img.dtype)
_img[:, :] = EXIT_COLOR
mask = cv2.bitwise_and(_img, _img, mask=exit_mask)
cv2.addWeighted(mask, 1, img, 1, 0, img)
# drawing top block with counts
cv2.rectangle(img, (0, 0), (img.shape[1], 50), (0, 0, 0), cv2.FILLED)
cv2.putText(img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)
return img
def __call__(self, context):
frame = context['frame'].copy()
frame_number = context['frame_number']
pathes = context['pathes']
exit_masks = context['exit_masks']
vehicle_count = context['vehicle_count']
frame = self.draw_ui(frame, vehicle_count, exit_masks)
frame = self.draw_pathes(frame, pathes)
frame = self.draw_boxes(frame, pathes, exit_masks)
utils.save_frame(frame, self.image_dir +
"/processed_%04d.png" % frame_number)
return context
CSV writer 按时间保存数据,因为我们需要用它做进一步的分析。因此我使用下列公式向 unix 时间戳添加额外的帧计时:
time = ((self.start_time + int(frame_number / self.fps)) * 100
+ int(100.0 / self.fps) * (frame_number % self.fps))
在 start time=1 000 000 000 和 fps=10 的情况下,结果如下:
frame 1 = 1 000 000 000 010
frame 1 = 1 000 000 000 020
…
获取完整的 csv 报告后,你可以随意聚合这些数据。
该项目的完整代码地址:
https://github.com/creotiv/object_detection_projects/tree/master/opencv_traffic_counting
结论
所以,这并不像人们想象的那么难。
但是,如果你运行该脚本,你会发现该解决方案并不完美,它存在一个问题——背景物体重叠,而且它还无法按类型进行车辆分类(实分析时你肯定需要)。但是,该方法拥有好的摄像头位置(道路上方),能够提供相当不错的准确率。这告诉我们即使简单的小算法用好了也能取得不错的结果。
那么我们要怎么做才能解决当前的问题呢?
一种方式是添加额外的过滤,使物体分离,以进行更好的检测。另一种方式是使用更复杂的算法,比如深度卷积网络。