Skip to content

landingai.storage

Third-party storage integrations.

download_file(url, file_output_path=None)

Download a file from a public url. This function will follow HTTP redirects

Parameters

url : str Source url file_output_path : Optional[Path], optional The local output file path for the downloaded file. If no path is provided, the file will be saved into a temporary directory provided by the OS (which could get deleted after reboot), and when possible the extension of the downloaded file will be included in the output file path.

Returns

Path Path to the downloaded file

Source code in landingai/storage/data_access.py
def download_file(
    url: str,
    file_output_path: Optional[Path] = None,
) -> str:
    """Download a file from a public url. This function will follow HTTP redirects

    Parameters
    ----------
    url : str
        Source url
    file_output_path : Optional[Path], optional
        The local output file path for the downloaded file. If no path is provided, the file will be saved into a temporary directory provided by the OS (which could get deleted after reboot), and when possible the extension of the downloaded file will be included in the output file path.

    Returns
    -------
    Path
        Path to the downloaded file
    """
    # TODO: It would be nice for this function to not re-download if the src has not been updated
    ret = read_file(url)  # Fetch the file
    if file_output_path is not None:
        with open(str(file_output_path), "wb") as f:  # type: Any
            f.write(ret["content"])

    else:
        suffix = ""
        if "filename" in ret:
            # use filename provided by server
            suffix = "--" + str(ret["filename"])
        else:
            # try to get the name from the URL
            r = urlparse(url)
            suffix = "--" + os.path.basename(unquote(r.path))
        with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as f:
            f.write(ret["content"])
    return f.name  # type: ignore

fetch_from_uri(uri, **kwargs)

Check if the URI is local and fetch it if it is not

Parameters

uri : str Supported URIs - local paths - file:// - http:// - https://

Returns

Path Path to a local resource

Source code in landingai/storage/data_access.py
def fetch_from_uri(uri: str, **kwargs) -> Path:  # type: ignore
    """Check if the URI is local and fetch it if it is not

    Parameters
    ----------
    uri : str
        Supported URIs
        - local paths
        - file://
        - http://
        - https://


    Returns
    -------
    Path
        Path to a local resource
    """
    # TODO support other URIs
    # snowflake://stage/filename  (credentials will be passed on kwargs)
    r = urlparse(uri)
    # Match local unix and windows paths (e.g. C:\)
    if r.scheme == "" or r.scheme == "file" or len(r.scheme) == 1:
        # The file is already local
        return Path(uri)
    if r.scheme == "http" or r.scheme == "https":
        # Fetch the file from the web
        return Path(download_file(uri))
    raise ValueError(f"URI not supported {uri}")

read_file(url)

Read bytes from a URL. Typically, the URL is a presigned URL (for example, from Amazon S3 or Snowflake) that points to a video or image file. Returns


Dict[str, Any] Returns the content under "content". Optionally may return "filename" in case the server provided it.

Source code in landingai/storage/data_access.py
def read_file(url: str) -> Dict[str, Any]:
    """Read bytes from a URL.
    Typically, the URL is a presigned URL (for example, from Amazon S3 or Snowflake) that points to a video or image file.
    Returns
    -------
    Dict[str, Any]
        Returns the content under "content". Optionally may return "filename" in case the server provided it.
    """
    response = requests.get(url, allow_redirects=True)  # True is the default behavior
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        reason = f"{e.response.text} (status code: {e.response.status_code})"
        msg_prefix = f"Failed to read from url ({url}) due to {reason}"
        if response.status_code == 403:
            error_msg = f"{msg_prefix}. Please double check the url is not expired and it's well-formed."
            raise ValueError(error_msg) from e
        elif response.status_code == 404:
            raise FileNotFoundError(
                f"{msg_prefix}. Please double check the file exists and the url is well-formed."
            ) from e
        else:
            error_msg = f"{msg_prefix}. Please try again later or reach out to us via our LandingAI platform."
            raise ValueError(error_msg) from e
    if response.status_code >= 300:
        raise ValueError(
            f"Failed to read from url ({url}) due to {response.text} (status code: {response.status_code})"
        )
    ret = {"content": response.content}
    # Check if server returned the file name
    if "content-disposition" in response.headers:
        m = re.findall(
            "filename=[\"']*([^;\"']+)", response.headers["content-disposition"]
        )
        if len(m):  # if there is a match select the first one
            ret["filename"] = m[0]
    _LOGGER.info(
        f"Received content with length {len(response.content)}, type {response.headers.get('Content-Type')}"
        # and filename "+ str(ret["filename"])
    )

    return ret

SnowflakeCredential

Bases: BaseSettings

Snowflake API credential. It's used to connect to Snowflake. It supports loading from environment variables or .env files.

The supported name of the environment variables are (case-insensitive): - SNOWFLAKE_USER - SNOWFLAKE_PASSWORD - SNOWFLAKE_ACCOUNT

Environment variables will always take priority over values loaded from a dotenv file.

Source code in landingai/storage/snowflake.py
class SnowflakeCredential(BaseSettings):
    """Snowflake API credential. It's used to connect to Snowflake.
    It supports loading from environment variables or .env files.

    The supported name of the environment variables are (case-insensitive):
    - SNOWFLAKE_USER
    - SNOWFLAKE_PASSWORD
    - SNOWFLAKE_ACCOUNT

    Environment variables will always take priority over values loaded from a dotenv file.
    """

    user: str
    password: str
    account: str

    class Config:
        env_file = ".env"
        env_prefix = "SNOWFLAKE_"
        case_sensitive = False

SnowflakeDBConfig

Bases: BaseSettings

Snowflake connection config. It supports loading from environment variables or .env files.

The supported name of the environment variables are (case-insensitive): - SNOWFLAKE_WAREHOUSE - SNOWFLAKE_DATABASE - SNOWFLAKE_SCHEMA

Environment variables will always take priority over values loaded from a dotenv file.

Source code in landingai/storage/snowflake.py
class SnowflakeDBConfig(BaseSettings):
    """Snowflake connection config.
    It supports loading from environment variables or .env files.

    The supported name of the environment variables are (case-insensitive):
    - SNOWFLAKE_WAREHOUSE
    - SNOWFLAKE_DATABASE
    - SNOWFLAKE_SCHEMA

    Environment variables will always take priority over values loaded from a dotenv file.
    """

    warehouse: str
    database: str
    # NOTE: the name "schema" is reserved by pydantic, so we use "snowflake_schema" instead.
    snowflake_schema: str = Field(..., env="SNOWFLAKE_SCHEMA")

    class Config:
        env_file = ".env"
        env_prefix = "SNOWFLAKE_"
        case_sensitive = False

get_snowflake_presigned_url(remote_filename, stage_name, *, credential=None, connection_config=None)

Get a presigned URL for a file stored in Snowflake. NOTE: Snowflake returns a valid URL even if the file doesn't exist. So the downstream needs to check if the file exists first.

Source code in landingai/storage/snowflake.py
def get_snowflake_presigned_url(
    remote_filename: str,
    stage_name: str,
    *,
    credential: Optional[SnowflakeCredential] = None,
    connection_config: Optional[SnowflakeDBConfig] = None,
) -> str:
    """Get a presigned URL for a file stored in Snowflake.
    NOTE: Snowflake returns a valid URL even if the file doesn't exist.
          So the downstream needs to check if the file exists first.
    """
    import snowflake.connector  # type: ignore

    if credential is None:
        credential = SnowflakeCredential()
    if connection_config is None:
        connection_config = SnowflakeDBConfig()

    ctx = snowflake.connector.connect(
        user=credential.user,
        password=credential.password,
        account=credential.account,
        warehouse=connection_config.warehouse,
        database=connection_config.database,
        schema=connection_config.snowflake_schema,
    )
    cur = ctx.cursor()
    exec_res = cur.execute(f"LIST @{stage_name}")
    if exec_res is None:
        raise ValueError(f"Failed to list files in stage: {stage_name}")
    files = exec_res.fetchall()
    _LOGGER.debug(f"Files in stage {stage_name}: {files}")
    exec_res = cur.execute(
        f"SELECT get_presigned_url(@{stage_name}, '{remote_filename}') as url"
    )
    if exec_res is None:
        raise ValueError(
            f"Failed to get presigned url for file: {remote_filename} in stage: {stage_name}"
        )
    result = exec_res.fetchall()
    if len(result) == 0 or len(result[0]) == 0:
        raise FileNotFoundError(
            f"File ({remote_filename}) not found in stage {stage_name}. Please double check the file exists in the expected location, stage: {stage_name}, db config: {connection_config}."
        )
    result_url: str = result[0][0]
    _LOGGER.info(f"Result url: {result_url}")
    return result_url

save_remote_file_to_local(remote_filename, stage_name, *, local_output=None, credential=None, connection_config=None)

Save a file stored in Snowflake to local disk. If local_output is not provided, a temporary directory will be created and used. If credential or connection_config is not provided, it will read from environment variable or .env file instead.

Source code in landingai/storage/snowflake.py
def save_remote_file_to_local(
    remote_filename: str,
    stage_name: str,
    *,
    local_output: Optional[Path] = None,
    credential: Optional[SnowflakeCredential] = None,
    connection_config: Optional[SnowflakeDBConfig] = None,
) -> Path:
    """Save a file stored in Snowflake to local disk.
    If local_output is not provided, a temporary directory will be created and used.
    If credential or connection_config is not provided, it will read from environment variable or .env file instead.
    """
    url = get_snowflake_presigned_url(
        remote_filename,
        stage_name,
        credential=credential,
        connection_config=connection_config,
    )
    if local_output is None:
        local_output = Path(tempfile.mkdtemp())
    file_path = local_output / remote_filename
    file_path.parent.mkdir(parents=True, exist_ok=True)
    download_file(url, file_output_path=file_path)
    _LOGGER.info(f"Saved file {remote_filename} to {file_path}")
    return file_path