Skip to content

Documentation

Documentation

valor_lite.semantic_segmentation.Bitmask dataclass

Represents a binary mask with an associated semantic label.

Parameters:

Name Type Description Default
mask NDArray[bool_]

A NumPy array of boolean values representing the mask.

required
label str

The semantic label associated with the mask.

required

Examples:

>>> import numpy as np
>>> mask = np.array([[True, False], [False, True]], dtype=np.bool_)
>>> bitmask = Bitmask(mask=mask, label='ocean')
Source code in valor_lite/semantic_segmentation/annotation.py
@dataclass
class Bitmask:
    """
    Represents a binary mask with an associated semantic label.

    Parameters
    ----------
    mask : NDArray[np.bool_]
        A NumPy array of boolean values representing the mask.
    label : str
        The semantic label associated with the mask.

    Examples
    --------
    >>> import numpy as np
    >>> mask = np.array([[True, False], [False, True]], dtype=np.bool_)
    >>> bitmask = Bitmask(mask=mask, label='ocean')
    """

    mask: NDArray[np.bool_]
    label: str

    def __post_init__(self):
        if self.mask.dtype != np.bool_:
            raise ValueError(
                f"Bitmask recieved mask with dtype '{self.mask.dtype}'."
            )

valor_lite.semantic_segmentation.Segmentation dataclass

Segmentation data structure holding ground truth and prediction bitmasks for semantic segmentation tasks.

Parameters:

Name Type Description Default
uid str

Unique identifier for the image or sample.

required
groundtruths List[Bitmask]

List of ground truth bitmasks.

required
predictions List[Bitmask]

List of predicted bitmasks.

required
shape tuple of int

The shape of the segmentation masks. This is set automatically after initialization.

required
size int

The total number of pixels in the masks. This is set automatically after initialization.

0

Examples:

>>> import numpy as np
>>> mask1 = np.array([[True, False], [False, True]], dtype=np.bool_)
>>> groundtruth = Bitmask(mask=mask1, label='object')
>>> mask2 = np.array([[False, True], [True, False]], dtype=np.bool_)
>>> prediction = Bitmask(mask=mask2, label='object')
>>> segmentation = Segmentation(
...     uid='123',
...     groundtruths=[groundtruth],
...     predictions=[prediction]
... )
Source code in valor_lite/semantic_segmentation/annotation.py
@dataclass
class Segmentation:
    """
    Segmentation data structure holding ground truth and prediction bitmasks for semantic segmentation tasks.

    Parameters
    ----------
    uid : str
        Unique identifier for the image or sample.
    groundtruths : List[Bitmask]
        List of ground truth bitmasks.
    predictions : List[Bitmask]
        List of predicted bitmasks.
    shape : tuple of int, optional
        The shape of the segmentation masks. This is set automatically after initialization.
    size : int, optional
        The total number of pixels in the masks. This is set automatically after initialization.

    Examples
    --------
    >>> import numpy as np
    >>> mask1 = np.array([[True, False], [False, True]], dtype=np.bool_)
    >>> groundtruth = Bitmask(mask=mask1, label='object')
    >>> mask2 = np.array([[False, True], [True, False]], dtype=np.bool_)
    >>> prediction = Bitmask(mask=mask2, label='object')
    >>> segmentation = Segmentation(
    ...     uid='123',
    ...     groundtruths=[groundtruth],
    ...     predictions=[prediction]
    ... )
    """

    uid: str
    groundtruths: list[Bitmask]
    predictions: list[Bitmask]
    shape: tuple[int, ...]
    size: int = field(default=0)

    def __post_init__(self):

        if len(self.shape) != 2 or self.shape[0] <= 0 or self.shape[1] <= 0:
            raise ValueError(
                f"segmentations must be 2-dimensional and have non-zero dimensions. Recieved shape '{self.shape}'"
            )
        self.size = self.shape[0] * self.shape[1]

        mask_accumulation = None
        for groundtruth in self.groundtruths:
            if self.shape != groundtruth.mask.shape:
                raise ValueError(
                    f"ground truth masks for datum '{self.uid}' should have shape '{self.shape}'. Received mask with shape '{groundtruth.mask.shape}'"
                )

            if mask_accumulation is None:
                mask_accumulation = groundtruth.mask.copy()
            elif np.logical_and(mask_accumulation, groundtruth.mask).any():
                raise ValueError("ground truth masks cannot overlap")
            else:
                mask_accumulation = mask_accumulation | groundtruth.mask

        mask_accumulation = None
        for prediction in self.predictions:
            if self.shape != prediction.mask.shape:
                raise ValueError(
                    f"prediction masks for datum '{self.uid}' should have shape '{self.shape}'. Received mask with shape '{prediction.mask.shape}'"
                )

            if mask_accumulation is None:
                mask_accumulation = prediction.mask.copy()
            elif np.logical_and(mask_accumulation, prediction.mask).any():
                raise ValueError("prediction masks cannot overlap")
            else:
                mask_accumulation = mask_accumulation | prediction.mask

valor_lite.semantic_segmentation.DataLoader

Segmentation DataLoader.

Source code in valor_lite/semantic_segmentation/manager.py
class DataLoader:
    """
    Segmentation DataLoader.
    """

    def __init__(self):
        self._evaluator = Evaluator()
        self.matrices = list()

    def _add_datum(self, uid: str) -> int:
        """
        Helper function for adding a datum to the cache.

        Parameters
        ----------
        uid : str
            The datum uid.

        Returns
        -------
        int
            The datum index.
        """
        if uid in self._evaluator.datum_id_to_index:
            raise ValueError(f"Datum with uid `{uid}` already exists.")
        index = len(self._evaluator.datum_id_to_index)
        self._evaluator.datum_id_to_index[uid] = index
        self._evaluator.index_to_datum_id.append(uid)
        return index

    def _add_label(self, label: str) -> int:
        """
        Helper function for adding a label to the cache.

        Parameters
        ----------
        label : str
            A string label.

        Returns
        -------
        int
            The label's index.
        """
        if label not in self._evaluator.label_to_index:
            label_id = len(self._evaluator.index_to_label)
            self._evaluator.label_to_index[label] = label_id
            self._evaluator.index_to_label.append(label)
        return self._evaluator.label_to_index[label]

    def add_data(
        self,
        segmentations: list[Segmentation],
        show_progress: bool = False,
    ):
        """
        Adds segmentations to the cache.

        Parameters
        ----------
        segmentations : list[Segmentation]
            A list of Segmentation objects.
        show_progress : bool, default=False
            Toggle for tqdm progress bar.
        """

        disable_tqdm = not show_progress
        for segmentation in tqdm(segmentations, disable=disable_tqdm):
            # update datum cache
            self._add_datum(segmentation.uid)

            groundtruth_labels = -1 * np.ones(
                len(segmentation.groundtruths), dtype=np.int64
            )
            for idx, groundtruth in enumerate(segmentation.groundtruths):
                label_idx = self._add_label(groundtruth.label)
                groundtruth_labels[idx] = label_idx

            prediction_labels = -1 * np.ones(
                len(segmentation.predictions), dtype=np.int64
            )
            for idx, prediction in enumerate(segmentation.predictions):
                label_idx = self._add_label(prediction.label)
                prediction_labels[idx] = label_idx

            if segmentation.groundtruths:
                combined_groundtruths = np.stack(
                    [
                        groundtruth.mask.flatten()
                        for groundtruth in segmentation.groundtruths
                    ],
                    axis=0,
                )
            else:
                combined_groundtruths = np.zeros(
                    (1, segmentation.shape[0] * segmentation.shape[1]),
                    dtype=np.bool_,
                )

            if segmentation.predictions:
                combined_predictions = np.stack(
                    [
                        prediction.mask.flatten()
                        for prediction in segmentation.predictions
                    ],
                    axis=0,
                )
            else:
                combined_predictions = np.zeros(
                    (1, segmentation.shape[0] * segmentation.shape[1]),
                    dtype=np.bool_,
                )

            self.matrices.append(
                compute_intermediate_confusion_matrices(
                    groundtruths=combined_groundtruths,
                    predictions=combined_predictions,
                    groundtruth_labels=groundtruth_labels,
                    prediction_labels=prediction_labels,
                    n_labels=len(self._evaluator.index_to_label),
                )
            )

    def finalize(self) -> Evaluator:
        """
        Performs data finalization and some preprocessing steps.

        Returns
        -------
        Evaluator
            A ready-to-use evaluator object.
        """

        if len(self.matrices) == 0:
            raise EmptyEvaluatorError()

        n_labels = len(self._evaluator.index_to_label)
        n_datums = len(self._evaluator.index_to_datum_id)
        self._evaluator._confusion_matrices = np.zeros(
            (n_datums, n_labels + 1, n_labels + 1), dtype=np.int64
        )
        for idx, matrix in enumerate(self.matrices):
            h, w = matrix.shape
            self._evaluator._confusion_matrices[idx, :h, :w] = matrix
        self._evaluator._label_metadata = compute_label_metadata(
            confusion_matrices=self._evaluator._confusion_matrices,
            n_labels=n_labels,
        )
        self._evaluator._metadata = Metadata.create(
            confusion_matrices=self._evaluator._confusion_matrices,
        )
        return self._evaluator

add_data(segmentations, show_progress=False)

Adds segmentations to the cache.

Parameters:

Name Type Description Default
segmentations list[Segmentation]

A list of Segmentation objects.

required
show_progress bool

Toggle for tqdm progress bar.

False
Source code in valor_lite/semantic_segmentation/manager.py
def add_data(
    self,
    segmentations: list[Segmentation],
    show_progress: bool = False,
):
    """
    Adds segmentations to the cache.

    Parameters
    ----------
    segmentations : list[Segmentation]
        A list of Segmentation objects.
    show_progress : bool, default=False
        Toggle for tqdm progress bar.
    """

    disable_tqdm = not show_progress
    for segmentation in tqdm(segmentations, disable=disable_tqdm):
        # update datum cache
        self._add_datum(segmentation.uid)

        groundtruth_labels = -1 * np.ones(
            len(segmentation.groundtruths), dtype=np.int64
        )
        for idx, groundtruth in enumerate(segmentation.groundtruths):
            label_idx = self._add_label(groundtruth.label)
            groundtruth_labels[idx] = label_idx

        prediction_labels = -1 * np.ones(
            len(segmentation.predictions), dtype=np.int64
        )
        for idx, prediction in enumerate(segmentation.predictions):
            label_idx = self._add_label(prediction.label)
            prediction_labels[idx] = label_idx

        if segmentation.groundtruths:
            combined_groundtruths = np.stack(
                [
                    groundtruth.mask.flatten()
                    for groundtruth in segmentation.groundtruths
                ],
                axis=0,
            )
        else:
            combined_groundtruths = np.zeros(
                (1, segmentation.shape[0] * segmentation.shape[1]),
                dtype=np.bool_,
            )

        if segmentation.predictions:
            combined_predictions = np.stack(
                [
                    prediction.mask.flatten()
                    for prediction in segmentation.predictions
                ],
                axis=0,
            )
        else:
            combined_predictions = np.zeros(
                (1, segmentation.shape[0] * segmentation.shape[1]),
                dtype=np.bool_,
            )

        self.matrices.append(
            compute_intermediate_confusion_matrices(
                groundtruths=combined_groundtruths,
                predictions=combined_predictions,
                groundtruth_labels=groundtruth_labels,
                prediction_labels=prediction_labels,
                n_labels=len(self._evaluator.index_to_label),
            )
        )

finalize()

Performs data finalization and some preprocessing steps.

Returns:

Type Description
Evaluator

A ready-to-use evaluator object.

Source code in valor_lite/semantic_segmentation/manager.py
def finalize(self) -> Evaluator:
    """
    Performs data finalization and some preprocessing steps.

    Returns
    -------
    Evaluator
        A ready-to-use evaluator object.
    """

    if len(self.matrices) == 0:
        raise EmptyEvaluatorError()

    n_labels = len(self._evaluator.index_to_label)
    n_datums = len(self._evaluator.index_to_datum_id)
    self._evaluator._confusion_matrices = np.zeros(
        (n_datums, n_labels + 1, n_labels + 1), dtype=np.int64
    )
    for idx, matrix in enumerate(self.matrices):
        h, w = matrix.shape
        self._evaluator._confusion_matrices[idx, :h, :w] = matrix
    self._evaluator._label_metadata = compute_label_metadata(
        confusion_matrices=self._evaluator._confusion_matrices,
        n_labels=n_labels,
    )
    self._evaluator._metadata = Metadata.create(
        confusion_matrices=self._evaluator._confusion_matrices,
    )
    return self._evaluator

valor_lite.semantic_segmentation.Evaluator

Segmentation Evaluator

Source code in valor_lite/semantic_segmentation/manager.py
class Evaluator:
    """
    Segmentation Evaluator
    """

    def __init__(self):
        """Initializes evaluator caches."""
        # external references
        self.datum_id_to_index: dict[str, int] = {}
        self.index_to_datum_id: list[str] = []
        self.label_to_index: dict[str, int] = {}
        self.index_to_label: list[str] = []

        # internal caches
        self._confusion_matrices = np.array([], dtype=np.int64)
        self._label_metadata = np.array([], dtype=np.int64)
        self._metadata = Metadata()

    @property
    def metadata(self) -> Metadata:
        return self._metadata

    @property
    def ignored_prediction_labels(self) -> list[str]:
        """
        Prediction labels that are not present in the ground truth set.
        """
        glabels = set(np.where(self._label_metadata[:, 0] > 0)[0])
        plabels = set(np.where(self._label_metadata[:, 1] > 0)[0])
        return [
            self.index_to_label[label_id] for label_id in (plabels - glabels)
        ]

    @property
    def missing_prediction_labels(self) -> list[str]:
        """
        Ground truth labels that are not present in the prediction set.
        """
        glabels = set(np.where(self._label_metadata[:, 0] > 0)[0])
        plabels = set(np.where(self._label_metadata[:, 1] > 0)[0])
        return [
            self.index_to_label[label_id] for label_id in (glabels - plabels)
        ]

    def create_filter(
        self,
        datum_ids: list[str] | None = None,
        labels: list[str] | None = None,
    ) -> Filter:
        """
        Creates a filter for use with the evaluator.

        Parameters
        ----------
        datum_ids : list[str], optional
            An optional list of string uids representing datums.
        labels : list[str], optional
            An optional list of labels.

        Returns
        -------
        Filter
            The filter object containing a mask and metadata.
        """
        datum_mask = np.ones(self._confusion_matrices.shape[0], dtype=np.bool_)
        label_mask = np.zeros(
            self.metadata.number_of_labels + 1, dtype=np.bool_
        )

        if datum_ids is not None:
            if not datum_ids:
                return Filter(
                    datum_mask=np.zeros_like(datum_mask),
                    label_mask=label_mask,
                    metadata=Metadata(),
                )
            datum_id_array = np.array(
                [self.datum_id_to_index[uid] for uid in datum_ids],
                dtype=np.int64,
            )
            datum_id_array.sort()
            mask_valid_datums = (
                np.arange(self._confusion_matrices.shape[0]).reshape(-1, 1)
                == datum_id_array.reshape(1, -1)
            ).any(axis=1)
            datum_mask[~mask_valid_datums] = False

        if labels is not None:
            if not labels:
                return Filter(
                    datum_mask=datum_mask,
                    label_mask=np.ones_like(label_mask),
                    metadata=Metadata(),
                )
            labels_id_array = np.array(
                [self.label_to_index[label] for label in labels] + [-1],
                dtype=np.int64,
            )
            label_range = np.arange(self.metadata.number_of_labels + 1) - 1
            mask_valid_labels = (
                label_range.reshape(-1, 1) == labels_id_array.reshape(1, -1)
            ).any(axis=1)
            label_mask[~mask_valid_labels] = True

        filtered_confusion_matrices, _ = filter_cache(
            confusion_matrices=self._confusion_matrices.copy(),
            datum_mask=datum_mask,
            label_mask=label_mask,
            number_of_labels=self.metadata.number_of_labels,
        )

        return Filter(
            datum_mask=datum_mask,
            label_mask=label_mask,
            metadata=Metadata.create(
                confusion_matrices=filtered_confusion_matrices,
            ),
        )

    def filter(
        self, filter_: Filter
    ) -> tuple[NDArray[np.int64], NDArray[np.int64]]:
        """
        Performs the filter operation over the internal cache.

        Parameters
        ----------
        filter_ : Filter
            An object describing the filter operation.

        Returns
        -------
        NDArray[int64]
            Filtered confusion matrices.
        NDArray[int64]
            Filtered label metadata
        """
        return filter_cache(
            confusion_matrices=self._confusion_matrices.copy(),
            datum_mask=filter_.datum_mask,
            label_mask=filter_.label_mask,
            number_of_labels=self.metadata.number_of_labels,
        )

    def compute_precision_recall_iou(
        self, filter_: Filter | None = None
    ) -> dict[MetricType, list]:
        """
        Performs an evaluation and returns metrics.

        Returns
        -------
        dict[MetricType, list]
            A dictionary mapping MetricType enumerations to lists of computed metrics.
        """
        if filter_ is not None:
            confusion_matrices, label_metadata = self.filter(filter_)
            n_pixels = filter_.metadata.number_of_pixels
        else:
            confusion_matrices = self._confusion_matrices
            label_metadata = self._label_metadata
            n_pixels = self.metadata.number_of_pixels

        results = compute_metrics(
            confusion_matrices=confusion_matrices,
            label_metadata=label_metadata,
            n_pixels=n_pixels,
        )
        return unpack_precision_recall_iou_into_metric_lists(
            results=results,
            label_metadata=label_metadata,
            index_to_label=self.index_to_label,
        )

    def evaluate(
        self, filter_: Filter | None = None
    ) -> dict[MetricType, list[Metric]]:
        """
        Computes all available metrics.

        Returns
        -------
        dict[MetricType, list[Metric]]
            Lists of metrics organized by metric type.
        """
        return self.compute_precision_recall_iou(filter_=filter_)

ignored_prediction_labels property

Prediction labels that are not present in the ground truth set.

missing_prediction_labels property

Ground truth labels that are not present in the prediction set.

__init__()

Initializes evaluator caches.

Source code in valor_lite/semantic_segmentation/manager.py
def __init__(self):
    """Initializes evaluator caches."""
    # external references
    self.datum_id_to_index: dict[str, int] = {}
    self.index_to_datum_id: list[str] = []
    self.label_to_index: dict[str, int] = {}
    self.index_to_label: list[str] = []

    # internal caches
    self._confusion_matrices = np.array([], dtype=np.int64)
    self._label_metadata = np.array([], dtype=np.int64)
    self._metadata = Metadata()

compute_precision_recall_iou(filter_=None)

Performs an evaluation and returns metrics.

Returns:

Type Description
dict[MetricType, list]

A dictionary mapping MetricType enumerations to lists of computed metrics.

Source code in valor_lite/semantic_segmentation/manager.py
def compute_precision_recall_iou(
    self, filter_: Filter | None = None
) -> dict[MetricType, list]:
    """
    Performs an evaluation and returns metrics.

    Returns
    -------
    dict[MetricType, list]
        A dictionary mapping MetricType enumerations to lists of computed metrics.
    """
    if filter_ is not None:
        confusion_matrices, label_metadata = self.filter(filter_)
        n_pixels = filter_.metadata.number_of_pixels
    else:
        confusion_matrices = self._confusion_matrices
        label_metadata = self._label_metadata
        n_pixels = self.metadata.number_of_pixels

    results = compute_metrics(
        confusion_matrices=confusion_matrices,
        label_metadata=label_metadata,
        n_pixels=n_pixels,
    )
    return unpack_precision_recall_iou_into_metric_lists(
        results=results,
        label_metadata=label_metadata,
        index_to_label=self.index_to_label,
    )

create_filter(datum_ids=None, labels=None)

Creates a filter for use with the evaluator.

Parameters:

Name Type Description Default
datum_ids list[str]

An optional list of string uids representing datums.

None
labels list[str]

An optional list of labels.

None

Returns:

Type Description
Filter

The filter object containing a mask and metadata.

Source code in valor_lite/semantic_segmentation/manager.py
def create_filter(
    self,
    datum_ids: list[str] | None = None,
    labels: list[str] | None = None,
) -> Filter:
    """
    Creates a filter for use with the evaluator.

    Parameters
    ----------
    datum_ids : list[str], optional
        An optional list of string uids representing datums.
    labels : list[str], optional
        An optional list of labels.

    Returns
    -------
    Filter
        The filter object containing a mask and metadata.
    """
    datum_mask = np.ones(self._confusion_matrices.shape[0], dtype=np.bool_)
    label_mask = np.zeros(
        self.metadata.number_of_labels + 1, dtype=np.bool_
    )

    if datum_ids is not None:
        if not datum_ids:
            return Filter(
                datum_mask=np.zeros_like(datum_mask),
                label_mask=label_mask,
                metadata=Metadata(),
            )
        datum_id_array = np.array(
            [self.datum_id_to_index[uid] for uid in datum_ids],
            dtype=np.int64,
        )
        datum_id_array.sort()
        mask_valid_datums = (
            np.arange(self._confusion_matrices.shape[0]).reshape(-1, 1)
            == datum_id_array.reshape(1, -1)
        ).any(axis=1)
        datum_mask[~mask_valid_datums] = False

    if labels is not None:
        if not labels:
            return Filter(
                datum_mask=datum_mask,
                label_mask=np.ones_like(label_mask),
                metadata=Metadata(),
            )
        labels_id_array = np.array(
            [self.label_to_index[label] for label in labels] + [-1],
            dtype=np.int64,
        )
        label_range = np.arange(self.metadata.number_of_labels + 1) - 1
        mask_valid_labels = (
            label_range.reshape(-1, 1) == labels_id_array.reshape(1, -1)
        ).any(axis=1)
        label_mask[~mask_valid_labels] = True

    filtered_confusion_matrices, _ = filter_cache(
        confusion_matrices=self._confusion_matrices.copy(),
        datum_mask=datum_mask,
        label_mask=label_mask,
        number_of_labels=self.metadata.number_of_labels,
    )

    return Filter(
        datum_mask=datum_mask,
        label_mask=label_mask,
        metadata=Metadata.create(
            confusion_matrices=filtered_confusion_matrices,
        ),
    )

evaluate(filter_=None)

Computes all available metrics.

Returns:

Type Description
dict[MetricType, list[Metric]]

Lists of metrics organized by metric type.

Source code in valor_lite/semantic_segmentation/manager.py
def evaluate(
    self, filter_: Filter | None = None
) -> dict[MetricType, list[Metric]]:
    """
    Computes all available metrics.

    Returns
    -------
    dict[MetricType, list[Metric]]
        Lists of metrics organized by metric type.
    """
    return self.compute_precision_recall_iou(filter_=filter_)

filter(filter_)

Performs the filter operation over the internal cache.

Parameters:

Name Type Description Default
filter_ Filter

An object describing the filter operation.

required

Returns:

Type Description
NDArray[int64]

Filtered confusion matrices.

NDArray[int64]

Filtered label metadata

Source code in valor_lite/semantic_segmentation/manager.py
def filter(
    self, filter_: Filter
) -> tuple[NDArray[np.int64], NDArray[np.int64]]:
    """
    Performs the filter operation over the internal cache.

    Parameters
    ----------
    filter_ : Filter
        An object describing the filter operation.

    Returns
    -------
    NDArray[int64]
        Filtered confusion matrices.
    NDArray[int64]
        Filtered label metadata
    """
    return filter_cache(
        confusion_matrices=self._confusion_matrices.copy(),
        datum_mask=filter_.datum_mask,
        label_mask=filter_.label_mask,
        number_of_labels=self.metadata.number_of_labels,
    )