Skip to content

landingai.predict

Predictor

A class that calls your inference endpoint on the LandingLens platform.

Source code in landingai/predict/cloud.py
class Predictor:
    """A class that calls your inference endpoint on the LandingLens platform."""

    _url: str = "https://predict.app.landing.ai/inference/v1/predict"
    _num_retry: int = 3
    _session: Session

    def __init__(
        self,
        endpoint_id: str,
        *,
        api_key: Optional[str] = None,
        check_server_ready: bool = True,
    ) -> None:
        """Predictor constructor

        Parameters
        ----------
        endpoint_id
            A unique string that identifies your inference endpoint.
            This string can be found in the URL of your inference endpoint.
            Example: "9f237028-e630-4576-8826-f35ab9000abc" is the endpoint id in this URL:
            https://predict.app.landing.ai/inference/v1/predict?endpoint_id=9f237028-e630-4576-8826-f35ab9000abc
        api_key
            The API Key of your LandingLens organization.
            If not provided, it will try to load from the environment variable
            LANDINGAI_API_KEY or from the .env file.
        check_server_ready : bool, optional
            Check if the cloud inference service is reachable, by default True
        """
        # Check if the cloud inference service is reachable
        if check_server_ready and not self._check_connectivity(url=self._url):
            raise ConnectionError(
                f"Failed to connect to the cloud inference service. Check that {self._url} is accesible from this device"
            )

        self._endpoint_id = endpoint_id
        self._api_credential = self._load_api_credential(api_key)
        extra_x_event = {
            "endpoint_id": self._endpoint_id,
            "model_type": "fast_and_easy",
        }
        headers = self._build_default_headers(self._api_credential, extra_x_event)
        self._session = create_requests_session(self._url, self._num_retry, headers)
        # performance_metrics keeps performance metrics for the last call to _do_inference()
        self._performance_metrics: Dict[str, int] = {}

    def _load_api_credential(self, api_key: Optional[str]) -> Optional[APIKey]:
        """
        Simple wrapper to load the API key from given string or env var.

        This wrapper is useful to allow subclasses of `Predictor` to override the behavior
        of loading the API key.
        For example: SnowflakeNativeAppPredictor doesn't use APIKey at all,
        so it can override this method to return None.
        """
        return load_api_credential(api_key)

    def _check_connectivity(
        self, url: Optional[str] = None, host: Optional[Tuple[str, int]] = None
    ) -> bool:
        if url:
            parsed_url = urlparse(url)
            if parsed_url.port:
                port = parsed_url.port
            elif parsed_url.scheme == "https":
                port = 443
            elif parsed_url.scheme == "http":
                port = 80
            else:
                port = socket.getservbyname(parsed_url.scheme)
            host = (parsed_url.hostname, port)  # type: ignore

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex(host)  # type: ignore
        # print(f"Checking if {host[0]}:{host[1]} is open (res={result})")
        sock.close()
        return result == 0

    def _build_default_headers(
        self, api_key: Optional[APIKey], extra_x_event: Optional[Dict[str, str]] = None
    ) -> Dict[str, str]:
        """Build the HTTP headers for the request to the Cloud inference endpoint(s)."""
        tracked_properties = get_runtime_environment_info()
        if extra_x_event:
            tracked_properties.update(extra_x_event)
        tracking_data = {
            "event": "inference",
            "action": "POST",
            "properties": tracked_properties,
        }
        header = {
            "contentType": "multipart/form-data",
            "X-event": json.dumps(tracking_data),
        }
        if api_key is not None:
            header["apikey"] = api_key.api_key
        return header

    @retry(
        # All customers have a quota of images per minute. If the server return a 429, then we will wait 60 seconds and retry. Note that we will retry forever on 429s which is ok since the rate limiter will eventually allow the request to go through.
        retry=retry_if_exception_type(RateLimitExceededError),
        wait=wait_fixed(60),
        before_sleep=before_sleep_log(_LOGGER, logging.WARNING),
    )
    @Timer(name="Predictor.predict")
    def predict(
        self,
        image: Union[np.ndarray, PIL.Image.Image],
        metadata: Optional[InferenceMetadata] = None,
        **kwargs: Any,
    ) -> List[Prediction]:
        """Call the inference endpoint and return the prediction result.

        Parameters
        ----------
        image
            The input image to be predicted. The image should be in the RGB format if it has three channels.
        metadata
            The (optional) metadata associated with this inference/image.
            Metadata is helpful for attaching additional information to the inference result so you can later filter the historical inference results by your custom values in LandingLens.

            See `landingai.common.InferenceMetadata` for more information.

        Returns
        -------
        The inference result in a list of dictionary
            Each dictionary is a prediction result.
            The inference result has been filtered by the confidence threshold set in LandingLens and sorted by confidence score in descending order.
        """
        buffer_bytes = serialize_image(image)
        files = {"file": buffer_bytes}
        query_params = {
            "endpoint_id": self._endpoint_id,
        }
        data = {"metadata": metadata.json()} if metadata else None
        (preds, self._performance_metrics) = get_cloudinference_prediction(
            self._session,
            self._url,
            files,
            query_params,
            _CloudPredictionExtractor,
            data=data,
        )
        return preds

    def get_metrics(self) -> Dict[str, int]:
        """
        Return the performance metrics for the last inference call.

        Returns:
            A dictionary containing the performance metrics.
            Example:
            {
                "decoding_s": 0.0084266,
                "infer_s": 3.3537345,
                "postprocess_s": 0.0255059,
                "preprocess_s": 0.0124037,
                "waiting_s": 0.0001487
            }
        """
        return self._performance_metrics

__init__(endpoint_id, *, api_key=None, check_server_ready=True)

Predictor constructor

Parameters

endpoint_id A unique string that identifies your inference endpoint. This string can be found in the URL of your inference endpoint. Example: "9f237028-e630-4576-8826-f35ab9000abc" is the endpoint id in this URL: https://predict.app.landing.ai/inference/v1/predict?endpoint_id=9f237028-e630-4576-8826-f35ab9000abc api_key The API Key of your LandingLens organization. If not provided, it will try to load from the environment variable LANDINGAI_API_KEY or from the .env file. check_server_ready : bool, optional Check if the cloud inference service is reachable, by default True

Source code in landingai/predict/cloud.py
def __init__(
    self,
    endpoint_id: str,
    *,
    api_key: Optional[str] = None,
    check_server_ready: bool = True,
) -> None:
    """Predictor constructor

    Parameters
    ----------
    endpoint_id
        A unique string that identifies your inference endpoint.
        This string can be found in the URL of your inference endpoint.
        Example: "9f237028-e630-4576-8826-f35ab9000abc" is the endpoint id in this URL:
        https://predict.app.landing.ai/inference/v1/predict?endpoint_id=9f237028-e630-4576-8826-f35ab9000abc
    api_key
        The API Key of your LandingLens organization.
        If not provided, it will try to load from the environment variable
        LANDINGAI_API_KEY or from the .env file.
    check_server_ready : bool, optional
        Check if the cloud inference service is reachable, by default True
    """
    # Check if the cloud inference service is reachable
    if check_server_ready and not self._check_connectivity(url=self._url):
        raise ConnectionError(
            f"Failed to connect to the cloud inference service. Check that {self._url} is accesible from this device"
        )

    self._endpoint_id = endpoint_id
    self._api_credential = self._load_api_credential(api_key)
    extra_x_event = {
        "endpoint_id": self._endpoint_id,
        "model_type": "fast_and_easy",
    }
    headers = self._build_default_headers(self._api_credential, extra_x_event)
    self._session = create_requests_session(self._url, self._num_retry, headers)
    # performance_metrics keeps performance metrics for the last call to _do_inference()
    self._performance_metrics: Dict[str, int] = {}

get_metrics()

Return the performance metrics for the last inference call.

Returns:

Name Type Description
Dict[str, int]

A dictionary containing the performance metrics.

Example Dict[str, int]
Dict[str, int]

{ "decoding_s": 0.0084266, "infer_s": 3.3537345, "postprocess_s": 0.0255059, "preprocess_s": 0.0124037, "waiting_s": 0.0001487

Dict[str, int]

}

Source code in landingai/predict/cloud.py
def get_metrics(self) -> Dict[str, int]:
    """
    Return the performance metrics for the last inference call.

    Returns:
        A dictionary containing the performance metrics.
        Example:
        {
            "decoding_s": 0.0084266,
            "infer_s": 3.3537345,
            "postprocess_s": 0.0255059,
            "preprocess_s": 0.0124037,
            "waiting_s": 0.0001487
        }
    """
    return self._performance_metrics

predict(image, metadata=None, **kwargs)

Call the inference endpoint and return the prediction result.

Parameters

image The input image to be predicted. The image should be in the RGB format if it has three channels. metadata The (optional) metadata associated with this inference/image. Metadata is helpful for attaching additional information to the inference result so you can later filter the historical inference results by your custom values in LandingLens.

See `landingai.common.InferenceMetadata` for more information.
Returns

The inference result in a list of dictionary Each dictionary is a prediction result. The inference result has been filtered by the confidence threshold set in LandingLens and sorted by confidence score in descending order.

Source code in landingai/predict/cloud.py
@retry(
    # All customers have a quota of images per minute. If the server return a 429, then we will wait 60 seconds and retry. Note that we will retry forever on 429s which is ok since the rate limiter will eventually allow the request to go through.
    retry=retry_if_exception_type(RateLimitExceededError),
    wait=wait_fixed(60),
    before_sleep=before_sleep_log(_LOGGER, logging.WARNING),
)
@Timer(name="Predictor.predict")
def predict(
    self,
    image: Union[np.ndarray, PIL.Image.Image],
    metadata: Optional[InferenceMetadata] = None,
    **kwargs: Any,
) -> List[Prediction]:
    """Call the inference endpoint and return the prediction result.

    Parameters
    ----------
    image
        The input image to be predicted. The image should be in the RGB format if it has three channels.
    metadata
        The (optional) metadata associated with this inference/image.
        Metadata is helpful for attaching additional information to the inference result so you can later filter the historical inference results by your custom values in LandingLens.

        See `landingai.common.InferenceMetadata` for more information.

    Returns
    -------
    The inference result in a list of dictionary
        Each dictionary is a prediction result.
        The inference result has been filtered by the confidence threshold set in LandingLens and sorted by confidence score in descending order.
    """
    buffer_bytes = serialize_image(image)
    files = {"file": buffer_bytes}
    query_params = {
        "endpoint_id": self._endpoint_id,
    }
    data = {"metadata": metadata.json()} if metadata else None
    (preds, self._performance_metrics) = get_cloudinference_prediction(
        self._session,
        self._url,
        files,
        query_params,
        _CloudPredictionExtractor,
        data=data,
    )
    return preds

EdgePredictor

Bases: Predictor

EdgePredictor runs local inference by connecting to an edge inference service (e.g. LandingEdge)

Source code in landingai/predict/edge.py
class EdgePredictor(Predictor):
    """`EdgePredictor` runs local inference by connecting to an edge inference service (e.g. LandingEdge)"""

    def __init__(
        self,
        host: str = "localhost",
        port: int = 8000,
        check_server_ready: bool = True,
    ) -> None:
        """By default the inference service runs on `localhost:8000`

        Parameters
        ----------
        host : str, optional
            Hostname or IP, by default "localhost"
        port : int, optional
            Port, by default 8000
        check_server_ready : bool, optional
            Check if the inference server is running, by default True
        """
        self._url = f"http://{host}:{port}/images"
        # Check if the inference server is reachable
        if check_server_ready and not self._check_connectivity(host=(host, port)):
            raise ConnectionError(
                f"Failed to connect to the model server. Please check if the server is running and the connection url ({self._url})."
            )
        self._session = create_requests_session(
            self._url,
            0,
            {
                "contentType": "multipart/form-data"
            },  # No retries for the inference service
        )

    @Timer(name="EdgePredictor.predict")
    def predict(
        self,
        image: Union[np.ndarray, PIL.Image.Image],
        metadata: Optional[InferenceMetadata] = None,
        reuse_session: bool = True,
        **kwargs: Any,
    ) -> List[Prediction]:
        """Run Edge inference on the input image and return the prediction result.

        Parameters
        ----------
        image
            The input image to be predicted
        metadata
            The (optional) metadata associated with this inference/image.
            Metadata is helpful for attaching additional information to the inference result so you can later filter the historical inference results by your custom values in LandingLens.
            Note: The metadata is not reported back to LandingLens by default unless the edge inference server (i.e. ModelRunner) enables the feature of reporting historical inference results.

            See `landingai.common.InferenceMetadata` for more details.
        reuse_session
            Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False.
        Returns
        -------
        List[Prediction]
            A list of prediction result.
        """
        buffer_bytes = serialize_image(image)
        files = {"file": buffer_bytes}
        data = {"metadata": metadata.json()} if metadata else None
        if reuse_session:
            session = self._session
        else:
            session = create_requests_session(
                self._url,
                0,
                {
                    "contentType": "multipart/form-data"
                },  # No retries for the inference service
            )
        (preds, self._performance_metrics) = get_cloudinference_prediction(
            session, self._url, files, {}, _EdgeExtractor, data=data
        )
        return preds

__init__(host='localhost', port=8000, check_server_ready=True)

By default the inference service runs on localhost:8000

Parameters

host : str, optional Hostname or IP, by default "localhost" port : int, optional Port, by default 8000 check_server_ready : bool, optional Check if the inference server is running, by default True

Source code in landingai/predict/edge.py
def __init__(
    self,
    host: str = "localhost",
    port: int = 8000,
    check_server_ready: bool = True,
) -> None:
    """By default the inference service runs on `localhost:8000`

    Parameters
    ----------
    host : str, optional
        Hostname or IP, by default "localhost"
    port : int, optional
        Port, by default 8000
    check_server_ready : bool, optional
        Check if the inference server is running, by default True
    """
    self._url = f"http://{host}:{port}/images"
    # Check if the inference server is reachable
    if check_server_ready and not self._check_connectivity(host=(host, port)):
        raise ConnectionError(
            f"Failed to connect to the model server. Please check if the server is running and the connection url ({self._url})."
        )
    self._session = create_requests_session(
        self._url,
        0,
        {
            "contentType": "multipart/form-data"
        },  # No retries for the inference service
    )

predict(image, metadata=None, reuse_session=True, **kwargs)

Run Edge inference on the input image and return the prediction result.

Parameters

image The input image to be predicted metadata The (optional) metadata associated with this inference/image. Metadata is helpful for attaching additional information to the inference result so you can later filter the historical inference results by your custom values in LandingLens. Note: The metadata is not reported back to LandingLens by default unless the edge inference server (i.e. ModelRunner) enables the feature of reporting historical inference results.

See `landingai.common.InferenceMetadata` for more details.

reuse_session Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False. Returns


List[Prediction] A list of prediction result.

Source code in landingai/predict/edge.py
@Timer(name="EdgePredictor.predict")
def predict(
    self,
    image: Union[np.ndarray, PIL.Image.Image],
    metadata: Optional[InferenceMetadata] = None,
    reuse_session: bool = True,
    **kwargs: Any,
) -> List[Prediction]:
    """Run Edge inference on the input image and return the prediction result.

    Parameters
    ----------
    image
        The input image to be predicted
    metadata
        The (optional) metadata associated with this inference/image.
        Metadata is helpful for attaching additional information to the inference result so you can later filter the historical inference results by your custom values in LandingLens.
        Note: The metadata is not reported back to LandingLens by default unless the edge inference server (i.e. ModelRunner) enables the feature of reporting historical inference results.

        See `landingai.common.InferenceMetadata` for more details.
    reuse_session
        Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False.
    Returns
    -------
    List[Prediction]
        A list of prediction result.
    """
    buffer_bytes = serialize_image(image)
    files = {"file": buffer_bytes}
    data = {"metadata": metadata.json()} if metadata else None
    if reuse_session:
        session = self._session
    else:
        session = create_requests_session(
            self._url,
            0,
            {
                "contentType": "multipart/form-data"
            },  # No retries for the inference service
        )
    (preds, self._performance_metrics) = get_cloudinference_prediction(
        session, self._url, files, {}, _EdgeExtractor, data=data
    )
    return preds

OcrPredictor

Bases: Predictor

A class that calls your OCR inference endpoint on the LandingLens platform.

Source code in landingai/predict/ocr.py
class OcrPredictor(Predictor):
    """A class that calls your OCR inference endpoint on the LandingLens platform."""

    _url: str = "https://app.landing.ai/ocr/v1/detect-text"

    def __init__(
        self,
        threshold: float = 0.5,
        *,
        language: Literal["en", "ch"] = "ch",
        api_key: Optional[str] = None,
    ) -> None:
        """OCR Predictor constructor

        Parameters
        ----------
        threshold:
            The minimum confidence threshold of the prediction to keep, by default 0.5
        api_key
            The API Key of your LandingLens organization.
            If not provided, it will try to load from the environment variable
            LANDINGAI_API_KEY or from the .env file.
        language:
            Specifies the character set to use. Can either be `"en"` for English
            or `"ch"` for Chinese and English (default).
        """
        self._threshold = threshold
        self._language = language
        self._api_credential = load_api_credential(api_key)
        extra_x_event = {
            "model_type": "ocr",
        }
        headers = self._build_default_headers(self._api_credential, extra_x_event)
        self._session = create_requests_session(self._url, self._num_retry, headers)

    @retry(
        # All customers have a quota of images per minute. If the server return a 429, then we will wait 60 seconds and retry
        retry=retry_if_exception_type(RateLimitExceededError),
        wait=wait_fixed(60),
        before_sleep=before_sleep_log(_LOGGER, logging.WARNING),
    )
    @Timer(name="OcrPredictor.predict")
    def predict(  # type: ignore
        self, image: Union[np.ndarray, PIL.Image.Image], **kwargs: Any
    ) -> List[Prediction]:
        """Run OCR on the input image and return the prediction result.

        Parameters
        ----------
        image:
            The input image to be predicted
        mode:
            The mode of this prediction. It can be either "multi-text" (default) or "single-text".
            In "multi-text" mode, the predictor will detect multiple lines of text in the image.
            In "single-text" mode, the predictor will detect a single line of text in the image.
        regions_of_interest:
            A list of region of interest boxes/quadrilateral. Each quadrilateral is a list of 4 points (x, y).
            In "single-text" mode, the caller must provide a list of quadrilateral(s) that cover the text in the image.
            Each quadrilateral is a list of 4 points (x, y), and it should cover a single line of text in the image.
            In "multi-text" mode, regions_of_interest is not required. If it is None, the whole image will be used as the region of interest.

        Returns
        -------
        List[OcrPrediction]
            A list of OCR prediction result.
        """

        buffer_bytes = serialize_image(image)
        files = {"images": buffer_bytes}
        mode: str = kwargs.get("mode", "multi-text")
        if mode not in ["multi-text", "single-text"]:
            raise ValueError(
                f"mode must be either 'multi-text' or 'single-text', but got: {mode}"
            )
        if mode == "single-text" and "regions_of_interest" not in kwargs:
            raise ValueError(
                "regions_of_interest parameter must be provided in single-text mode."
            )
        data: Dict[str, Any]
        data = {"language": self._language}
        if rois := kwargs.get("regions_of_interest", []):
            data["rois"] = serialize_rois(rois, mode)

        (preds, self._performance_metrics) = get_cloudinference_prediction(
            self._session,
            self._url,
            files,
            {},
            _OcrExtractor,
            data=data,
        )
        return [pred for pred in preds if pred.score >= self._threshold]

__init__(threshold=0.5, *, language='ch', api_key=None)

OCR Predictor constructor

Parameters

threshold: The minimum confidence threshold of the prediction to keep, by default 0.5 api_key The API Key of your LandingLens organization. If not provided, it will try to load from the environment variable LANDINGAI_API_KEY or from the .env file. language: Specifies the character set to use. Can either be "en" for English or "ch" for Chinese and English (default).

Source code in landingai/predict/ocr.py
def __init__(
    self,
    threshold: float = 0.5,
    *,
    language: Literal["en", "ch"] = "ch",
    api_key: Optional[str] = None,
) -> None:
    """OCR Predictor constructor

    Parameters
    ----------
    threshold:
        The minimum confidence threshold of the prediction to keep, by default 0.5
    api_key
        The API Key of your LandingLens organization.
        If not provided, it will try to load from the environment variable
        LANDINGAI_API_KEY or from the .env file.
    language:
        Specifies the character set to use. Can either be `"en"` for English
        or `"ch"` for Chinese and English (default).
    """
    self._threshold = threshold
    self._language = language
    self._api_credential = load_api_credential(api_key)
    extra_x_event = {
        "model_type": "ocr",
    }
    headers = self._build_default_headers(self._api_credential, extra_x_event)
    self._session = create_requests_session(self._url, self._num_retry, headers)

predict(image, **kwargs)

Run OCR on the input image and return the prediction result.

Parameters

image: The input image to be predicted mode: The mode of this prediction. It can be either "multi-text" (default) or "single-text". In "multi-text" mode, the predictor will detect multiple lines of text in the image. In "single-text" mode, the predictor will detect a single line of text in the image. regions_of_interest: A list of region of interest boxes/quadrilateral. Each quadrilateral is a list of 4 points (x, y). In "single-text" mode, the caller must provide a list of quadrilateral(s) that cover the text in the image. Each quadrilateral is a list of 4 points (x, y), and it should cover a single line of text in the image. In "multi-text" mode, regions_of_interest is not required. If it is None, the whole image will be used as the region of interest.

Returns

List[OcrPrediction] A list of OCR prediction result.

Source code in landingai/predict/ocr.py
@retry(
    # All customers have a quota of images per minute. If the server return a 429, then we will wait 60 seconds and retry
    retry=retry_if_exception_type(RateLimitExceededError),
    wait=wait_fixed(60),
    before_sleep=before_sleep_log(_LOGGER, logging.WARNING),
)
@Timer(name="OcrPredictor.predict")
def predict(  # type: ignore
    self, image: Union[np.ndarray, PIL.Image.Image], **kwargs: Any
) -> List[Prediction]:
    """Run OCR on the input image and return the prediction result.

    Parameters
    ----------
    image:
        The input image to be predicted
    mode:
        The mode of this prediction. It can be either "multi-text" (default) or "single-text".
        In "multi-text" mode, the predictor will detect multiple lines of text in the image.
        In "single-text" mode, the predictor will detect a single line of text in the image.
    regions_of_interest:
        A list of region of interest boxes/quadrilateral. Each quadrilateral is a list of 4 points (x, y).
        In "single-text" mode, the caller must provide a list of quadrilateral(s) that cover the text in the image.
        Each quadrilateral is a list of 4 points (x, y), and it should cover a single line of text in the image.
        In "multi-text" mode, regions_of_interest is not required. If it is None, the whole image will be used as the region of interest.

    Returns
    -------
    List[OcrPrediction]
        A list of OCR prediction result.
    """

    buffer_bytes = serialize_image(image)
    files = {"images": buffer_bytes}
    mode: str = kwargs.get("mode", "multi-text")
    if mode not in ["multi-text", "single-text"]:
        raise ValueError(
            f"mode must be either 'multi-text' or 'single-text', but got: {mode}"
        )
    if mode == "single-text" and "regions_of_interest" not in kwargs:
        raise ValueError(
            "regions_of_interest parameter must be provided in single-text mode."
        )
    data: Dict[str, Any]
    data = {"language": self._language}
    if rois := kwargs.get("regions_of_interest", []):
        data["rois"] = serialize_rois(rois, mode)

    (preds, self._performance_metrics) = get_cloudinference_prediction(
        self._session,
        self._url,
        files,
        {},
        _OcrExtractor,
        data=data,
    )
    return [pred for pred in preds if pred.score >= self._threshold]

Snowflake-specific adapters and helpers

SnowflakeNativeAppPredictor

Bases: Predictor

Snowflake Native App Predictor, which is basically a regular cloud predictor with a different auth mechanism.

Source code in landingai/predict/snowflake.py
class SnowflakeNativeAppPredictor(Predictor):
    """Snowflake Native App Predictor, which is basically a regular cloud predictor with a different auth mechanism."""

    # For how long can we reuse the auth token before having to fetch a new one
    AUTH_TOKEN_MAX_AGE = datetime.timedelta(minutes=5)

    def __init__(
        self,
        endpoint_id: str,
        *,
        snowflake_account: str,
        snowflake_user: str,
        snowflake_password: Optional[str] = None,
        snowflake_private_key: Optional[str] = None,
        snowflake_authenticator: Optional[str] = None,
        native_app_url: str,
        # TODO: Remove this once we remove the API key auth from snowflake
        api_key: Optional[str] = None,
        check_server_ready: bool = True,
    ) -> None:
        assert (
            snowflake_password is not None or snowflake_private_key is not None
        ), "You must provide either `snowflake_password` or `snowflake_public_key`."
        super().__init__(
            endpoint_id, api_key=api_key, check_server_ready=check_server_ready
        )
        self._url = urljoin(native_app_url, "/inference/v1/predict")
        self.snowflake_account = snowflake_account
        self.snowflake_user = snowflake_user
        self.snowflake_password = snowflake_password
        self.snowflake_private_key = snowflake_private_key
        self.snowflake_authenticator = snowflake_authenticator

        self._auth_token = None
        self._last_auth_token_fetch: Optional[datetime.datetime] = None

    def _load_api_credential(self, api_key: Optional[str]) -> Optional[APIKey]:
        # Snowflake Native App does not use API Key, so we ignore it.
        # Once we remove the API key auth from snowflake, we can always return None here.
        if api_key is None:
            return None
        return super()._load_api_credential(api_key)

    def _get_auth_token(self) -> str:
        try:
            import snowflake.connector  # type: ignore
            from cryptography.hazmat.backends import default_backend
            from cryptography.hazmat.primitives import serialization
        except ImportError:
            raise ImportError(
                "In order to use snowflake.NativeAppPredictor, you must install snowflake optionals. "
                "Please, run: pip install landingai[snowflake]"
            )

        # Reuse the token if it's not too old
        if self._auth_token is not None and (
            datetime.datetime.now() - self._last_auth_token_fetch
            < self.AUTH_TOKEN_MAX_AGE
        ):
            return self._auth_token
        connect_params: Dict[str, Any] = dict(
            user=self.snowflake_user,
            account=self.snowflake_account,
            session_parameters={"PYTHON_CONNECTOR_QUERY_RESULT_FORMAT": "json"},
        )
        if self.snowflake_password is not None:
            connect_params["password"] = self.snowflake_password
        if self.snowflake_private_key is not None:
            p_key = serialization.load_pem_private_key(
                self.snowflake_private_key.encode("ascii"),
                password=None,
                backend=default_backend(),
            )
            connect_params["private_key"] = p_key.private_bytes(
                encoding=serialization.Encoding.DER,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=serialization.NoEncryption(),
            )
        if self.snowflake_authenticator is not None:
            connect_params["authenticator"] = self.snowflake_authenticator

        ctx = snowflake.connector.connect(**connect_params)
        ctx._all_async_queries_finished = lambda: False  # type: ignore
        token_data = ctx._rest._token_request("ISSUE")  # type: ignore
        self._auth_token = token_data["data"]["sessionToken"]
        self._last_auth_token_fetch = datetime.datetime.now()
        return cast(str, self._auth_token)

    @property
    def _session(self) -> Session:
        extra_x_event = {
            "endpoint_id": self._endpoint_id,
            "model_type": "fast_and_easy",
        }
        headers = self._build_default_headers(self._api_credential, extra_x_event)
        headers["Authorization"] = f'Snowflake Token="{self._get_auth_token()}"'

        return create_requests_session(
            url=self._url,
            num_retry=self._num_retry,
            headers=headers,
        )

    @_session.setter
    def _session(self, value: Session) -> None:
        """Ignore setting the session. We always create a new session when needed."""
        pass

Module for making predictions on LandingLens models.

PredictionExtractor

The base class for all extractors. This is useful for type checking.

Source code in landingai/predict/utils.py
class PredictionExtractor:
    """The base class for all extractors. This is useful for type checking."""

    @staticmethod
    def extract_prediction(response: Any) -> List[Prediction]:
        raise NotImplementedError()

create_requests_session(url, num_retry, headers)

Create a requests session with retry

Source code in landingai/predict/utils.py
def create_requests_session(
    url: str, num_retry: int, headers: Dict[str, str]
) -> Session:
    """Create a requests session with retry"""
    session = Session()
    retries = Retry(
        # TODO: make them configurable
        # The 5XX retry scheme needs to account for the circuit breaker which will shutdown a service for 10 seconds
        total=num_retry,  # Defaults to 3
        backoff_factor=7,  # This the amount of seconds to wait on the second retry (i.e. 0, 7, 21). First retry is immediate.
        raise_on_redirect=True,
        raise_on_status=False,  # We are already raising exceptions during backend invocations
        allowed_methods=["GET", "POST", "PUT"],
        status_forcelist=[
            # 408 Request Timeout , 413 Content Too Large
            # 429,  # Too Many Requests  (ie. rate limiter). This is handled externally
            # 500 Internal Server Error -> We don't retry here since it tends to reflect determinist software bugs
            502,  # Bad Gateway
            503,  # Service Unavailable (include cloud circuit breaker)
            504,  # Gateway Timeout
        ],
    )
    session.mount(
        url, HTTPAdapter(max_retries=retries if num_retry > 0 else num_retry)
    )  # Since POST is not idempotent we will ony retry on the this specific API
    session.headers.update(headers)
    return session

get_cloudinference_prediction(session, endpoint_url, files, params, extractor_class, *, data=None)

Call the inference endpoint and extract the prediction result.

Source code in landingai/predict/utils.py
@Timer(name="_do_inference", log_fn=_LOGGER.debug)
def get_cloudinference_prediction(
    session: Session,
    endpoint_url: str,
    files: Dict[str, Any],
    params: Dict[str, Any],
    extractor_class: Type[PredictionExtractor],
    *,
    data: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Prediction], Dict[str, int]]:
    """Call the inference endpoint and extract the prediction result."""
    try:
        resp = session.post(endpoint_url, files=files, params=params, data=data)
    except requests.exceptions.ConnectionError as e:
        raise ConnectionError(
            f"Failed to connect to the model server. Please double check the model server url ({endpoint_url}) is correct.\nException detail: {e}"
        ) from e
    response = HttpResponse.from_response(resp)
    _LOGGER.debug("Response: %s", response)
    response.raise_for_status()
    json_dict = response.json()
    # OCR response is a list of list of predictions
    if isinstance(json_dict, list):
        return (extractor_class.extract_prediction(json_dict), {})
    # Save performance metrics for debugging
    performance_metrics = json_dict.get("latency", {})
    return (extractor_class.extract_prediction(json_dict), performance_metrics)

serialize_rois(rois, mode)

Serialize the regions of interest into a JSON string.

Source code in landingai/predict/utils.py
def serialize_rois(rois: List[List[Tuple[int, int]]], mode: str) -> str:
    """Serialize the regions of interest into a JSON string."""
    rois_payload = [
        {
            "location": [{"x": coord[0], "y": coord[1]} for coord in roi],
            "mode": mode,
        }
        for roi in rois
    ]
    return json.dumps([rois_payload])