Skip to content

landingai.data_management

LandingLens

LandingLens client

Example

Create a client by specifying API Key and project id

client = LandingLens(project, api_key)

Parameters

project_id: int LandingLens project id. Can override this default in individual commands. api_key: Optional[str] LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.

Source code in landingai/data_management/client.py
class LandingLens:
    """
    LandingLens client

    Example
    -------
    # Create a client by specifying API Key and project id
    >>> client = LandingLens(project, api_key)

    Parameters
    ----------
    project_id: int
        LandingLens project id.  Can override this default in individual commands.
    api_key: Optional[str]
        LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.
    """

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self.project_id = project_id
        if not api_key:
            api_key = load_api_credential().api_key
        self.api_key = api_key

    @property
    def _project_id(self) -> int:
        return self.project_id

    @property
    def _api_key(self) -> str:
        return self.api_key

    def _api_async(
        self,
        route_name: str,
        params: Optional[Dict[str, Any]] = None,
        form_data: Optional[Dict[str, Any]] = None,
        resp_with_content: Optional[Dict[str, Any]] = None,
        url_replacements: Optional[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
        """Returns a response from the LandingLens API"""
        is_form_data = form_data is not None
        assert resp_with_content is not None if not is_form_data else True

        endpoint, headers, params, root_url, route = self._api_common_setup(
            route_name, url_replacements, resp_with_content, params
        )
        if is_form_data:
            # Create a MultipartEncoder for the form data
            form = MultipartEncoder(fields=form_data) if form_data is not None else None
            headers["Content-Type"] = form.content_type

        try:
            response = requests.request(
                method=route["method"].__name__,
                url=endpoint,
                headers=headers,
                json=resp_with_content if not is_form_data else None,
                params=params,
                data=form if is_form_data else None,
            )

            _LOGGER.debug("Request URL: ", response.url)
            _LOGGER.debug("Response Code: ", response.status_code)
            _LOGGER.debug("Response Reason: ", response.reason)

            resp_with_content = response.json()
            _LOGGER.debug(
                "Response Content (500 chars): ",
                json.dumps(resp_with_content)[:500],
            )
        except requests.exceptions.RequestException as e:
            raise HttpError(
                "HTTP request to LandingLens server failed with error message: \n"
                f"{str(e)}"
            )
        except Exception as e:
            raise HttpError(f"An error occurred during the HTTP request: {str(e)}")
        assert resp_with_content is not None
        return resp_with_content

    def _api(
        self,
        route_name: str,
        params: Optional[Dict[str, Any]] = None,
        data: Optional[Dict[str, Any]] = None,
        url_replacements: Optional[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
        """Returns a response from the LandingLens API"""
        endpoint, headers, params, root_url, route = self._api_common_setup(
            route_name, url_replacements, data, params
        )
        resp = route["method"](
            endpoint,
            params=params,
            json=data,
            headers=headers,
            verify=True,
        )
        _LOGGER.info(f"Request URL: {resp.request.url}")
        _LOGGER.debug("Response Code: ", resp.status_code)
        _LOGGER.debug("Response Reason: ", resp.reason)
        _LOGGER.debug("Response Content (500 chars): ", resp.content[:500])
        if not resp.ok:
            try:
                error_message = json.load(io.StringIO(resp.content.decode("utf-8")))[
                    "message"
                ]
            except Exception as e:
                _LOGGER.warning(f"Failed to parse error message into json: {e}")
                error_message = resp.text
            raise HttpError(
                "HTTP request to LandingLens server failed with "
                f"code {resp.status_code}-{resp.reason} and error message: \n"
                f"{error_message}"
            )
        return cast(Dict[str, Any], resp.json())

    def _api_common_setup(
        self,
        route_name: str,
        url_replacements: Optional[Dict[str, Any]],
        data: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
    ) -> Tuple[str, Dict[str, Any], Dict[str, Any], str, Dict[str, Any]]:
        route = ROUTES[route_name]
        headers = {
            "apikey": self.api_key,
            "Content-Type": "application/json",
            "User-Agent": "landingai-python-" + version("landingai"),
        }
        root_url_type = cast(str, route["root_url"])

        if root_url_type not in _URL_ROOTS:
            raise ValueError(f"Unknown URL specified: {root_url_type}")

        root_url = _URL_ROOTS[root_url_type]

        if not params:
            params = {}
        if route["method"] == requests.get and not params.get("projectId"):
            params["projectId"] = self.project_id
        if route["method"] == requests.post and data and not data.get("projectId"):
            data["projectId"] = self.project_id
        endpoint = posixpath.join(root_url, cast(str, route["endpoint"]))

        if url_replacements:
            endpoint = endpoint.format(
                **{**{"version": _API_VERSION}, **url_replacements}
            )
        else:
            endpoint = endpoint.format(**{"version": _API_VERSION})

        return endpoint, headers, params, root_url, route

    def get_project_property(
        self, project_id: int, property: Optional[str] = None
    ) -> Any:
        resp = self._api(GET_PROJECT, params={"projectId": project_id})
        project = resp.get("data")
        if property is None:
            return project
        assert project is not None
        property_value = project.get(to_camel_case(property))
        if property_value is None:
            raise HttpError(f"{property} Id not found")
        return property_value

    @lru_cache(maxsize=_LRU_CACHE_SIZE)
    def get_metadata_mappings(
        self, project_id: int
    ) -> Tuple[Dict[str, Any], Dict[int, str]]:
        resp = self._api(METADATA_ITEMS, params={"projectId": project_id})
        metadata_mapping_resp = resp.get("data", {})

        metadata_mapping = {
            metadata_field["name"]: (
                metadata_field["id"],
                metadata_field["predefinedChoices"],
            )
            for metadata_field in metadata_mapping_resp.values()
        }
        id_to_metadata = {v[0]: k for k, v in metadata_mapping.items()}

        return metadata_mapping, id_to_metadata

Label

Label management API client. This class provides a set of APIs to manage the label of a particular project on LandingLens. For example, you can use this class to list all the available labels for a given project.

Example

client = Label(project_id, api_key) client.get_label_map() {'0': 'ok', '1': 'cat', '2': 'dog'}

Parameters

project_id: int LandingLens project id. Can override this default in individual commands. api_key: Optional[str] LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.

Source code in landingai/data_management/label.py
class Label:
    """Label management API client.
    This class provides a set of APIs to manage the label of a particular project on LandingLens.
    For example, you can use this class to list all the available labels for a given project.

    Example
    -------
    >>> client = Label(project_id, api_key)
    >>> client.get_label_map()
    >>> {'0': 'ok', '1': 'cat', '2': 'dog'}

    Parameters
    ----------
    project_id: int
        LandingLens project id.  Can override this default in individual commands.
    api_key: Optional[str]
        LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.
    """

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self._client = LandingLens(project_id=project_id, api_key=api_key)

    def get_label_map(self) -> Dict[str, str]:
        """Get all the available labels for a given project.

        Returns
        ----------
        Dict[str, str]
            A dictionary of label index to label name.
            ```
            # Example output
            {
                "0": "ok",
                "1": "cat",
                "2": "dog",
                "3": "duck",
            }
            ```
        """
        project_id = self._client._project_id
        resp = self._client._api(GET_DEFECTS, params={"projectId": project_id})
        resp_data = resp["data"]
        label_map = {str(label["indexId"]): label["name"] for label in resp_data}
        label_map["0"] = "ok"
        return label_map

get_label_map()

Get all the available labels for a given project.

Returns

Dict[str, str] A dictionary of label index to label name.

# Example output
{
    "0": "ok",
    "1": "cat",
    "2": "dog",
    "3": "duck",
}

Source code in landingai/data_management/label.py
def get_label_map(self) -> Dict[str, str]:
    """Get all the available labels for a given project.

    Returns
    ----------
    Dict[str, str]
        A dictionary of label index to label name.
        ```
        # Example output
        {
            "0": "ok",
            "1": "cat",
            "2": "dog",
            "3": "duck",
        }
        ```
    """
    project_id = self._client._project_id
    resp = self._client._api(GET_DEFECTS, params={"projectId": project_id})
    resp_data = resp["data"]
    label_map = {str(label["indexId"]): label["name"] for label in resp_data}
    label_map["0"] = "ok"
    return label_map

Media

Media management API client. This class provides a set of APIs to manage the medias (images) uploaded to LandingLens. For example, you can use this class to upload medias (images) to LandingLens or list the medias are already uploaded to the LandingLens.

Example

client = Media(project_id, api_key) client.upload("path/to/image.jpg") client.upload("path/to/image_folder") print(client.ls())

Parameters

project_id: int LandingLens project id. Can override this default in individual commands. api_key: Optional[str] LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.

Source code in landingai/data_management/media.py
class Media:
    """Media management API client.
    This class provides a set of APIs to manage the medias (images) uploaded to LandingLens.
    For example, you can use this class to upload medias (images) to LandingLens or list
    the medias are already uploaded to the LandingLens.

    Example
    -------
    >>> client = Media(project_id, api_key)
    >>> client.upload("path/to/image.jpg")
    >>> client.upload("path/to/image_folder")
    >>> print(client.ls())

    Parameters
    ----------
    project_id: int
        LandingLens project id.  Can override this default in individual commands.
    api_key: Optional[str]
        LandingLens API Key. If it's not provided, it will be read from the environment
        variable LANDINGAI_API_KEY, or from .env file on your project root directory.
    """

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self._client = LandingLens(project_id=project_id, api_key=api_key)
        self._media_max_page_size = 1000
        self._metadata_max_page_size = 500

    def upload(
        self,
        source: Union[str, Path, Image],
        split: str = "",
        classification_name: Optional[str] = None,
        object_detection_xml: Optional[str] = None,
        seg_mask: Optional[str] = None,
        seg_defect_map: Optional[str] = None,
        nothing_to_label: bool = False,
        metadata_dict: Optional[Dict[str, Any]] = None,
        validate_extensions: bool = True,
        tolerate_duplicate_upload: bool = True,
        tags: Optional[List[str]] = None,
    ) -> Dict[str, Any]:
        """
        Upload media to platform.

        Parameters
        ----------
        source: Union[str, Path, Image]
            The image source to upload. It can be a path to the local image file, an
            image folder or a PIL Image object. For image files, the supported formats
            are jpg, jpeg, png, bmp and tiff.
        split: str
            Set this media to one split ('train'/'dev'/'test'), '' represents Unassigned
            and is the default
        classification_name: str
            Set the media's classification if the project type is Classification or
            Anomaly Detection
        object_detection_xml: str
            Path to the Pascal VOC xml file for object detection project
        seg_mask: str
            Path to the segmentation mask file for segmentation project
        seg_defect_map: str
            Path to the segmentation defect_map.json file for segmentation project.
            To get this map, you can use the `landingai.data_management.label.Label` API.
            See below code as an example.
            ```python
            >>> client = Label(project_id, api_key)
            >>> client.get_label_map()
            >>> {'0': 'ok', '1': 'cat', '2': 'dog'}
            ```
        nothing_to_label: bool
            Set the media's label as OK, valid for object detection and segmetation
            project
        metadata_dict: dict
            A dictionary of metadata to be updated or inserted. The key of the metadata
            needs to be created/registered (for the first time) on LandingLens before
            media uploading.
        validate_extensions: bool
            Defaults to True. Files other than jpg/jpeg/png/bmp will be skipped.
            If set to False, will try to upload all files. Behavior of platform
            for unexpected extensions may not be correct - for example, most likely file
            will be uploaded to s3, but won't show in data browser.
        tolerate_duplicate_upload: bool
            Whether to tolerate duplicate upload. A duplicate upload is identified by
            status code 409. The server returns a 409 status code if the same media file
            content exists in the project. Defaults to True. If set to False, will raise
            a `landingai.exceptions.HttpError` if it's a duplicate upload.

        Returns
        -------
        Dict[str, Any]
            The result from the upload().
            ```
            # Example output
            {
                "num_uploaded": 10,
                "skipped_count": 0,
                "error_count": 0,
                "medias": [...],
                "files_with_errors": {},
            }
            ```
        """
        if isinstance(source, Path):
            source = str(source)
        if isinstance(source, str) and not os.path.exists(source):
            raise ValueError(
                f"file/folder does not exist at the specified path {source}"
            )

        project_id = self._client._project_id
        project = self._client.get_project_property(project_id)
        dataset_id = project.get("datasetId")
        label_type = project.get("labelType")

        # construct initial_label
        initial_label: Dict[str, Any] = {}
        if nothing_to_label:
            initial_label["unlabeledAsNothingToLabel"] = True
        elif (
            label_type == "classification" or label_type == "anomaly_detection"
        ) and classification_name is not None:
            initial_label["classification"] = classification_name
        elif label_type == "bounding_box" and object_detection_xml is not None:
            xml_content = open(object_detection_xml, "rb").read()
            initial_label["objectDetection"] = base64.b64encode(xml_content).decode(
                "utf-8"
            )
        elif (
            label_type == "segmentation"
            and seg_mask is not None
            and seg_defect_map is not None
        ):
            seg_defect_map_content = open(seg_defect_map, "r").read()
            seg_mask_content = open(seg_mask, "rb").read()
            initial_label["segMask"] = base64.b64encode(seg_mask_content).decode(
                "utf-8"
            )
            initial_label["segDefectMap"] = seg_defect_map_content

        # construct metadata
        metadata: Dict[str, Any] = {} if metadata_dict is None else metadata_dict
        if metadata != {}:
            metadata_mapping, _ = self._client.get_metadata_mappings(project_id)
            metadata = metadata_to_ids(metadata, metadata_mapping)

        medias: List[Dict[str, Any]] = []
        skipped_count = 0
        error_count = 0
        medias_with_errors: Dict[str, Any] = {}

        assert isinstance(source, (str, Image))
        if isinstance(source, str) and os.path.isdir(source):
            (
                medias,
                skipped_count,
                error_count,
                medias_with_errors,
            ) = _upload_folder(
                self._client,
                dataset_id,
                source,
                project_id,
                validate_extensions,
                tolerate_duplicate_upload,
            )
        else:
            # Resolve filename and extension for _upload_media()
            if isinstance(source, Image):
                ext = "png"
                ts = int(datetime.now().timestamp() * 1000)
                filename = f"image_{ts}.{ext}"
            else:
                assert isinstance(source, str)
                filename = os.path.basename(source)
                ext = os.path.splitext(filename)[-1][1:]
            # Validate extension
            if validate_extensions and ext.upper() not in _ALLOWED_EXTENSIONS:
                raise ValueError(
                    f"""Unexpected extension {ext}. Allowed extensions are: {_ALLOWED_EXTENSIONS}.
                    If you want to attempt the upload anyway, set validate_extensions=False.
                    This may result in an unexpected behavior - e.g. file not showing up in data browser."""
                )
            try:
                resp = _upload_media(
                    self._client,
                    dataset_id,
                    filename,
                    source,
                    project_id,
                    ext,
                    split,
                    initial_label,
                    metadata,
                    tags,
                )
                medias.append(resp)
            except DuplicateUploadError:
                if not tolerate_duplicate_upload:
                    raise
                skipped_count = 1
            except Exception as e:
                error_count = 1
                medias_with_errors[filename] = str(e)

        return {
            "num_uploaded": len(medias),
            "skipped_count": skipped_count,
            "error_count": error_count,
            "medias": medias,
            "files_with_errors": medias_with_errors,
        }

    def ls(
        self,
        offset: int = 0,
        limit: int = 1000,
        media_status: Union[str, List[str], None] = None,
        **metadata: Optional[Dict[str, Any]],
    ) -> Dict[str, Any]:
        """
        List medias with metadata for given project id. Can be filtered using metadata.
        NOTE: pagination is applied with the `offset` and `limit` parameters.

        Parameters
        ----------
        offset: int
            Defaults to 0. As in standard pagination.
        limit: int
            Max 1000. Defaults to 1000. As in standard pagination.
        media_status: Union[str, List]
            Gets only medias with specified statuses. Defaults to None - then medias
            with all statuses are fetched.
            Possible values: raw, pending_labeling, pending_review, rejected, approved
        **metadata:
            Kwargs used as metadata that will be used for server side filtering of the results.
        """
        if limit - offset > self._media_max_page_size:
            raise ValueError(f"Exceeded max page size of {self._media_max_page_size}")

        if media_status is not None:
            _validate_media_status(media_status)

        project_id = self._client._project_id
        assert project_id is not None

        dataset_id = self._client.get_project_property(project_id, "dataset_id")

        metadata_filter_map: Dict[str, Any] = {}
        if metadata and len(metadata) > 0:
            metadata_mapping, _ = self._client.get_metadata_mappings(project_id)
            metadata_filter_map = _metadata_to_filter(metadata, metadata_mapping)

        column_filter_map: Dict[str, Any] = {}
        if media_status is not None:
            if isinstance(media_status, str):
                media_status = [media_status]
            column_filter_map = {
                "datasetContent": {"mediaStatus": {"CONTAINS_ANY": media_status}}
            }

        resp = self._client._api(
            MEDIA_LIST,
            params=_build_list_media_request(
                limit,
                column_filter_map,
                dataset_id,
                metadata_filter_map,
                offset,
                project_id,
            ),
        )
        medias = resp["data"]

        if len(medias) == self._media_max_page_size:
            _LOGGER.warning(f"fetched medias only up to {self._media_max_page_size}")

        return {
            "medias": medias,
            "num_requested": limit - offset,
            "count": len(medias),
            "offset": offset,
            "limit": limit,
        }

    def update_split_key(
        self,
        media_ids: List[int],
        split_key: str,
    ) -> None:
        """
        Update the split key for a list of medias on the LandingLens platform.

        Parameters
        ----------
        media_ids: List[int]
            A list of media ids to update split key.
        split: str
            The split key to set for these medias, it could be 'train', 'dev', 'test' or '' (where '' represents Unassigned) and is the default.

        Example
        -------
        >>> client = Media(project_id, api_key)
        >>> client.update_split_key(media_ids=[1001, 1002], split_key="test")  # assign split key 'test' for media ids 1001 and 1002
        >>> client.update_split_key(media_ids=[1001, 1002], split_key="")    # remove split key for media ids 1001 and 1002

        """
        split_key = split_key.strip().lower()
        if split_key not in _SUPPORTED_KEYS:
            raise ValueError(
                f"Invalid split key: {split_key}. Supported split keys are: {_SUPPORTED_KEYS}"
            )
        project_id = self._client._project_id
        split_id = 0  # 0 is Unassigned split
        if split_key != "":
            resp = self._client._api(
                GET_PROJECT_SPLIT, params={"projectId": project_id}
            )
            split_name_to_id = {
                split["splitSetName"].lower(): split["id"] for split in resp["data"]
            }
            assert (
                split_key in split_name_to_id
            ), f"Split key {split_key} not found in project {project_id}. Available split keys in this project are: {split_name_to_id.keys()}"
            split_id = split_name_to_id[split_key]
        dataset_id = self._client.get_project_property(project_id)["datasetId"]
        self._client._api(
            MEDIA_UPDATE_SPLIT,
            params={
                "projectId": project_id,
                "datasetId": dataset_id,
                "splitSet": split_id,
                "selectMediaOptions": json.dumps({"selectedMedia": media_ids}),
            },
        )
        _LOGGER.info(
            f"Successfully updated split key to '{split_key}' for {len(media_ids)} medias with media ids: {media_ids}"
        )

ls(offset=0, limit=1000, media_status=None, **metadata)

List medias with metadata for given project id. Can be filtered using metadata. NOTE: pagination is applied with the offset and limit parameters.

Parameters

offset: int Defaults to 0. As in standard pagination. limit: int Max 1000. Defaults to 1000. As in standard pagination. media_status: Union[str, List] Gets only medias with specified statuses. Defaults to None - then medias with all statuses are fetched. Possible values: raw, pending_labeling, pending_review, rejected, approved **metadata: Kwargs used as metadata that will be used for server side filtering of the results.

Source code in landingai/data_management/media.py
def ls(
    self,
    offset: int = 0,
    limit: int = 1000,
    media_status: Union[str, List[str], None] = None,
    **metadata: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
    """
    List medias with metadata for given project id. Can be filtered using metadata.
    NOTE: pagination is applied with the `offset` and `limit` parameters.

    Parameters
    ----------
    offset: int
        Defaults to 0. As in standard pagination.
    limit: int
        Max 1000. Defaults to 1000. As in standard pagination.
    media_status: Union[str, List]
        Gets only medias with specified statuses. Defaults to None - then medias
        with all statuses are fetched.
        Possible values: raw, pending_labeling, pending_review, rejected, approved
    **metadata:
        Kwargs used as metadata that will be used for server side filtering of the results.
    """
    if limit - offset > self._media_max_page_size:
        raise ValueError(f"Exceeded max page size of {self._media_max_page_size}")

    if media_status is not None:
        _validate_media_status(media_status)

    project_id = self._client._project_id
    assert project_id is not None

    dataset_id = self._client.get_project_property(project_id, "dataset_id")

    metadata_filter_map: Dict[str, Any] = {}
    if metadata and len(metadata) > 0:
        metadata_mapping, _ = self._client.get_metadata_mappings(project_id)
        metadata_filter_map = _metadata_to_filter(metadata, metadata_mapping)

    column_filter_map: Dict[str, Any] = {}
    if media_status is not None:
        if isinstance(media_status, str):
            media_status = [media_status]
        column_filter_map = {
            "datasetContent": {"mediaStatus": {"CONTAINS_ANY": media_status}}
        }

    resp = self._client._api(
        MEDIA_LIST,
        params=_build_list_media_request(
            limit,
            column_filter_map,
            dataset_id,
            metadata_filter_map,
            offset,
            project_id,
        ),
    )
    medias = resp["data"]

    if len(medias) == self._media_max_page_size:
        _LOGGER.warning(f"fetched medias only up to {self._media_max_page_size}")

    return {
        "medias": medias,
        "num_requested": limit - offset,
        "count": len(medias),
        "offset": offset,
        "limit": limit,
    }

update_split_key(media_ids, split_key)

Update the split key for a list of medias on the LandingLens platform.

Parameters

media_ids: List[int] A list of media ids to update split key. split: str The split key to set for these medias, it could be 'train', 'dev', 'test' or '' (where '' represents Unassigned) and is the default.

Example

client = Media(project_id, api_key) client.update_split_key(media_ids=[1001, 1002], split_key="test") # assign split key 'test' for media ids 1001 and 1002 client.update_split_key(media_ids=[1001, 1002], split_key="") # remove split key for media ids 1001 and 1002

Source code in landingai/data_management/media.py
def update_split_key(
    self,
    media_ids: List[int],
    split_key: str,
) -> None:
    """
    Update the split key for a list of medias on the LandingLens platform.

    Parameters
    ----------
    media_ids: List[int]
        A list of media ids to update split key.
    split: str
        The split key to set for these medias, it could be 'train', 'dev', 'test' or '' (where '' represents Unassigned) and is the default.

    Example
    -------
    >>> client = Media(project_id, api_key)
    >>> client.update_split_key(media_ids=[1001, 1002], split_key="test")  # assign split key 'test' for media ids 1001 and 1002
    >>> client.update_split_key(media_ids=[1001, 1002], split_key="")    # remove split key for media ids 1001 and 1002

    """
    split_key = split_key.strip().lower()
    if split_key not in _SUPPORTED_KEYS:
        raise ValueError(
            f"Invalid split key: {split_key}. Supported split keys are: {_SUPPORTED_KEYS}"
        )
    project_id = self._client._project_id
    split_id = 0  # 0 is Unassigned split
    if split_key != "":
        resp = self._client._api(
            GET_PROJECT_SPLIT, params={"projectId": project_id}
        )
        split_name_to_id = {
            split["splitSetName"].lower(): split["id"] for split in resp["data"]
        }
        assert (
            split_key in split_name_to_id
        ), f"Split key {split_key} not found in project {project_id}. Available split keys in this project are: {split_name_to_id.keys()}"
        split_id = split_name_to_id[split_key]
    dataset_id = self._client.get_project_property(project_id)["datasetId"]
    self._client._api(
        MEDIA_UPDATE_SPLIT,
        params={
            "projectId": project_id,
            "datasetId": dataset_id,
            "splitSet": split_id,
            "selectMediaOptions": json.dumps({"selectedMedia": media_ids}),
        },
    )
    _LOGGER.info(
        f"Successfully updated split key to '{split_key}' for {len(media_ids)} medias with media ids: {media_ids}"
    )

upload(source, split='', classification_name=None, object_detection_xml=None, seg_mask=None, seg_defect_map=None, nothing_to_label=False, metadata_dict=None, validate_extensions=True, tolerate_duplicate_upload=True, tags=None)

Upload media to platform.

Parameters

source: Union[str, Path, Image] The image source to upload. It can be a path to the local image file, an image folder or a PIL Image object. For image files, the supported formats are jpg, jpeg, png, bmp and tiff. split: str Set this media to one split ('train'/'dev'/'test'), '' represents Unassigned and is the default classification_name: str Set the media's classification if the project type is Classification or Anomaly Detection object_detection_xml: str Path to the Pascal VOC xml file for object detection project seg_mask: str Path to the segmentation mask file for segmentation project seg_defect_map: str Path to the segmentation defect_map.json file for segmentation project. To get this map, you can use the landingai.data_management.label.Label API. See below code as an example.

>>> client = Label(project_id, api_key)
>>> client.get_label_map()
>>> {'0': 'ok', '1': 'cat', '2': 'dog'}
nothing_to_label: bool Set the media's label as OK, valid for object detection and segmetation project metadata_dict: dict A dictionary of metadata to be updated or inserted. The key of the metadata needs to be created/registered (for the first time) on LandingLens before media uploading. validate_extensions: bool Defaults to True. Files other than jpg/jpeg/png/bmp will be skipped. If set to False, will try to upload all files. Behavior of platform for unexpected extensions may not be correct - for example, most likely file will be uploaded to s3, but won't show in data browser. tolerate_duplicate_upload: bool Whether to tolerate duplicate upload. A duplicate upload is identified by status code 409. The server returns a 409 status code if the same media file content exists in the project. Defaults to True. If set to False, will raise a landingai.exceptions.HttpError if it's a duplicate upload.

Returns

Dict[str, Any] The result from the upload().

# Example output
{
    "num_uploaded": 10,
    "skipped_count": 0,
    "error_count": 0,
    "medias": [...],
    "files_with_errors": {},
}

Source code in landingai/data_management/media.py
def upload(
    self,
    source: Union[str, Path, Image],
    split: str = "",
    classification_name: Optional[str] = None,
    object_detection_xml: Optional[str] = None,
    seg_mask: Optional[str] = None,
    seg_defect_map: Optional[str] = None,
    nothing_to_label: bool = False,
    metadata_dict: Optional[Dict[str, Any]] = None,
    validate_extensions: bool = True,
    tolerate_duplicate_upload: bool = True,
    tags: Optional[List[str]] = None,
) -> Dict[str, Any]:
    """
    Upload media to platform.

    Parameters
    ----------
    source: Union[str, Path, Image]
        The image source to upload. It can be a path to the local image file, an
        image folder or a PIL Image object. For image files, the supported formats
        are jpg, jpeg, png, bmp and tiff.
    split: str
        Set this media to one split ('train'/'dev'/'test'), '' represents Unassigned
        and is the default
    classification_name: str
        Set the media's classification if the project type is Classification or
        Anomaly Detection
    object_detection_xml: str
        Path to the Pascal VOC xml file for object detection project
    seg_mask: str
        Path to the segmentation mask file for segmentation project
    seg_defect_map: str
        Path to the segmentation defect_map.json file for segmentation project.
        To get this map, you can use the `landingai.data_management.label.Label` API.
        See below code as an example.
        ```python
        >>> client = Label(project_id, api_key)
        >>> client.get_label_map()
        >>> {'0': 'ok', '1': 'cat', '2': 'dog'}
        ```
    nothing_to_label: bool
        Set the media's label as OK, valid for object detection and segmetation
        project
    metadata_dict: dict
        A dictionary of metadata to be updated or inserted. The key of the metadata
        needs to be created/registered (for the first time) on LandingLens before
        media uploading.
    validate_extensions: bool
        Defaults to True. Files other than jpg/jpeg/png/bmp will be skipped.
        If set to False, will try to upload all files. Behavior of platform
        for unexpected extensions may not be correct - for example, most likely file
        will be uploaded to s3, but won't show in data browser.
    tolerate_duplicate_upload: bool
        Whether to tolerate duplicate upload. A duplicate upload is identified by
        status code 409. The server returns a 409 status code if the same media file
        content exists in the project. Defaults to True. If set to False, will raise
        a `landingai.exceptions.HttpError` if it's a duplicate upload.

    Returns
    -------
    Dict[str, Any]
        The result from the upload().
        ```
        # Example output
        {
            "num_uploaded": 10,
            "skipped_count": 0,
            "error_count": 0,
            "medias": [...],
            "files_with_errors": {},
        }
        ```
    """
    if isinstance(source, Path):
        source = str(source)
    if isinstance(source, str) and not os.path.exists(source):
        raise ValueError(
            f"file/folder does not exist at the specified path {source}"
        )

    project_id = self._client._project_id
    project = self._client.get_project_property(project_id)
    dataset_id = project.get("datasetId")
    label_type = project.get("labelType")

    # construct initial_label
    initial_label: Dict[str, Any] = {}
    if nothing_to_label:
        initial_label["unlabeledAsNothingToLabel"] = True
    elif (
        label_type == "classification" or label_type == "anomaly_detection"
    ) and classification_name is not None:
        initial_label["classification"] = classification_name
    elif label_type == "bounding_box" and object_detection_xml is not None:
        xml_content = open(object_detection_xml, "rb").read()
        initial_label["objectDetection"] = base64.b64encode(xml_content).decode(
            "utf-8"
        )
    elif (
        label_type == "segmentation"
        and seg_mask is not None
        and seg_defect_map is not None
    ):
        seg_defect_map_content = open(seg_defect_map, "r").read()
        seg_mask_content = open(seg_mask, "rb").read()
        initial_label["segMask"] = base64.b64encode(seg_mask_content).decode(
            "utf-8"
        )
        initial_label["segDefectMap"] = seg_defect_map_content

    # construct metadata
    metadata: Dict[str, Any] = {} if metadata_dict is None else metadata_dict
    if metadata != {}:
        metadata_mapping, _ = self._client.get_metadata_mappings(project_id)
        metadata = metadata_to_ids(metadata, metadata_mapping)

    medias: List[Dict[str, Any]] = []
    skipped_count = 0
    error_count = 0
    medias_with_errors: Dict[str, Any] = {}

    assert isinstance(source, (str, Image))
    if isinstance(source, str) and os.path.isdir(source):
        (
            medias,
            skipped_count,
            error_count,
            medias_with_errors,
        ) = _upload_folder(
            self._client,
            dataset_id,
            source,
            project_id,
            validate_extensions,
            tolerate_duplicate_upload,
        )
    else:
        # Resolve filename and extension for _upload_media()
        if isinstance(source, Image):
            ext = "png"
            ts = int(datetime.now().timestamp() * 1000)
            filename = f"image_{ts}.{ext}"
        else:
            assert isinstance(source, str)
            filename = os.path.basename(source)
            ext = os.path.splitext(filename)[-1][1:]
        # Validate extension
        if validate_extensions and ext.upper() not in _ALLOWED_EXTENSIONS:
            raise ValueError(
                f"""Unexpected extension {ext}. Allowed extensions are: {_ALLOWED_EXTENSIONS}.
                If you want to attempt the upload anyway, set validate_extensions=False.
                This may result in an unexpected behavior - e.g. file not showing up in data browser."""
            )
        try:
            resp = _upload_media(
                self._client,
                dataset_id,
                filename,
                source,
                project_id,
                ext,
                split,
                initial_label,
                metadata,
                tags,
            )
            medias.append(resp)
        except DuplicateUploadError:
            if not tolerate_duplicate_upload:
                raise
            skipped_count = 1
        except Exception as e:
            error_count = 1
            medias_with_errors[filename] = str(e)

    return {
        "num_uploaded": len(medias),
        "skipped_count": skipped_count,
        "error_count": error_count,
        "medias": medias,
        "files_with_errors": medias_with_errors,
    }

Metadata

Metadata management API client. This class provides a set of APIs to manage the metadata of the medias (images) uploaded to LandingLens. For example, you can use this class to update the metadata of the uploaded medias.

Example

client = Metadata(project_id, api_key) client.update([101, 102, 103], creator="tom")

Parameters

project_id: int LandingLens project id. Can override this default in individual commands. api_key: Optional[str] LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.

Source code in landingai/data_management/metadata.py
class Metadata:
    """Metadata management API client.
    This class provides a set of APIs to manage the metadata of the medias (images) uploaded to LandingLens.
    For example, you can use this class to update the metadata of the uploaded medias.

    Example
    -------
    >>> client = Metadata(project_id, api_key)
    >>> client.update([101, 102, 103], creator="tom")

    Parameters
    ----------
    project_id: int
        LandingLens project id.  Can override this default in individual commands.
    api_key: Optional[str]
        LandingLens API Key. If it's not provided, it will be read from the environment variable LANDINGAI_API_KEY, or from .env file on your project root directory.
    """

    def __init__(self, project_id: int, api_key: Optional[str] = None):
        self._client = LandingLens(project_id=project_id, api_key=api_key)

    def update(
        self,
        media_ids: Union[int, List[int]],
        **input_metadata: Optional[Dict[str, Any]],
    ) -> Dict[str, Any]:
        """Update or insert a dictionary of metadata for a set of medias.

        Parameters
        ----------
        media_ids
            Media ids to update.
        input_metadata
            A dictionary of metadata to be updated or inserted. The key of the metadata
            needs to be created/registered (for the first time) on LandingLens before
            calling update().

        Returns
        ----------
        Dict[str, Any]
            The result from the update().
            ```
            # Example output
            {
                "project_id": 12345,
                "metadata": [...],
                "media_ids": [123, 124]],
            }
            ```
        """
        project_id = self._client._project_id
        if (
            not media_ids
            or isinstance(media_ids, bool)
            or (not isinstance(media_ids, int) and len(media_ids) == 0)
        ):
            raise ValueError("Missing required flags: {'media_ids'}")

        if not input_metadata:
            raise ValueError("Missing required flags: {'metadata'}")

        dataset_id = self._client.get_project_property(project_id, "dataset_id")

        if isinstance(media_ids, int):
            media_ids = [media_ids]
        else:
            # to avoid errors due to things like numpy.int
            media_ids = list(map(int, media_ids))

        metadata_mapping, id_to_metadata = self._client.get_metadata_mappings(
            project_id
        )

        body = _MetadataUploadRequestBody(
            selectOption=_SelectOption(media_ids),
            project=_Project(project_id, dataset_id),
            metadata=metadata_to_ids(input_metadata, metadata_mapping),
        )

        resp = self._client._api(METADATA_UPDATE, data=obj_to_dict(body))
        resp_data = resp["data"]
        return {
            "project_id": project_id,
            "metadata": ids_to_metadata(resp_data[0]["metadata"], id_to_metadata),
            "media_ids": [media["mediaId"] for media in resp_data],
        }

    def get(self, media_id: int) -> Dict[str, str]:
        """Return all the metadata associated with a given media."""
        resp = self._client._api(
            METADATA_GET, params={"objectId": media_id, "objectType": "media"}
        )
        _, id_to_metadata = self._client.get_metadata_mappings(self._client._project_id)
        return {id_to_metadata[int(k)]: v for k, v in resp["data"].items()}

get(media_id)

Return all the metadata associated with a given media.

Source code in landingai/data_management/metadata.py
def get(self, media_id: int) -> Dict[str, str]:
    """Return all the metadata associated with a given media."""
    resp = self._client._api(
        METADATA_GET, params={"objectId": media_id, "objectType": "media"}
    )
    _, id_to_metadata = self._client.get_metadata_mappings(self._client._project_id)
    return {id_to_metadata[int(k)]: v for k, v in resp["data"].items()}

update(media_ids, **input_metadata)

Update or insert a dictionary of metadata for a set of medias.

Parameters

media_ids Media ids to update. input_metadata A dictionary of metadata to be updated or inserted. The key of the metadata needs to be created/registered (for the first time) on LandingLens before calling update().

Returns

Dict[str, Any] The result from the update().

# Example output
{
    "project_id": 12345,
    "metadata": [...],
    "media_ids": [123, 124]],
}

Source code in landingai/data_management/metadata.py
def update(
    self,
    media_ids: Union[int, List[int]],
    **input_metadata: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
    """Update or insert a dictionary of metadata for a set of medias.

    Parameters
    ----------
    media_ids
        Media ids to update.
    input_metadata
        A dictionary of metadata to be updated or inserted. The key of the metadata
        needs to be created/registered (for the first time) on LandingLens before
        calling update().

    Returns
    ----------
    Dict[str, Any]
        The result from the update().
        ```
        # Example output
        {
            "project_id": 12345,
            "metadata": [...],
            "media_ids": [123, 124]],
        }
        ```
    """
    project_id = self._client._project_id
    if (
        not media_ids
        or isinstance(media_ids, bool)
        or (not isinstance(media_ids, int) and len(media_ids) == 0)
    ):
        raise ValueError("Missing required flags: {'media_ids'}")

    if not input_metadata:
        raise ValueError("Missing required flags: {'metadata'}")

    dataset_id = self._client.get_project_property(project_id, "dataset_id")

    if isinstance(media_ids, int):
        media_ids = [media_ids]
    else:
        # to avoid errors due to things like numpy.int
        media_ids = list(map(int, media_ids))

    metadata_mapping, id_to_metadata = self._client.get_metadata_mappings(
        project_id
    )

    body = _MetadataUploadRequestBody(
        selectOption=_SelectOption(media_ids),
        project=_Project(project_id, dataset_id),
        metadata=metadata_to_ids(input_metadata, metadata_mapping),
    )

    resp = self._client._api(METADATA_UPDATE, data=obj_to_dict(body))
    resp_data = resp["data"]
    return {
        "project_id": project_id,
        "metadata": ids_to_metadata(resp_data[0]["metadata"], id_to_metadata),
        "media_ids": [media["mediaId"] for media in resp_data],
    }

Encoder

Bases: JSONEncoder

JSON encoder that converts all keys to camel case

Source code in landingai/data_management/utils.py
class Encoder(json.JSONEncoder):
    """JSON encoder that converts all keys to camel case"""

    def default(self, obj: object) -> Any:
        if isinstance(obj, dict):
            return {to_camel_case(k): v for k, v in obj.items()}
        if isinstance(obj, Enum):
            return obj._name_
        return {to_camel_case(k): v for k, v in obj.__dict__.items()}

PrettyPrintable

A mix-in class that enables its subclass to be serialized into pretty printed string

Source code in landingai/data_management/utils.py
class PrettyPrintable:
    """A mix-in class that enables its subclass to be serialized into pretty printed string"""

    def to_str(self) -> str:
        """Returns the string representation of the model"""
        return pprint.pformat(self.__dict__)

    def __repr__(self) -> str:
        """For `print` and `pprint`"""
        return self.to_str()

__repr__()

For print and pprint

Source code in landingai/data_management/utils.py
def __repr__(self) -> str:
    """For `print` and `pprint`"""
    return self.to_str()

to_str()

Returns the string representation of the model

Source code in landingai/data_management/utils.py
def to_str(self) -> str:
    """Returns the string representation of the model"""
    return pprint.pformat(self.__dict__)

obj_to_dict(obj)

Convert an object to a json dictionary with camel case keys

Source code in landingai/data_management/utils.py
def obj_to_dict(obj: object) -> Dict[str, Any]:
    """Convert an object to a json dictionary with camel case keys"""
    json_body = json.dumps(obj, cls=Encoder)
    return cast(Dict[str, Any], json.loads(json_body))

obj_to_params(obj)

Convert an object to query parameters in dict format where the dict keys are in camel case.

Source code in landingai/data_management/utils.py
def obj_to_params(obj: object) -> Dict[str, Any]:
    """Convert an object to query parameters in dict format where the dict keys are in camel case."""
    return {
        to_camel_case(k): v if isinstance(v, list) else json.dumps(v, cls=Encoder)
        for k, v in obj.__dict__.items()
    }

to_camel_case(snake_str)

Convert a snake case string to camel case

Source code in landingai/data_management/utils.py
def to_camel_case(snake_str: str) -> str:
    """Convert a snake case string to camel case"""
    words = snake_str.split("_")
    return words[0] + "".join(word.title() for word in words[1:])

validate_metadata(input_metadata, metadata_mapping)

Validate the input metadata against the metadata mapping. Raise ValueError if any metadata keys are not available.

Source code in landingai/data_management/utils.py
def validate_metadata(
    input_metadata: Dict[str, Any], metadata_mapping: Dict[str, Any]
) -> None:
    """Validate the input metadata against the metadata mapping. Raise ValueError if any metadata keys are not available."""
    not_allowed = set(input_metadata.keys()) - set(metadata_mapping.keys())
    # TODO: Validate also values and maybe types. Or shouldn't it be the job of the server?
    if len(not_allowed) > 0:
        raise ValueError(
            f"""Not allowed fields: {not_allowed}.
Available fields are {metadata_mapping.keys()}.
If you want to add new fields, please add it to the associated project on the LandingLens platform."""
        )