Source code for mmocr.datasets.uniform_concat_dataset
# Copyright (c) OpenMMLab. All rights reserved.
import copy
from collections import defaultdict
import numpy as np
from mmcv.utils import print_log
from mmdet.datasets import DATASETS, ConcatDataset, build_dataset
from mmocr.utils import is_2dlist, is_type_list
[docs]@DATASETS.register_module()
class UniformConcatDataset(ConcatDataset):
"""A wrapper of ConcatDataset which support dataset pipeline assignment and
replacement.
Args:
datasets (list[dict] | list[list[dict]]): A list of datasets cfgs.
separate_eval (bool): Whether to evaluate the results
separately if it is used as validation dataset.
Defaults to True.
show_mean_scores (str | bool): Whether to compute the mean evaluation
results, only applicable when ``separate_eval=True``. Options are
[True, False, ``auto``]. If ``True``, mean results will be added to
the result dictionary with keys in the form of
``mean_{metric_name}``. If 'auto', mean results will be shown only
when more than 1 dataset is wrapped.
pipeline (None | list[dict] | list[list[dict]]): If ``None``,
each dataset in datasets use its own pipeline;
If ``list[dict]``, it will be assigned to the dataset whose
pipeline is None in datasets;
If ``list[list[dict]]``, pipeline of dataset which is None
in datasets will be replaced by the corresponding pipeline
in the list.
force_apply (bool): If True, apply pipeline above to each dataset
even if it have its own pipeline. Default: False.
"""
def __init__(self,
datasets,
separate_eval=True,
show_mean_scores='auto',
pipeline=None,
force_apply=False,
**kwargs):
new_datasets = []
if pipeline is not None:
assert isinstance(
pipeline,
list), 'pipeline must be list[dict] or list[list[dict]].'
if is_type_list(pipeline, dict):
self._apply_pipeline(datasets, pipeline, force_apply)
new_datasets = datasets
elif is_2dlist(pipeline):
assert is_2dlist(datasets)
assert len(datasets) == len(pipeline)
for sub_datasets, tmp_pipeline in zip(datasets, pipeline):
self._apply_pipeline(sub_datasets, tmp_pipeline,
force_apply)
new_datasets.extend(sub_datasets)
else:
if is_2dlist(datasets):
for sub_datasets in datasets:
new_datasets.extend(sub_datasets)
else:
new_datasets = datasets
datasets = [build_dataset(c, kwargs) for c in new_datasets]
super().__init__(datasets, separate_eval)
if not separate_eval:
raise NotImplementedError(
'Evaluating datasets as a whole is not'
' supported yet. Please use "separate_eval=True"')
assert isinstance(show_mean_scores, bool) or show_mean_scores == 'auto'
if show_mean_scores == 'auto':
show_mean_scores = len(self.datasets) > 1
self.show_mean_scores = show_mean_scores
if show_mean_scores is True or show_mean_scores == 'auto' and len(
self.datasets) > 1:
if len(set([type(ds) for ds in self.datasets])) != 1:
raise NotImplementedError(
'To compute mean evaluation scores, all datasets'
'must have the same type')
@staticmethod
def _apply_pipeline(datasets, pipeline, force_apply=False):
from_cfg = all(isinstance(x, dict) for x in datasets)
assert from_cfg, 'datasets should be config dicts'
assert all(isinstance(x, dict) for x in pipeline)
for dataset in datasets:
if dataset['pipeline'] is None or force_apply:
dataset['pipeline'] = copy.deepcopy(pipeline)
[docs] def evaluate(self, results, logger=None, **kwargs):
"""Evaluate the results.
Args:
results (list[list | tuple]): Testing results of the dataset.
logger (logging.Logger | str | None): Logger used for printing
related information during evaluation. Default: None.
Returns:
dict[str: float]: Results of each separate
dataset if `self.separate_eval=True`.
"""
assert len(results) == self.cumulative_sizes[-1], \
('Dataset and results have different sizes: '
f'{self.cumulative_sizes[-1]} v.s. {len(results)}')
# Check whether all the datasets support evaluation
for dataset in self.datasets:
assert hasattr(dataset, 'evaluate'), \
f'{type(dataset)} does not implement evaluate function'
if self.separate_eval:
dataset_idx = -1
total_eval_results = dict()
if self.show_mean_scores:
mean_eval_results = defaultdict(list)
for dataset in self.datasets:
start_idx = 0 if dataset_idx == -1 else \
self.cumulative_sizes[dataset_idx]
end_idx = self.cumulative_sizes[dataset_idx + 1]
results_per_dataset = results[start_idx:end_idx]
print_log(
f'\nEvaluating {dataset.ann_file} with '
f'{len(results_per_dataset)} images now',
logger=logger)
eval_results_per_dataset = dataset.evaluate(
results_per_dataset, logger=logger, **kwargs)
dataset_idx += 1
for k, v in eval_results_per_dataset.items():
total_eval_results.update({f'{dataset_idx}_{k}': v})
if self.show_mean_scores:
mean_eval_results[k].append(v)
if self.show_mean_scores:
for k, v in mean_eval_results.items():
total_eval_results[f'mean_{k}'] = np.mean(v)
return total_eval_results
else:
raise NotImplementedError(
'Evaluating datasets as a whole is not'
' supported yet. Please use "separate_eval=True"')