import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Union
from zipfile import ZipFile
import zipfile
from io import RawIOBase
import numpy as np
from scipy.spatial.transform import Rotation
from d3d.abstraction import (ObjectTag, ObjectTarget3D, Target3DArray,
TransformSet)
from d3d.dataset.base import DetectionDatasetBase, check_frames, split_trainval, expand_name
from d3d.dataset.kitti import utils
from d3d.dataset.kitti.utils import KittiObjectClass
from d3d.dataset.zip import PatchedZipFile
def load_label(basepath, file):
'''
Load label or result from text file in KITTI object detection label format
'''
data = []
if isinstance(basepath, (str, Path)):
fin = Path(basepath, file).open()
else: # assume ZipFile object
fin = basepath.open(str(file))
with fin:
for line in fin.readlines():
if not line.strip():
continue
if isinstance(line, bytes):
line = line.decode()
values = [KittiObjectClass[value] if idx == 0 else float(value)
for idx, value in enumerate(line.split(' '))]
data.append(values)
return data
def parse_label(label: list, raw_calib: dict):
'''
Generate object array from loaded label or result text
'''
Tr = raw_calib['Tr_velo_to_cam'].reshape(3, 4)
RRect = Rotation.from_matrix(raw_calib['R0_rect'].reshape(3, 3))
HR, HT = Rotation.from_matrix(Tr[:,:3]), Tr[:,3]
objects = Target3DArray()
objects.frame = "velo"
for item in label:
if item[0] == KittiObjectClass.DontCare:
continue
h,w,l = item[8:11] # height, width, length
position = item[11:14] # x, y, z in camera coordinate
ry = item[14] # rotation of y axis in camera coordinate
position[1] -= h/2
# parse object position and orientation
position = np.dot(position, RRect.inv().as_matrix().T)
position = HR.inv().as_matrix().dot(position - HT)
orientation = HR.inv() * RRect.inv() * Rotation.from_euler('y', ry)
orientation *= Rotation.from_euler("x", np.pi/2) # change dimension from l,h,w to l,w,h
score = item[15] if len(item) == 16 else None
tag = ObjectTag(item[0], KittiObjectClass, scores=score)
target = ObjectTarget3D(position, orientation, [l,w,h], tag)
objects.append(target)
return objects
def _line_box_intersect(p0, p1, width, height):
# p0: inlier point
# p1: outlier point
k = (p1[1] - p0[1]) / (p1[0] - p0[0])
# find which border
case = None
if p1[0] < p0[0]:
if p1[1] < p0[1]:
k0 = p0[1] / p0[0]
case = 2 if k > k0 else 3
else:
k0 = - (height - p0[1]) / p0[0]
case = 3 if k > k0 else 0
else:
if p1[1] < p0[1]:
k0 = - p0[1] / (width - p0[0])
case = 1 if k > k0 else 2
else:
k0 = (height - p0[1]) / (width - p0[0])
case = 0 if k > k0 else 1
# do intersection
if case == 0: # y > height border
x = p0[0] + (height - p0[1]) / k
y = height
elif case == 1: # x > width border
x = width
y = p0[1] + (width - p0[0]) * k
elif case == 2: # y < 0 border
x = p1[0] + (-p1[1]) / k
y = 0
elif case == 3: # x < 0 border
x = 0
y = p1[1] + (-p1[0]) * k
assert 0 <= x <= width, "x = %.2f" % x
assert 0 <= y <= height, "y = %.2f" % y
return (x, y)
[docs]class KittiObjectLoader(DetectionDatasetBase):
"""
Loader for KITTI object detection dataset, please organize the files into following structure
* Zip Files::
- data_object_calib.zip
- data_object_image_2.zip
- data_object_image_3.zip
- data_object_label_2.zip
- data_object_velodyne.zip
* Unzipped Structure::
- <base_path directory>
- training
- calib
- image_2
- label_2
- velodyne
- testing
- calib
- image_2
- velodyne
Note that the 3d objects labelled as `DontCare` are removed from the result of :meth:`annotation_3dobject`.
For description of constructor parameters, please refer to :class:`d3d.dataset.base.DetectionDatasetBase`
"""
VALID_CAM_NAMES = ["cam2", "cam3"] # TODO: rename to camera_gray_left, etc.
VALID_LIDAR_NAMES = ["velo"]
VALID_OBJ_CLASSES = KittiObjectClass
def __init__(self, base_path, inzip=False, phase="training", trainval_split=0.8, trainval_random=False):
super().__init__(base_path, inzip=inzip, phase=phase,
trainval_split=trainval_split, trainval_random=trainval_random)
self.phase_path = 'training' if phase == 'validation' else phase
# count total number of frames
total_count = None
if self.inzip:
for folder in ['image_2', 'image_3', 'velodyne', 'label_2']:
data_zip = self.base_path / ("data_object_%s.zip" % folder)
if data_zip.exists():
with ZipFile(data_zip) as data:
total_count = total_count or sum(1 for name in data.namelist() \
if name.startswith(self.phase_path) and not name.endswith('/'))
break
else:
for folder in ['image_2', 'image_3', 'velodyne', 'label_2']:
fpath = self.base_path / self.phase_path / folder
if fpath.exists():
total_count = sum(1 for _ in fpath.iterdir())
break
if not total_count:
raise ValueError("Cannot parse dataset, please check path, inzip option and file structure")
self.frames = split_trainval(phase, total_count, trainval_split, trainval_random)
self._image_size_cache = {} # used to store the image size
def __len__(self):
return len(self.frames)
def _parse_idx(self, idx):
if isinstance(idx, int):
uidx = self.frames[idx]
else:
uidx, = idx
return uidx
[docs] @expand_name(VALID_CAM_NAMES)
def camera_data(self, idx, names='cam2'):
if names == "cam2":
folder_name = "image_2"
elif names == "cam3":
folder_name = "image_3"
uidx = self._parse_idx(idx)
fname = Path(self.phase_path, folder_name, '%06d.png' % uidx)
if self._return_file_path:
return self.base_path / fname
if self.inzip:
with PatchedZipFile(self.base_path / ("data_object_%s.zip" % folder_name), to_extract=fname) as source:
image = utils.load_image(source, fname, gray=False)
else:
image = utils.load_image(self.base_path, fname, gray=False)
# save image size for calibration
if uidx not in self._image_size_cache:
self._image_size_cache[uidx] = image.size
return image
[docs] @expand_name(VALID_LIDAR_NAMES)
def lidar_data(self, idx, names='velo', formatted=False):
assert names == 'velo'
uidx = self._parse_idx(idx)
fname = Path(self.phase_path, 'velodyne', '%06d.bin' % uidx)
if self._return_file_path:
return self.base_path / fname
if self.inzip:
with PatchedZipFile(self.base_path / "data_object_velodyne.zip", to_extract=fname) as source:
return utils.load_velo_scan(source, fname, formatted=formatted)
else:
return utils.load_velo_scan(self.base_path, fname, formatted=formatted)
def _load_calib(self, basepath, uidx, raw=False):
# load the calibration file
fname = Path(self.phase_path, 'calib', '%06d.txt' % uidx)
filedata = utils.load_calib_file(basepath, fname)
if raw:
return filedata
# load image size, which could take additional time
if uidx not in self._image_size_cache:
self.camera_data((uidx,)) # fill image size cache
image_size = self._image_size_cache[uidx]
# load matrics
data = TransformSet("velo")
rect = filedata['R0_rect'].reshape(3, 3)
velo_to_cam = filedata['Tr_velo_to_cam'].reshape(3, 4)
for i in range(4):
P = filedata['P%d' % i].reshape(3, 4)
intri, offset = P[:, :3], P[:, 3]
projection = intri.dot(rect)
offset_cartesian = np.linalg.inv(projection).dot(offset)
extri = np.vstack([velo_to_cam, np.array([0,0,0,1])])
extri[:3, 3] += offset_cartesian
frame = "cam%d" % i
data.set_intrinsic_camera(frame, projection, image_size, rotate=False)
data.set_extrinsic(extri, frame_to=frame)
data.set_intrinsic_general("imu")
data.set_extrinsic(filedata['Tr_imu_to_velo'].reshape(3, 4), frame_from="imu")
return data
[docs] def calibration_data(self, idx, raw=False):
uidx = self._parse_idx(idx)
fname = Path(self.phase_path, 'calib', '%06d.txt' % uidx)
if self._return_file_path:
return self.base_path / fname
if self.inzip:
with PatchedZipFile(self.base_path / "data_object_calib.zip", to_extract=fname) as source:
return self._load_calib(source, uidx, raw)
else:
return self._load_calib(self.base_path, uidx, raw)
[docs] def annotation_3dobject(self, idx, raw=False):
assert self.phase_path != "testing", "Testing dataset doesn't contain label data"
uidx = self._parse_idx(idx)
fname = Path(self.phase_path, 'label_2', '%06d.txt' % uidx)
if self._return_file_path:
return self.base_path / fname
if self.inzip:
with PatchedZipFile(self.base_path / "data_object_label_2.zip", to_extract=fname) as source:
label = load_label(source, fname)
else:
label = load_label(self.base_path, fname)
if raw:
return label
return parse_label(label, self.calibration_data((uidx,), raw=True))
[docs] def identity(self, idx):
return (self.frames[idx],)
[docs] def dump_detection_output(self, idx: Union[int, tuple], detections: Target3DArray, fout: Union[str, Path, RawIOBase]) -> None:
'''
Save the detection in KITTI output format. We need raw calibration for R0_rect
'''
uidx = self._parse_idx(idx)
calib = self.calibration_data(uidx)
raw_calib = self.calibration_data(uidx, raw=True)
# get intrinsics
assert detections.frame == "velo"
Tr = raw_calib['Tr_velo_to_cam'].reshape(3, 4)
RRect = Rotation.from_matrix(raw_calib['R0_rect'].reshape(3, 3))
HR, HT = Rotation.from_matrix(Tr[:,:3]), Tr[:,3]
meta = calib.intrinsics_meta['cam2']
width, height = meta.width, meta.height
# process detections
output_lines = []
output_format = "%s 0 0 0" + " %.2f" * 12
for box in detections:
# calculate bounding box 2D
uv, mask, dmask = calib.project_points_to_camera(box.corners,
frame_to="cam2", frame_from="velo", remove_outlier=False, return_dmask=True)
if len(uv[mask]) < 1: continue # ignore boxes that is outside the image
bdpoints = []
pairs = [(0, 1), (2, 3), (4, 5), (6, 7), # box lines
(0, 4), (1, 5), (2, 6), (3, 7),
(0, 2), (1, 3), (4, 6), (5, 7)]
inlier = [i in mask for i in range(len(uv))]
for i, j in pairs:
if not inlier[i] and not inlier[j]:
continue
if i not in dmask or j not in dmask: # only calculate for points ahead
continue
if not inlier[i]:
bdpoints.append(_line_box_intersect(uv[j], uv[i], width, height))
if not inlier[j]:
bdpoints.append(_line_box_intersect(uv[i], uv[j], width, height))
uv = np.array(uv[mask].tolist() + bdpoints)
umin, vmin = np.min(uv, axis=0)
umax, vmax = np.max(uv, axis=0)
# calculate position in original 3D frame
l,w,h = box.dimension
position = RRect.as_matrix().dot(HR.as_matrix().dot(box.position) + HT)
position[1] += h/2
orientation = box.orientation * Rotation.from_euler("x", np.pi/2)
orientation = RRect * HR * orientation
yaw = orientation.as_euler("YZX")[0]
output_values = (box.tag_top.name,)
output_values += (umin, vmin, umax, vmax)
output_values += (h, w, l)
output_values += tuple(position.tolist())
output_values += (yaw, box.tag_top_score)
output_lines.append(output_format % output_values)
content = "\n".join(output_lines)
if isinstance(fout, (str, Path)):
Path(fout).write_text(content)
else:
fout.write(content.encode())
[docs]def execute_official_evaluator(exec_path, label_path, result_path, output_path, model_name=None, show_output=True):
'''
Execute official evaluator from KITTI devkit
:param label_path: path to the extracted labels of kitti dataset
:param result_path: path to the results created by dump_detection_output
:parma output_path: path to the output evaluation results
:param model_name: unique name of your model. KITTI tool requires sha1 as model name, but that's not mandatory.
:param show_output: show_output option passed to Kitti evaluator
Note: to install prerequisites `sudo apt install gnuplot texlive-extra-utils`
'''
model_name = model_name or "noname"
# create temporary directory
temp_path = Path(tempfile.mkdtemp())
temp_label_path = temp_path / "data" / "object"
temp_result_path = temp_path / "results" / model_name
temp_label_path.mkdir(parents=True, exist_ok=True)
temp_result_path.mkdir(parents=True, exist_ok=True)
output_path = Path(output_path)
output_path.mkdir(parents=True, exist_ok=True)
try:
# execute
(temp_label_path / "label_2").symlink_to(label_path, target_is_directory=True)
(temp_result_path / "data").symlink_to(result_path, target_is_directory=True)
proc = subprocess.Popen([exec_path, model_name], cwd=temp_path, stdout=None if show_output else subprocess.PIPE)
proc.wait()
# move result files
for dirname in temp_result_path.iterdir():
if dirname.name == "data":
continue
shutil.move(dirname, output_path)
finally:
# clean
shutil.rmtree(temp_path)
def create_submission(result_path, output_file):
'''
Create submission file from dumped detection results (using dump_detection_output)
:param output_file: Output zip file. If not ended with `.zip`, `.zip` will be appended
'''
fsubmission = Path(output_file)
if fsubmission.suffix != ".zip":
fsubmission = fsubmission.parent / (fsubmission.name + ".zip")
fsubmission.parent.mkdir(exist_ok=True, parents=True)
with zipfile.ZipFile(fsubmission, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for file in Path(result_path).iterdir():
archive.write(file, file.name)
print("Submission file created at", fsubmission)
def parse_detection_output():
from argparse import ArgumentParser
from tqdm import tqdm
parser = ArgumentParser(description='Convert detection output to dumped binary files with d3d object array.')
parser.add_argument('input', type=str,
help='Directory of detection output files')
parser.add_argument('-o', '--output', type=str,
help='Output directory. If not provided, it will be the same as input')
parser.add_argument('-d', '--dataset-path', type=str, dest="dspath",
help='Path of the KITTI object dataset')
parser.add_argument('-p', '--phase', type=str, default='training',
choices=['training', 'testing'], help="Dataset phase, either training or testing")
parser.add_argument('-z', '--inzip', action="store_true",
help="Whether the dataset is in zip archives")
args = parser.parse_args()
loader = KittiObjectLoader(args.dspath, inzip=args.inzip, phase=args.phase, trainval_split=1)
input_path = Path(args.input)
output_path = Path(args.output or args.input)
output_path.mkdir(parents=True, exist_ok=True)
for txtpath in tqdm(input_path.iterdir(), total=sum(1 for _ in input_path.iterdir())):
relpath = txtpath.relative_to(input_path)
boxes = load_label(input_path, relpath)
calib = loader.calibration_data(int(relpath.stem), raw=True)
boxes = parse_label(boxes, calib)
boxes.dump(output_path / relpath.with_suffix('.objs'))