import functools
import inspect
from collections import defaultdict
from enum import Enum
from multiprocessing import Manager, Pool
from pathlib import Path
from threading import Event
from typing import (Any, Callable, ContextManager, Dict, Iterable, List,
NewType, Optional, Tuple, Union)
import numpy as np
import numpy.random as npr
from d3d.abstraction import EgoPose, Target3DArray, TransformSet
from numpy import ndarray as NdArray
from PIL.Image import Image
from sortedcontainers import SortedDict
from tqdm import tqdm, trange
[docs]def split_trainval(phase: str,
total_count: int,
trainval_split: Union[float, List[int]],
trainval_random: Union[bool, int, str]) -> Iterable[int]:
'''
Split frames for training or validation set
:param phase: training or validation
:param total_count: total number of frames in trainval part of the dataset
:param trainval_split: the ratio to split training dataset.
* If it's a number, then it's the ratio to split training dataset.
* If it's 1, then the validation set is empty; if it's 0, then training set is empty
* If it's a list of number, then it directly defines the indices to report (ignoring trainval_random)
:param trainval_random: whether select the train/val split randomly.
* If it's a bool, then trainval is split with or without shuffle
* If it's a number, then it's used as the seed for random shuffling
* If it's a string, then predefined order is used. {r: reverse}
'''
if isinstance(trainval_split, list):
frames = trainval_split
else: # trainval_split is a number
if isinstance(trainval_random, bool):
frames = npr.default_rng().permutation(total_count) \
if trainval_random else np.arange(total_count)
elif isinstance(trainval_random, int):
gen = npr.default_rng(seed=trainval_random)
frames = gen.permutation(total_count)
elif trainval_random == "r": # predefined shuffle type
frames = np.arange(total_count)[::-1]
else:
raise ValueError("Invalid trainval_random type!")
# split phase
if phase == 'training':
frames = frames[:int(total_count * trainval_split)]
elif phase == 'validation':
frames = frames[int(total_count * trainval_split):]
else: # testing
pass
return frames
_SortedCountDict = NewType('SortedCountDict', (SortedDict, Dict[Any, int]))
[docs]def split_trainval_seq(phase: str,
seq_counts: _SortedCountDict,
trainval_split: Union[float, List[int]],
trainval_random: Union[bool, int, str],
by_seq: bool = False) -> Iterable[int]:
'''
Split frames for training or validation by frames or by sequence
TODO: consider nframes
:param phase: training or validation
:param total_count: total number of frames in trainval part of the dataset
:param trainval_split: the ratio to split training dataset.
* If it's a number, then it's the ratio to split training dataset.
* If it's 1, then the validation set is empty; if it's 0, then training set is empty
* If it's a list of number and by_seq is True, then it directly defines the indices to report (ignoring :obj:`trainval_random`)
* If it's a list of sequence name and by_seq is False, then it directly defines the sequences to be chosen
:param trainval_random: whether select the train/val split randomly.
* If it's a bool, then trainval is split with or without shuffle
* If it's a number, then it's used as the seed for random shuffling
* If it's a string, then predefined order is used. {r: reverse}
:param by_seq: Whether split trainval partitions by sequences instead of frames
'''
if not by_seq:
total_count = sum(seq_counts.values())
return split_trainval(phase, total_count, trainval_split, trainval_random)
# calculate starting point of sequences
seqstarts = {}
counter = 0
for seqid, seqcount in seq_counts.items():
seqstarts[seqid] = counter
counter += seqcount
# determine sequences
if isinstance(trainval_split, list):
seqs = trainval_split
else:
seqs = list(seq_counts.keys())
if phase == 'training':
seqs = seqs[:int(len(seqs) * trainval_split)]
elif phase == 'validation':
seqs = seqs[int(len(seqs) * trainval_split):]
elif phase != 'testing': # testing
raise ValueError("Incorrect dataset phase!")
# generate frames
frames = []
if isinstance(trainval_random, bool):
if trainval_random:
rng = npr.default_rng()
seqids = rng.permutation(len(seqs))
for sid in seqids:
seq = seqs[sid]
frames.append(rng.permutation(
seq_counts[seq]) + seqstarts[seq])
else:
for seq in seqs:
frames.append(np.arange(seq_counts[seq]) + seqstarts[seq])
elif isinstance(trainval_random, int):
rng = npr.default_rng(seed=trainval_random)
seqids = rng.permutation(len(seqs))
for sid in seqids:
seq = seqs[sid]
frames.append(rng.permutation(seq_counts[seq]) + seqstarts[seq])
elif trainval_random == "r": # predefined shuffle type
for seq in seqs[::-1]:
frames.append(np.arange(seq_counts[seq])[::-1] + seqstarts[seq])
return np.concatenate(frames)
[docs]def check_frames(names: Union[List[str], str], valid: List[str]):
'''
Check wether names is inside valid options.
:param names: Names to be checked
:param valid: List of the valid names
:return: unpack_result: whether need to unpack results
names: frame names converted to list
'''
unpack_result = False
if names is None:
names = valid
elif isinstance(names, str):
names = [names]
unpack_result = True
# sanity check
for name in names:
if name not in valid:
message = "Invalid frame name %s, " % name
message += "valid options are " + ", ".join(valid)
raise ValueError(message)
return unpack_result, names
[docs]class DatasetBase:
"""
This class acts as the base for all dataset loaders
"""
def __init__(self,
base_path: Union[str, Path],
inzip: bool = False,
phase: str = "training",
trainval_split: Union[float, List[int]] = 1.,
trainval_random: Union[bool, int, str] = False):
"""
:param base_path: directory containing the zip files, or the required data
:param inzip: whether the dataset is store in original zip archives or unzipped
:param phase: training, validation or testing
:param trainval_split: the ratio to split training dataset. See
documentation of :func:`split_trainval` for detail.
:param trainval_random: whether select the train/val split randomly. See
documentation of :func:`split_trainval` for detail.
"""
del trainval_split, trainval_random
self.base_path = Path(base_path)
self.inzip = inzip
self.phase = phase
# trainval_split and trainval_random should be used only in constructor
if phase not in ['training', 'validation', 'testing']:
raise ValueError("Invalid phase tag")
self._return_file_path = False
def __len__(self) -> int:
'''
Return the total number of frames (in the given split)
'''
raise NotImplementedError("abstract function")
class _ReturnPathContext:
def __init__(self, ds: "DatasetBase"):
self.ds = ds
def __enter__(self):
if self.ds.inzip:
raise RuntimeError("Cannot return path from a dataset in zip!")
self.ds._return_file_path = True
def __exit__(self, type, value, traceback):
self.ds._return_file_path = False
[docs] def return_path(self) -> ContextManager:
"""
Make the dataset return the raw paths to the data instead of parsing
it. This method returns a context manager.
"""
return DatasetBase._ReturnPathContext(self)
[docs] def identity(self, idx: int) -> tuple:
'''
Return something that can track the data back to original dataset. The result tuple can be passed
to any accessors above and directly access given data.
:param idx: index of requested frame to be parsed
'''
raise NotImplementedError("abstract function")
[docs]class MultiModalDatasetMixin:
"""
This class defines basic interface for multi-modal datasets
"""
VALID_CAM_NAMES: List[str]
'''
List of valid sensor names of camera
'''
VALID_LIDAR_NAMES: List[str]
'''
List of valid sensor names of lidar
'''
[docs] def lidar_data(self,
idx: Union[int, tuple],
names: Optional[Union[str, List[str]]] = None,
formatted: bool = False) -> Union[NdArray, List[NdArray]]:
'''
Return the lidar point cloud data
:param names: name of requested lidar sensors. The default sensor is
the first element in :attr:`VALID_LIDAR_NAMES`.
:param idx: index of requested lidar frames
:param formatted: if true, the point cloud wrapped in a numpy record array will be returned
'''
raise NotImplementedError("abstract function")
[docs] def camera_data(self,
idx: Union[int, tuple],
names: Optional[Union[str, List[str]]] = None
) -> Union[Image, List[Image]]:
'''
Return the camera image data
:param names: name of requested camera sensors. The default sensor is
the first element in :attr:`VALID_CAM_NAMES`.
:param idx: index of requested image frames
'''
raise NotImplementedError("abstract function")
[docs] def calibration_data(self,
idx: Union[int, tuple],
raw: Optional[bool] = None) -> Union[TransformSet, Any]:
'''
Return the calibration data
:param idx: index of requested frame
:param raw: if false, converted :class:`d3d.abstraction.TransformSet`
will be returned, otherwise raw data will be returned in
original format.
'''
raise NotImplementedError("abstract function")
[docs]class DetectionDatasetBase(DatasetBase, MultiModalDatasetMixin):
"""
This class defines basic interface for object detection datasets
"""
VALID_OBJ_CLASSES: Enum
'''
List of valid object labels
'''
def __init__(self,
base_path: Union[str, Path],
inzip: bool = False,
phase: str = "training",
trainval_split: Union[float, List[int]] = 1.,
trainval_random: Union[bool, int, str] = False):
"""
:param base_path: directory containing the zip files, or the required data
:param inzip: whether the dataset is store in original zip archives or unzipped
:param phase: training, validation or testing
:param trainval_split: the ratio to split training dataset. See
documentation of :func:`split_trainval` for detail.
:param trainval_random: whether select the train/val split randomly. See
documentation of :func:`split_trainval` for detail.
"""
DatasetBase.__init__(self, base_path, inzip=inzip, phase=phase,
trainval_split=trainval_split, trainval_random=trainval_random)
[docs] def annotation_3dobject(self, idx: Union[int, tuple],
raw: Optional[bool] = None) -> Union[Target3DArray, Any]:
'''
Return list of converted ground truth targets in lidar frame.
:param idx: index of requested frame
:param raw: if false, targets will be converted to d3d
:class:`d3d.abstraction.Target3DArray` format,
otherwise raw data will be returned in original format
'''
raise NotImplementedError("abstract function")
[docs] def analyze_3dobject(self) -> dict:
'''
Report statistics on 3D object labels
:return: Statistics containing mean dimension
'''
dimensions = defaultdict(list)
for i in trange(len(self), desc="Analyzing"):
for obj in self.annotation_3dobject(i):
dimensions[obj.tag_top].append(obj.dimension)
mean_dimensions = {k: np.mean(v, axis=0)
for k, v in dimensions.items()}
return dict(mean_dimension=mean_dimensions)
[docs]class SegmentationDatasetMixin:
"""
This class define basic interface for point cloud segmentation datasets
"""
VALID_PTS_CLASSES: Enum
'''
List of valid points labels
'''
[docs] def annotation_3dpoints(self, idx: Union[int, tuple],
names: Optional[Union[str, List[str]]] = None,
formatted: Optional[bool] = False):
'''
Return list of point-wise labels in lidar frame.
:param idx: index of requested frame
:param formatted: if True, the point labels will be represented as a numpy record array,
otherwise, the returned object will be a dictionary of numpy arrays.
'''
raise NotImplementedError("abstract function")
[docs]class SequenceDatasetBase(DatasetBase):
"""
This class defines basic interface for datasets of sequences
"""
def __init__(self,
base_path: Union[str, Path],
inzip: bool = False,
phase: str = "training",
trainval_split: Union[float, List[int]] = 1.,
trainval_random: Union[bool, int, str] = False,
trainval_byseq=False,
nframes: int = 0):
"""
:param base_path: directory containing the zip files, or the required data
:param inzip: whether the dataset is store in original zip archives or unzipped
:param phase: training, validation or testing
:param trainval_split: the ratio to split training dataset. See
documentation of :func:`split_trainval_seq` for detail.
:param trainval_random: whether select the train/val split randomly. See
documentation of :func:`split_trainval_seq` for detail.
:param nframes: number of consecutive frames returned from the accessors
* If it's a positive number, then it returns adjacent frames with total number reduced
* If it's a negative number, absolute value of it is consumed
* If it's zero, then it act like object detection dataset, which means the methods will return unpacked data
:param trainval_byseq: Whether split trainval partitions by sequences instead of frames
"""
del trainval_byseq
super().__init__(base_path, inzip=inzip, phase=phase,
trainval_split=trainval_split, trainval_random=trainval_random)
self.nframes = abs(nframes)
[docs] def _locate_frame(self, idx: int) -> Tuple[Any, int]:
'''
Subclass should implement this function to convert overall index to (sequence_id, frame_idx) to support
decorator :func:`expand_idx` and :func:`expand_idx_name`
:return: (seq_id, frame_idx) where frame_idx is the index of starting frame
'''
raise NotImplementedError("_locate_frame is not implemented!")
[docs] def identity(self, idx: int) -> Union[tuple, List[tuple]]:
'''
Return something that can track the data back to original dataset
:param idx: index of requested frame to be parsed
:return: if :obj:`nframes` > 0, then the function return a list of ids
which are consistent with other functions.
'''
raise NotImplementedError("abstract function")
@property
def sequence_sizes(self) -> Dict[Any, int]:
'''
Return the mapping from sequence id to sequence sizes
'''
raise NotImplementedError("abstract function")
@property
def sequence_ids(self) -> List[Any]:
'''
Return the list of sequence ids
'''
raise NotImplementedError("abstract function")
[docs] def timestamp(self,
idx: Union[int, tuple],
names: Optional[Union[str, List[str]]] = None
) -> Union[int, List[int]]:
'''
Return the timestamp of frame specified by the index, represented by
Unix timestamp in macroseconds (usually 16 digits integer)
:param idx: index of requested frame
:param names: specify the sensor whose pose is requested. This option
only make sense when the dataset contains separate
timestamps for data from each sensor.
'''
raise NotImplementedError("abstract function")
[docs]class MultiModalSequenceDatasetMixin:
"""
This class defines basic interface for multi-modal datasets of sequences
"""
[docs] def lidar_data(self,
idx: Union[int, tuple],
names: Optional[Union[str, List[str]]] = None,
formatted: bool = False) -> Union[NdArray, List[NdArray], List[List[NdArray]]]:
'''
If multiple frames are requested, the results will be a list of list. Outer list
corresponds to frame names and inner list corresponds to time sequence. So
len(names) × len(frames) data objects will be returned
:param names: name of requested lidar sensors. The default frame is
the first element in :attr:`VALID_LIDAR_NAMES`.
:param idx: index of requested lidar frames
:param formatted: if true, the point cloud wrapped in a numpy record array will be returned
* If single index is given, then the frame indexing is done on the whole dataset with trainval split
* If a tuple is given, it's considered to be a unique id of the frame (from :meth:`identity` method),
trainval split is ignored in this way and nframes offset is not added
'''
raise NotImplementedError("_locate_frame is not implemented!")
[docs] def camera_data(self,
idx: Union[int, tuple],
names: Optional[Union[str, List[str]]] = None
) -> Union[Image, List[Image], List[List[Image]]]:
'''
Return the camera image data
:param names: name of requested camera sensors. The default sensor is
the first element in :attr:`VALID_CAM_NAMES`.
:param idx: index of requested image frames, see description in
:meth:`lidar_data` method.
'''
raise NotImplementedError("abstract function")
[docs] def calibration_data(self, idx: Union[int, tuple],
raw: Optional[bool] = False) -> Union[TransformSet, Any]:
'''
Return the calibration data. Notices that we assume the calibration is
fixed among one squence, so it always return a single object.
:param idx: index of requested lidar frames
:param raw: If false, converted :class:`d3d.abstraction.TransformSet`
will be returned, otherwise raw data will be returned in
original format
'''
raise NotImplementedError("abstract function")
[docs]class TrackingDatasetBase(SequenceDatasetBase, MultiModalSequenceDatasetMixin):
"""
Tracking dataset is similarly defined with detection dataset. The two major differences are
1. Tracking dataset use (sequence_id, frame_id) as identifier.
2. Tracking dataset provide unique object id across time frames.
"""
[docs] def annotation_3dobject(self,
idx: Union[int, tuple],
raw: Optional[bool] = False
) -> Union[Target3DArray, List[Target3DArray]]:
'''
Return list of converted ground truth targets in lidar frame.
:param idx: index of requested frame
:param raw: if false, targets will be converted to d3d
:class:`d3d.abstraction.Target3DArray` format, otherwise
raw data will be returned in original format.
'''
raise NotImplementedError("abstract function")
[docs] def pose(self,
idx: Union[int, tuple],
raw: Optional[bool] = False,
names: Optional[Union[str, List[str]]] = None
) -> Union[EgoPose, Any]:
'''
Return (relative) pose of the vehicle for the frame. The base frame should be ground attached
which means the base frame will follow a East-North-Up axis order.
:param idx: index of requested frame
:param names: specify the sensor whose pose is requested. This option
only make sense when the dataset contains separate
timestamps for data from each sensor. In this case, the
pose either comes from dataset, or from interpolation.
:param raw: if false, targets will be converted to d3d
:class:`d3d.abstraction.EgoPose` format, otherwise raw
data will be returned in original format.
'''
raise NotImplementedError("abstract function")
@property
def pose_name(self) -> str:
'''
Return the sensor frame name whose coordinate the pose is reported in. This frame can be different from
the default frame in the calibration TransformSet.
'''
raise NotImplementedError("abstract property")
# Some utilities for implementing tracking dataset
# ref: https://stackoverflow.com/questions/2365701/decorating-python-class-methods-how-do-i-pass-the-instance-to-the-decorator
[docs]def expand_idx(func):
'''
This decorator wraps SequenceDatasetBase member functions with index input. It will delegates the situation
where self.nframe > 0 to the original function so that the original function can support only one index.
There is a parameter :obj:bypass` added to decorated function, which is used to call
the original underlying method without expansion.
'''
@functools.wraps(func)
def wrapper(self: SequenceDatasetBase, idx: Union[int, tuple], *kargs, **kwargs):
bypass = kwargs.pop("bypass", False)
if isinstance(idx, int):
seq_id, frame_idx = self._locate_frame(idx)
else:
seq_id, frame_idx = idx
if self.nframes == 0 or bypass:
return func(self, (seq_id, frame_idx), *kargs, **kwargs)
else:
return [func(self, (seq_id, idx), *kargs, **kwargs)
for idx in range(frame_idx, frame_idx + self.nframes + 1)]
return wrapper
[docs]def expand_name(valid_names: List[str]) -> Callable[[Callable], Callable]:
'''
This decorator works similar to expand_idx with support to distribute frame names.
Note that this function acts as a decorator factory instead of decorator
:param valid_names: List of valid sensor names
'''
def decorator(func):
default_names = inspect.signature(func).parameters["names"].default
assert default_names != inspect.Parameter.empty, \
"The decorated function should have default names value"
@functools.wraps(func)
def wrapper(self: TrackingDatasetBase, idx, names=default_names, *kargs, **kwargs):
unpack_result, names = check_frames(names, valid_names)
results = []
for name in names:
results.append(func(self, idx, name, *kargs, **kwargs))
return results[0] if unpack_result else results
return wrapper
return decorator
[docs]def expand_idx_name(valid_names: List[str]) -> Callable[[Callable], Callable]:
'''
This decorator works similar to expand_idx with support to distribute both indices and
frame names. Note that this function acts as a decorator factory instead of decorator
There is a parameter :obj:bypass` added to decorated function, which is used to call
the original underlying method without expansion.
:param valid_names: List of valid sensor names
'''
def decorator(func):
default_names = inspect.signature(func).parameters["names"].default
assert default_names != inspect.Parameter.empty, \
"The decorated function should have default names value"
@functools.wraps(func)
def wrapper(self: SequenceDatasetBase, idx, names=default_names, *kargs, **kwargs):
bypass = kwargs.pop("bypass", False)
if isinstance(idx, int):
seq_id, frame_idx = self._locate_frame(idx)
else:
seq_id, frame_idx = idx
unpack_result, names = check_frames(names, valid_names)
results = []
for name in names:
if self.nframes == 0 or bypass:
results.append(func(self, (seq_id, frame_idx),
names=name, *kargs, **kwargs))
else:
results.append([func(self, (seq_id, idx), names=name, *kargs, **kwargs)
for idx in range(frame_idx, frame_idx + self.nframes + 1)])
return results[0] if unpack_result else results
return wrapper
return decorator
[docs]class NumberPool:
'''
This class is a utility for multiprocessing using tqdm, define the task as
.. code-block:: python
def task(ntqdm, ...):
...
for data in tqdm(..., position=ntqdm, leave=False):
...
Then the parallel progress bars will be displayed in place.
'''
def __init__(self,
processes: int,
offset: int = 0,
*args, **kargs):
"""
:param processes: Number of processes available in the pool. If processes < 1,
then functions will be executed in current thread.
:param offset: The offset added to the `ntqdm` value of all process. This is useful
when you want to display a progress bar in outer loop.
"""
if processes == 0:
self._single_thread = True
else:
self._single_thread = False
self._ppool = Pool(processes, initializer=tqdm.set_lock, # pool of processes
initargs=(tqdm.get_lock(),), *args, **kargs)
self._npool = Manager().Array(
'B', [0] * processes) # pool of position number
self._nlock = Manager().Lock() # lock for self._npool
self._nqueue = 0 # number of tasks in pool
self._offset = offset
self._complete_event = Event()
@staticmethod
def _wrap_func(func, args, pool, nlock, offset):
n = -1
with nlock:
n = next(i for i, v in enumerate(pool) if v == 0)
pool[n] = 1
ret = func(n + offset, *args)
return (n, ret)
[docs] def apply_async(self, func, args=(), callback=None):
if self._single_thread:
result = func(0, *args)
if callback is not None:
callback(result)
return result
def _wrap_cb(ret):
n, oret = ret
with self._nlock:
self._npool[n] = 0
self._nqueue -= 1
if callback is not None:
callback(oret)
self._complete_event.set()
self._nqueue += 1
self._ppool.apply_async(NumberPool._wrap_func,
(func, args, self._npool,
self._nlock, self._offset),
callback=_wrap_cb,
error_callback=lambda e: print(
f"{type(e).__name__}: {e}")
)
[docs] def wait_for_once(self, margin: int = 0):
"""
Block current thread and wait for one available process
:param margin: Define when a process is available. The method will
return when there is :obj:`nprocess + margin` processes
in the pool.
"""
if self._nqueue >= len(self._npool) + margin: # only wait if the pool is full
self._complete_event.wait()
self._complete_event.clear()
def close(self):
self._ppool.close()
def join(self):
self._ppool.join()
# TODO: add imap function and support progress bar for imap