from collections import defaultdict
from pathlib import Path
from zipfile import ZipFile
import numpy as np
from d3d.abstraction import (EgoPose, ObjectTag, ObjectTarget3D, Target3DArray,
TransformSet)
from d3d.dataset.base import (TrackingDatasetBase, expand_idx, expand_idx_name,
split_trainval_seq)
from d3d.dataset.kitti import utils
from d3d.dataset.kitti.utils import KittiObjectClass, OxtData
from d3d.dataset.zip import PatchedZipFile
from scipy.spatial.transform import Rotation
from sortedcontainers import SortedDict
def parse_label(label: list, raw_calib: dict) -> Target3DArray:
'''
Generate object array from loaded label or result text
'''
Tr = raw_calib['Tr_velo_cam'].reshape(3, 4)
RRect = Rotation.from_matrix(raw_calib['R_rect'].reshape(3, 3))
HR, HT = Rotation.from_matrix(Tr[:,:3]), Tr[:,3]
objects = Target3DArray()
objects.frame = "velo"
for item in label:
track_id = int(item[0]) # add 1 to prevent 0 tid
if item[1] == KittiObjectClass.DontCare:
continue
h,w,l = item[9:12] # height, width, length
position = item[12:15] # x, y, z in camera coordinate
ry = item[15] # rotation of y axis in camera coordinate
position[1] -= h/2
# parse object position and orientation
position = np.dot(position, RRect.inv().as_matrix().T)
position = HR.inv().as_matrix().dot(position - HT)
orientation = HR.inv() * RRect.inv() * Rotation.from_euler('y', ry)
orientation *= Rotation.from_euler("x", np.pi/2) # change dimension from l,h,w to l,w,h
score = item[16] if len(item) == 17 else None
tag = ObjectTag(item[1], KittiObjectClass, scores=score)
target = ObjectTarget3D(position, orientation, [l,w,h], tag, tid=track_id)
objects.append(target)
return objects
[docs]class KittiTrackingLoader(TrackingDatasetBase):
"""
Loader for KITTI multi-object tracking dataset, please organize the files into following structure
* Zip Files::
- data_tracking_calib.zip
- data_tracking_image_2.zip
- data_tracking_image_3.zip
- data_tracking_label_2.zip
- data_tracking_velodyne.zip
- data_tracking_oxts.zip
* Unzipped Structure::
- <base_path directory>
- training
- calib
- image_02
- label_02
- oxts
- velodyne
- testing
- calib
- image_02
- oxts
- velodyne
For description of constructor parameters, please refer to :class:`d3d.dataset.base.TrackingDatasetBase`.
Note that the 3d objects labelled as `DontCare` are removed from the result of :meth:`annotation_3dobject`.
"""
VALID_CAM_NAMES = ["cam2", "cam3"]
VALID_LIDAR_NAMES = ["velo"]
VALID_OBJ_CLASSES = KittiObjectClass
def __init__(self, base_path, inzip=False, phase="training",
trainval_split=0.8, trainval_random=False, trainval_byseq=False, nframes=0):
super().__init__(base_path, inzip=inzip, phase=phase, nframes=nframes,
trainval_split=trainval_split, trainval_random=trainval_random,
trainval_byseq=trainval_byseq)
self.phase_path = 'training' if phase == 'validation' else phase
# count total number of frames
frame_count = defaultdict(int)
if self.inzip:
for folder in ['image_2', 'image_3', 'velodyne']:
data_zip = self.base_path / ("data_tracking_%s.zip" % folder)
if data_zip.exists():
with ZipFile(data_zip) as data:
for name in data.namelist():
name = Path(name)
if len(name.parts) != 4: # skip directories
continue
phase, _, seq, frame = name.parts
if phase != self.phase_path:
continue
seq = int(seq)
frame_count[seq] = max(frame_count[seq],
int(name.stem) + 1) # idx starts from 0
break
else:
for folder in ['image_02', 'image_03', 'velodyne']:
fpath = self.base_path / self.phase_path / folder
if fpath.exists():
for seq_path in fpath.iterdir():
seq = int(seq_path.name)
frame_count[seq] = sum(1 for _ in seq_path.iterdir())
break
if not len(frame_count):
raise ValueError("Cannot parse dataset, please check path, inzip option and file structure")
self.frame_dict = SortedDict(frame_count)
self.frames = split_trainval_seq(phase, self.frame_dict, trainval_split, trainval_random, trainval_byseq)
self._image_size_cache = {} # used to store the image size (for each sequence)
self._label_cache = {} # used to store parsed label data
self._calib_cache = {} # used to store parsed calibration data
self._pose_cache = {} # used to store parsed pose data
def __len__(self):
return len(self.frames)
@property
def sequence_ids(self):
return list(self.frame_dict.keys())
@property
def sequence_sizes(self):
return dict(self.frame_dict)
def _locate_frame(self, idx):
idx = self.frames[idx] # use underlying frame index
for k, v in self.frame_dict.items():
if idx < (v - self.nframes):
return k, idx
idx -= (v - self.nframes)
raise KeyError("Index larger than dataset size")
def _preload_label(self, seq_id):
if seq_id in self._label_cache:
return
file_name = Path(self.phase_path, "label_02", "%04d.txt" % seq_id)
if self.inzip:
with PatchedZipFile(self.base_path / "data_tracking_label_2.zip", to_extract=file_name) as source:
text = source.read(str(file_name)).decode().split('\n')
else:
with open(self.base_path / file_name, "r") as fin:
text = fin.readlines()
self._label_cache[seq_id] = defaultdict(list)
for line in text:
if not line.strip():
continue
frame_id, track_id, remain = line.split(" ", 2)
values = [KittiObjectClass[value] if idx == 0 else float(value)
for idx, value in enumerate(remain.split(' '))]
self._label_cache[seq_id][int(frame_id)].append([int(track_id)] + values)
def _preload_calib(self, seq_id):
if seq_id in self._calib_cache:
return
file_name = Path(self.phase_path, 'calib', '%04d.txt' % seq_id)
if self.inzip:
with PatchedZipFile(self.base_path / "data_tracking_calib.zip", to_extract=file_name) as source:
self._calib_cache[seq_id] = utils.load_calib_file(source, file_name)
else:
self._calib_cache[seq_id] = utils.load_calib_file(self.base_path, file_name)
def _preload_oxts(self, seq_id):
if seq_id in self._pose_cache:
return
file_name = Path(self.phase_path, 'oxts', '%04d.txt' % seq_id)
if self.inzip:
with PatchedZipFile(self.base_path / "data_tracking_oxts.zip", to_extract=file_name) as source:
text = source.read(str(file_name)).decode().split('\n')
else:
with open(self.base_path / file_name, "r") as fin:
text = fin.readlines()
self._pose_cache[seq_id] = []
for line in text:
line = line.strip()
if not line:
continue
values = list(map(float, line.strip().split(' ')))
values[-5:] = list(map(int, values[-5:]))
self._pose_cache[seq_id].append(OxtData(*values))
[docs] @expand_idx_name(VALID_CAM_NAMES)
def camera_data(self, idx, names='cam2'):
seq_id, frame_idx = idx
_folders = {
"cam2": ("image_02", "data_tracking_image_2.zip"),
"cam3": ("image_03", "data_tracking_image_3.zip")
}
folder_name, zip_name = _folders[names]
fname = Path(self.phase_path, folder_name, "%04d" % seq_id, '%06d.png' % frame_idx)
if self._return_file_path:
return self.base_path / fname
if self.inzip:
with PatchedZipFile(self.base_path / zip_name, to_extract=fname) as source:
image = utils.load_image(source, fname, gray=False)
else:
image = utils.load_image(self.base_path, fname, gray=False)
# save image size for calibration
if seq_id not in self._image_size_cache:
self._image_size_cache[seq_id] = image.size
return image
[docs] @expand_idx_name(VALID_LIDAR_NAMES)
def lidar_data(self, idx, names='velo', formatted=False):
seq_id, frame_idx = idx
# This is the problem in KITTI dataset itself
if seq_id == 1 and frame_idx in range(177, 181):
raise ValueError("There is missing data in KITTI tracking dataset at seq 1, frame 177-180!")
assert names == 'velo'
fname = Path(self.phase_path, 'velodyne', "%04d" % seq_id, '%06d.bin' % frame_idx)
if self._return_file_path:
return self.base_path / fname
if self.inzip:
with PatchedZipFile(self.base_path / "data_tracking_velodyne.zip", to_extract=fname) as source:
return utils.load_velo_scan(source, fname, formatted=formatted)
else:
return utils.load_velo_scan(self.base_path, fname, formatted=formatted)
def _load_calib(self, seq, raw=False):
# load the calibration file data
self._preload_calib(seq)
filedata = self._calib_cache[seq]
if raw:
return filedata
# load image size, which could take additional time
if seq not in self._image_size_cache:
self.camera_data((seq, self.nframes)) # load first n frames to fill image size cache
image_size = self._image_size_cache[seq]
# load matrics
data = TransformSet("velo")
rect = filedata['R_rect'].reshape(3, 3)
velo_to_cam = filedata['Tr_velo_cam'].reshape(3, 4)
for i in range(4):
P = filedata['P%d' % i].reshape(3, 4)
intri, offset = P[:, :3], P[:, 3]
projection = intri.dot(rect)
offset_cartesian = np.linalg.inv(projection).dot(offset)
extri = np.vstack([velo_to_cam, np.array([0,0,0,1])])
extri[:3, 3] += offset_cartesian
frame = "cam%d" % i
data.set_intrinsic_camera(frame, projection, image_size, rotate=False)
data.set_extrinsic(extri, frame_to=frame)
data.set_intrinsic_general("imu")
data.set_extrinsic(filedata['Tr_imu_velo'].reshape(3, 4), frame_from="imu")
return data
[docs] def calibration_data(self, idx, raw=False):
assert not self._return_file_path, "The calibration is not stored in single file!"
if isinstance(idx, int):
seq_id, _ = self._locate_frame(idx)
else:
seq_id, _ = idx
return self._load_calib(seq_id, raw)
[docs] @expand_idx
def annotation_3dobject(self, idx, raw=False):
assert not self._return_file_path, "The annotation is not stored in single file!"
assert self.phase_path != "testing", "Testing dataset doesn't contain label data"
seq_id, frame_idx = idx
self._preload_label(seq_id)
label_data = self._label_cache[seq_id][frame_idx]
if raw:
return label_data
self._preload_calib(seq_id)
raw_calib = self._calib_cache[seq_id]
return parse_label(label_data, raw_calib)
[docs] @expand_idx
def identity(self, idx):
return idx
[docs] @expand_idx
def pose(self, idx, raw=False):
seq_id, frame_idx = idx
self._preload_oxts(seq_id)
raw_pose = self._pose_cache[seq_id][frame_idx]
if raw:
return raw_pose
return utils.parse_pose_from_oxt(raw_pose)
@property
def pose_name(self):
return 'imu'
[docs] @expand_idx
def timestamp(self, idx, names="velo"):
del names
_, fidx = idx
return fidx * 1e5 + 1 # assuming the data frequency is 10Hz and prepend a delay