Skip to content

landingai.data_management

LandingLens

LandingLens client

Example

Create a client by specifying API Key and project id

client = LandingLens(project, api_key)

Parameters

project_id: int LandingLens project id. Can override this default in individual commands. api_key: Optional[str] LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.

Source code in landingai/data_management/client.py
class LandingLens:
    """
    LandingLens client

    Example
    -------
    # Create a client by specifying API Key and project id
    >>> client = LandingLens(project, api_key)

    Parameters
    ----------
    project_id: int
        LandingLens project id.  Can override this default in individual commands.
    api_key: Optional[str]
        LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.
    """

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self.project_id = project_id
        if not api_key:
            api_key = load_api_credential().api_key
        self.api_key = api_key

    @property
    def _project_id(self) -> int:
        return self.project_id

    @property
    def _api_key(self) -> str:
        return self.api_key

    def _api_async(
        self,
        route_name: str,
        params: Optional[Dict[str, Any]] = None,
        form_data: Optional[Dict[str, Any]] = None,
        resp_with_content: Optional[Dict[str, Any]] = None,
        url_replacements: Optional[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
        """Returns a response from the LandingLens API"""
        is_form_data = form_data is not None
        assert resp_with_content is not None if not is_form_data else True

        endpoint, headers, params, root_url, route = self._api_common_setup(
            route_name, url_replacements, resp_with_content, params
        )
        if is_form_data:
            # Create a MultipartEncoder for the form data
            form = MultipartEncoder(fields=form_data) if form_data is not None else None
            headers["Content-Type"] = form.content_type

        try:
            response = requests.request(
                method=route["method"].__name__,
                url=endpoint,
                headers=headers,
                json=resp_with_content if not is_form_data else None,
                params=params,
                data=form if is_form_data else None,
            )

            _LOGGER.debug("Request URL: ", response.url)
            _LOGGER.debug("Response Code: ", response.status_code)
            _LOGGER.debug("Response Reason: ", response.reason)

            resp_with_content = response.json()
            _LOGGER.debug(
                "Response Content (500 chars): ",
                json.dumps(resp_with_content)[:500],
            )
        except requests.exceptions.RequestException as e:
            raise HttpError(
                "HTTP request to LandingLens server failed with error message: \n"
                f"{str(e)}"
            )
        except Exception as e:
            raise HttpError(f"An error occurred during the HTTP request: {str(e)}")
        assert resp_with_content is not None
        return resp_with_content

    def _api(
        self,
        route_name: str,
        params: Optional[Dict[str, Any]] = None,
        data: Optional[Dict[str, Any]] = None,
        url_replacements: Optional[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
        """Returns a response from the LandingLens API"""
        endpoint, headers, params, root_url, route = self._api_common_setup(
            route_name, url_replacements, data, params
        )
        resp = route["method"](
            endpoint,
            params=params,
            json=data,
            headers=headers,
            verify=True,
        )
        _LOGGER.info(f"Request URL: {resp.request.url}")
        _LOGGER.debug("Response Code: ", resp.status_code)
        _LOGGER.debug("Response Reason: ", resp.reason)
        _LOGGER.debug("Response Content (500 chars): ", resp.content[:500])
        if not resp.ok:
            try:
                error_message = json.load(io.StringIO(resp.content.decode("utf-8")))[
                    "message"
                ]
            except Exception as e:
                _LOGGER.warning(f"Failed to parse error message into json: {e}")
                error_message = resp.text
            raise HttpError(
                "HTTP request to LandingLens server failed with "
                f"code {resp.status_code}-{resp.reason} and error message: \n"
                f"{error_message}"
            )
        return cast(Dict[str, Any], resp.json())

    def _api_common_setup(
        self,
        route_name: str,
        url_replacements: Optional[Dict[str, Any]],
        data: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
    ) -> Tuple[str, Dict[str, Any], Dict[str, Any], str, Dict[str, Any]]:
        route = ROUTES[route_name]
        headers = {
            "apikey": self.api_key,
            "Content-Type": "application/json",
            "User-Agent": "landingai-python-" + version("landingai"),
        }
        root_url_type = cast(str, route["root_url"])

        if root_url_type not in _URL_ROOTS:
            raise ValueError(f"Unknown URL specified: {root_url_type}")

        root_url = _URL_ROOTS[root_url_type]

        if not params:
            params = {}
        if route["method"] == requests.get and not params.get("projectId"):
            params["projectId"] = self.project_id
        if route["method"] == requests.post and data and not data.get("projectId"):
            data["projectId"] = self.project_id
        endpoint = posixpath.join(root_url, cast(str, route["endpoint"]))

        if url_replacements:
            endpoint = endpoint.format(
                **{**{"version": _API_VERSION}, **url_replacements}
            )
        else:
            endpoint = endpoint.format(**{"version": _API_VERSION})

        return endpoint, headers, params, root_url, route

    def get_project_property(
        self, project_id: int, property: Optional[str] = None
    ) -> Any:
        resp = self._api(GET_PROJECT, params={"projectId": project_id})
        project = resp.get("data")
        if property is None:
            return project
        assert project is not None
        property_value = project.get(to_camel_case(property))
        if property_value is None:
            raise HttpError(f"{property} Id not found")
        return property_value

    @lru_cache(maxsize=_LRU_CACHE_SIZE)
    def get_metadata_mappings(
        self, project_id: int
    ) -> Tuple[Dict[str, Any], Dict[int, str]]:
        resp = self._api(METADATA_ITEMS, params={"projectId": project_id})
        metadata_mapping_resp = resp.get("data", {})

        metadata_mapping = {
            metadata_field["name"]: (
                metadata_field["id"],
                metadata_field["predefinedChoices"],
            )
            for metadata_field in metadata_mapping_resp.values()
        }
        id_to_metadata = {v[0]: k for k, v in metadata_mapping.items()}

        return metadata_mapping, id_to_metadata

LegacyTrainingDataset

A client for fetch the training dataset from legacy training flows.

Source code in landingai/data_management/dataset.py
@deprecated(
    " You should not use this class unless you're told by the LandingAI team. It's only intended for training flow migration use cases."
)
class LegacyTrainingDataset:
    """A client for fetch the training dataset from legacy training flows."""

    def __init__(self, project_id: int, cookie: str) -> None:
        self._project_id = project_id
        self._cookie = cookie

    def get_legacy_training_dataset(
        self, output_dir: Path, job_id: str
    ) -> pd.DataFrame:
        """Get the training dataset from legacy training flow by job_id.
        Currently, it only supports segmentation and classification datasets.

        Example output of the returned dataframe for a segmentation dataset:
        ```
            media_id   seg_mask_prediction_path         seg_mask_label_path
        0   10413664  /work/landingai-python/104136...  /work/landingai-python/104136...
        1   10413665  /work/landingai-python/104136...  /work/landingai-python/104136...
        2   10413666  /work/landingai-python/104136...  /work/landingai-python/104136...
        ```

        NOTE:
        1. This dataset has a similar format as the dataset returned by `TrainingDataset.get_training_dataset()`.
        2. Only difference is that the prediction mask is thresholded, i.e. the value of each pixel is either 0 or 1.


        Example output of the returned dataframe for a classification dataset:
        ```
              media_id   label_class  prediction_score prediction_class prediction_type
        0      9789913    black_spot          0.992697       black_spot         correct
        1      9789914    black_spot          0.996753       black_spot         correct
        ...        ...           ...               ...              ...             ...
        1801   9791719  unclassified          0.969400     unclassified         correct
        1802   9791720  unclassified          0.778278     unclassified         correct
        ```
        """

        output_dir.mkdir(parents=True, exist_ok=True)
        data = _fetch_gt_and_predictions(
            self._project_id, self._cookie, job_id=job_id, offset=0
        )
        if not data:
            raise ValueError(
                f"Failed to find a classic flow job by job id: {job_id} in project {self._project_id}. Please check the error log for more details and act accordingly."
            )
        dataset_type = data["type"]
        rows: List[Dict[str, Any]] = [
            _extract_gt_and_predictions(d, output_dir, dataset_type)
            for d in data["details"]
        ]
        total = data["totalItems"]
        _LOGGER.info(f"Found {total} records from a {dataset_type} dataset:")
        if total > _PAGE_SIZE:
            new_offsets = list(range(0, total - _PAGE_SIZE, _PAGE_SIZE))
            new_offsets = [offset + _PAGE_SIZE for offset in new_offsets]
            with tqdm(total=len(new_offsets)) as pbar:
                with concurrent.futures.ThreadPoolExecutor() as executor:
                    futures = [
                        executor.submit(
                            _fetch_gt_and_predictions,
                            project_id=self._project_id,
                            cookie=self._cookie,
                            job_id=job_id,
                            offset=new_offset,
                        )
                        for new_offset in new_offsets
                    ]
                    for future in concurrent.futures.as_completed(futures):
                        new_data = future.result()
                        if not new_data:
                            continue
                        new_rows = [
                            _extract_gt_and_predictions(d, output_dir, dataset_type)
                            for d in new_data["details"]
                        ]
                        rows.extend(new_rows)
                        pbar.update(1)
        _LOGGER.info(
            (f"Fetched {len(rows)} image-prediction-label pairs from job {job_id}.")
        )
        return pd.DataFrame(rows)

get_legacy_training_dataset(output_dir, job_id)

Get the training dataset from legacy training flow by job_id. Currently, it only supports segmentation and classification datasets.

Example output of the returned dataframe for a segmentation dataset:

    media_id   seg_mask_prediction_path         seg_mask_label_path
0   10413664  /work/landingai-python/104136...  /work/landingai-python/104136...
1   10413665  /work/landingai-python/104136...  /work/landingai-python/104136...
2   10413666  /work/landingai-python/104136...  /work/landingai-python/104136...

NOTE: 1. This dataset has a similar format as the dataset returned by TrainingDataset.get_training_dataset(). 2. Only difference is that the prediction mask is thresholded, i.e. the value of each pixel is either 0 or 1.

Example output of the returned dataframe for a classification dataset:

      media_id   label_class  prediction_score prediction_class prediction_type
0      9789913    black_spot          0.992697       black_spot         correct
1      9789914    black_spot          0.996753       black_spot         correct
...        ...           ...               ...              ...             ...
1801   9791719  unclassified          0.969400     unclassified         correct
1802   9791720  unclassified          0.778278     unclassified         correct

Source code in landingai/data_management/dataset.py
def get_legacy_training_dataset(
    self, output_dir: Path, job_id: str
) -> pd.DataFrame:
    """Get the training dataset from legacy training flow by job_id.
    Currently, it only supports segmentation and classification datasets.

    Example output of the returned dataframe for a segmentation dataset:
    ```
        media_id   seg_mask_prediction_path         seg_mask_label_path
    0   10413664  /work/landingai-python/104136...  /work/landingai-python/104136...
    1   10413665  /work/landingai-python/104136...  /work/landingai-python/104136...
    2   10413666  /work/landingai-python/104136...  /work/landingai-python/104136...
    ```

    NOTE:
    1. This dataset has a similar format as the dataset returned by `TrainingDataset.get_training_dataset()`.
    2. Only difference is that the prediction mask is thresholded, i.e. the value of each pixel is either 0 or 1.


    Example output of the returned dataframe for a classification dataset:
    ```
          media_id   label_class  prediction_score prediction_class prediction_type
    0      9789913    black_spot          0.992697       black_spot         correct
    1      9789914    black_spot          0.996753       black_spot         correct
    ...        ...           ...               ...              ...             ...
    1801   9791719  unclassified          0.969400     unclassified         correct
    1802   9791720  unclassified          0.778278     unclassified         correct
    ```
    """

    output_dir.mkdir(parents=True, exist_ok=True)
    data = _fetch_gt_and_predictions(
        self._project_id, self._cookie, job_id=job_id, offset=0
    )
    if not data:
        raise ValueError(
            f"Failed to find a classic flow job by job id: {job_id} in project {self._project_id}. Please check the error log for more details and act accordingly."
        )
    dataset_type = data["type"]
    rows: List[Dict[str, Any]] = [
        _extract_gt_and_predictions(d, output_dir, dataset_type)
        for d in data["details"]
    ]
    total = data["totalItems"]
    _LOGGER.info(f"Found {total} records from a {dataset_type} dataset:")
    if total > _PAGE_SIZE:
        new_offsets = list(range(0, total - _PAGE_SIZE, _PAGE_SIZE))
        new_offsets = [offset + _PAGE_SIZE for offset in new_offsets]
        with tqdm(total=len(new_offsets)) as pbar:
            with concurrent.futures.ThreadPoolExecutor() as executor:
                futures = [
                    executor.submit(
                        _fetch_gt_and_predictions,
                        project_id=self._project_id,
                        cookie=self._cookie,
                        job_id=job_id,
                        offset=new_offset,
                    )
                    for new_offset in new_offsets
                ]
                for future in concurrent.futures.as_completed(futures):
                    new_data = future.result()
                    if not new_data:
                        continue
                    new_rows = [
                        _extract_gt_and_predictions(d, output_dir, dataset_type)
                        for d in new_data["details"]
                    ]
                    rows.extend(new_rows)
                    pbar.update(1)
    _LOGGER.info(
        (f"Fetched {len(rows)} image-prediction-label pairs from job {job_id}.")
    )
    return pd.DataFrame(rows)

TrainingDataset

A client for fetch the (Fast & East) training dataset.

Source code in landingai/data_management/dataset.py
class TrainingDataset:
    """A client for fetch the (Fast & East) training dataset."""

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self._client = LandingLens(project_id=project_id, api_key=api_key)
        self._metadata_client = Metadata(project_id=project_id, api_key=api_key)

    def get_training_dataset(
        self, output_dir: Path, include_image_metadata: bool = False
    ) -> pd.DataFrame:
        """Get the most recently used training dataset.

        Example output of the returned dataframe:
        ```
                id    split  classes  seg_mask_prediction_path  media_level_predicted_score    label_id     seg_mask_label_path media_level_label             metadata
        0   11229595   None       []  images/11229595_pred.npy                          NaN  11301603.0  images/11229595_gt.npy                OK                   {}
        1   11229597   None       []  images/11229597_pred.npy                          NaN         NaN                    None              None                   {}
        2    9918918  train  [screw]   images/9918918_pred.npy                     0.954456   8792257.0   images/9918918_gt.npy                NG                   {}
        3    9918924    dev  [screw]   images/9918924_pred.npy                     0.843393   8792265.0   images/9918924_gt.npy                NG   {'creator': 'bob'}
        4    9918921  train  [screw]   images/9918921_pred.npy                     0.956114   8792260.0   images/9918921_gt.npy                NG                   {}
        5    9918923  train  [screw]   images/9918923_pred.npy                     0.943873   8792262.0   images/9918923_gt.npy                NG   {'creator': 'foo'}
        ```

        NOTE:
        1.  Ground truth and prediction masks will be saved to the output_dir as a serialized numpy binary file.
            The file name is the media_id with a suffix of "_gt.npy" or "_pred.npy".
            You can load the numpy array by calling `np.load(file_path)`.
            The shape of the numpy array is (height, width, num_classes).
            The 0th channel is the first class, the 1th channel is the second class and so on. (The background class is not included.)

        2.  For prediction masks, the value of each pixel is the confidence score of the class, i.e. it's not thresholded.
            For ground truth masks, the value of each pixel is either 0 or 1.

        3.  The serialized mask will an empty numpy array when there is no prediction or ground truth mask.
            E.g. the ground truth label is OK, i.e. no defect.
            So be sure to check the shape of the ground truth mask before using it.

        4.  The training dataset could also include images that are not used for training.
            Those images will have a None value for below fields: label_id,seg_mask_label_path,media_level_label
            Tip: for evaluating the model performance, you can filter out those images by checking the label_id field.

        5. The split field could be None, train, dev, or test. None means "unassigned" split.

        6. The metadata field is a dictionary that contains the metadata associated with each image. It's empty by default. Only available when `include_image_metadata` is True.
        """
        output_dir.mkdir(parents=True, exist_ok=True)
        project_id = self._client._project_id
        project_model_info = self.get_project_model_info()
        _LOGGER.info(f"Found the most recent model: {project_model_info}")
        model_id = cast(str, project_model_info["registered_model_id"])
        resp = self._client._api(
            route_name=GET_FAST_TRAINING_EXPORT,
            params={
                "projectId": project_id,
                "datasetVersionId": project_model_info["dataset_version_id"],
                "modelId": model_id,
                "skipCreatingDatasetVersion": "true",
            },
        )
        if resp["data"]["project"]["labelType"] != "segmentation":
            raise ValueError(
                f"Project {project_id} is not a segmentation project. Currently only segmentation projects are supported. For other project types, consider using the dataset snapshot export feature from the LandingLens platform UI."
            )

        dataset_id = resp["data"]["dataset"]["id"]
        medias = [
            {
                "id": int(media["media_id"]),
                "split": media.get("split", None),
                "classes": media["defect_list"],
            }
            for media in resp["data"]["data"]
        ]
        _LOGGER.info(
            f"Found {len(medias)} medias in the training dataset. Querying media details..."
        )
        images_dir = output_dir / "images"
        images_dir.mkdir(parents=True, exist_ok=True)
        medias_map = {media["id"]: media for media in medias}
        with tqdm(total=len(medias)) as pbar:
            with concurrent.futures.ThreadPoolExecutor() as executor:
                futures = [
                    executor.submit(
                        self._get_media_details,
                        media_id=media["id"],
                        dataset_id=dataset_id,
                        model_id=model_id,
                        output_dir=images_dir,
                        include_image_metadata=include_image_metadata,
                    )
                    for media in medias
                ]
                for future in concurrent.futures.as_completed(futures):
                    details = future.result()
                    medias_map[details["id"]].update(details)
                    pbar.update(1)

        return pd.DataFrame(medias)

    def _get_media_details(
        self,
        media_id: int,
        dataset_id: int,
        model_id: str,
        output_dir: Path,
        include_image_metadata: bool,
    ) -> Dict[str, Any]:
        """Get media details and image metadata."""
        resp = self._client._api(
            MEDIA_DETAILS,
            params={
                "mediaId": media_id,
                "datasetId": dataset_id,
                "modelId": model_id,
            },
        )
        data = resp["data"]
        # Get label data
        seg_mask_label_path, label_id, media_level_label = None, None, None
        if data.get("label"):
            label_id = data["label"]["id"]
            media_level_label = data["label"]["mediaLevelLabel"]
            if data["label"].get("annotations") is not None:
                try:
                    flattened_bitmaps = [
                        np.array(
                            decode_bitmap_rle(ann["segmentationBitmapEncoded"]),
                            dtype=np.uint8,
                        ).reshape(
                            (
                                ann["rangeBox"]["ymax"] - ann["rangeBox"]["ymin"] + 1,
                                ann["rangeBox"]["xmax"] - ann["rangeBox"]["xmin"] + 1,
                            )
                        )
                        for ann in data["label"]["annotations"]
                    ]
                    mask = (
                        np.stack(arrays=flattened_bitmaps, axis=2)
                        if flattened_bitmaps
                        else flattened_bitmaps
                    )
                    seg_mask_label_path = output_dir / f"{media_id}_gt.npy"
                    np.save(seg_mask_label_path, mask)
                except Exception:
                    _LOGGER.exception(
                        f"Failed to decode the segmentation mask (prediction) for media {media_id}."
                    )

        # Get prediction data
        seg_mask_prediction_path, media_level_predicted_score = (
            None,
            None,
        )
        if data.get("predictionLabel"):
            try:
                media_level_predicted_score = data["predictionLabel"]["mediaLevelScore"]
                seg_mask_prediction_path = output_dir / f"{media_id}_pred.npy"
                pred_mask = np.asarray(
                    Image.open(
                        requests.get(
                            data["predictionLabel"]["segImgPath"], stream=True
                        ).raw
                    )
                )
                unique_classes = np.unique(pred_mask[:, :, 0])
                masks = []
                for unique_class in unique_classes:
                    assert (
                        unique_class != 0
                    ), "Unexpected data. Background class should not be included in the prediction mask."
                    mask_score = (
                        pred_mask[:, :, 2]
                        * (pred_mask[:, :, 0] == unique_class).astype(np.float16)
                    ) / 255
                    masks.append(mask_score)
                stacked_mask = np.stack(masks, axis=2)
                np.save(seg_mask_prediction_path, stacked_mask)
            except Exception:
                _LOGGER.exception(
                    f"Failed to decode the segmentation mask (label) for media {media_id}."
                )

        media = {
            "id": media_id,
            # prediction data
            "seg_mask_prediction_path": seg_mask_prediction_path.absolute().as_posix()
            if seg_mask_prediction_path
            else None,
            "media_level_predicted_score": media_level_predicted_score,
            # label data
            "label_id": label_id,
            "seg_mask_label_path": seg_mask_label_path.absolute().as_posix()
            if seg_mask_label_path
            else None,
            "media_level_label": media_level_label,
        }
        if include_image_metadata:
            metadata = self._metadata_client.get(media_id)
            media["metadata"] = metadata
        return media

    def get_project_model_info(self) -> Dict[str, Union[str, int]]:
        project_id = self._client._project_id
        resp = self._client._api(
            GET_PROJECT_MODEL_INFO, params={"projectId": project_id}
        )
        return {
            "dataset_version_id": resp["data"]["datasetVersionId"],
            "registered_model_id": resp["data"]["registeredModelId"],
        }

get_training_dataset(output_dir, include_image_metadata=False)

Get the most recently used training dataset.

Example output of the returned dataframe:

        id    split  classes  seg_mask_prediction_path  media_level_predicted_score    label_id     seg_mask_label_path media_level_label             metadata
0   11229595   None       []  images/11229595_pred.npy                          NaN  11301603.0  images/11229595_gt.npy                OK                   {}
1   11229597   None       []  images/11229597_pred.npy                          NaN         NaN                    None              None                   {}
2    9918918  train  [screw]   images/9918918_pred.npy                     0.954456   8792257.0   images/9918918_gt.npy                NG                   {}
3    9918924    dev  [screw]   images/9918924_pred.npy                     0.843393   8792265.0   images/9918924_gt.npy                NG   {'creator': 'bob'}
4    9918921  train  [screw]   images/9918921_pred.npy                     0.956114   8792260.0   images/9918921_gt.npy                NG                   {}
5    9918923  train  [screw]   images/9918923_pred.npy                     0.943873   8792262.0   images/9918923_gt.npy                NG   {'creator': 'foo'}

NOTE: 1. Ground truth and prediction masks will be saved to the output_dir as a serialized numpy binary file. The file name is the media_id with a suffix of "_gt.npy" or "_pred.npy". You can load the numpy array by calling np.load(file_path). The shape of the numpy array is (height, width, num_classes). The 0th channel is the first class, the 1th channel is the second class and so on. (The background class is not included.)

  1. For prediction masks, the value of each pixel is the confidence score of the class, i.e. it's not thresholded. For ground truth masks, the value of each pixel is either 0 or 1.

  2. The serialized mask will an empty numpy array when there is no prediction or ground truth mask. E.g. the ground truth label is OK, i.e. no defect. So be sure to check the shape of the ground truth mask before using it.

  3. The training dataset could also include images that are not used for training. Those images will have a None value for below fields: label_id,seg_mask_label_path,media_level_label Tip: for evaluating the model performance, you can filter out those images by checking the label_id field.

  4. The split field could be None, train, dev, or test. None means "unassigned" split.

  5. The metadata field is a dictionary that contains the metadata associated with each image. It's empty by default. Only available when include_image_metadata is True.

Source code in landingai/data_management/dataset.py
def get_training_dataset(
    self, output_dir: Path, include_image_metadata: bool = False
) -> pd.DataFrame:
    """Get the most recently used training dataset.

    Example output of the returned dataframe:
    ```
            id    split  classes  seg_mask_prediction_path  media_level_predicted_score    label_id     seg_mask_label_path media_level_label             metadata
    0   11229595   None       []  images/11229595_pred.npy                          NaN  11301603.0  images/11229595_gt.npy                OK                   {}
    1   11229597   None       []  images/11229597_pred.npy                          NaN         NaN                    None              None                   {}
    2    9918918  train  [screw]   images/9918918_pred.npy                     0.954456   8792257.0   images/9918918_gt.npy                NG                   {}
    3    9918924    dev  [screw]   images/9918924_pred.npy                     0.843393   8792265.0   images/9918924_gt.npy                NG   {'creator': 'bob'}
    4    9918921  train  [screw]   images/9918921_pred.npy                     0.956114   8792260.0   images/9918921_gt.npy                NG                   {}
    5    9918923  train  [screw]   images/9918923_pred.npy                     0.943873   8792262.0   images/9918923_gt.npy                NG   {'creator': 'foo'}
    ```

    NOTE:
    1.  Ground truth and prediction masks will be saved to the output_dir as a serialized numpy binary file.
        The file name is the media_id with a suffix of "_gt.npy" or "_pred.npy".
        You can load the numpy array by calling `np.load(file_path)`.
        The shape of the numpy array is (height, width, num_classes).
        The 0th channel is the first class, the 1th channel is the second class and so on. (The background class is not included.)

    2.  For prediction masks, the value of each pixel is the confidence score of the class, i.e. it's not thresholded.
        For ground truth masks, the value of each pixel is either 0 or 1.

    3.  The serialized mask will an empty numpy array when there is no prediction or ground truth mask.
        E.g. the ground truth label is OK, i.e. no defect.
        So be sure to check the shape of the ground truth mask before using it.

    4.  The training dataset could also include images that are not used for training.
        Those images will have a None value for below fields: label_id,seg_mask_label_path,media_level_label
        Tip: for evaluating the model performance, you can filter out those images by checking the label_id field.

    5. The split field could be None, train, dev, or test. None means "unassigned" split.

    6. The metadata field is a dictionary that contains the metadata associated with each image. It's empty by default. Only available when `include_image_metadata` is True.
    """
    output_dir.mkdir(parents=True, exist_ok=True)
    project_id = self._client._project_id
    project_model_info = self.get_project_model_info()
    _LOGGER.info(f"Found the most recent model: {project_model_info}")
    model_id = cast(str, project_model_info["registered_model_id"])
    resp = self._client._api(
        route_name=GET_FAST_TRAINING_EXPORT,
        params={
            "projectId": project_id,
            "datasetVersionId": project_model_info["dataset_version_id"],
            "modelId": model_id,
            "skipCreatingDatasetVersion": "true",
        },
    )
    if resp["data"]["project"]["labelType"] != "segmentation":
        raise ValueError(
            f"Project {project_id} is not a segmentation project. Currently only segmentation projects are supported. For other project types, consider using the dataset snapshot export feature from the LandingLens platform UI."
        )

    dataset_id = resp["data"]["dataset"]["id"]
    medias = [
        {
            "id": int(media["media_id"]),
            "split": media.get("split", None),
            "classes": media["defect_list"],
        }
        for media in resp["data"]["data"]
    ]
    _LOGGER.info(
        f"Found {len(medias)} medias in the training dataset. Querying media details..."
    )
    images_dir = output_dir / "images"
    images_dir.mkdir(parents=True, exist_ok=True)
    medias_map = {media["id"]: media for media in medias}
    with tqdm(total=len(medias)) as pbar:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(
                    self._get_media_details,
                    media_id=media["id"],
                    dataset_id=dataset_id,
                    model_id=model_id,
                    output_dir=images_dir,
                    include_image_metadata=include_image_metadata,
                )
                for media in medias
            ]
            for future in concurrent.futures.as_completed(futures):
                details = future.result()
                medias_map[details["id"]].update(details)
                pbar.update(1)

    return pd.DataFrame(medias)

Label

Label management API client. This class provides a set of APIs to manage the label of a particular project on LandingLens. For example, you can use this class to list all the available labels for a given project.

Example

client = Label(project_id, api_key) client.get_label_map() {'0': 'ok', '1': 'cat', '2': 'dog'}

Parameters

project_id: int LandingLens project id. Can override this default in individual commands. api_key: Optional[str] LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.

Source code in landingai/data_management/label.py
class Label:
    """Label management API client.
    This class provides a set of APIs to manage the label of a particular project on LandingLens.
    For example, you can use this class to list all the available labels for a given project.

    Example
    -------
    >>> client = Label(project_id, api_key)
    >>> client.get_label_map()
    >>> {'0': 'ok', '1': 'cat', '2': 'dog'}

    Parameters
    ----------
    project_id: int
        LandingLens project id.  Can override this default in individual commands.
    api_key: Optional[str]
        LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.
    """

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self._client = LandingLens(project_id=project_id, api_key=api_key)

    def get_label_map(self) -> Dict[str, str]:
        """Get all the available labels for a given project.

        Returns
        ----------
        Dict[str, str]
            A dictionary of label index to label name.
            ```
            # Example output
            {
                "0": "ok",
                "1": "cat",
                "2": "dog",
                "3": "duck",
            }
            ```
        """
        project_id = self._client._project_id
        resp = self._client._api(GET_DEFECTS, params={"projectId": project_id})
        resp_data = resp["data"]
        label_map = {str(label["indexId"]): label["name"] for label in resp_data}
        label_map["0"] = "ok"
        return label_map

get_label_map()

Get all the available labels for a given project.

Returns

Dict[str, str] A dictionary of label index to label name.

# Example output
{
    "0": "ok",
    "1": "cat",
    "2": "dog",
    "3": "duck",
}

Source code in landingai/data_management/label.py
def get_label_map(self) -> Dict[str, str]:
    """Get all the available labels for a given project.

    Returns
    ----------
    Dict[str, str]
        A dictionary of label index to label name.
        ```
        # Example output
        {
            "0": "ok",
            "1": "cat",
            "2": "dog",
            "3": "duck",
        }
        ```
    """
    project_id = self._client._project_id
    resp = self._client._api(GET_DEFECTS, params={"projectId": project_id})
    resp_data = resp["data"]
    label_map = {str(label["indexId"]): label["name"] for label in resp_data}
    label_map["0"] = "ok"
    return label_map

Media

Media management API client. This class provides a set of APIs to manage the medias (images) uploaded to LandingLens. For example, you can use this class to upload medias (images) to LandingLens or list the medias are already uploaded to the LandingLens.

Example

client = Media(project_id, api_key) client.upload("path/to/image.jpg") client.upload("path/to/image_folder") print(client.ls())

Parameters

project_id: int LandingLens project id. Can override this default in individual commands. api_key: Optional[str] LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.

Source code in landingai/data_management/media.py
class Media:
    """Media management API client.
    This class provides a set of APIs to manage the medias (images) uploaded to LandingLens.
    For example, you can use this class to upload medias (images) to LandingLens or list
    the medias are already uploaded to the LandingLens.

    Example
    -------
    >>> client = Media(project_id, api_key)
    >>> client.upload("path/to/image.jpg")
    >>> client.upload("path/to/image_folder")
    >>> print(client.ls())

    Parameters
    ----------
    project_id: int
        LandingLens project id.  Can override this default in individual commands.
    api_key: Optional[str]
        LandingLens API Key. If it's not provided, it will be read from the environment
        variable LANDINGAI_API_KEY, or from .env file on your project root directory.
    """

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self._client = LandingLens(project_id=project_id, api_key=api_key)
        self._media_max_page_size = 1000
        self._metadata_max_page_size = 500

    def upload(
        self,
        source: Union[str, Path, Image],
        split: str = "",
        classification_name: Optional[str] = None,
        object_detection_xml: Optional[str] = None,
        seg_mask: Optional[str] = None,
        seg_defect_map: Optional[str] = None,
        nothing_to_label: bool = False,
        metadata_dict: Optional[Dict[str, Any]] = None,
        validate_extensions: bool = True,
        tolerate_duplicate_upload: bool = True,
        tags: Optional[List[str]] = None,
    ) -> Dict[str, Any]:
        """
        Upload media to platform.

        Parameters
        ----------
        source: Union[str, Path, Image]
            The image source to upload. It can be a path to the local image file, an
            image folder or a PIL Image object. For image files, the supported formats
            are jpg, jpeg, png, bmp and tiff.
        split: str
            Set this media to one split ('train'/'dev'/'test'), '' represents Unassigned
            and is the default
        classification_name: str
            Set the media's classification if the project type is Classification or
            Anomaly Detection
        object_detection_xml: str
            Path to the Pascal VOC xml file for object detection project
        seg_mask: str
            Path to the segmentation mask file for segmentation project
        seg_defect_map: str
            Path to the segmentation defect_map.json file for segmentation project.
            To get this map, you can use the `landingai.data_management.label.Label` API.
            See below code as an example.
            ```python
            >>> client = Label(project_id, api_key)
            >>> client.get_label_map()
            >>> {'0': 'ok', '1': 'cat', '2': 'dog'}
            ```
        nothing_to_label: bool
            Set the media's label as OK, valid for object detection and segmetation
            project
        metadata_dict: dict
            A dictionary of metadata to be updated or inserted. The key of the metadata
            needs to be created/registered (for the first time) on LandingLens before
            media uploading.
        validate_extensions: bool
            Defaults to True. Files other than jpg/jpeg/png/bmp will be skipped.
            If set to False, will try to upload all files. Behavior of platform
            for unexpected extensions may not be correct - for example, most likely file
            will be uploaded to s3, but won't show in data browser.
        tolerate_duplicate_upload: bool
            Whether to tolerate duplicate upload. A duplicate upload is identified by
            status code 409. The server returns a 409 status code if the same media file
            content exists in the project. Defaults to True. If set to False, will raise
            a `landingai.exceptions.HttpError` if it's a duplicate upload.

        Returns
        -------
        Dict[str, Any]
            The result from the upload().
            ```
            # Example output
            {
                "num_uploaded": 10,
                "skipped_count": 0,
                "error_count": 0,
                "medias": [...],
                "files_with_errors": {},
            }
            ```
        """
        if isinstance(source, Path):
            source = str(source)
        if isinstance(source, str) and not os.path.exists(source):
            raise ValueError(
                f"file/folder does not exist at the specified path {source}"
            )

        project_id = self._client._project_id
        project = self._client.get_project_property(project_id)
        dataset_id = project.get("datasetId")
        label_type = project.get("labelType")

        # construct initial_label
        initial_label: Dict[str, Any] = {}
        if nothing_to_label:
            initial_label["unlabeledAsNothingToLabel"] = True
        elif (
            label_type == "classification" or label_type == "anomaly_detection"
        ) and classification_name is not None:
            initial_label["classification"] = classification_name
        elif label_type == "bounding_box" and object_detection_xml is not None:
            xml_content = open(object_detection_xml, "rb").read()
            initial_label["objectDetection"] = base64.b64encode(xml_content).decode(
                "utf-8"
            )
        elif (
            label_type == "segmentation"
            and seg_mask is not None
            and seg_defect_map is not None
        ):
            seg_defect_map_content = open(seg_defect_map, "r").read()
            seg_mask_content = open(seg_mask, "rb").read()
            initial_label["segMask"] = base64.b64encode(seg_mask_content).decode(
                "utf-8"
            )
            initial_label["segDefectMap"] = seg_defect_map_content

        # construct metadata
        metadata: Dict[str, Any] = {} if metadata_dict is None else metadata_dict
        if metadata != {}:
            metadata_mapping, _ = self._client.get_metadata_mappings(project_id)
            metadata = metadata_to_ids(metadata, metadata_mapping)

        medias: List[Dict[str, Any]] = []
        skipped_count = 0
        error_count = 0
        medias_with_errors: Dict[str, Any] = {}

        assert isinstance(source, (str, Image))
        if isinstance(source, str) and os.path.isdir(source):
            (
                medias,
                skipped_count,
                error_count,
                medias_with_errors,
            ) = _upload_folder(
                self._client,
                dataset_id,
                source,
                project_id,
                validate_extensions,
                tolerate_duplicate_upload,
            )
        else:
            # Resolve filename and extension for _upload_media()
            if isinstance(source, Image):
                ext = "png"
                ts = int(datetime.now().timestamp() * 1000)
                filename = f"image_{ts}.{ext}"
            else:
                assert isinstance(source, str)
                filename = os.path.basename(source)
                ext = os.path.splitext(filename)[-1][1:]
            # Validate extension
            if validate_extensions and ext.upper() not in _ALLOWED_EXTENSIONS:
                raise ValueError(
                    f"""Unexpected extension {ext}. Allowed extensions are: {_ALLOWED_EXTENSIONS}.
                    If you want to attempt the upload anyway, set validate_extensions=False.
                    This may result in an unexpected behavior - e.g. file not showing up in data browser."""
                )
            try:
                resp = _upload_media(
                    self._client,
                    dataset_id,
                    filename,
                    source,
                    project_id,
                    ext,
                    split,
                    initial_label,
                    metadata,
                    tags,
                )
                medias.append(resp)
            except DuplicateUploadError:
                if not tolerate_duplicate_upload:
                    raise
                skipped_count = 1
            except Exception as e:
                error_count = 1
                medias_with_errors[filename] = str(e)

        return {
            "num_uploaded": len(medias),
            "skipped_count": skipped_count,
            "error_count": error_count,
            "medias": medias,
            "files_with_errors": medias_with_errors,
        }

    def ls(
        self,
        offset: int = 0,
        limit: int = 1000,
        media_status: Union[str, List[str], None] = None,
        **metadata: Optional[Dict[str, Any]],
    ) -> Dict[str, Any]:
        """
        List medias with metadata for given project id. Can be filtered using metadata.
        NOTE: pagination is applied with the `offset` and `limit` parameters.

        Parameters
        ----------
        offset: int
            Defaults to 0. As in standard pagination.
        limit: int
            Max 1000. Defaults to 1000. As in standard pagination.
        media_status: Union[str, List]
            Gets only medias with specified statuses. Defaults to None - then medias
            with all statuses are fetched.
            Possible values: raw, pending_labeling, pending_review, rejected, approved
        **metadata:
            Kwargs used as metadata that will be used for server side filtering of the results.
        """
        if limit - offset > self._media_max_page_size:
            raise ValueError(f"Exceeded max page size of {self._media_max_page_size}")

        if media_status is not None:
            _validate_media_status(media_status)

        project_id = self._client._project_id
        assert project_id is not None

        dataset_id = self._client.get_project_property(project_id, "dataset_id")

        metadata_filter_map: Dict[str, Any] = {}
        if metadata and len(metadata) > 0:
            metadata_mapping, _ = self._client.get_metadata_mappings(project_id)
            metadata_filter_map = _metadata_to_filter(metadata, metadata_mapping)

        column_filter_map: Dict[str, Any] = {}
        if media_status is not None:
            if isinstance(media_status, str):
                media_status = [media_status]
            column_filter_map = {
                "datasetContent": {"mediaStatus": {"CONTAINS_ANY": media_status}}
            }

        resp = self._client._api(
            MEDIA_LIST,
            params=_build_list_media_request(
                limit,
                column_filter_map,
                dataset_id,
                metadata_filter_map,
                offset,
                project_id,
            ),
        )
        medias = resp["data"]

        if len(medias) == self._media_max_page_size:
            _LOGGER.warning(f"fetched medias only up to {self._media_max_page_size}")

        return {
            "medias": medias,
            "num_requested": limit - offset,
            "count": len(medias),
            "offset": offset,
            "limit": limit,
        }

    def update_split_key(
        self,
        media_ids: List[int],
        split_key: str,
    ) -> None:
        """
        Update the split key for a list of medias on the LandingLens platform.

        Parameters
        ----------
        media_ids: List[int]
            A list of media ids to update split key.
        split: str
            The split key to set for these medias, it could be 'train', 'dev', 'test' or '' (where '' represents Unassigned) and is the default.

        Example
        -------
        >>> client = Media(project_id, api_key)
        >>> client.update_split_key(media_ids=[1001, 1002], split_key="test")  # assign split key 'test' for media ids 1001 and 1002
        >>> client.update_split_key(media_ids=[1001, 1002], split_key="")    # remove split key for media ids 1001 and 1002

        """
        split_key = split_key.strip().lower()
        if split_key not in _SUPPORTED_KEYS:
            raise ValueError(
                f"Invalid split key: {split_key}. Supported split keys are: {_SUPPORTED_KEYS}"
            )
        project_id = self._client._project_id
        split_id = 0  # 0 is Unassigned split
        if split_key != "":
            resp = self._client._api(
                GET_PROJECT_SPLIT, params={"projectId": project_id}
            )
            split_name_to_id = {
                split["splitSetName"].lower(): split["id"] for split in resp["data"]
            }
            assert (
                split_key in split_name_to_id
            ), f"Split key {split_key} not found in project {project_id}. Available split keys in this project are: {split_name_to_id.keys()}"
            split_id = split_name_to_id[split_key]
        dataset_id = self._client.get_project_property(project_id)["datasetId"]
        self._client._api(
            MEDIA_UPDATE_SPLIT,
            params={
                "projectId": project_id,
                "datasetId": dataset_id,
                "splitSet": split_id,
                "selectMediaOptions": json.dumps({"selectedMedia": media_ids}),
            },
        )
        _LOGGER.info(
            f"Successfully updated split key to '{split_key}' for {len(media_ids)} medias with media ids: {media_ids}"
        )

ls(offset=0, limit=1000, media_status=None, **metadata)

List medias with metadata for given project id. Can be filtered using metadata. NOTE: pagination is applied with the offset and limit parameters.

Parameters

offset: int Defaults to 0. As in standard pagination. limit: int Max 1000. Defaults to 1000. As in standard pagination. media_status: Union[str, List] Gets only medias with specified statuses. Defaults to None - then medias with all statuses are fetched. Possible values: raw, pending_labeling, pending_review, rejected, approved **metadata: Kwargs used as metadata that will be used for server side filtering of the results.

Source code in landingai/data_management/media.py
def ls(
    self,
    offset: int = 0,
    limit: int = 1000,
    media_status: Union[str, List[str], None] = None,
    **metadata: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
    """
    List medias with metadata for given project id. Can be filtered using metadata.
    NOTE: pagination is applied with the `offset` and `limit` parameters.

    Parameters
    ----------
    offset: int
        Defaults to 0. As in standard pagination.
    limit: int
        Max 1000. Defaults to 1000. As in standard pagination.
    media_status: Union[str, List]
        Gets only medias with specified statuses. Defaults to None - then medias
        with all statuses are fetched.
        Possible values: raw, pending_labeling, pending_review, rejected, approved
    **metadata:
        Kwargs used as metadata that will be used for server side filtering of the results.
    """
    if limit - offset > self._media_max_page_size:
        raise ValueError(f"Exceeded max page size of {self._media_max_page_size}")

    if media_status is not None:
        _validate_media_status(media_status)

    project_id = self._client._project_id
    assert project_id is not None

    dataset_id = self._client.get_project_property(project_id, "dataset_id")

    metadata_filter_map: Dict[str, Any] = {}
    if metadata and len(metadata) > 0:
        metadata_mapping, _ = self._client.get_metadata_mappings(project_id)
        metadata_filter_map = _metadata_to_filter(metadata, metadata_mapping)

    column_filter_map: Dict[str, Any] = {}
    if media_status is not None:
        if isinstance(media_status, str):
            media_status = [media_status]
        column_filter_map = {
            "datasetContent": {"mediaStatus": {"CONTAINS_ANY": media_status}}
        }

    resp = self._client._api(
        MEDIA_LIST,
        params=_build_list_media_request(
            limit,
            column_filter_map,
            dataset_id,
            metadata_filter_map,
            offset,
            project_id,
        ),
    )
    medias = resp["data"]

    if len(medias) == self._media_max_page_size:
        _LOGGER.warning(f"fetched medias only up to {self._media_max_page_size}")

    return {
        "medias": medias,
        "num_requested": limit - offset,
        "count": len(medias),
        "offset": offset,
        "limit": limit,
    }

update_split_key(media_ids, split_key)

Update the split key for a list of medias on the LandingLens platform.

Parameters

media_ids: List[int] A list of media ids to update split key. split: str The split key to set for these medias, it could be 'train', 'dev', 'test' or '' (where '' represents Unassigned) and is the default.

Example

client = Media(project_id, api_key) client.update_split_key(media_ids=[1001, 1002], split_key="test") # assign split key 'test' for media ids 1001 and 1002 client.update_split_key(media_ids=[1001, 1002], split_key="") # remove split key for media ids 1001 and 1002

Source code in landingai/data_management/media.py
def update_split_key(
    self,
    media_ids: List[int],
    split_key: str,
) -> None:
    """
    Update the split key for a list of medias on the LandingLens platform.

    Parameters
    ----------
    media_ids: List[int]
        A list of media ids to update split key.
    split: str
        The split key to set for these medias, it could be 'train', 'dev', 'test' or '' (where '' represents Unassigned) and is the default.

    Example
    -------
    >>> client = Media(project_id, api_key)
    >>> client.update_split_key(media_ids=[1001, 1002], split_key="test")  # assign split key 'test' for media ids 1001 and 1002
    >>> client.update_split_key(media_ids=[1001, 1002], split_key="")    # remove split key for media ids 1001 and 1002

    """
    split_key = split_key.strip().lower()
    if split_key not in _SUPPORTED_KEYS:
        raise ValueError(
            f"Invalid split key: {split_key}. Supported split keys are: {_SUPPORTED_KEYS}"
        )
    project_id = self._client._project_id
    split_id = 0  # 0 is Unassigned split
    if split_key != "":
        resp = self._client._api(
            GET_PROJECT_SPLIT, params={"projectId": project_id}
        )
        split_name_to_id = {
            split["splitSetName"].lower(): split["id"] for split in resp["data"]
        }
        assert (
            split_key in split_name_to_id
        ), f"Split key {split_key} not found in project {project_id}. Available split keys in this project are: {split_name_to_id.keys()}"
        split_id = split_name_to_id[split_key]
    dataset_id = self._client.get_project_property(project_id)["datasetId"]
    self._client._api(
        MEDIA_UPDATE_SPLIT,
        params={
            "projectId": project_id,
            "datasetId": dataset_id,
            "splitSet": split_id,
            "selectMediaOptions": json.dumps({"selectedMedia": media_ids}),
        },
    )
    _LOGGER.info(
        f"Successfully updated split key to '{split_key}' for {len(media_ids)} medias with media ids: {media_ids}"
    )

upload(source, split='', classification_name=None, object_detection_xml=None, seg_mask=None, seg_defect_map=None, nothing_to_label=False, metadata_dict=None, validate_extensions=True, tolerate_duplicate_upload=True, tags=None)

Upload media to platform.

Parameters

source: Union[str, Path, Image] The image source to upload. It can be a path to the local image file, an image folder or a PIL Image object. For image files, the supported formats are jpg, jpeg, png, bmp and tiff. split: str Set this media to one split ('train'/'dev'/'test'), '' represents Unassigned and is the default classification_name: str Set the media's classification if the project type is Classification or Anomaly Detection object_detection_xml: str Path to the Pascal VOC xml file for object detection project seg_mask: str Path to the segmentation mask file for segmentation project seg_defect_map: str Path to the segmentation defect_map.json file for segmentation project. To get this map, you can use the landingai.data_management.label.Label API. See below code as an example.

>>> client = Label(project_id, api_key)
>>> client.get_label_map()
>>> {'0': 'ok', '1': 'cat', '2': 'dog'}
nothing_to_label: bool Set the media's label as OK, valid for object detection and segmetation project metadata_dict: dict A dictionary of metadata to be updated or inserted. The key of the metadata needs to be created/registered (for the first time) on LandingLens before media uploading. validate_extensions: bool Defaults to True. Files other than jpg/jpeg/png/bmp will be skipped. If set to False, will try to upload all files. Behavior of platform for unexpected extensions may not be correct - for example, most likely file will be uploaded to s3, but won't show in data browser. tolerate_duplicate_upload: bool Whether to tolerate duplicate upload. A duplicate upload is identified by status code 409. The server returns a 409 status code if the same media file content exists in the project. Defaults to True. If set to False, will raise a landingai.exceptions.HttpError if it's a duplicate upload.

Returns

Dict[str, Any] The result from the upload().

# Example output
{
    "num_uploaded": 10,
    "skipped_count": 0,
    "error_count": 0,
    "medias": [...],
    "files_with_errors": {},
}

Source code in landingai/data_management/media.py
def upload(
    self,
    source: Union[str, Path, Image],
    split: str = "",
    classification_name: Optional[str] = None,
    object_detection_xml: Optional[str] = None,
    seg_mask: Optional[str] = None,
    seg_defect_map: Optional[str] = None,
    nothing_to_label: bool = False,
    metadata_dict: Optional[Dict[str, Any]] = None,
    validate_extensions: bool = True,
    tolerate_duplicate_upload: bool = True,
    tags: Optional[List[str]] = None,
) -> Dict[str, Any]:
    """
    Upload media to platform.

    Parameters
    ----------
    source: Union[str, Path, Image]
        The image source to upload. It can be a path to the local image file, an
        image folder or a PIL Image object. For image files, the supported formats
        are jpg, jpeg, png, bmp and tiff.
    split: str
        Set this media to one split ('train'/'dev'/'test'), '' represents Unassigned
        and is the default
    classification_name: str
        Set the media's classification if the project type is Classification or
        Anomaly Detection
    object_detection_xml: str
        Path to the Pascal VOC xml file for object detection project
    seg_mask: str
        Path to the segmentation mask file for segmentation project
    seg_defect_map: str
        Path to the segmentation defect_map.json file for segmentation project.
        To get this map, you can use the `landingai.data_management.label.Label` API.
        See below code as an example.
        ```python
        >>> client = Label(project_id, api_key)
        >>> client.get_label_map()
        >>> {'0': 'ok', '1': 'cat', '2': 'dog'}
        ```
    nothing_to_label: bool
        Set the media's label as OK, valid for object detection and segmetation
        project
    metadata_dict: dict
        A dictionary of metadata to be updated or inserted. The key of the metadata
        needs to be created/registered (for the first time) on LandingLens before
        media uploading.
    validate_extensions: bool
        Defaults to True. Files other than jpg/jpeg/png/bmp will be skipped.
        If set to False, will try to upload all files. Behavior of platform
        for unexpected extensions may not be correct - for example, most likely file
        will be uploaded to s3, but won't show in data browser.
    tolerate_duplicate_upload: bool
        Whether to tolerate duplicate upload. A duplicate upload is identified by
        status code 409. The server returns a 409 status code if the same media file
        content exists in the project. Defaults to True. If set to False, will raise
        a `landingai.exceptions.HttpError` if it's a duplicate upload.

    Returns
    -------
    Dict[str, Any]
        The result from the upload().
        ```
        # Example output
        {
            "num_uploaded": 10,
            "skipped_count": 0,
            "error_count": 0,
            "medias": [...],
            "files_with_errors": {},
        }
        ```
    """
    if isinstance(source, Path):
        source = str(source)
    if isinstance(source, str) and not os.path.exists(source):
        raise ValueError(
            f"file/folder does not exist at the specified path {source}"
        )

    project_id = self._client._project_id
    project = self._client.get_project_property(project_id)
    dataset_id = project.get("datasetId")
    label_type = project.get("labelType")

    # construct initial_label
    initial_label: Dict[str, Any] = {}
    if nothing_to_label:
        initial_label["unlabeledAsNothingToLabel"] = True
    elif (
        label_type == "classification" or label_type == "anomaly_detection"
    ) and classification_name is not None:
        initial_label["classification"] = classification_name
    elif label_type == "bounding_box" and object_detection_xml is not None:
        xml_content = open(object_detection_xml, "rb").read()
        initial_label["objectDetection"] = base64.b64encode(xml_content).decode(
            "utf-8"
        )
    elif (
        label_type == "segmentation"
        and seg_mask is not None
        and seg_defect_map is not None
    ):
        seg_defect_map_content = open(seg_defect_map, "r").read()
        seg_mask_content = open(seg_mask, "rb").read()
        initial_label["segMask"] = base64.b64encode(seg_mask_content).decode(
            "utf-8"
        )
        initial_label["segDefectMap"] = seg_defect_map_content

    # construct metadata
    metadata: Dict[str, Any] = {} if metadata_dict is None else metadata_dict
    if metadata != {}:
        metadata_mapping, _ = self._client.get_metadata_mappings(project_id)
        metadata = metadata_to_ids(metadata, metadata_mapping)

    medias: List[Dict[str, Any]] = []
    skipped_count = 0
    error_count = 0
    medias_with_errors: Dict[str, Any] = {}

    assert isinstance(source, (str, Image))
    if isinstance(source, str) and os.path.isdir(source):
        (
            medias,
            skipped_count,
            error_count,
            medias_with_errors,
        ) = _upload_folder(
            self._client,
            dataset_id,
            source,
            project_id,
            validate_extensions,
            tolerate_duplicate_upload,
        )
    else:
        # Resolve filename and extension for _upload_media()
        if isinstance(source, Image):
            ext = "png"
            ts = int(datetime.now().timestamp() * 1000)
            filename = f"image_{ts}.{ext}"
        else:
            assert isinstance(source, str)
            filename = os.path.basename(source)
            ext = os.path.splitext(filename)[-1][1:]
        # Validate extension
        if validate_extensions and ext.upper() not in _ALLOWED_EXTENSIONS:
            raise ValueError(
                f"""Unexpected extension {ext}. Allowed extensions are: {_ALLOWED_EXTENSIONS}.
                If you want to attempt the upload anyway, set validate_extensions=False.
                This may result in an unexpected behavior - e.g. file not showing up in data browser."""
            )
        try:
            resp = _upload_media(
                self._client,
                dataset_id,
                filename,
                source,
                project_id,
                ext,
                split,
                initial_label,
                metadata,
                tags,
            )
            medias.append(resp)
        except DuplicateUploadError:
            if not tolerate_duplicate_upload:
                raise
            skipped_count = 1
        except Exception as e:
            error_count = 1
            medias_with_errors[filename] = str(e)

    return {
        "num_uploaded": len(medias),
        "skipped_count": skipped_count,
        "error_count": error_count,
        "medias": medias,
        "files_with_errors": medias_with_errors,
    }

Metadata

Metadata management API client. This class provides a set of APIs to manage the metadata of the medias (images) uploaded to LandingLens. For example, you can use this class to update the metadata of the uploaded medias.

Example

client = Metadata(project_id, api_key) client.update([101, 102, 103], creator="tom")

Parameters

project_id: int LandingLens project id. Can override this default in individual commands. api_key: Optional[str] LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.

Source code in landingai/data_management/metadata.py
class Metadata:
    """Metadata management API client.
    This class provides a set of APIs to manage the metadata of the medias (images) uploaded to LandingLens.
    For example, you can use this class to update the metadata of the uploaded medias.

    Example
    -------
    >>> client = Metadata(project_id, api_key)
    >>> client.update([101, 102, 103], creator="tom")

    Parameters
    ----------
    project_id: int
        LandingLens project id.  Can override this default in individual commands.
    api_key: Optional[str]
        LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.
    """

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self._client = LandingLens(project_id=project_id, api_key=api_key)

    def update(
        self,
        media_ids: Union[int, List[int]],
        **input_metadata: Optional[Dict[str, Any]],
    ) -> Dict[str, Any]:
        """Update or insert a dictionary of metadata for a set of medias.

        Parameters
        ----------
        media_ids
            Media ids to update.
        input_metadata
            A dictionary of metadata to be updated or inserted. The key of the metadata
            needs to be created/registered (for the first time) on LandingLens before
            calling update().

        Returns
        ----------
        Dict[str, Any]
            The result from the update().
            ```
            # Example output
            {
                "project_id": 12345,
                "metadata": [...],
                "media_ids": [123, 124]],
            }
            ```
        """
        project_id = self._client._project_id
        if (
            not media_ids
            or isinstance(media_ids, bool)
            or (not isinstance(media_ids, int) and len(media_ids) == 0)
        ):
            raise ValueError("Missing required flags: {'media_ids'}")

        if not input_metadata:
            raise ValueError("Missing required flags: {'metadata'}")

        dataset_id = self._client.get_project_property(project_id, "dataset_id")

        if isinstance(media_ids, int):
            media_ids = [media_ids]
        else:
            # to avoid errors due to things like numpy.int
            media_ids = list(map(int, media_ids))

        metadata_mapping, id_to_metadata = self._client.get_metadata_mappings(
            project_id
        )

        body = _MetadataUploadRequestBody(
            selectOption=_SelectOption(media_ids),
            project=_Project(project_id, dataset_id),
            metadata=metadata_to_ids(input_metadata, metadata_mapping),
        )

        resp = self._client._api(METADATA_UPDATE, data=obj_to_dict(body))
        resp_data = resp["data"]
        return {
            "project_id": project_id,
            "metadata": ids_to_metadata(resp_data[0]["metadata"], id_to_metadata),
            "media_ids": [media["mediaId"] for media in resp_data],
        }

    def get(self, media_id: int) -> Dict[str, str]:
        """Return all the metadata associated with a given media."""
        resp = self._client._api(
            METADATA_GET, params={"objectId": media_id, "objectType": "media"}
        )
        _, id_to_metadata = self._client.get_metadata_mappings(self._client._project_id)
        return {id_to_metadata[int(k)]: v for k, v in resp["data"].items()}

get(media_id)

Return all the metadata associated with a given media.

Source code in landingai/data_management/metadata.py
def get(self, media_id: int) -> Dict[str, str]:
    """Return all the metadata associated with a given media."""
    resp = self._client._api(
        METADATA_GET, params={"objectId": media_id, "objectType": "media"}
    )
    _, id_to_metadata = self._client.get_metadata_mappings(self._client._project_id)
    return {id_to_metadata[int(k)]: v for k, v in resp["data"].items()}

update(media_ids, **input_metadata)

Update or insert a dictionary of metadata for a set of medias.

Parameters

media_ids Media ids to update. input_metadata A dictionary of metadata to be updated or inserted. The key of the metadata needs to be created/registered (for the first time) on LandingLens before calling update().

Returns

Dict[str, Any] The result from the update().

# Example output
{
    "project_id": 12345,
    "metadata": [...],
    "media_ids": [123, 124]],
}

Source code in landingai/data_management/metadata.py
def update(
    self,
    media_ids: Union[int, List[int]],
    **input_metadata: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
    """Update or insert a dictionary of metadata for a set of medias.

    Parameters
    ----------
    media_ids
        Media ids to update.
    input_metadata
        A dictionary of metadata to be updated or inserted. The key of the metadata
        needs to be created/registered (for the first time) on LandingLens before
        calling update().

    Returns
    ----------
    Dict[str, Any]
        The result from the update().
        ```
        # Example output
        {
            "project_id": 12345,
            "metadata": [...],
            "media_ids": [123, 124]],
        }
        ```
    """
    project_id = self._client._project_id
    if (
        not media_ids
        or isinstance(media_ids, bool)
        or (not isinstance(media_ids, int) and len(media_ids) == 0)
    ):
        raise ValueError("Missing required flags: {'media_ids'}")

    if not input_metadata:
        raise ValueError("Missing required flags: {'metadata'}")

    dataset_id = self._client.get_project_property(project_id, "dataset_id")

    if isinstance(media_ids, int):
        media_ids = [media_ids]
    else:
        # to avoid errors due to things like numpy.int
        media_ids = list(map(int, media_ids))

    metadata_mapping, id_to_metadata = self._client.get_metadata_mappings(
        project_id
    )

    body = _MetadataUploadRequestBody(
        selectOption=_SelectOption(media_ids),
        project=_Project(project_id, dataset_id),
        metadata=metadata_to_ids(input_metadata, metadata_mapping),
    )

    resp = self._client._api(METADATA_UPDATE, data=obj_to_dict(body))
    resp_data = resp["data"]
    return {
        "project_id": project_id,
        "metadata": ids_to_metadata(resp_data[0]["metadata"], id_to_metadata),
        "media_ids": [media["mediaId"] for media in resp_data],
    }

Encoder

Bases: JSONEncoder

JSON encoder that converts all keys to camel case

Source code in landingai/data_management/utils.py
class Encoder(json.JSONEncoder):
    """JSON encoder that converts all keys to camel case"""

    def default(self, obj: object) -> Any:
        if isinstance(obj, dict):
            return {to_camel_case(k): v for k, v in obj.items()}
        if isinstance(obj, Enum):
            return obj._name_
        return {to_camel_case(k): v for k, v in obj.__dict__.items()}

PrettyPrintable

A mix-in class that enables its subclass to be serialized into pretty printed string

Source code in landingai/data_management/utils.py
class PrettyPrintable:
    """A mix-in class that enables its subclass to be serialized into pretty printed string"""

    def to_str(self) -> str:
        """Returns the string representation of the model"""
        return pprint.pformat(self.__dict__)

    def __repr__(self) -> str:
        """For `print` and `pprint`"""
        return self.to_str()

__repr__()

For print and pprint

Source code in landingai/data_management/utils.py
def __repr__(self) -> str:
    """For `print` and `pprint`"""
    return self.to_str()

to_str()

Returns the string representation of the model

Source code in landingai/data_management/utils.py
def to_str(self) -> str:
    """Returns the string representation of the model"""
    return pprint.pformat(self.__dict__)

obj_to_dict(obj)

Convert an object to a json dictionary with camel case keys

Source code in landingai/data_management/utils.py
def obj_to_dict(obj: object) -> Dict[str, Any]:
    """Convert an object to a json dictionary with camel case keys"""
    json_body = json.dumps(obj, cls=Encoder)
    return cast(Dict[str, Any], json.loads(json_body))

obj_to_params(obj)

Convert an object to query parameters in dict format where the dict keys are in camel case.

Source code in landingai/data_management/utils.py
def obj_to_params(obj: object) -> Dict[str, Any]:
    """Convert an object to query parameters in dict format where the dict keys are in camel case."""
    return {
        to_camel_case(k): v if isinstance(v, list) else json.dumps(v, cls=Encoder)
        for k, v in obj.__dict__.items()
    }

to_camel_case(snake_str)

Convert a snake case string to camel case

Source code in landingai/data_management/utils.py
def to_camel_case(snake_str: str) -> str:
    """Convert a snake case string to camel case"""
    words = snake_str.split("_")
    return words[0] + "".join(word.title() for word in words[1:])

validate_metadata(input_metadata, metadata_mapping)

Validate the input metadata against the metadata mapping. Raise ValueError if any metadata keys are not available.

Source code in landingai/data_management/utils.py
def validate_metadata(
    input_metadata: Dict[str, Any], metadata_mapping: Dict[str, Any]
) -> None:
    """Validate the input metadata against the metadata mapping. Raise ValueError if any metadata keys are not available."""
    not_allowed = set(input_metadata.keys()) - set(metadata_mapping.keys())
    # TODO: Validate also values and maybe types. Or shouldn't it be the job of the server?
    if len(not_allowed) > 0:
        raise ValueError(
            f"""Not allowed fields: {not_allowed}.
Available fields are {metadata_mapping.keys()}.
If you want to add new fields, please add it to the associated project on the LandingLens platform."""
        )