Skip to content

landingai.pipeline

The vision pipeline abstraction helps chain image processing operations as sequence of steps. Each step consumes and produces a FrameSet which typically contains a source image and derivative metadata and images.

The vision pipeline abstraction helps chain image processing operations as sequence of steps. Each step consumes and produces a FrameSet which typically contains a source image and derivative metadata and images.

Frame

Bases: BaseModel

A Frame stores a main image, its metadata and potentially other derived images.

Source code in landingai/pipeline/frameset.py
class Frame(BaseModel):
    """A Frame stores a main image, its metadata and potentially other derived images."""

    image: Image.Image
    """Main image generated typically at the beginning of a pipeline"""

    other_images: Dict[str, Image.Image] = {}
    """Other derivative images associated with this the main image. For example: `FrameSet.overlay_predictions` will store the resulting image on `Frame.other_images["overlay"]"""

    predictions: PredictionList = PredictionList([])
    """List of predictions for the main image"""

    metadata: Dict[str, Any] = {}
    """An optional collection of metadata"""

    @property
    def frames(self) -> List["Frame"]:
        """Returns a list with a single frame"""
        warnings.warn(
            "frame.frames[x].<method> is deprecated and will be removed in future versions. "
            "Use frame.<method> instead"
        )
        return [self]

    @classmethod
    def from_image(cls, uri: str, metadata: Optional[Dict[str, Any]] = {}) -> "Frame":
        """Creates a Frame from an image file

        Parameters
        ----------
        uri : URI to file (local or remote)

        Returns
        -------
        Frame : New Frame enclosing the image
        """

        image = Image.open(str(fetch_from_uri(uri)))
        return cls(image=image, metadata=metadata)

    @classmethod
    def from_array(cls, array: np.ndarray, is_bgr: bool = True) -> "Frame":
        """Creates a Frame from a image encode as ndarray

        Parameters
        ----------
        array : np.ndarray
            Image
        is_bgr : bool, optional
            Assume OpenCV's BGR channel ordering? Defaults to True

        Returns
        -------
        Frame
        """
        # TODO: Make is_bgr an enum and support grayscale, rgba (what can PIL autodetect?)
        if is_bgr:
            array = cv2.cvtColor(array, cv2.COLOR_BGR2RGB)
        im = Image.fromarray(array)
        return cls(image=im)

    def run_predict(
        self, predictor: Predictor, reuse_session: bool = True, **kwargs: Any
    ) -> "Frame":
        """Run a cloud inference model
        Parameters
        ----------
        predictor: the model to be invoked.
        reuse_session
            Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False.
        kwargs: keyword arguments to forward to `predictor`.
        """
        self.predictions = PredictionList(
            predictor.predict(self.image, reuse_session=reuse_session, **kwargs)
        )
        return self

    def overlay_predictions(self, options: Optional[Dict[str, Any]] = None) -> "Frame":
        self.other_images["overlay"] = overlay_predictions(
            cast(List[Prediction], self.predictions), self.image, options
        )
        return self

    def crop_predictions(self) -> "FrameSet":
        """Crops from this frame regions with predictions and returns a FrameSet with the the cropped Frames"""
        pred_frames = []
        for pred in self.predictions:
            bounding_box = get_prediction_bounding_box(pred)
            if bounding_box is None:
                continue
            new_frame = self.copy()
            new_frame.predictions = PredictionList([pred])
            new_frame.crop(bounding_box)
            pred_frames.append(new_frame)
        return FrameSet(frames=pred_frames)

    def to_numpy_array(
        self,
        image_src: str = "",
        *,
        include_predictions: bool = False,
    ) -> np.ndarray:
        """Return a numpy array using RGB channel ordering. If this array is passed to OpenCV, you will need to convert it to BGR

        Parameters
        ----------
        image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from `other_images`
        include_predictions: If the image has predictions, should it be overlaid on top of the image?
        """
        if image_src:
            warnings.warn(
                "image_src keyword on Frame.to_numpy_array is deprecated. Use include_predictions instead."
            )
            if image_src == "overlay":
                include_predictions = True
        if include_predictions:
            image_src = "overlay"
        img = (
            self.image
            if image_src == "" or image_src not in self.other_images
            else self.other_images[image_src]
        )
        return np.asarray(img)

    def show_image(
        self,
        image_src: str = "",
        clear_nb_cell: bool = False,
        *,
        include_predictions: bool = False,
    ) -> "Frame":
        """Open a window and display all the images.
        Parameters
        ----------
        image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from `other_images`
        include_predictions: If the image has predictions, should it be overlaid on top of the image?
        """
        if image_src:
            warnings.warn(
                "image_src keyword on Frame.show_image is deprecated. Use include_predictions instead."
            )
            if image_src == "overlay":
                include_predictions = True
        if include_predictions:
            image_src = "overlay"
        # TODO: Should show be a end leaf?
        # Check if we are on a notebook context
        if is_running_in_notebook():
            from IPython import display

            if clear_nb_cell:
                display.clear_output(wait=True)
            if image_src == "":
                display.display(self.image)
            else:
                display.display(self.other_images[image_src])
        else:
            # Use PIL's implementation
            if image_src == "":
                self.image.show()
            else:
                self.other_images[image_src].show()
        return self

    def save_image(
        self, path: str, format: str = "png", *, include_predictions: bool = False
    ) -> None:
        """Save the image to path

        Parameters
        ----------
        path: File path for the output image
        format: File format for the output image. Defaults to "png"
        include_predictions: If the image has predictions, should it be overlaid on top of the image?
        """
        if include_predictions:
            img = self.other_images["overlay"]
        else:
            img = self.image
        img.save(path, format=format.upper())

    def resize(
        self, width: Optional[int] = None, height: Optional[int] = None
    ) -> "Frame":
        """Resizes the frame to the given dimensions. If width or height is missing the resize will preserve the aspect ratio.
        Parameters
        ----------
        width: The requested width in pixels.
        height: The requested width in pixels.
        """
        if width is None and height is None:  # No resize needed
            return self

        if width is None:
            width = int(height * float(self.image.size[0] / self.image.size[1]))  # type: ignore
        if height is None:
            height = int(width * float(self.image.size[1] / self.image.size[0]))
        self.image = self.image.resize((width, height))
        return self

    def downsize(
        self, width: Optional[int] = None, height: Optional[int] = None
    ) -> "Frame":
        """Resize only if the image is larger than the expected dimensions,
        Parameters
        ----------
        width: The requested width in pixels.
        height: The requested width in pixels.
        """
        if width is None and height is None:  # No resize needed
            return self
        # Compute the final dimensions on the first image
        if width is None:
            width = int(height * float(self.image.size[0] / self.image.size[1]))  # type: ignore
        if height is None:
            height = int(width * float(self.image.size[1] / self.image.size[0]))
        if self.image.size[0] > width or self.image.size[1] > height:
            self.image = self.image.resize((width, height))
        return self

    def crop(self, bbox: BoundingBox) -> "Frame":
        """Crop the image based on the bounding box

        Parameters
        ----------
        bbox: A tuple with the bounding box coordinates (xmin, ymin, xmax, ymax)
        """
        self.image = self.image.crop(bbox)
        return self

    def adjust_sharpness(self, factor: float) -> "Frame":
        """Adjust the sharpness of the image

        Parameters
        ----------
        factor: The enhancement factor
        """
        return self._apply_enhancement(ImageEnhance.Sharpness, factor)

    def adjust_brightness(self, factor: float) -> "Frame":
        """Adjust the brightness of the image

        Parameters
        ----------
        factor: The enhancement factor
        """
        return self._apply_enhancement(ImageEnhance.Brightness, factor)

    def adjust_contrast(self, factor: float) -> "Frame":
        """Adjust the contrast of the image

        Parameters
        ----------
        factor: The enhancement factor
        """
        return self._apply_enhancement(ImageEnhance.Contrast, factor)

    def adjust_color(self, factor: float) -> "Frame":
        """Adjust the color of the image

        Parameters
        ----------
        factor: The enhancement factor
        """
        return self._apply_enhancement(ImageEnhance.Color, factor)

    def _apply_enhancement(
        self, enhancement: Type[ImageEnhance._Enhance], factor: float
    ) -> "Frame":
        enhancer = enhancement(self.image)  # type: ignore
        self.image = enhancer.enhance(factor)
        return self

    class Config:
        arbitrary_types_allowed = True

frames: List[Frame] property

Returns a list with a single frame

image: Image.Image instance-attribute

Main image generated typically at the beginning of a pipeline

metadata: Dict[str, Any] = {} class-attribute instance-attribute

An optional collection of metadata

other_images: Dict[str, Image.Image] = {} class-attribute instance-attribute

Other derivative images associated with this the main image. For example: FrameSet.overlay_predictions will store the resulting image on `Frame.other_images["overlay"]

predictions: PredictionList = PredictionList([]) class-attribute instance-attribute

List of predictions for the main image

adjust_brightness(factor)

Adjust the brightness of the image

Parameters

factor: The enhancement factor

Source code in landingai/pipeline/frameset.py
def adjust_brightness(self, factor: float) -> "Frame":
    """Adjust the brightness of the image

    Parameters
    ----------
    factor: The enhancement factor
    """
    return self._apply_enhancement(ImageEnhance.Brightness, factor)

adjust_color(factor)

Adjust the color of the image

Parameters

factor: The enhancement factor

Source code in landingai/pipeline/frameset.py
def adjust_color(self, factor: float) -> "Frame":
    """Adjust the color of the image

    Parameters
    ----------
    factor: The enhancement factor
    """
    return self._apply_enhancement(ImageEnhance.Color, factor)

adjust_contrast(factor)

Adjust the contrast of the image

Parameters

factor: The enhancement factor

Source code in landingai/pipeline/frameset.py
def adjust_contrast(self, factor: float) -> "Frame":
    """Adjust the contrast of the image

    Parameters
    ----------
    factor: The enhancement factor
    """
    return self._apply_enhancement(ImageEnhance.Contrast, factor)

adjust_sharpness(factor)

Adjust the sharpness of the image

Parameters

factor: The enhancement factor

Source code in landingai/pipeline/frameset.py
def adjust_sharpness(self, factor: float) -> "Frame":
    """Adjust the sharpness of the image

    Parameters
    ----------
    factor: The enhancement factor
    """
    return self._apply_enhancement(ImageEnhance.Sharpness, factor)

crop(bbox)

Crop the image based on the bounding box

Parameters

bbox: A tuple with the bounding box coordinates (xmin, ymin, xmax, ymax)

Source code in landingai/pipeline/frameset.py
def crop(self, bbox: BoundingBox) -> "Frame":
    """Crop the image based on the bounding box

    Parameters
    ----------
    bbox: A tuple with the bounding box coordinates (xmin, ymin, xmax, ymax)
    """
    self.image = self.image.crop(bbox)
    return self

crop_predictions()

Crops from this frame regions with predictions and returns a FrameSet with the the cropped Frames

Source code in landingai/pipeline/frameset.py
def crop_predictions(self) -> "FrameSet":
    """Crops from this frame regions with predictions and returns a FrameSet with the the cropped Frames"""
    pred_frames = []
    for pred in self.predictions:
        bounding_box = get_prediction_bounding_box(pred)
        if bounding_box is None:
            continue
        new_frame = self.copy()
        new_frame.predictions = PredictionList([pred])
        new_frame.crop(bounding_box)
        pred_frames.append(new_frame)
    return FrameSet(frames=pred_frames)

downsize(width=None, height=None)

Resize only if the image is larger than the expected dimensions, Parameters


width: The requested width in pixels. height: The requested width in pixels.

Source code in landingai/pipeline/frameset.py
def downsize(
    self, width: Optional[int] = None, height: Optional[int] = None
) -> "Frame":
    """Resize only if the image is larger than the expected dimensions,
    Parameters
    ----------
    width: The requested width in pixels.
    height: The requested width in pixels.
    """
    if width is None and height is None:  # No resize needed
        return self
    # Compute the final dimensions on the first image
    if width is None:
        width = int(height * float(self.image.size[0] / self.image.size[1]))  # type: ignore
    if height is None:
        height = int(width * float(self.image.size[1] / self.image.size[0]))
    if self.image.size[0] > width or self.image.size[1] > height:
        self.image = self.image.resize((width, height))
    return self

from_array(array, is_bgr=True) classmethod

Creates a Frame from a image encode as ndarray

Parameters

array : np.ndarray Image is_bgr : bool, optional Assume OpenCV's BGR channel ordering? Defaults to True

Returns

Frame

Source code in landingai/pipeline/frameset.py
@classmethod
def from_array(cls, array: np.ndarray, is_bgr: bool = True) -> "Frame":
    """Creates a Frame from a image encode as ndarray

    Parameters
    ----------
    array : np.ndarray
        Image
    is_bgr : bool, optional
        Assume OpenCV's BGR channel ordering? Defaults to True

    Returns
    -------
    Frame
    """
    # TODO: Make is_bgr an enum and support grayscale, rgba (what can PIL autodetect?)
    if is_bgr:
        array = cv2.cvtColor(array, cv2.COLOR_BGR2RGB)
    im = Image.fromarray(array)
    return cls(image=im)

from_image(uri, metadata={}) classmethod

Creates a Frame from an image file

Parameters

uri : URI to file (local or remote)

Returns

Frame : New Frame enclosing the image

Source code in landingai/pipeline/frameset.py
@classmethod
def from_image(cls, uri: str, metadata: Optional[Dict[str, Any]] = {}) -> "Frame":
    """Creates a Frame from an image file

    Parameters
    ----------
    uri : URI to file (local or remote)

    Returns
    -------
    Frame : New Frame enclosing the image
    """

    image = Image.open(str(fetch_from_uri(uri)))
    return cls(image=image, metadata=metadata)

resize(width=None, height=None)

Resizes the frame to the given dimensions. If width or height is missing the resize will preserve the aspect ratio. Parameters


width: The requested width in pixels. height: The requested width in pixels.

Source code in landingai/pipeline/frameset.py
def resize(
    self, width: Optional[int] = None, height: Optional[int] = None
) -> "Frame":
    """Resizes the frame to the given dimensions. If width or height is missing the resize will preserve the aspect ratio.
    Parameters
    ----------
    width: The requested width in pixels.
    height: The requested width in pixels.
    """
    if width is None and height is None:  # No resize needed
        return self

    if width is None:
        width = int(height * float(self.image.size[0] / self.image.size[1]))  # type: ignore
    if height is None:
        height = int(width * float(self.image.size[1] / self.image.size[0]))
    self.image = self.image.resize((width, height))
    return self

run_predict(predictor, reuse_session=True, **kwargs)

Run a cloud inference model Parameters


predictor: the model to be invoked. reuse_session Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False. kwargs: keyword arguments to forward to predictor.

Source code in landingai/pipeline/frameset.py
def run_predict(
    self, predictor: Predictor, reuse_session: bool = True, **kwargs: Any
) -> "Frame":
    """Run a cloud inference model
    Parameters
    ----------
    predictor: the model to be invoked.
    reuse_session
        Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False.
    kwargs: keyword arguments to forward to `predictor`.
    """
    self.predictions = PredictionList(
        predictor.predict(self.image, reuse_session=reuse_session, **kwargs)
    )
    return self

save_image(path, format='png', *, include_predictions=False)

Save the image to path

Parameters

path: File path for the output image format: File format for the output image. Defaults to "png" include_predictions: If the image has predictions, should it be overlaid on top of the image?

Source code in landingai/pipeline/frameset.py
def save_image(
    self, path: str, format: str = "png", *, include_predictions: bool = False
) -> None:
    """Save the image to path

    Parameters
    ----------
    path: File path for the output image
    format: File format for the output image. Defaults to "png"
    include_predictions: If the image has predictions, should it be overlaid on top of the image?
    """
    if include_predictions:
        img = self.other_images["overlay"]
    else:
        img = self.image
    img.save(path, format=format.upper())

show_image(image_src='', clear_nb_cell=False, *, include_predictions=False)

Open a window and display all the images. Parameters


image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from other_images include_predictions: If the image has predictions, should it be overlaid on top of the image?

Source code in landingai/pipeline/frameset.py
def show_image(
    self,
    image_src: str = "",
    clear_nb_cell: bool = False,
    *,
    include_predictions: bool = False,
) -> "Frame":
    """Open a window and display all the images.
    Parameters
    ----------
    image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from `other_images`
    include_predictions: If the image has predictions, should it be overlaid on top of the image?
    """
    if image_src:
        warnings.warn(
            "image_src keyword on Frame.show_image is deprecated. Use include_predictions instead."
        )
        if image_src == "overlay":
            include_predictions = True
    if include_predictions:
        image_src = "overlay"
    # TODO: Should show be a end leaf?
    # Check if we are on a notebook context
    if is_running_in_notebook():
        from IPython import display

        if clear_nb_cell:
            display.clear_output(wait=True)
        if image_src == "":
            display.display(self.image)
        else:
            display.display(self.other_images[image_src])
    else:
        # Use PIL's implementation
        if image_src == "":
            self.image.show()
        else:
            self.other_images[image_src].show()
    return self

to_numpy_array(image_src='', *, include_predictions=False)

Return a numpy array using RGB channel ordering. If this array is passed to OpenCV, you will need to convert it to BGR

Parameters

image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from other_images include_predictions: If the image has predictions, should it be overlaid on top of the image?

Source code in landingai/pipeline/frameset.py
def to_numpy_array(
    self,
    image_src: str = "",
    *,
    include_predictions: bool = False,
) -> np.ndarray:
    """Return a numpy array using RGB channel ordering. If this array is passed to OpenCV, you will need to convert it to BGR

    Parameters
    ----------
    image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from `other_images`
    include_predictions: If the image has predictions, should it be overlaid on top of the image?
    """
    if image_src:
        warnings.warn(
            "image_src keyword on Frame.to_numpy_array is deprecated. Use include_predictions instead."
        )
        if image_src == "overlay":
            include_predictions = True
    if include_predictions:
        image_src = "overlay"
    img = (
        self.image
        if image_src == "" or image_src not in self.other_images
        else self.other_images[image_src]
    )
    return np.asarray(img)

FrameSet

Bases: BaseModel

A FrameSet is a collection of frames (in order). Typically a FrameSet will include a single image but there are circumstances where other images will be extracted from the initial one. For example: we may want to identify vehicles on an initial image and then extract sub-images for each of the vehicles.

Source code in landingai/pipeline/frameset.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
class FrameSet(BaseModel):
    """
    A FrameSet is a collection of frames (in order). Typically a FrameSet
    will include a single image but there are circumstances where other images
    will be extracted from the initial one. For example: we may want to
    identify vehicles on an initial image and then extract sub-images for
    each of the vehicles.
    """

    frames: List[Frame] = []  # Start with empty frame set

    @classmethod
    def from_image(
        cls, uri: str, metadata: Optional[Dict[str, Any]] = {}
    ) -> "FrameSet":
        """Creates a FrameSet from an image file

        Parameters
        ----------
        uri : URI to file (local or remote)

        Returns
        -------
        FrameSet : New FrameSet containing a single image
        """
        return cls(frames=[Frame.from_image(uri=uri, metadata=metadata)])

    @classmethod
    def from_array(cls, array: np.ndarray, is_bgr: bool = True) -> "FrameSet":
        """Creates a FrameSet from a image encode as ndarray

        Parameters
        ----------
        array : np.ndarray
            Image
        is_bgr : bool, optional
            Assume OpenCV's BGR channel ordering? Defaults to True

        Returns
        -------
        FrameSet
        """
        return cls(frames=[Frame.from_array(array=array, is_bgr=is_bgr)])

    # TODO: Is it worth to emulate a full container? - https://docs.python.org/3/reference/datamodel.html#emulating-container-types
    def __getitem__(self, key: int) -> Frame:
        return self.frames[key]

    def __iter__(self) -> Iterable[Frame]:  # type: ignore
        for f in self.frames:
            yield f

    def __len__(self) -> int:
        return len(self.frames)

    def _repr_pretty_(self, pp, cycle) -> str:  # type: ignore
        # Enable a pretty output on Jupiter notebooks `Display()` function
        return str(
            pp.text(
                self.json(
                    # exclude={"frames": {"__all__": {"image", "other_images"}}},
                    indent=2
                )
            )
        )

    @property
    def predictions(self) -> PredictionList:
        """Returns the predictions from all the frames in the FrameSet"""
        ret = PredictionList()
        for p in self.frames:
            ret.extend(p.predictions)
        return ret

    def is_empty(self) -> bool:
        """Check if the FrameSet is empty
        Returns
        -------
        bool
            True if the are no Frames on the FrameSet
        """
        return not self.frames  # True if the list is empty

    def run_predict(self, predictor: Predictor, num_workers: int = 1) -> "FrameSet":
        """Run a cloud inference model
        Parameters
        ----------
        predictor: the model to be invoked.
        num_workers: By default a single worker will request predictions sequentially. Parallel requests can help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. The number of workers should typically be under 5. A large number of workers when using cloud inference will be rate limited and produce no improvement.
        """

        if num_workers > 1:
            # Remember that run_predict will retry indefinitely on 429 (with a 60 second delay). This logic is still ok for a multi-threaded context.
            with ThreadPoolExecutor(
                max_workers=num_workers
            ) as executor:  # TODO: make this configurable
                futures = [
                    executor.submit(frame.run_predict, predictor, reuse_session=False)
                    for frame in self.frames
                ]
                wait(futures)
        else:
            for frame in self.frames:
                frame.run_predict(predictor)
        return self

    def overlay_predictions(
        self, options: Optional[Dict[str, Any]] = None
    ) -> "FrameSet":  # TODO: Optional where to store
        for frame in self.frames:
            frame.overlay_predictions(options)
        return self

    def resize(
        self, width: Union[int, None] = None, height: Union[int, None] = None
    ) -> "FrameSet":  # TODO: Optional where to store
        """Returns a resized copy of this image. If width or height is missing the resize will preserve the aspect ratio
        Parameters
        ----------
        width: The requested width in pixels.
        height: The requested width in pixels.
        """
        if width is None and height is None:  # No resize needed
            return self
        for frame in self.frames:
            frame.resize(width, height)
        return self

    def downsize(
        self, width: Union[int, None] = None, height: Union[int, None] = None
    ) -> "FrameSet":  # TODO: Optional where to store
        """Resize only if the image is larger than the expected dimensions,
        Parameters
        ----------
        width: The requested width in pixels.
        height: The requested width in pixels.
        """
        for frame in self.frames:
            frame.downsize(width, height)
        return self

    def crop(self, bbox: BoundingBox) -> "FrameSet":
        """Crop the images based on the bounding box

        Parameters
        ----------
        bbox: A tuple with the bounding box coordinates (xmin, ymin, xmax, ymax)
        """
        for frame in self.frames:
            frame.crop(bbox)
        return self

    def adjust_sharpness(self, factor: float) -> "FrameSet":
        """Adjust the sharpness of the image

        Parameters
        ----------
        factor: The enhancement factor
        """
        for f in self.frames:
            f.adjust_sharpness(factor)
        return self

    def adjust_brightness(self, factor: float) -> "FrameSet":
        """Adjust the brightness of the image

        Parameters
        ----------
        factor: The enhancement factor
        """
        for f in self.frames:
            f.adjust_brightness(factor)
        return self

    def adjust_contrast(self, factor: float) -> "FrameSet":
        """Adjust the contrast of the image

        Parameters
        ----------
        factor: The enhancement factor
        """
        for f in self.frames:
            f.adjust_contrast(factor)
        return self

    def adjust_color(self, factor: float) -> "FrameSet":
        """Adjust the color of the image

        Parameters
        ----------
        factor: The enhancement factor
        """
        for f in self.frames:
            f.adjust_color(factor)
        return self

    def copy(self, *args: Any, **kwargs: Any) -> "FrameSet":
        """Returns a copy of this FrameSet, with all the frames copied"""
        frameset = super().copy(*args, **kwargs)
        frameset.frames = [frame.copy() for frame in self.frames]
        return frameset

    def save_image(
        self,
        filename_prefix: str,
        image_src: str = "",  # TODO: remove this parameter in next major version
        format: str = "png",
        *,
        include_predictions: bool = False,
    ) -> "FrameSet":
        """Save all the images on the FrameSet to disk (as PNG)

        Parameters
        ----------
        filename_prefix : path and name prefix for the image file
        image_src: (deprecated) if empty the source image will be saved. Otherwise the image will be selected from `other_images`
        include_predictions: If the image has predictions, should it be overlaid on top of the image?
        """
        if image_src:
            warnings.warn(
                "image_src keyword on FrameSet.save_image is deprecated. Use include_predictions instead."
            )
        if include_predictions:
            image_src = "overlay"
        # If there is only one frame, save it with the given prefix without timestamp
        if len(self.frames) == 1:
            self.frames[0].save_image(
                f"{filename_prefix}.{format.lower()}",
                format=format.upper(),
                include_predictions=include_predictions,
            )
        else:
            # TODO: deprecate this behavior. Using timestamp here makes it really hard
            # to find the images later. We should probably use a counter instead (like "prefix_{i}.png")
            timestamp = datetime.now().strftime(
                "%Y%m%d-%H%M%S"
            )  # TODO saving faster than 1 sec will cause image overwrite
            c = 0
            for frame in self.frames:
                img = frame.image if image_src == "" else frame.other_images[image_src]
                img.save(
                    f"{filename_prefix}_{timestamp}_{image_src}_{c}.{format.lower()}",
                    format=format.upper(),
                )
                c += 1
        return self

    def save_video(
        self,
        video_file_path: str,
        video_fps: Optional[int] = None,
        video_length_sec: Optional[float] = None,
        image_src: str = "",  # TODO: remove this parameter in next major version
        include_predictions: bool = False,
    ) -> "FrameSet":
        """Save the FrameSet as an mp4 video file. The following example, shows to use save_video to save a clip from a live RTSP source.
        ```python
            video_len_sec=10
            fps=4
            img_src = NetworkedCamera(stream_url, fps=fps)
            frs = FrameSet()
            for i,frame in enumerate(img_src):
                if i>=video_len_sec*fps: # Limit capture time
                    break
                frs.extend(frame)
            frs.save_video("sample_images/test.mp4",video_fps=fps)
        ```

        Parameters
        ----------
        video_file_path : str
            Path and filename with extension of the video file
        video_fps : Optional[int]
            The number of frames per second for the output video file.
            Either the `video_fps` or `video_length_sec` should be provided to assemble the video. if none of the two are provided, the method will try to set a "reasonable" value.
        video_length_sec : Optional[float]
            The total number of seconds for the output video file.
            Either the `video_fps` or `video_length_sec` should be provided to assemble the video. if none of the two are provided, the method will try to set a "reasonable" value.
        image_src : str, optional
            if empty the source image will be used. Otherwise the image will be selected from `other_images`
        """
        if not video_file_path.lower().endswith(".mp4"):
            raise NotImplementedError("Only .mp4 is supported")

        if image_src:
            warnings.warn(
                "image_src keyword on FrameSet.save_video is deprecated. Use include_predictions instead."
            )
            if image_src == "overlay":
                include_predictions = True

        total_frames = len(self.frames)
        if total_frames == 0:
            return self

        if video_fps is not None and video_length_sec is not None:
            raise ValueError(
                "The 'video_fps' and 'video_length_sec' arguments cannot be set at the same time"
            )

        # Try to tune FPS based on parameters or pick a reasonable number. The goal is to produce a video that last a a couple of seconds even when there are few frames. OpenCV will silently fail and not create a file if the resulting fps is less than 1
        if video_length_sec is not None and video_length_sec <= total_frames:
            video_fps = int(total_frames / video_length_sec)
        elif video_fps is None:
            video_fps = min(2, total_frames)

        writer = imageio.get_writer(video_file_path, fps=video_fps)
        for fr in self.frames:
            writer.append_data(
                fr.to_numpy_array(include_predictions=include_predictions)
            )
        writer.close()

        # TODO: Future delete if we get out of OpenCV
        # Previous implementation with OpenCV that required code guessing and did not work on windows because of wurlitzer (an alternative will be https://github.com/greg-hellings/stream-redirect)
        # # All images should have the same shape as it's from the same video file
        # img_shape = self.frames[0].image.size
        # # Find a suitable coded that it is installed on the system. H264/avc1 is preferred, see https://discuss.streamlit.io/t/st-video-doesnt-show-opencv-generated-mp4/3193/4

        # codecs = [
        #     cv2.VideoWriter_fourcc(*"avc1"),  # type: ignore
        #     cv2.VideoWriter_fourcc(*"hev1"),  # type: ignore
        #     cv2.VideoWriter_fourcc(*"mp4v"),  # type: ignore
        #     cv2.VideoWriter_fourcc(*"xvid"),  # type: ignore
        #     -1,  # This forces OpenCV to dump the list of codecs
        # ]
        # for fourcc in codecs:
        #     with pipes() as (out, err):
        #         video = cv2.VideoWriter(video_file_path, fourcc, video_fps, img_shape)
        #     stderr = err.read()
        #     # Print OpenCV output to help customer's understand what is going on
        #     print(out.read())
        #     print(stderr)
        #     if "is not" not in stderr:  # Found a working codec
        #         break
        # if fourcc == -1 or not video.isOpened():
        #     raise Exception(
        #         f"Could not find a suitable codec to save {video_file_path}"
        #     )
        # for fr in self.frames:
        #     video.write(cv2.cvtColor(fr.to_numpy_array(image_src), cv2.COLOR_RGB2BGR))
        # video.release()
        return self

    def show_image(
        self,
        image_src: str = "",
        clear_nb_cell: bool = False,
        *,
        include_predictions: bool = False,
    ) -> "FrameSet":
        """Open a window and display all the images.
        Parameters
        ----------
        image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from `other_images`
        include_predictions: If the image has predictions, should it be overlaid on top of the image?
        """
        if image_src:
            warnings.warn(
                "image_src keyword on FrameSet.show_image is deprecated. Use include_predictions instead."
            )
            if image_src == "overlay":
                include_predictions = True

        for frame in self.frames:
            frame.show_image(
                clear_nb_cell=clear_nb_cell, include_predictions=include_predictions
            )

        # # TODO: Implement image stacking when we have multiple frames (https://answers.opencv.org/question/175912/how-to-display-multiple-images-in-one-window/)
        # """Open an OpenCV window and display all the images. This call will stop the execution until a key is pressed.
        # Parameters
        # ----------
        # image_src: if empty the source image will be displayed. Otherwise the image will be selected from `other_images`
        # """
        # # OpenCV is full of issues when it comes to displaying windows (see https://stackoverflow.com/questions/6116564/destroywindow-does-not-close-window-on-mac-using-python-and-opencv)
        # cv2.namedWindow("image")
        # cv2.startWindowThread()
        # if image_src == "":
        #     img = cv2.cvtColor(np.asarray(self.frames[0].image), cv2.COLOR_BGR2RGB)
        # else:
        #     img = cv2.cvtColor(np.asarray(self.frames[0].other_images[image_src]), cv2.COLOR_BGR2RGB)
        # cv2.imshow("LandingAI - Press any key to exit", img)
        # cv2.waitKey(0) # close window when a key press is detected
        # cv2.waitKey(1)
        # cv2.destroyWindow('image')
        # for i in range (1,5):
        #     cv2.waitKey(1)

        return self

    def extend(self, frs: "FrameSet") -> "FrameSet":
        """Add a all the Frames from `frs` into this FrameSet

        Parameters
        ----------
        frs : FrameSet
            Framerset to be added at the end of the current one

        Returns
        -------
        FrameSet
        """
        self.frames.extend(frs.frames)
        return self

    def append(self, fr: Frame) -> None:
        """Add a Frame into this FrameSet

        Parameters
        ----------
        fr : Frame
            Frame to be added at the end of the current one

        Returns
        -------
        FrameSet
        """
        self.frames.append(fr)

    def apply(self, function: Callable[[Frame], Frame] = lambda f: f) -> "FrameSet":
        """Apply a function to all frames

        Parameters
        ----------
        function: lambda function that takes individual frames and returned an updated frame
        """
        for i in range(len(self.frames)):
            self.frames[i] = function(self.frames[i])
        return self

    def filter(self, function: Callable[[Frame], bool] = lambda f: True) -> "FrameSet":
        """Evaluate a function on every frame and keep or remove

        Parameters
        ----------
        function : lambda function that gets invoked on every Frame. If it returns False, the Frame will be deleted
        """
        for i in reversed(
            range(0, len(self.frames))
        ):  # Traverse in reverse so we can delete
            if not function(self.frames[i]):
                self.frames.pop(i)
        return self

    class Config:
        # Add some encoders to prevent large structures from being printed
        json_encoders = {
            np.ndarray: lambda a: f"<np.ndarray: {a.shape}>",
            Image.Image: lambda i: f"<Image.Image: {i.size}>",
        }

predictions: PredictionList property

Returns the predictions from all the frames in the FrameSet

adjust_brightness(factor)

Adjust the brightness of the image

Parameters

factor: The enhancement factor

Source code in landingai/pipeline/frameset.py
def adjust_brightness(self, factor: float) -> "FrameSet":
    """Adjust the brightness of the image

    Parameters
    ----------
    factor: The enhancement factor
    """
    for f in self.frames:
        f.adjust_brightness(factor)
    return self

adjust_color(factor)

Adjust the color of the image

Parameters

factor: The enhancement factor

Source code in landingai/pipeline/frameset.py
def adjust_color(self, factor: float) -> "FrameSet":
    """Adjust the color of the image

    Parameters
    ----------
    factor: The enhancement factor
    """
    for f in self.frames:
        f.adjust_color(factor)
    return self

adjust_contrast(factor)

Adjust the contrast of the image

Parameters

factor: The enhancement factor

Source code in landingai/pipeline/frameset.py
def adjust_contrast(self, factor: float) -> "FrameSet":
    """Adjust the contrast of the image

    Parameters
    ----------
    factor: The enhancement factor
    """
    for f in self.frames:
        f.adjust_contrast(factor)
    return self

adjust_sharpness(factor)

Adjust the sharpness of the image

Parameters

factor: The enhancement factor

Source code in landingai/pipeline/frameset.py
def adjust_sharpness(self, factor: float) -> "FrameSet":
    """Adjust the sharpness of the image

    Parameters
    ----------
    factor: The enhancement factor
    """
    for f in self.frames:
        f.adjust_sharpness(factor)
    return self

append(fr)

Add a Frame into this FrameSet

Parameters

fr : Frame Frame to be added at the end of the current one

Returns

FrameSet

Source code in landingai/pipeline/frameset.py
def append(self, fr: Frame) -> None:
    """Add a Frame into this FrameSet

    Parameters
    ----------
    fr : Frame
        Frame to be added at the end of the current one

    Returns
    -------
    FrameSet
    """
    self.frames.append(fr)

apply(function=lambda : f)

Apply a function to all frames

Parameters

function: lambda function that takes individual frames and returned an updated frame

Source code in landingai/pipeline/frameset.py
def apply(self, function: Callable[[Frame], Frame] = lambda f: f) -> "FrameSet":
    """Apply a function to all frames

    Parameters
    ----------
    function: lambda function that takes individual frames and returned an updated frame
    """
    for i in range(len(self.frames)):
        self.frames[i] = function(self.frames[i])
    return self

copy(*args, **kwargs)

Returns a copy of this FrameSet, with all the frames copied

Source code in landingai/pipeline/frameset.py
def copy(self, *args: Any, **kwargs: Any) -> "FrameSet":
    """Returns a copy of this FrameSet, with all the frames copied"""
    frameset = super().copy(*args, **kwargs)
    frameset.frames = [frame.copy() for frame in self.frames]
    return frameset

crop(bbox)

Crop the images based on the bounding box

Parameters

bbox: A tuple with the bounding box coordinates (xmin, ymin, xmax, ymax)

Source code in landingai/pipeline/frameset.py
def crop(self, bbox: BoundingBox) -> "FrameSet":
    """Crop the images based on the bounding box

    Parameters
    ----------
    bbox: A tuple with the bounding box coordinates (xmin, ymin, xmax, ymax)
    """
    for frame in self.frames:
        frame.crop(bbox)
    return self

downsize(width=None, height=None)

Resize only if the image is larger than the expected dimensions, Parameters


width: The requested width in pixels. height: The requested width in pixels.

Source code in landingai/pipeline/frameset.py
def downsize(
    self, width: Union[int, None] = None, height: Union[int, None] = None
) -> "FrameSet":  # TODO: Optional where to store
    """Resize only if the image is larger than the expected dimensions,
    Parameters
    ----------
    width: The requested width in pixels.
    height: The requested width in pixels.
    """
    for frame in self.frames:
        frame.downsize(width, height)
    return self

extend(frs)

Add a all the Frames from frs into this FrameSet

Parameters

frs : FrameSet Framerset to be added at the end of the current one

Returns

FrameSet

Source code in landingai/pipeline/frameset.py
def extend(self, frs: "FrameSet") -> "FrameSet":
    """Add a all the Frames from `frs` into this FrameSet

    Parameters
    ----------
    frs : FrameSet
        Framerset to be added at the end of the current one

    Returns
    -------
    FrameSet
    """
    self.frames.extend(frs.frames)
    return self

filter(function=lambda : True)

Evaluate a function on every frame and keep or remove

Parameters

function : lambda function that gets invoked on every Frame. If it returns False, the Frame will be deleted

Source code in landingai/pipeline/frameset.py
def filter(self, function: Callable[[Frame], bool] = lambda f: True) -> "FrameSet":
    """Evaluate a function on every frame and keep or remove

    Parameters
    ----------
    function : lambda function that gets invoked on every Frame. If it returns False, the Frame will be deleted
    """
    for i in reversed(
        range(0, len(self.frames))
    ):  # Traverse in reverse so we can delete
        if not function(self.frames[i]):
            self.frames.pop(i)
    return self

from_array(array, is_bgr=True) classmethod

Creates a FrameSet from a image encode as ndarray

Parameters

array : np.ndarray Image is_bgr : bool, optional Assume OpenCV's BGR channel ordering? Defaults to True

Returns

FrameSet

Source code in landingai/pipeline/frameset.py
@classmethod
def from_array(cls, array: np.ndarray, is_bgr: bool = True) -> "FrameSet":
    """Creates a FrameSet from a image encode as ndarray

    Parameters
    ----------
    array : np.ndarray
        Image
    is_bgr : bool, optional
        Assume OpenCV's BGR channel ordering? Defaults to True

    Returns
    -------
    FrameSet
    """
    return cls(frames=[Frame.from_array(array=array, is_bgr=is_bgr)])

from_image(uri, metadata={}) classmethod

Creates a FrameSet from an image file

Parameters

uri : URI to file (local or remote)

Returns

FrameSet : New FrameSet containing a single image

Source code in landingai/pipeline/frameset.py
@classmethod
def from_image(
    cls, uri: str, metadata: Optional[Dict[str, Any]] = {}
) -> "FrameSet":
    """Creates a FrameSet from an image file

    Parameters
    ----------
    uri : URI to file (local or remote)

    Returns
    -------
    FrameSet : New FrameSet containing a single image
    """
    return cls(frames=[Frame.from_image(uri=uri, metadata=metadata)])

is_empty()

Check if the FrameSet is empty Returns


bool True if the are no Frames on the FrameSet

Source code in landingai/pipeline/frameset.py
def is_empty(self) -> bool:
    """Check if the FrameSet is empty
    Returns
    -------
    bool
        True if the are no Frames on the FrameSet
    """
    return not self.frames  # True if the list is empty

resize(width=None, height=None)

Returns a resized copy of this image. If width or height is missing the resize will preserve the aspect ratio Parameters


width: The requested width in pixels. height: The requested width in pixels.

Source code in landingai/pipeline/frameset.py
def resize(
    self, width: Union[int, None] = None, height: Union[int, None] = None
) -> "FrameSet":  # TODO: Optional where to store
    """Returns a resized copy of this image. If width or height is missing the resize will preserve the aspect ratio
    Parameters
    ----------
    width: The requested width in pixels.
    height: The requested width in pixels.
    """
    if width is None and height is None:  # No resize needed
        return self
    for frame in self.frames:
        frame.resize(width, height)
    return self

run_predict(predictor, num_workers=1)

Run a cloud inference model Parameters


predictor: the model to be invoked. num_workers: By default a single worker will request predictions sequentially. Parallel requests can help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. The number of workers should typically be under 5. A large number of workers when using cloud inference will be rate limited and produce no improvement.

Source code in landingai/pipeline/frameset.py
def run_predict(self, predictor: Predictor, num_workers: int = 1) -> "FrameSet":
    """Run a cloud inference model
    Parameters
    ----------
    predictor: the model to be invoked.
    num_workers: By default a single worker will request predictions sequentially. Parallel requests can help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. The number of workers should typically be under 5. A large number of workers when using cloud inference will be rate limited and produce no improvement.
    """

    if num_workers > 1:
        # Remember that run_predict will retry indefinitely on 429 (with a 60 second delay). This logic is still ok for a multi-threaded context.
        with ThreadPoolExecutor(
            max_workers=num_workers
        ) as executor:  # TODO: make this configurable
            futures = [
                executor.submit(frame.run_predict, predictor, reuse_session=False)
                for frame in self.frames
            ]
            wait(futures)
    else:
        for frame in self.frames:
            frame.run_predict(predictor)
    return self

save_image(filename_prefix, image_src='', format='png', *, include_predictions=False)

Save all the images on the FrameSet to disk (as PNG)

Parameters

filename_prefix : path and name prefix for the image file image_src: (deprecated) if empty the source image will be saved. Otherwise the image will be selected from other_images include_predictions: If the image has predictions, should it be overlaid on top of the image?

Source code in landingai/pipeline/frameset.py
def save_image(
    self,
    filename_prefix: str,
    image_src: str = "",  # TODO: remove this parameter in next major version
    format: str = "png",
    *,
    include_predictions: bool = False,
) -> "FrameSet":
    """Save all the images on the FrameSet to disk (as PNG)

    Parameters
    ----------
    filename_prefix : path and name prefix for the image file
    image_src: (deprecated) if empty the source image will be saved. Otherwise the image will be selected from `other_images`
    include_predictions: If the image has predictions, should it be overlaid on top of the image?
    """
    if image_src:
        warnings.warn(
            "image_src keyword on FrameSet.save_image is deprecated. Use include_predictions instead."
        )
    if include_predictions:
        image_src = "overlay"
    # If there is only one frame, save it with the given prefix without timestamp
    if len(self.frames) == 1:
        self.frames[0].save_image(
            f"{filename_prefix}.{format.lower()}",
            format=format.upper(),
            include_predictions=include_predictions,
        )
    else:
        # TODO: deprecate this behavior. Using timestamp here makes it really hard
        # to find the images later. We should probably use a counter instead (like "prefix_{i}.png")
        timestamp = datetime.now().strftime(
            "%Y%m%d-%H%M%S"
        )  # TODO saving faster than 1 sec will cause image overwrite
        c = 0
        for frame in self.frames:
            img = frame.image if image_src == "" else frame.other_images[image_src]
            img.save(
                f"{filename_prefix}_{timestamp}_{image_src}_{c}.{format.lower()}",
                format=format.upper(),
            )
            c += 1
    return self

save_video(video_file_path, video_fps=None, video_length_sec=None, image_src='', include_predictions=False)

Save the FrameSet as an mp4 video file. The following example, shows to use save_video to save a clip from a live RTSP source.

    video_len_sec=10
    fps=4
    img_src = NetworkedCamera(stream_url, fps=fps)
    frs = FrameSet()
    for i,frame in enumerate(img_src):
        if i>=video_len_sec*fps: # Limit capture time
            break
        frs.extend(frame)
    frs.save_video("sample_images/test.mp4",video_fps=fps)

Parameters

video_file_path : str Path and filename with extension of the video file video_fps : Optional[int] The number of frames per second for the output video file. Either the video_fps or video_length_sec should be provided to assemble the video. if none of the two are provided, the method will try to set a "reasonable" value. video_length_sec : Optional[float] The total number of seconds for the output video file. Either the video_fps or video_length_sec should be provided to assemble the video. if none of the two are provided, the method will try to set a "reasonable" value. image_src : str, optional if empty the source image will be used. Otherwise the image will be selected from other_images

Source code in landingai/pipeline/frameset.py
def save_video(
    self,
    video_file_path: str,
    video_fps: Optional[int] = None,
    video_length_sec: Optional[float] = None,
    image_src: str = "",  # TODO: remove this parameter in next major version
    include_predictions: bool = False,
) -> "FrameSet":
    """Save the FrameSet as an mp4 video file. The following example, shows to use save_video to save a clip from a live RTSP source.
    ```python
        video_len_sec=10
        fps=4
        img_src = NetworkedCamera(stream_url, fps=fps)
        frs = FrameSet()
        for i,frame in enumerate(img_src):
            if i>=video_len_sec*fps: # Limit capture time
                break
            frs.extend(frame)
        frs.save_video("sample_images/test.mp4",video_fps=fps)
    ```

    Parameters
    ----------
    video_file_path : str
        Path and filename with extension of the video file
    video_fps : Optional[int]
        The number of frames per second for the output video file.
        Either the `video_fps` or `video_length_sec` should be provided to assemble the video. if none of the two are provided, the method will try to set a "reasonable" value.
    video_length_sec : Optional[float]
        The total number of seconds for the output video file.
        Either the `video_fps` or `video_length_sec` should be provided to assemble the video. if none of the two are provided, the method will try to set a "reasonable" value.
    image_src : str, optional
        if empty the source image will be used. Otherwise the image will be selected from `other_images`
    """
    if not video_file_path.lower().endswith(".mp4"):
        raise NotImplementedError("Only .mp4 is supported")

    if image_src:
        warnings.warn(
            "image_src keyword on FrameSet.save_video is deprecated. Use include_predictions instead."
        )
        if image_src == "overlay":
            include_predictions = True

    total_frames = len(self.frames)
    if total_frames == 0:
        return self

    if video_fps is not None and video_length_sec is not None:
        raise ValueError(
            "The 'video_fps' and 'video_length_sec' arguments cannot be set at the same time"
        )

    # Try to tune FPS based on parameters or pick a reasonable number. The goal is to produce a video that last a a couple of seconds even when there are few frames. OpenCV will silently fail and not create a file if the resulting fps is less than 1
    if video_length_sec is not None and video_length_sec <= total_frames:
        video_fps = int(total_frames / video_length_sec)
    elif video_fps is None:
        video_fps = min(2, total_frames)

    writer = imageio.get_writer(video_file_path, fps=video_fps)
    for fr in self.frames:
        writer.append_data(
            fr.to_numpy_array(include_predictions=include_predictions)
        )
    writer.close()

    # TODO: Future delete if we get out of OpenCV
    # Previous implementation with OpenCV that required code guessing and did not work on windows because of wurlitzer (an alternative will be https://github.com/greg-hellings/stream-redirect)
    # # All images should have the same shape as it's from the same video file
    # img_shape = self.frames[0].image.size
    # # Find a suitable coded that it is installed on the system. H264/avc1 is preferred, see https://discuss.streamlit.io/t/st-video-doesnt-show-opencv-generated-mp4/3193/4

    # codecs = [
    #     cv2.VideoWriter_fourcc(*"avc1"),  # type: ignore
    #     cv2.VideoWriter_fourcc(*"hev1"),  # type: ignore
    #     cv2.VideoWriter_fourcc(*"mp4v"),  # type: ignore
    #     cv2.VideoWriter_fourcc(*"xvid"),  # type: ignore
    #     -1,  # This forces OpenCV to dump the list of codecs
    # ]
    # for fourcc in codecs:
    #     with pipes() as (out, err):
    #         video = cv2.VideoWriter(video_file_path, fourcc, video_fps, img_shape)
    #     stderr = err.read()
    #     # Print OpenCV output to help customer's understand what is going on
    #     print(out.read())
    #     print(stderr)
    #     if "is not" not in stderr:  # Found a working codec
    #         break
    # if fourcc == -1 or not video.isOpened():
    #     raise Exception(
    #         f"Could not find a suitable codec to save {video_file_path}"
    #     )
    # for fr in self.frames:
    #     video.write(cv2.cvtColor(fr.to_numpy_array(image_src), cv2.COLOR_RGB2BGR))
    # video.release()
    return self

show_image(image_src='', clear_nb_cell=False, *, include_predictions=False)

Open a window and display all the images. Parameters


image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from other_images include_predictions: If the image has predictions, should it be overlaid on top of the image?

Source code in landingai/pipeline/frameset.py
def show_image(
    self,
    image_src: str = "",
    clear_nb_cell: bool = False,
    *,
    include_predictions: bool = False,
) -> "FrameSet":
    """Open a window and display all the images.
    Parameters
    ----------
    image_src (deprecated): if empty the source image will be displayed. Otherwise the image will be selected from `other_images`
    include_predictions: If the image has predictions, should it be overlaid on top of the image?
    """
    if image_src:
        warnings.warn(
            "image_src keyword on FrameSet.show_image is deprecated. Use include_predictions instead."
        )
        if image_src == "overlay":
            include_predictions = True

    for frame in self.frames:
        frame.show_image(
            clear_nb_cell=clear_nb_cell, include_predictions=include_predictions
        )

    # # TODO: Implement image stacking when we have multiple frames (https://answers.opencv.org/question/175912/how-to-display-multiple-images-in-one-window/)
    # """Open an OpenCV window and display all the images. This call will stop the execution until a key is pressed.
    # Parameters
    # ----------
    # image_src: if empty the source image will be displayed. Otherwise the image will be selected from `other_images`
    # """
    # # OpenCV is full of issues when it comes to displaying windows (see https://stackoverflow.com/questions/6116564/destroywindow-does-not-close-window-on-mac-using-python-and-opencv)
    # cv2.namedWindow("image")
    # cv2.startWindowThread()
    # if image_src == "":
    #     img = cv2.cvtColor(np.asarray(self.frames[0].image), cv2.COLOR_BGR2RGB)
    # else:
    #     img = cv2.cvtColor(np.asarray(self.frames[0].other_images[image_src]), cv2.COLOR_BGR2RGB)
    # cv2.imshow("LandingAI - Press any key to exit", img)
    # cv2.waitKey(0) # close window when a key press is detected
    # cv2.waitKey(1)
    # cv2.destroyWindow('image')
    # for i in range (1,5):
    #     cv2.waitKey(1)

    return self

PredictionList

Bases: List[Union[ClassificationPrediction, OcrPrediction]]

A list of predictions from LandingLens, with some helper methods to filter and check prediction results.

This class inherits from list, so it can be used as a list. For example, you can iterate over the predictions, or use len() to get the number of predictions. Some operations are overriten to make it easier to work with predictions. For example, you can use in operator to check if a label is in the prediction list:

"label-that-exists" in frameset.predictions True "not-found-label" in frameset.predictions False

Source code in landingai/pipeline/frameset.py
class PredictionList(List[Union[ClassificationPrediction, OcrPrediction]]):
    """
    A list of predictions from LandingLens, with some helper methods to filter and check prediction results.

    This class inherits from `list`, so it can be used as a list. For example, you can iterate over the predictions, or use `len()` to get the number of predictions.
    Some operations are overriten to make it easier to work with predictions. For example, you can use `in` operator to check if a label is in the prediction list:

    >>> "label-that-exists" in frameset.predictions
    True
    >>> "not-found-label" in frameset.predictions
    False
    """

    def __init__(self, *args: Any) -> None:
        # TODO: This is a hack to support OCR predictions. We should probably have different PredictionList
        #  classes for each type of prediction, so we don't have to do conditional checks everywhere and `cast`
        # or "type: ignore".
        super().__init__(*args)
        for p in self:
            if not isinstance(p, type(self[0])):
                raise ValueError("All elements should be of the same type")

    @property
    def _inner_type(self) -> str:
        return type(self[0]).__name__

    def __contains__(self, key: object) -> bool:
        if not len(self):
            return False
        if isinstance(key, str):
            if self._inner_type == "OcrPrediction":
                # For OCR predictions, check if the key is in the full text
                full_text = " ".join(cast(OcrPrediction, p).text for p in self)
                return key in full_text
            else:
                return any(
                    p
                    for p in self
                    if cast(ClassificationPrediction, p).label_name == key
                )
        return super().__contains__(key)

    def filter_threshold(self, min_score: float) -> "PredictionList":
        """Return a new PredictionList with only the predictions that have a score greater than the threshold

        Parameters
        ----------
        min_score: The threshold to filter predictions out

        Returns
        -------
        PredictionList : A new instance of PredictionList containing only predictions above min_score
        """
        return PredictionList((p for p in self if p.score >= min_score))

    def filter_label(self, label: str) -> "PredictionList":
        """Return a new PredictionList with only the predictions that have the specified label

        Parameters
        ----------
        label: The label name to filter for
        Returns
        -------
        PredictionList : A new instance of PredictionList containing only the filtered labels
        """
        if self._inner_type == "OcrPrediction":
            raise TypeError(
                "You can't filter by labels if type of prediction doesn't have `label_name` attribute"
            )
        return PredictionList(
            (p for p in self if cast(ClassificationPrediction, p).label_name == label)
        )

filter_label(label)

Return a new PredictionList with only the predictions that have the specified label

Parameters

label: The label name to filter for Returns


PredictionList : A new instance of PredictionList containing only the filtered labels

Source code in landingai/pipeline/frameset.py
def filter_label(self, label: str) -> "PredictionList":
    """Return a new PredictionList with only the predictions that have the specified label

    Parameters
    ----------
    label: The label name to filter for
    Returns
    -------
    PredictionList : A new instance of PredictionList containing only the filtered labels
    """
    if self._inner_type == "OcrPrediction":
        raise TypeError(
            "You can't filter by labels if type of prediction doesn't have `label_name` attribute"
        )
    return PredictionList(
        (p for p in self if cast(ClassificationPrediction, p).label_name == label)
    )

filter_threshold(min_score)

Return a new PredictionList with only the predictions that have a score greater than the threshold

Parameters

min_score: The threshold to filter predictions out

Returns

PredictionList : A new instance of PredictionList containing only predictions above min_score

Source code in landingai/pipeline/frameset.py
def filter_threshold(self, min_score: float) -> "PredictionList":
    """Return a new PredictionList with only the predictions that have a score greater than the threshold

    Parameters
    ----------
    min_score: The threshold to filter predictions out

    Returns
    -------
    PredictionList : A new instance of PredictionList containing only predictions above min_score
    """
    return PredictionList((p for p in self if p.score >= min_score))

A module that provides a set of abstractions and APIs for reading images from different sources.

ImageFolder

Bases: ImageSourceBase

The ImageFolder class is an image source that reads images from a folder path.

Example 1:

folder = ImageFolder("/home/user/images")
for image_batch in folder:
    print(image_batch[0].image.size)

Example 2:

# Read all jpg files in the folder (including nested files)
folder = ImageFolder(glob_pattern="/home/user/images/**/*.jpg")
for image_batch in folder:
    print(image_batch[0].image.size)

Source code in landingai/pipeline/image_source.py
class ImageFolder(ImageSourceBase):
    """
    The `ImageFolder` class is an image source that reads images from a folder path.

    Example 1:
    ```python
    folder = ImageFolder("/home/user/images")
    for image_batch in folder:
        print(image_batch[0].image.size)
    ```

    Example 2:
    ```python
    # Read all jpg files in the folder (including nested files)
    folder = ImageFolder(glob_pattern="/home/user/images/**/*.jpg")
    for image_batch in folder:
        print(image_batch[0].image.size)
    ```
    """

    def __init__(
        self,
        source: Union[Path, str, List[str], None] = None,
        glob_pattern: Union[str, List[str], None] = None,
    ) -> None:
        """Constructor for ImageFolder.

        Parameters
        ----------
        source
            A list of file paths or the path to the folder path that contains the images.
            A folder path can be either an absolute or relative folder path, in `str` or `Path` type. E.g. "/home/user/images".
            If you provide a folder path, all the files directly within the folder will be read (including non-image files).
            Nested files and sub-directories will be ignored.
            Consider using `glob_pattern` if you need to:
              1. filter out unwanted files, e.g. your folder has both image and non-image files
              2. read nested image files, e.g. `/home/user/images/**/*.jpg`.
            The ordering of images is based on the file name alphabetically if source is a folder path.
            If source is a list of files, the order of the input files is preserved.
            Currently only local file paths are supported.
        glob_pattern
            One or more python glob pattern(s) to grab only wanted files in the folder. E.g. "/home/user/images/*.jpg".
            NOTE: If `glob_pattern` is provided, the `source` parameter is ignored.
            For more information about glob pattern, see https://docs.python.org/3/library/glob.html

        """
        self._source = source
        self._image_paths: List[str] = []

        if source is None and glob_pattern is None:
            raise ValueError("Either 'source' or 'glob_pattern' must be provided.")
        if glob_pattern is not None:
            if isinstance(glob_pattern, str):
                glob_pattern = [glob_pattern]
            for pattern in glob_pattern:
                self._image_paths.extend(list(glob.glob(pattern, recursive=True)))
            self._image_paths.sort()
        elif isinstance(source, list):
            self._image_paths = source
        else:
            assert isinstance(source, str) or isinstance(source, Path)
            p = Path(source)
            if not p.exists():
                raise ValueError(f"Path '{p}' does not exist.")
            self._image_paths = [str(x) for x in p.glob("*") if x.is_file()]
            self._image_paths.sort()

    def __iter__(self) -> IteratorType[Frame]:
        for img_path in self._image_paths:
            meta = {"image_path": img_path}
            yield Frame.from_image(str(img_path), metadata=meta)

    def __len__(self) -> int:
        return len(self._image_paths)

    def __repr__(self) -> str:
        return str(self._image_paths)

    @property
    def image_paths(self) -> List[str]:
        """Returns a list of image paths."""
        return self._image_paths

image_paths: List[str] property

Returns a list of image paths.

__init__(source=None, glob_pattern=None)

Constructor for ImageFolder.

Parameters

source A list of file paths or the path to the folder path that contains the images. A folder path can be either an absolute or relative folder path, in str or Path type. E.g. "/home/user/images". If you provide a folder path, all the files directly within the folder will be read (including non-image files). Nested files and sub-directories will be ignored. Consider using glob_pattern if you need to: 1. filter out unwanted files, e.g. your folder has both image and non-image files 2. read nested image files, e.g. /home/user/images/**/*.jpg. The ordering of images is based on the file name alphabetically if source is a folder path. If source is a list of files, the order of the input files is preserved. Currently only local file paths are supported. glob_pattern One or more python glob pattern(s) to grab only wanted files in the folder. E.g. "/home/user/images/*.jpg". NOTE: If glob_pattern is provided, the source parameter is ignored. For more information about glob pattern, see https://docs.python.org/3/library/glob.html

Source code in landingai/pipeline/image_source.py
def __init__(
    self,
    source: Union[Path, str, List[str], None] = None,
    glob_pattern: Union[str, List[str], None] = None,
) -> None:
    """Constructor for ImageFolder.

    Parameters
    ----------
    source
        A list of file paths or the path to the folder path that contains the images.
        A folder path can be either an absolute or relative folder path, in `str` or `Path` type. E.g. "/home/user/images".
        If you provide a folder path, all the files directly within the folder will be read (including non-image files).
        Nested files and sub-directories will be ignored.
        Consider using `glob_pattern` if you need to:
          1. filter out unwanted files, e.g. your folder has both image and non-image files
          2. read nested image files, e.g. `/home/user/images/**/*.jpg`.
        The ordering of images is based on the file name alphabetically if source is a folder path.
        If source is a list of files, the order of the input files is preserved.
        Currently only local file paths are supported.
    glob_pattern
        One or more python glob pattern(s) to grab only wanted files in the folder. E.g. "/home/user/images/*.jpg".
        NOTE: If `glob_pattern` is provided, the `source` parameter is ignored.
        For more information about glob pattern, see https://docs.python.org/3/library/glob.html

    """
    self._source = source
    self._image_paths: List[str] = []

    if source is None and glob_pattern is None:
        raise ValueError("Either 'source' or 'glob_pattern' must be provided.")
    if glob_pattern is not None:
        if isinstance(glob_pattern, str):
            glob_pattern = [glob_pattern]
        for pattern in glob_pattern:
            self._image_paths.extend(list(glob.glob(pattern, recursive=True)))
        self._image_paths.sort()
    elif isinstance(source, list):
        self._image_paths = source
    else:
        assert isinstance(source, str) or isinstance(source, Path)
        p = Path(source)
        if not p.exists():
            raise ValueError(f"Path '{p}' does not exist.")
        self._image_paths = [str(x) for x in p.glob("*") if x.is_file()]
        self._image_paths.sort()

ImageSourceBase

Bases: Iterator

The base class for all image sources.

Source code in landingai/pipeline/image_source.py
class ImageSourceBase(Iterator):
    """The base class for all image sources."""

    def close(self) -> None:
        """Free up any resource used by the image source"""
        pass

    def __next__(self) -> Frame:
        raise NotImplementedError()

    def __enter__(self) -> "ImageSourceBase":
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Literal[False]:
        self.close()
        return False

close()

Free up any resource used by the image source

Source code in landingai/pipeline/image_source.py
def close(self) -> None:
    """Free up any resource used by the image source"""
    pass

NetworkedCamera

Bases: BaseModel, ImageSourceBase

The NetworkCamera class can connect to RTSP and other live video sources in order to grab frames. The main concern is to be able to consume frames at the source speed and drop them as needed to ensure the application allday gets the lastes frame

Source code in landingai/pipeline/image_source.py
class NetworkedCamera(BaseModel, ImageSourceBase):
    """The NetworkCamera class can connect to RTSP and other live video sources in order to grab frames. The main concern is to be able to consume frames at the source speed and drop them as needed to ensure the application allday gets the lastes frame"""

    stream_url: str
    motion_detection_threshold: int
    capture_interval: Union[float, None] = None
    previous_frame: Union[np.ndarray, None] = None
    _last_capture_time: datetime = PrivateAttr()
    _cap: Any = PrivateAttr()  # cv2.VideoCapture
    _t: Any = PrivateAttr()  # threading.Thread
    _t_lock: Any = PrivateAttr()  # threading.Lock
    _t_running: bool = PrivateAttr()

    def __init__(
        self,
        stream_url: Union[str, int],
        motion_detection_threshold: int = 0,
        capture_interval: Optional[float] = None,
        fps: Optional[int] = None,
    ) -> None:
        """
        Parameters
        ----------
        stream_url : url to video source, or a number indicating the webcam index
        motion_detection_threshold : If set to zero then motion detections is disabled. Any other value (0-100) will make the camera drop all images that don't have significant changes
        capture_interval : Number of seconds to wait in between frames. If set to None, the NetworkedCamera will acquire images as fast as the source permits.
        fps: Capture speed in frames per second. If set to None, the NetworkedCamera will acquire images as fast as the source permits.
        """
        if fps is not None and capture_interval is not None:
            raise ValueError(
                "The fps and capture_interval arguments cannot be set at the same time"
            )
        elif fps is not None:
            capture_interval = 1 / fps

        if capture_interval is not None and capture_interval < 1 / 30:
            raise ValueError(
                "The resulting fps cannot be more than 30 frames per second"
            )
        cap = cv2.VideoCapture(stream_url)
        if not cap.isOpened():
            cap.release()
            raise Exception(f"Could not open stream ({stream_url})")
        cap.set(
            cv2.CAP_PROP_BUFFERSIZE, 2
        )  # Limit buffering to 2 frames in order to avoid backlog (i.e. lag)

        super().__init__(
            stream_url=stream_url,
            motion_detection_threshold=motion_detection_threshold,
            capture_interval=capture_interval,
        )
        self._last_capture_time = datetime.now()
        self._cap = cap
        self._t_lock = threading.Lock()
        self._t = threading.Thread(target=self._reader)
        self._t.daemon = True
        self._t_running = False
        self._t.start()

    def __del__(self) -> None:
        if self._t_running:
            # TODO: deprecate the __del__ method in future versions.
            warnings.warn(
                "NetworkedCamera object should be closed explicitly with close(), or using with statement."
            )
            self.close()

    def close(self) -> None:
        self._t_running = False
        self._t.join(timeout=10)
        self._cap.release()

    # grab frames as soon as they are available
    def _reader(self) -> None:
        self._t_running = True
        # inter_frame_interval = 1 / self._cap.get(cv2.CAP_PROP_FPS)  # Get the source's framerate (FPS = 1/X)
        inter_frame_interval = (
            1 / 30
        )  # Some sources miss report framerate so we use a conservative number
        while self._t_running:
            with self._t_lock:
                ret = self._cap.grab()  # non-blocking call
                if not ret:
                    raise Exception(f"Connection to camera broken ({self.stream_url})")
            time.sleep(inter_frame_interval)  # Limit acquisition speed

    # retrieve latest frame
    def get_latest_frame(self) -> Optional["Frame"]:
        """Return the most up to date frame by dropping all by the latest frame. This function is blocking"""
        if self.capture_interval is not None:
            t = datetime.now()
            delta = (t - self._last_capture_time).total_seconds()
            if delta <= self.capture_interval:
                time.sleep(self.capture_interval - delta)
        with self._t_lock:
            ret, frame = self._cap.retrieve()  # non-blocking call
            if not ret:
                raise Exception(f"Connection to camera broken ({self.stream_url})")
        self._last_capture_time = datetime.now()
        if self.motion_detection_threshold > 0:
            if self._detect_motion(frame):
                return Frame.from_array(frame)
            else:
                return None  # Empty frame

        return Frame.from_array(frame)

    def _detect_motion(self, frame: np.ndarray) -> bool:  # TODO Needs test cases
        """ """
        # Prepare image; grayscale and blur
        prepared_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        prepared_frame = cv2.GaussianBlur(src=prepared_frame, ksize=(5, 5), sigmaX=0)

        if self.previous_frame is None:
            # Save the result for the next invocation
            self.previous_frame = prepared_frame
            return True  # First frame; there is no previous one yet

        # calculate difference and update previous frame TODO: don't assume the processed image is cached
        diff_frame = cv2.absdiff(src1=self.previous_frame, src2=prepared_frame)
        # Only take different areas that are different enough (>20 / 255)
        thresh_frame = cv2.threshold(
            src=diff_frame, thresh=20, maxval=255, type=cv2.THRESH_BINARY
        )[1]
        change_percentage = (
            100 * cv2.countNonZero(thresh_frame) / (frame.shape[0] * frame.shape[1])
        )
        # print(f"Image change {change_percentage:.2f}%")
        if change_percentage > self.motion_detection_threshold:
            self.previous_frame = prepared_frame
            return True
        return False

    # Make the class iterable. TODO Needs test cases
    def __iter__(self) -> Any:
        return self

    def __next__(self) -> "Frame":
        latest_frame = self.get_latest_frame()
        while latest_frame is None:
            # If motion detection is set, we may get empty frames.
            # We should not yield then, though: we should wait for the next frame
            # until we get a real Frame.
            latest_frame = self.get_latest_frame()
        return latest_frame

    class Config:
        arbitrary_types_allowed = True

__init__(stream_url, motion_detection_threshold=0, capture_interval=None, fps=None)

Parameters

stream_url : url to video source, or a number indicating the webcam index motion_detection_threshold : If set to zero then motion detections is disabled. Any other value (0-100) will make the camera drop all images that don't have significant changes capture_interval : Number of seconds to wait in between frames. If set to None, the NetworkedCamera will acquire images as fast as the source permits. fps: Capture speed in frames per second. If set to None, the NetworkedCamera will acquire images as fast as the source permits.

Source code in landingai/pipeline/image_source.py
def __init__(
    self,
    stream_url: Union[str, int],
    motion_detection_threshold: int = 0,
    capture_interval: Optional[float] = None,
    fps: Optional[int] = None,
) -> None:
    """
    Parameters
    ----------
    stream_url : url to video source, or a number indicating the webcam index
    motion_detection_threshold : If set to zero then motion detections is disabled. Any other value (0-100) will make the camera drop all images that don't have significant changes
    capture_interval : Number of seconds to wait in between frames. If set to None, the NetworkedCamera will acquire images as fast as the source permits.
    fps: Capture speed in frames per second. If set to None, the NetworkedCamera will acquire images as fast as the source permits.
    """
    if fps is not None and capture_interval is not None:
        raise ValueError(
            "The fps and capture_interval arguments cannot be set at the same time"
        )
    elif fps is not None:
        capture_interval = 1 / fps

    if capture_interval is not None and capture_interval < 1 / 30:
        raise ValueError(
            "The resulting fps cannot be more than 30 frames per second"
        )
    cap = cv2.VideoCapture(stream_url)
    if not cap.isOpened():
        cap.release()
        raise Exception(f"Could not open stream ({stream_url})")
    cap.set(
        cv2.CAP_PROP_BUFFERSIZE, 2
    )  # Limit buffering to 2 frames in order to avoid backlog (i.e. lag)

    super().__init__(
        stream_url=stream_url,
        motion_detection_threshold=motion_detection_threshold,
        capture_interval=capture_interval,
    )
    self._last_capture_time = datetime.now()
    self._cap = cap
    self._t_lock = threading.Lock()
    self._t = threading.Thread(target=self._reader)
    self._t.daemon = True
    self._t_running = False
    self._t.start()

get_latest_frame()

Return the most up to date frame by dropping all by the latest frame. This function is blocking

Source code in landingai/pipeline/image_source.py
def get_latest_frame(self) -> Optional["Frame"]:
    """Return the most up to date frame by dropping all by the latest frame. This function is blocking"""
    if self.capture_interval is not None:
        t = datetime.now()
        delta = (t - self._last_capture_time).total_seconds()
        if delta <= self.capture_interval:
            time.sleep(self.capture_interval - delta)
    with self._t_lock:
        ret, frame = self._cap.retrieve()  # non-blocking call
        if not ret:
            raise Exception(f"Connection to camera broken ({self.stream_url})")
    self._last_capture_time = datetime.now()
    if self.motion_detection_threshold > 0:
        if self._detect_motion(frame):
            return Frame.from_array(frame)
        else:
            return None  # Empty frame

    return Frame.from_array(frame)

Screenshot

Bases: ImageSourceBase

Take a screenshot from the screen as an image source.

The screenshot will be taken at each iteration when looping over this object. For example:

for frameset in Screenshot():
    # `frameset` will contain a single frame with the screenshot here
    time.sleep(1)

Source code in landingai/pipeline/image_source.py
class Screenshot(ImageSourceBase):
    """
    Take a screenshot from the screen as an image source.

    The screenshot will be taken at each iteration when looping over this object. For example:
    ```
    for frameset in Screenshot():
        # `frameset` will contain a single frame with the screenshot here
        time.sleep(1)
    ```
    """

    def __iter__(self) -> Iterator:
        return self

    def __next__(self) -> Frame:
        frame = np.asarray(ImageGrab.grab())
        return Frame.from_array(frame, is_bgr=False)

VideoFile

Bases: ImageSourceBase

The VideoFile class is an image source that samples frames from a video file.

Example:

    import landingai.pipeline as pl

    img_src = pl.image_source.VideoFile("sample_images/surfers.mp4", samples_per_second=1)
    frs = pl.FrameSet()
    for i,frame in enumerate(img_src):
        if i>=3: # Fetch only 3 frames
            break
        frs.extend(
            frame.run_predict(predictor=surfer_model)
            .overlay_predictions()
        )
    print(pl.postprocessing.get_class_counts(frs))

Source code in landingai/pipeline/image_source.py
class VideoFile(ImageSourceBase):
    """
    The `VideoFile` class is an image source that samples frames from a video file.

    Example:
    ```python
        import landingai.pipeline as pl

        img_src = pl.image_source.VideoFile("sample_images/surfers.mp4", samples_per_second=1)
        frs = pl.FrameSet()
        for i,frame in enumerate(img_src):
            if i>=3: # Fetch only 3 frames
                break
            frs.extend(
                frame.run_predict(predictor=surfer_model)
                .overlay_predictions()
            )
        print(pl.postprocessing.get_class_counts(frs))
    ```
    """

    def __init__(self, uri: str, samples_per_second: float = 1) -> None:
        """Constructor for VideoFile.

        Parameters
        ----------
        uri : str
            URI to the video file. This could be a local file or a URL that serves the video file in bytes.
        samples_per_second : float, optional
            The number of images to sample per second (by default 1). If set to zero, it disables sampling
        """
        self._video_file = str(fetch_from_uri(uri))
        self._local_cache_dir = Path(tempfile.mkdtemp())
        self._samples_per_second = samples_per_second
        cap = cv2.VideoCapture(self._video_file)
        self._src_fps = cap.get(cv2.CAP_PROP_FPS)
        if self._src_fps == 0:
            _LOGGER.warning(
                f"Could not read FPS from video file ({self._video_file}). Set src FPS to {_DEFAULT_FPS}"
            )
            self._src_fps = _DEFAULT_FPS
        self._src_total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()
        # Compute video properties (i.e. the target number of samples and FPS based on the user provided `samples_per_second`)
        if self._samples_per_second != 0:
            # Adjust the video frames and fps based on sampling rate
            self._target_total_frames = int(
                self._src_total_frames * self._samples_per_second / self._src_fps
            )
            self._target_fps = self._samples_per_second
        else:
            self._target_total_frames = self._src_total_frames
            self._target_fps = self._src_fps

    def properties(self) -> Tuple[float, int, float, int]:
        """Return properties of the of the source file and the resulting FrameSet

        Returns
        -------
        Tuple[float, int, float, int]
            Properties:
            0. Source file FPS (frames per second)
            1. Source file total number of frames
            2. Resulting FPS after applying sampling rate
            3. Number of frames after applying sampling rate
        """
        return (
            self._src_fps,
            self._src_total_frames,
            self._target_fps,
            self._target_total_frames,
        )

    def __iter__(self) -> IteratorType[Frame]:
        for img_path in sample_images_from_video(
            self._video_file, self._local_cache_dir, self._samples_per_second
        ):
            yield Frame.from_image(img_path)

    def __del__(self) -> None:
        if os.path.exists(self._local_cache_dir):
            # TODO: deprecate the __del__ method in future versions.
            warnings.warn(
                "VideoFile object should be closed explicitly with close(), or using 'with' statement."
            )
            self.close()

    def close(self) -> None:
        shutil.rmtree(self._local_cache_dir)

__init__(uri, samples_per_second=1)

Constructor for VideoFile.

Parameters

uri : str URI to the video file. This could be a local file or a URL that serves the video file in bytes. samples_per_second : float, optional The number of images to sample per second (by default 1). If set to zero, it disables sampling

Source code in landingai/pipeline/image_source.py
def __init__(self, uri: str, samples_per_second: float = 1) -> None:
    """Constructor for VideoFile.

    Parameters
    ----------
    uri : str
        URI to the video file. This could be a local file or a URL that serves the video file in bytes.
    samples_per_second : float, optional
        The number of images to sample per second (by default 1). If set to zero, it disables sampling
    """
    self._video_file = str(fetch_from_uri(uri))
    self._local_cache_dir = Path(tempfile.mkdtemp())
    self._samples_per_second = samples_per_second
    cap = cv2.VideoCapture(self._video_file)
    self._src_fps = cap.get(cv2.CAP_PROP_FPS)
    if self._src_fps == 0:
        _LOGGER.warning(
            f"Could not read FPS from video file ({self._video_file}). Set src FPS to {_DEFAULT_FPS}"
        )
        self._src_fps = _DEFAULT_FPS
    self._src_total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()
    # Compute video properties (i.e. the target number of samples and FPS based on the user provided `samples_per_second`)
    if self._samples_per_second != 0:
        # Adjust the video frames and fps based on sampling rate
        self._target_total_frames = int(
            self._src_total_frames * self._samples_per_second / self._src_fps
        )
        self._target_fps = self._samples_per_second
    else:
        self._target_total_frames = self._src_total_frames
        self._target_fps = self._src_fps

properties()

Return properties of the of the source file and the resulting FrameSet

Returns

Tuple[float, int, float, int] Properties: 0. Source file FPS (frames per second) 1. Source file total number of frames 2. Resulting FPS after applying sampling rate 3. Number of frames after applying sampling rate

Source code in landingai/pipeline/image_source.py
def properties(self) -> Tuple[float, int, float, int]:
    """Return properties of the of the source file and the resulting FrameSet

    Returns
    -------
    Tuple[float, int, float, int]
        Properties:
        0. Source file FPS (frames per second)
        1. Source file total number of frames
        2. Resulting FPS after applying sampling rate
        3. Number of frames after applying sampling rate
    """
    return (
        self._src_fps,
        self._src_total_frames,
        self._target_fps,
        self._target_total_frames,
    )

Webcam

Bases: NetworkedCamera

The Webcam class can connect to a local webcam in order to iterate over captured frames.

This leverages the NetworkedCamera implementations, with the constructor receiving the webcam ID to be sent to OpenCV (instead of a stream URL). Note that it doesn't work with Collab or remote Jupyter notebooks (yet). In this case, use landingai.image_source_ops.take_photo_from_webcam() instead.

Source code in landingai/pipeline/image_source.py
class Webcam(NetworkedCamera):
    """
    The Webcam class can connect to a local webcam in order to iterate over captured frames.

    This leverages the NetworkedCamera implementations, with the constructor
    receiving the webcam ID to be sent to OpenCV (instead of a stream URL).
    Note that it doesn't work with Collab or remote Jupyter notebooks (yet). In this case,
    use `landingai.image_source_ops.take_photo_from_webcam()` instead.
    """

    def __init__(
        self,
        webcam_source: int = 0,
        motion_detection_threshold: int = 0,
        capture_interval: Optional[float] = None,
        fps: Optional[int] = None,
    ) -> None:
        super().__init__(
            webcam_source,
            motion_detection_threshold=motion_detection_threshold,
            capture_interval=capture_interval,
            fps=fps,
        )

get_class_counts(frs, add_id_to_classname=False)

This method returns the number of occurrences of each detected class in the FrameSet.

Parameters

add_id_to_classname : bool, optional By default, detections with the same class names and different defect id will be counted as the same. Set to True if you want to count them separately

Returns

Dict[str, int] A dictionary with the counts

Example:
    {
        "cat": 10,
        "dog": 3
    }

Source code in landingai/pipeline/postprocessing.py
def get_class_counts(
    frs: FrameSet, add_id_to_classname: bool = False
) -> Dict[str, int]:
    """This method returns the number of occurrences of each detected class in the FrameSet.

    Parameters
    ----------
    add_id_to_classname : bool, optional
        By default, detections with the same class names and different defect
        id will be counted as the same. Set to True if you want to count them
        separately

    Returns
    -------
    Dict[str, int]
        A dictionary with the counts
        ```
        Example:
            {
                "cat": 10,
                "dog": 3
            }
        ```
    """
    counts = {}
    for frame in frs.frames:
        # Here is a sample return from class_counts: {1: (3, 'Heart'), 3: (3, 'Club'), 4: (3, 'Spade'), 2: (3, 'Diamond')}
        if frame.predictions._inner_type == "OcrPrediction":
            raise TypeError("Can't count classes for OcrPredictor")
        predictions = cast(Sequence[ClassificationPrediction], frame.predictions)
        for k, v in class_counts(predictions).items():
            if add_id_to_classname:  # This is useful if class names are not unique
                class_name = f"{v[1]}_{k}"
            else:
                class_name = v[1]
            if class_name not in counts:
                counts[class_name] = v[0]
            else:
                counts[class_name] += v[0]
    return counts