Source code for d3d.dataset.kitti.raw

from collections import defaultdict
from itertools import chain
from pathlib import Path
from zipfile import ZipFile

import numpy as np
from d3d.abstraction import (ObjectTag, ObjectTarget3D, Target3DArray,
                             TransformSet)
from d3d.dataset.base import (TrackingDatasetBase, expand_idx, expand_idx_name,
                              split_trainval_seq)
from d3d.dataset.kitti import utils
from d3d.dataset.kitti.utils import KittiObjectClass, OxtData
from d3d.dataset.zip import PatchedZipFile
from scipy.spatial.transform import Rotation
from sortedcontainers import SortedDict


[docs]class KittiRawLoader(TrackingDatasetBase): """ Load and parse raw data into a usable format, please organize the files into following structure * Zip Files:: - 2011_09_26_calib.zip [required] - 2011_09_26_drive_0001_extract.zip - ... - 2011_09_26_drive_0001_sync.zip - ... - 2011_09_26_drive_0001_tracklets.zip - ... * Unzipped Structure:: - <base_path directory> - 2011_09_26 - calib_cam_to_cam.txt - calib_imu_to_velo.txt - calib_velo_to_cam.txt - 2011_09_26_drive_0001_extract - image_00 - image_01 - image_02 - image_03 - oxts - velodyne_points - ... - 2011_09_26_drive_0001_sync - image_00 - image_01 - image_02 - image_03 - oxts - velodyne_points - tracklet_labels.xml - ... For description of constructor parameters, please refer to :class:`d3d.dataset.base.TrackingDatasetBase` Note that the 3d objects labelled as `DontCare` are removed from the result of :meth:`annotation_3dobject`. :param datatype: 'sync' (synced) / 'extract' (unsynced) :type datatype: str """ VALID_CAM_NAMES = ["cam0", "cam1", "cam2", "cam3"] VALID_LIDAR_NAMES = ["velo"] VALID_OBJ_CLASSES = KittiObjectClass _frame2folder = { "cam0": "image_00", "cam1": "image_01", "cam2": "image_02", "cam3": "image_03", "velo": "velodyne_points", "imu": "oxts" } def __init__(self, base_path, datatype: str = 'sync', inzip=True, phase="training", trainval_split=1, trainval_random=False, trainval_byseq=False, nframes=0): super().__init__(base_path, inzip=inzip, phase=phase, nframes=nframes, trainval_split=trainval_split, trainval_random=trainval_random, trainval_byseq=trainval_byseq) self.datatype = datatype if phase == "testing": raise ValueError("There's no testing split for raw data!") if datatype != "sync": raise NotImplementedError("Currently only synced raw data are supported!") # count total number of frames frame_count = dict() _dates = ["2011_09_26", "2011_09_28", "2011_09_29", "2011_09_30", "2011_10_03"] if self.inzip: globs = [self.base_path.glob(f"{date}_drive_*_{datatype}.zip") for date in _dates] for archive in chain(*globs): with ZipFile(archive) as data: velo_files = (name for name in data.namelist() if name.endswith(".bin")) seq = archive.stem frame_count[seq] = sum(1 for _ in velo_files) else: for date in _dates: if not (self.base_path / date).exists(): continue for archive in (self.base_path / date).iterdir(): if not archive.is_dir(): # skip calibration files continue seq = archive.name frame_count[seq] = sum(1 for _ in (archive / "velodyne_points" / "data").iterdir()) if not len(frame_count): raise ValueError("Cannot parse dataset or empty dataset, please check path, inzip option and file structure") self.frame_dict = SortedDict(frame_count) self.frames = split_trainval_seq(phase, self.frame_dict, trainval_split, trainval_random, trainval_byseq) self._label_cache = {} # used to store parsed label data self._calib_cache = {} # used to store parsed calibration data self._timestamp_cache = {} # used to store parsed timestamp self._pose_cache = {} # used to store parsed pose data self._tracklet_cache = {} # used to store parsed tracklet data self._tracklet_mapping = {} # inverse mapping for tracklets def __len__(self): return len(self.frames) @property def sequence_ids(self): return list(self.frame_dict.keys()) @property def sequence_sizes(self): return dict(self.frame_dict) def _get_date(self, seq_id): return seq_id[:10] def _locate_frame(self, idx): # use underlying frame index idx = self.frames[idx] for k, v in self.frame_dict.items(): if idx < (v - self.nframes): return k, idx idx -= (v - self.nframes) raise ValueError("Index larger than dataset size") def _preload_calib(self, seq_id): date = self._get_date(seq_id) if date in self._calib_cache: return if self.inzip: with ZipFile(self.base_path / f"{date}_calib.zip") as source: self._calib_cache[date] = { "cam_to_cam": utils.load_calib_file(source, f"{date}/calib_cam_to_cam.txt"), "imu_to_velo": utils.load_calib_file(source, f"{date}/calib_imu_to_velo.txt"), "velo_to_cam": utils.load_calib_file(source, f"{date}/calib_velo_to_cam.txt") } else: source = self.base_path / date self._calib_cache[date] = { "cam_to_cam": utils.load_calib_file(source, "calib_cam_to_cam.txt"), "imu_to_velo": utils.load_calib_file(source, "calib_imu_to_velo.txt"), "velo_to_cam": utils.load_calib_file(source, "calib_velo_to_cam.txt") } def _load_calib(self, seq, raw=False): # load the calibration file data self._preload_calib(seq) date = self._get_date(seq) filedata = self._calib_cache[date] if raw: return filedata # load matrics data = TransformSet("velo") velo_to_cam = np.empty((3, 4)) velo_to_cam[:3, :3] = filedata['velo_to_cam']['R'].reshape(3, 3) velo_to_cam[:3, 3] = filedata['velo_to_cam']['T'] for i in range(4): S = filedata['cam_to_cam']['S_rect_%02d' % i].tolist() # TODO: here we have different R_rect's, what's the difference of them against the one used in object detection? R = filedata['cam_to_cam']['R_rect_%02d' % i].reshape(3, 3) P = filedata['cam_to_cam']['P_rect_%02d' % i].reshape(3, 4) intri, offset = P[:, :3], P[:, 3] projection = intri.dot(R) offset_cartesian = np.linalg.inv(projection).dot(offset) extri = np.vstack([velo_to_cam, np.array([0,0,0,1])]) extri[:3, 3] += offset_cartesian frame = "cam%d" % i data.set_intrinsic_camera(frame, projection, S, rotate=False) data.set_extrinsic(extri, frame_to=frame) imu_to_velo = np.empty((3, 4)) imu_to_velo[:3, :3] = filedata['imu_to_velo']['R'].reshape(3, 3) imu_to_velo[:3, 3] = filedata['imu_to_velo']['T'] data.set_intrinsic_general("imu") data.set_extrinsic(imu_to_velo, frame_from="imu") # add position of vehicle bottom center and rear axis center bc_rt = np.array([ [1, 0, 0, -0.27], [0, 1, 0, 0], [0, 0, 1, 1.73] ], dtype='f4') data.set_intrinsic_general("bottom_center") data.set_extrinsic(bc_rt, frame_to="bottom_center") rc_rt = np.array([ [1, 0, 0, -0.805], [0, 1, 0, 0], [0, 0, 1, 0.30] ]) data.set_intrinsic_general("rear_center") data.set_extrinsic(rc_rt, frame_from="bottom_center", frame_to="rear_center") return data
[docs] def calibration_data(self, idx, raw=False): assert not self._return_file_path, "The calibration is not stored in single file!" if isinstance(idx, int): seq_id, _ = self._locate_frame(idx) else: seq_id, _ = idx return self._load_calib(seq_id, raw=raw)
def _preload_timestamp(self, seq_id): date = self._get_date(seq_id) if seq_id in self._timestamp_cache: return tsdict = {} for frame, folder in self._frame2folder.items(): fname = Path(date, seq_id, folder, "timestamps.txt") if self.inzip: with PatchedZipFile(self.base_path / f"{seq_id}.zip", to_extract=fname) as data: tsdict[frame] = utils.load_timestamps(data, fname, formatted=True).astype(int) // 1000 else: tsdict[frame] = utils.load_timestamps(self.base_path, fname, formatted=True).astype(int) // 1000 self._timestamp_cache[seq_id] = tsdict
[docs] @expand_idx_name(VALID_CAM_NAMES + VALID_LIDAR_NAMES) def timestamp(self, idx, names="velo"): assert not self._return_file_path, "The timestamp is not stored in single file!" seq_id, frame_idx = idx self._preload_timestamp(seq_id) return self._timestamp_cache[seq_id][names][frame_idx]
def _preload_tracklets(self, seq_id): if seq_id in self._tracklet_cache: return date = self._get_date(seq_id) fname = Path(date, seq_id, "tracklet_labels.xml") if self.inzip: zname = seq_id[:-len(self.datatype)] + "tracklets" with ZipFile(self.base_path / f"{zname}.zip") as data: tracklets = utils.load_tracklets(data, fname) else: tracklets = utils.load_tracklets(self.base_path, fname) # inverse mapping objs = defaultdict(list) # (frame -> list of objects) for tid, tr in enumerate(tracklets): dim = [tr.l, tr.w, tr.h] tag = ObjectTag(tr.objectType, KittiObjectClass) for pose_idx, pose in enumerate(tr.poses): pos = [pose.tx, pose.ty, pose.tz] pos[2] += dim[2] / 2 ori = Rotation.from_euler("ZYX", (pose.rz, pose.ry, pose.rx)) objs[pose_idx + tr.first_frame].append(ObjectTarget3D(pos, ori, dim, tag, tid=tid)) self._tracklet_cache[seq_id] = {k: Target3DArray(l, frame="velo") for k, l in objs.items()}
[docs] @expand_idx def annotation_3dobject(self, idx): assert not self._return_file_path, "The annotation is not stored in single file!" seq_id, frame_idx = idx self._preload_tracklets(seq_id) return self._tracklet_cache[seq_id][frame_idx]
[docs] @expand_idx def pose(self, idx, raw=False): seq_id, frame_idx = idx date = self._get_date(seq_id) file_name = Path(date, seq_id, "oxts", "data", "%010d.txt" % frame_idx) if self._return_file_path: return self.base_path / file_name if self.inzip: with PatchedZipFile(self.base_path / f"{seq_id}.zip", to_extract=file_name) as data: oxt = utils.load_oxt_file(data, file_name)[0] else: oxt = utils.load_oxt_file(self.base_path, file_name)[0] if raw: return oxt return utils.parse_pose_from_oxt(oxt)
@property def pose_name(self): return 'imu'
[docs] @expand_idx_name(VALID_CAM_NAMES) def camera_data(self, idx, names='cam2'): seq_id, frame_idx = idx date = self._get_date(seq_id) fname = Path(date, seq_id, self._frame2folder[names], 'data', '%010d.png' % frame_idx) if self._return_file_path: return self.base_path / fname gray = names in ['cam0', 'cam1'] if self.inzip: with PatchedZipFile(self.base_path / f"{seq_id}.zip", to_extract=fname) as source: return utils.load_image(source, fname, gray=gray) else: return utils.load_image(self.base_path, fname, gray=gray)
[docs] @expand_idx_name(VALID_LIDAR_NAMES) def lidar_data(self, idx, names='velo', formatted=False): seq_id, frame_idx = idx date = self._get_date(seq_id) fname = Path(date, seq_id, 'velodyne_points', 'data', '%010d.bin' % frame_idx) if self._return_file_path: return self.base_path / fname if self.inzip: with PatchedZipFile(self.base_path / f"{seq_id}.zip", to_extract=fname) as source: return utils.load_velo_scan(source, fname, formatted=formatted) else: return utils.load_velo_scan(self.base_path, fname, formatted=formatted)
[docs] @expand_idx def identity(self, idx): return idx