Spaces:
Running
on
L4
Running
on
L4
import struct | |
import numpy as np | |
import png | |
import re | |
import sys | |
import csv | |
from PIL import Image | |
import h5py | |
FLO_TAG_FLOAT = ( | |
202021.25 # first 4 bytes in flo file; check for this when READING the file | |
) | |
FLO_TAG_STRING = "PIEH" # first 4 bytes in flo file; use this when WRITING the file | |
FLO_UNKNOWN_FLOW_THRESH = 1e9 # flo format threshold for unknown values | |
FLO_UNKNOWN_FLOW = 1e10 # value to use to represent unknown flow in flo file format | |
def readFlowFile(filepath): | |
"""read flow files in several formats. The resulting flow has shape height x width x 2. | |
For positions where there is no groundtruth available, the flow is set to np.nan. | |
Supports flo (Sintel), png (KITTI), npy (numpy), pfm (FlyingThings3D) and flo5 (Spring) file format. | |
filepath: path to the flow file | |
returns: flow with shape height x width x 2 | |
""" | |
if filepath.endswith(".flo"): | |
return readFloFlow(filepath) | |
elif filepath.endswith(".png"): | |
return readPngFlow(filepath) | |
elif filepath.endswith(".npy"): | |
return readNpyFlow(filepath) | |
elif filepath.endswith(".pfm"): | |
return readPfmFlow(filepath) | |
elif filepath.endswith(".flo5"): | |
return readFlo5Flow(filepath) | |
else: | |
raise ValueError(f"readFlowFile: Unknown file format for {filepath}") | |
def writeFlowFile(flow, filepath): | |
"""write optical flow to file. Supports flo (Sintel), png (KITTI) and npy (numpy) file format. | |
flow: optical flow with shape height x width x 2. Invalid values should be represented as np.nan | |
filepath: file path where to write the flow | |
""" | |
if not filepath: | |
raise ValueError("writeFlowFile: empty filepath") | |
if len(flow.shape) != 3 or flow.shape[2] != 2: | |
raise IOError( | |
f"writeFlowFile {filepath}: expected shape height x width x 2 but received {flow.shape}" | |
) | |
if flow.shape[0] > flow.shape[1]: | |
print( | |
f"write flo file {filepath}: Warning: Are you writing an upright image? Expected shape height x width x 2, got {flow.shape}" | |
) | |
if filepath.endswith(".flo"): | |
return writeFloFlow(flow, filepath) | |
elif filepath.endswith(".png"): | |
return writePngFlow(flow, filepath) | |
elif filepath.endswith(".npy"): | |
return writeNpyFile(flow, filepath) | |
elif filepath.endswith(".flo5"): | |
return writeFlo5File(flow, filepath) | |
else: | |
raise ValueError(f"writeFlowFile: Unknown file format for {filepath}") | |
def readFloFlow(filepath): | |
"""read optical flow from file stored in .flo file format as used in the Sintel dataset (Butler et al., 2012) | |
filepath: path to file where to read from | |
returns: flow as a numpy array with shape height x width x 2 | |
--- | |
".flo" file format used for optical flow evaluation | |
Stores 2-band float image for horizontal (u) and vertical (v) flow components. | |
Floats are stored in little-endian order. | |
A flow value is considered "unknown" if either |u| or |v| is greater than 1e9. | |
bytes contents | |
0-3 tag: "PIEH" in ASCII, which in little endian happens to be the float 202021.25 | |
(just a sanity check that floats are represented correctly) | |
4-7 width as an integer | |
8-11 height as an integer | |
12-end data (width*height*2*4 bytes total) | |
the float values for u and v, interleaved, in row order, i.e., | |
u[row0,col0], v[row0,col0], u[row0,col1], v[row0,col1], ... | |
""" | |
if filepath is None: | |
raise IOError("read flo file: empty filename") | |
if not filepath.endswith(".flo"): | |
raise IOError(f"read flo file ({filepath}): extension .flo expected") | |
with open(filepath, "rb") as stream: | |
tag = struct.unpack("f", stream.read(4))[0] | |
width = struct.unpack("i", stream.read(4))[0] | |
height = struct.unpack("i", stream.read(4))[0] | |
if tag != FLO_TAG_FLOAT: # simple test for correct endian-ness | |
raise IOError( | |
f"read flo file({filepath}): wrong tag (possibly due to big-endian machine?)" | |
) | |
# another sanity check to see that integers were read correctly (99999 should do the trick...) | |
if width < 1 or width > 99999: | |
raise IOError(f"read flo file({filepath}): illegal width {width}") | |
if height < 1 or height > 99999: | |
raise IOError(f"read flo file({filepath}): illegal height {height}") | |
nBands = 2 | |
flow = [] | |
n = nBands * width | |
for _ in range(height): | |
data = stream.read(n * 4) | |
if data is None: | |
raise IOError(f"read flo file({filepath}): file is too short") | |
data = np.asarray(struct.unpack(f"{n}f", data)) | |
data = data.reshape((width, nBands)) | |
flow.append(data) | |
if stream.read(1) != b"": | |
raise IOError(f"read flo file({filepath}): file is too long") | |
flow = np.asarray(flow) | |
# unknown values are set to nan | |
flow[np.abs(flow) > FLO_UNKNOWN_FLOW_THRESH] = np.nan | |
return flow | |
def writeFloFlow(flow, filepath): | |
""" | |
write optical flow in .flo format to file as used in the Sintel dataset (Butler et al., 2012) | |
flow: optical flow with shape height x width x 2 | |
filepath: optical flow file path to be saved | |
--- | |
".flo" file format used for optical flow evaluation | |
Stores 2-band float image for horizontal (u) and vertical (v) flow components. | |
Floats are stored in little-endian order. | |
A flow value is considered "unknown" if either |u| or |v| is greater than 1e9. | |
bytes contents | |
0-3 tag: "PIEH" in ASCII, which in little endian happens to be the float 202021.25 | |
(just a sanity check that floats are represented correctly) | |
4-7 width as an integer | |
8-11 height as an integer | |
12-end data (width*height*2*4 bytes total) | |
the float values for u and v, interleaved, in row order, i.e., | |
u[row0,col0], v[row0,col0], u[row0,col1], v[row0,col1], ... | |
""" | |
height, width, nBands = flow.shape | |
with open(filepath, "wb") as f: | |
if f is None: | |
raise IOError(f"write flo file {filepath}: file could not be opened") | |
# write header | |
result = f.write(FLO_TAG_STRING.encode("ascii")) | |
result += f.write(struct.pack("i", width)) | |
result += f.write(struct.pack("i", height)) | |
if result != 12: | |
raise IOError(f"write flo file {filepath}: problem writing header") | |
# write content | |
n = nBands * width | |
for i in range(height): | |
data = flow[i, :, :].flatten() | |
data[np.isnan(data)] = FLO_UNKNOWN_FLOW | |
result = f.write(struct.pack(f"{n}f", *data)) | |
if result != n * 4: | |
raise IOError(f"write flo file {filepath}: problem writing row {i}") | |
def readPngFlow(filepath): | |
"""read optical flow from file stored in png file format as used in the KITTI 12 (Geiger et al., 2012) and KITTI 15 (Menze et al., 2015) dataset. | |
filepath: path to file where to read from | |
returns: flow as a numpy array with shape height x width x 2. Invalid values are represented as np.nan | |
""" | |
# adapted from https://github.com/liruoteng/OpticalFlowToolkit | |
flow_object = png.Reader(filename=filepath) | |
flow_direct = flow_object.asDirect() | |
flow_data = list(flow_direct[2]) | |
(w, h) = flow_direct[3]["size"] | |
flow = np.zeros((h, w, 3), dtype=np.float64) | |
for i in range(len(flow_data)): | |
flow[i, :, 0] = flow_data[i][0::3] | |
flow[i, :, 1] = flow_data[i][1::3] | |
flow[i, :, 2] = flow_data[i][2::3] | |
invalid_idx = flow[:, :, 2] == 0 | |
flow[:, :, 0:2] = (flow[:, :, 0:2] - 2**15) / 64.0 | |
flow[invalid_idx, 0] = np.nan | |
flow[invalid_idx, 1] = np.nan | |
return flow[:, :, :2] | |
def writePngFlow(flow, filename): | |
"""write optical flow to file png file format as used in the KITTI 12 (Geiger et al., 2012) and KITTI 15 (Menze et al., 2015) dataset. | |
flow: optical flow in shape height x width x 2, invalid values should be represented as np.nan | |
filepath: path to file where to write to | |
""" | |
flow = 64.0 * flow + 2**15 | |
width = flow.shape[1] | |
height = flow.shape[0] | |
valid_map = np.ones([flow.shape[0], flow.shape[1], 1]) | |
valid_map[np.isnan(flow[:, :, 0]) | np.isnan(flow[:, :, 1])] = 0 | |
flow = np.nan_to_num(flow) | |
flow = np.concatenate([flow, valid_map], axis=-1) | |
flow = np.clip(flow, 0, 2**16 - 1) | |
flow = flow.astype(np.uint16) | |
flow = np.reshape(flow, (-1, width * 3)) | |
with open(filename, "wb") as f: | |
writer = png.Writer(width=width, height=height, bitdepth=16, greyscale=False) | |
writer.write(f, flow) | |
def readNpyFlow(filepath): | |
"""read numpy array from file. | |
filepath: file to read from | |
returns: numpy array | |
""" | |
return np.load(filepath) | |
def writeNpyFile(arr, filepath): | |
"""write numpy array to file. | |
arr: numpy array to write | |
filepath: file to write to | |
""" | |
np.save(filepath, arr) | |
def writeFlo5File(flow, filename): | |
with h5py.File(filename, "w") as f: | |
f.create_dataset("flow", data=flow, compression="gzip", compression_opts=5) | |
def readFlo5Flow(filename): | |
with h5py.File(filename, "r") as f: | |
if "flow" not in f.keys(): | |
raise IOError( | |
f"File {filename} does not have a 'flow' key. Is this a valid flo5 file?" | |
) | |
return f["flow"][()] | |
def readPfmFlow(filepath): | |
"""read optical flow from file stored in pfm file format as used in the FlyingThings3D (Mayer et al., 2016) dataset. | |
filepath: path to file where to read from | |
returns: flow as a numpy array with shape height x width x 2. | |
""" | |
flow = readPfmFile(filepath) | |
if len(flow.shape) != 3: | |
raise IOError( | |
f"read pfm flow: PFM file has wrong shape (assumed to be w x h x 3): {flow.shape}" | |
) | |
if flow.shape[2] != 3: | |
raise IOError( | |
f"read pfm flow: PFM file has wrong shape (assumed to be w x h x 3): {flow.shape}" | |
) | |
# remove third channel -> is all zeros | |
return flow[:, :, :2] | |
def readPfmFile(filepath): | |
""" | |
adapted from https://lmb.informatik.uni-freiburg.de/resources/datasets/SceneFlowDatasets.en.html | |
""" | |
file = open(filepath, "rb") | |
color = None | |
width = None | |
height = None | |
scale = None | |
endian = None | |
header = file.readline().rstrip() | |
if header.decode("ascii") == "PF": | |
color = True | |
elif header.decode("ascii") == "Pf": | |
color = False | |
else: | |
raise Exception("Not a PFM file.") | |
dim_match = re.match(r"^(\d+)\s(\d+)\s$", file.readline().decode("ascii")) | |
if dim_match: | |
width, height = list(map(int, dim_match.groups())) | |
else: | |
raise Exception("Malformed PFM header.") | |
scale = float(file.readline().decode("ascii").rstrip()) | |
if scale < 0: # little-endian | |
endian = "<" | |
scale = -scale | |
else: | |
endian = ">" # big-endian | |
data = np.fromfile(file, endian + "f") | |
shape = (height, width, 3) if color else (height, width) | |
data = np.reshape(data, shape) | |
data = np.flipud(data) | |
return data # , scale | |
def writePfmFile(image, filepath): | |
""" | |
adapted from https://lmb.informatik.uni-freiburg.de/resources/datasets/SceneFlowDatasets.en.html | |
""" | |
scale = 1 | |
file = open(filepath, "wb") | |
color = None | |
if image.dtype.name != "float32": | |
raise Exception("Image dtype must be float32.") | |
image = np.flipud(image) | |
if len(image.shape) == 3 and image.shape[2] == 3: # color image | |
color = True | |
elif ( | |
len(image.shape) == 2 or len(image.shape) == 3 and image.shape[2] == 1 | |
): # greyscale | |
color = False | |
else: | |
raise Exception("Image must have H x W x 3, H x W x 1 or H x W dimensions.") | |
file.write("PF\n" if color else "Pf\n".encode()) | |
file.write("%d %d\n".encode() % (image.shape[1], image.shape[0])) | |
endian = image.dtype.byteorder | |
if endian == "<" or endian == "=" and sys.byteorder == "little": | |
scale = -scale | |
file.write("%f\n".encode() % scale) | |
image.tofile(file) | |
def readDispFile(filepath): | |
"""read disparity (or disparity change) from file. The resulting numpy array has shape height x width. | |
For positions where there is no groundtruth available, the value is set to np.nan. | |
Supports png (KITTI), npy (numpy) and pfm (FlyingThings3D) file format. | |
filepath: path to the flow file | |
returns: disparity with shape height x width | |
""" | |
if filepath.endswith(".png"): | |
return readPngDisp(filepath) | |
elif filepath.endswith(".npy"): | |
return readNpyFlow(filepath) | |
elif filepath.endswith(".pfm"): | |
return readPfmDisp(filepath) | |
elif filepath.endswith(".dsp5"): | |
return readDsp5Disp(filepath) | |
else: | |
raise ValueError(f"readDispFile: Unknown file format for {filepath}") | |
def readPngDisp(filepath): | |
"""read disparity from file stored in png file format as used in the KITTI 12 (Geiger et al., 2012) and KITTI 15 (Menze et al., 2015) dataset. | |
filepath: path to file where to read from | |
returns: disparity as a numpy array with shape height x width. Invalid values are represented as np.nan | |
""" | |
# adapted from https://github.com/liruoteng/OpticalFlowToolkit | |
image_object = png.Reader(filename=filepath) | |
image_direct = image_object.asDirect() | |
image_data = list(image_direct[2]) | |
(w, h) = image_direct[3]["size"] | |
channel = len(image_data[0]) // w | |
if channel != 1: | |
raise IOError("read png disp: assumed channels to be 1!") | |
disp = np.zeros((h, w), dtype=np.float64) | |
for i in range(len(image_data)): | |
disp[i, :] = image_data[i][:] | |
disp[disp == 0] = np.nan | |
return disp[:, :] / 256.0 | |
def readPfmDisp(filepath): | |
"""read disparity or disparity change from file stored in pfm file format as used in the FlyingThings3D (Mayer et al., 2016) dataset. | |
filepath: path to file where to read from | |
returns: disparity as a numpy array with shape height x width. Invalid values are represented as np.nan | |
""" | |
disp = readPfmFile(filepath) | |
if len(disp.shape) != 2: | |
raise IOError( | |
f"read pfm disp: PFM file has wrong shape (assumed to be w x h): {disp.shape}" | |
) | |
return disp | |
def writePngDisp(disp, filepath): | |
"""write disparity to png file format as used in the KITTI 12 (Geiger et al., 2012) and KITTI 15 (Menze et al., 2015) dataset. | |
disp: disparity in shape height x width, invalid values should be represented as np.nan | |
filepath: path to file where to write to | |
""" | |
disp = 256 * disp | |
width = disp.shape[1] | |
height = disp.shape[0] | |
disp = np.clip(disp, 0, 2**16 - 1) | |
disp = np.nan_to_num(disp).astype(np.uint16) | |
disp = np.reshape(disp, (-1, width)) | |
with open(filepath, "wb") as f: | |
writer = png.Writer(width=width, height=height, bitdepth=16, greyscale=True) | |
writer.write(f, disp) | |
def writeDsp5File(disp, filename): | |
with h5py.File(filename, "w") as f: | |
f.create_dataset("disparity", data=disp, compression="gzip", compression_opts=5) | |
def readDsp5Disp(filename): | |
with h5py.File(filename, "r") as f: | |
if "disparity" not in f.keys(): | |
raise IOError( | |
f"File {filename} does not have a 'disparity' key. Is this a valid dsp5 file?" | |
) | |
return f["disparity"][()] | |
def writeDispFile(disp, filepath): | |
"""write disparity to file. Supports png (KITTI) and npy (numpy) file format. | |
disp: disparity with shape height x width. Invalid values should be represented as np.nan | |
filepath: file path where to write the flow | |
""" | |
if not filepath: | |
raise ValueError("writeDispFile: empty filepath") | |
if len(disp.shape) != 2: | |
raise IOError( | |
f"writeDispFile {filepath}: expected shape height x width but received {disp.shape}" | |
) | |
if disp.shape[0] > disp.shape[1]: | |
print( | |
f"writeDispFile {filepath}: Warning: Are you writing an upright image? Expected shape height x width, got {disp.shape}" | |
) | |
if filepath.endswith(".png"): | |
writePngDisp(disp, filepath) | |
elif filepath.endswith(".npy"): | |
writeNpyFile(disp, filepath) | |
elif filepath.endswith(".dsp5"): | |
writeDsp5File(disp, filepath) | |
def readKITTIObjMap(filepath): | |
assert filepath.endswith(".png") | |
return np.asarray(Image.open(filepath)) > 0 | |
def readKITTIIntrinsics(filepath, image=2): | |
assert filepath.endswith(".txt") | |
with open(filepath) as f: | |
reader = csv.reader(f, delimiter=" ") | |
for row in reader: | |
if row[0] == f"K_{image:02d}:": | |
K = np.array(row[1:], dtype=np.float32).reshape(3, 3) | |
kvec = np.array([K[0, 0], K[1, 1], K[0, 2], K[1, 2]]) | |
return kvec | |
def writePngMapFile(map_, filename): | |
Image.fromarray(map_).save(filename) | |