import cv2
import numpy as np
from videoslicer.utils import *
[docs]class VideoSlicer(object):
'''VideoSlicer class
Reads dimensions from a video file and allows for slicing and
iterating over the video data in arbitrary axis. Any slicing or
iteration action will result in a VideoSlice object targeted at a
specific part of the video data.
Slicing or iterating over the time dimension is both optimized for
speed and memory usage. Slicing and iterating over spatial
dimensions can be either optimized for speed (run over video once,
collecting all data), or for memory usage (run over video multiple
times, each time collecting the required amount of data).
Examples
--------
>>> slicer = VideoSlicer('movie.avi')
>>> view = slicer[:10,::10,::10]
>>> for frame in view:
frame.save('frame{:06d}.jpg'.format(frame.index))
>>> slicer = VideoSlicer('movie.avi')
>>> frame = slicer[10,...]
>>> frame.save('frame.jpg')
>>> frame.plot()
>>> slicer = VideoSlicer('movie.avi', axis=2) # loop over horizontal dimension
>>> for frame in slicer[:,:,::10]:
frame.T.save('timestack{:06d}.jpg'.format(frame.index)) # transpose to have time on the horizontal axis
See Also
--------
VideoView
'''
[docs] def __init__(self, filename, axis=0, optimize='memory'):
'''Initialization
Parameters
----------
filename : str
Path to video file
axis : int, optional
Iteration axis (default: 0)
optimize : str, optional
Optimization method (speed or memory; default: memory)
'''
self.filename = filename
self.axis = axis
self.optimize = optimize
self.buffer = cv2.VideoCapture(filename)
if not self.buffer.isOpened():
raise IOError('Cannot open video file: {}'.format(filename))
self.fps = self.buffer.get(cv2.CAP_PROP_FPS)
self.nt = int(self.buffer.get(cv2.CAP_PROP_FRAME_COUNT))
self.ny = int(self.buffer.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.nx = int(self.buffer.get(cv2.CAP_PROP_FRAME_WIDTH))
self.nd = 3
[docs] def __repr__(self):
return '{}(frames={:d}, shape={:d}x{:d}px, depth={:d}, fps={:0.0f}, axis={:d})'.format(
self.__class__.__name__, *self.shape, self.fps, self.axis
)
[docs] def __str__(self):
return self.__repr__()
def __enter__(self):
pass
def __exit__(self):
self.close()
def __iter__(self):
for frame in self.get_view():
yield frame
[docs] def __getitem__(self, s):
'''Returns a VideoView object given the provided slicing
See `get_view` for details.
'''
s = preprocess_getitem_args(s, self.shape)
if (len(s) < 3 or len(s) > 4):
raise IndexError('Invalid dimensions for video data ({:d}), '
'should be 3 or 4.'.format(len(s)))
return self.get_view(s)
[docs] def close(self):
'''Release video file handler'''
if self.buffer.isOpened():
self.buffer.release()
[docs] def get_view(self, slices=()):
'''Returns a VideoView object given the provided slicing
Parameters
----------
slices : tuple, optional
Tuple with slice objects and/or indices that define the
required data view
Returns
-------
VideoView
VideoView object that adheres to the provided slices
See Also
--------
VideoView
'''
return VideoView(self.buffer, self.shape, slices,
fps=self.fps, axis=self.axis, optimize=self.optimize)
@property
def time(self):
'''Returns the time axis in seconds for the open video file'''
return self.get_view().time
@property
def shape(self):
'''Returns the shape of the open video file'''
return (
self.nt,
self.ny,
self.nx,
self.nd
)
[docs]class VideoView(object):
'''VideoView class
Provides a generator over a specific part of a video file. The
generator iterates over a given axis and returns individual frames
in the remaining dimensions. By default, the generator iterates
over time and returns standard video frames. Alternatively, the
generator can iterate over space or depth, returning timestacks
from the video file.
If the slicing is chosen such that there is only one frame in the
iteration axis, the VideoView falls back to being a VideoFrame.
Slicing or iterating over the time dimension is both optimized for
speed and memory usage. Slicing and iterating over spatial
dimensions can be either optimized for speed (run over video once,
collecting all data), or for memory usage (run over video multiple
times, each time collecting the required amount of data).
See Also
--------
VideoSlicer
VideoFrame
'''
[docs] def __new__(cls, buffer, shape, slices, fps=None, axis=0, optimize='memory'):
'''Constructor
Returns a VideoView object, unless the view consists of a
single frame in the iteration axis, then it falls back to
being a VideoFrame object.
Parameters
----------
buffer : int
Open video file buffer
shape : tuple
Shape of video file
slices : tuple
Tuple of slices definign the view
fps : int, optional
Frame rate of video
axis : int, optional
Iteration axis (default: 0)
optimize : str, optional
Optimization method (speed or memory; default: memory)
'''
if len(slices) > len(shape):
raise ValueError('Number of slices should not exceed number of dimensions.')
obj = super(VideoView, cls).__new__(cls)
obj.buffer = buffer
obj._shape = shape
obj.axis = axis
obj.optimize = optimize
obj.fps = fps
obj.slices = [slices[i] if len(slices)>i else None
for i in range(len(shape))]
# if the slice in the iteration axis only contains a single
# frame, fall back to being a VideoFrame object
primary_slice = obj.iterable_slices()[axis]
if len(primary_slice) == 1:
return VideoFrame(list(obj)[0], index=primary_slice[0])
else:
return obj
[docs] def __repr__(self):
return '{}(frames={:d}, shape={:d}x{:d}px, depth={:d})'.format(self.__class__.__name__, *self.shape)
[docs] def __str__(self):
return self.__repr__()
def __iter__(self):
for frame in self.generator():
yield frame
[docs] def generator(self, slices=None, axis=None):
'''Generator method that provides individual VideoFrame objects along a given axis
Parameters
----------
slices : tuple, optional
Tuple with slices to overwrite the object's default slicing
axis : int, optional
Iteration axis to overwrite the object's default iteration
axis
See Also
--------
VideoFrame
'''
if slices is None:
slices = self.slices
if axis is None:
axis = self.axis
indices = self.iterable_slices(slices)
if axis == 0:
# iteration over time
for i in indices[0]:
self.buffer.set(cv2.CAP_PROP_POS_FRAMES, i)
rc, frame = self.buffer.read()
if rc:
# sequentially apply slicing
frame = frame[indices[1],:,:]
frame = frame[:,indices[2],:]
frame = frame[:,:,indices[3]]
yield VideoFrame(frame, index=i)
else:
yield VideoFrame(self._empty(*indices[1:]), index=i)
elif axis > 0:
# iteration in space
if self.optimize == 'speed':
# speed optimized: first collect all data by iterating
# over time, and then construct the generator
frames = np.asarray(list(self.generator(axis=0)))
indices2 = [slice(None)] * 4
for i,n in enumerate(indices[axis]):
indices2[axis] = i
yield VideoFrame(frames[indices2], index=n)
elif self.optimize == 'memory':
# memory optimized: iterate over time, collecting only
# the data required for the current video frame
for i in indices[axis]:
slices[axis] = i
yield VideoFrame(list(self.generator(slices=slices, axis=0)), index=i)
else:
raise ValueError('Optimization not supported: {}'.format(self.optimize))
[docs] def iterable_slices(self, slices=None):
'''Converts arbitrary slices into iterable lists, given the video dimensions
Parameters
----------
slices : tuple, optional
Tuple of slices to overwrite the object's default slicing
Returns
-------
list
List with lists with indices corresponding to the provided
slices and shape of the video file
'''
if slices is None:
slices = self.slices
return [self._iterable_slice(s1, s2)
for s1, s2 in zip(self._shape, slices)]
@property
def time(self):
'''Returns the time axis in seconds for the video view'''
return np.asarray(self.iterable_slices()[0]) / self.fps
@property
def shape(self):
'''Returns the shape of the video view'''
return tuple([len(x)
for x in self.iterable_slices()
if not isinstance(x, int)])
@staticmethod
def _empty(*args):
'''Creates and empty frame'''
return np.nan + np.zeros([len(a) for a in args])
@staticmethod
def _iterable_slice(n, s):
'''Returns an iterable from a slice and a length
Parameters
----------
n : int
Length
s : slice
Slice
Returns
-------
slice
'''
i = np.asarray(list(range(n)))
if s is not None:
i = i[s]
try:
return list(i)
except TypeError:
return [int(i)]
[docs]class VideoFrame(np.ndarray):
'''VideoFrame class
A `numpy.ndarray` with video frame specific methods: `T`
(transpose), `resize`, `plot` and `save`. The dimensions of the
array need to be valid image dimensions (i.e. 2 or 3) and the
depth dimension need to be 1, 3 or 4 depending on the number of
color and alpha channels.
With respect to the original `numpy.ndarray` an additional
attribute `index` contains the index of the current frame in the
original video file.
See Also
--------
VideoView
'''
[docs] def __new__(cls, arr, index=None, *args, **kwargs):
'''Constructor
Parameters
----------
arr : iterable
Image data
index : int, optional
Index of video frame in original video file
'''
arr = np.asarray(arr).squeeze()
if arr.ndim < 2 or arr.ndim > 3:
raise TypeError('Invalid dimensions for image data ({:d}), '
'should have 2 or 3 non-unity dimensions.'.format(arr.ndim))
if arr.ndim == 3 and arr.shape[-1] not in [1, 3, 4]:
raise TypeError('Invalid last dimension for image data ({:d}), '
'should be 1, 3 or 4.'.format(arr.shape[-1]))
obj = arr.view(cls, *args, **kwargs)
obj.index = index
return obj
[docs] def set_index(self, index):
'''Set index of video frame in original video file'''
self.index = index
[docs] def resize(self, shape):
'''Resize video frame to given dimensions
Parameters
----------
shape : tuple
Target image shape
'''
arr = cv2.resize(self, shape[::-1])
obj = self.__class__(arr)
obj.__array_finalize__(self)
return obj
[docs] @classmethod
def read(cls, filename, **kwargs):
'''Initialize VideoFrame object from image file
Parameters
----------
filename : str
Path to image file
Returns
-------
VideoFrame
VideoFrame object with image data
'''
return cls(cv2.imread(filename), **kwargs)
[docs] def write(self, filename):
'''Write VideoFrame to image file
Parameters
----------
filename : str
Path to image file
'''
cv2.imwrite(filename, self)
[docs] def plot(self, ax=None):
'''Plot image
Parameters
----------
ax : matplotlib.pyplot.Axes, optional
Axes to plot onto
Returns
-------
ax : matplotlib.pyplot.Axes
Axes containing plot
'''
import matplotlib.pyplot as plt
if ax is None: fig, ax = plt.subplots()
ax.imshow(cv2.cvtColor(self, cv2.COLOR_BGR2RGB))
return ax
@property
def T(self):
'''Returns the transposed image'''
return np.swapaxes(self, 0, 1)
@property
def shape(self):
'''Returns the shape of the image (always 3 dimensions)'''
shp = super(VideoFrame, self).shape
if len(shp) < 3:
return shp + (1,)
else:
return shp
[docs] def __array_finalize__(self, obj):
if obj is None: return
self.index = getattr(obj, 'index', None)