| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | import array
|
| | import codecs
|
| | import datetime
|
| | import struct
|
| | from collections import OrderedDict
|
| |
|
| | import numpy as np
|
| |
|
| |
|
| | class VolFile:
|
| | def __init__(self, filename):
|
| | """
|
| | Parses Heyex Spectralis *.vol files.
|
| |
|
| | Args:
|
| | filename (str): Path to vol file
|
| |
|
| | Returns:
|
| | volFile class
|
| |
|
| | """
|
| | self.__parse_volfile(filename)
|
| |
|
| | @property
|
| | def oct(self):
|
| | """
|
| | Retrieve OCT volume as a 3D numpy array.
|
| |
|
| | Returns:
|
| | 3D numpy array with OCT intensities as 'uint8' array
|
| |
|
| | """
|
| | return self.wholefile["cScan"]
|
| |
|
| | @property
|
| | def irslo(self):
|
| | """
|
| | Retrieve IR SLO image as 2D numpy array
|
| |
|
| | Returns:
|
| | 2D numpy array with IR reflectance SLO image as 'uint8' array.
|
| |
|
| | """
|
| | return self.wholefile["sloImage"]
|
| |
|
| | @property
|
| | def grid(self):
|
| | """
|
| | Retrieve the IR SLO pixel coordinates for the B scan OCT slices
|
| |
|
| | Returns:
|
| | 2D numpy array with the number of b scan images in the first dimension
|
| | and x_0, y_0, x_1, y_1 defining the line of the B scan on the pixel
|
| | coordinates of the IR SLO image.
|
| |
|
| | """
|
| | wf = self.wholefile
|
| | grid = []
|
| | for bi in range(len(wf["slice-headers"])):
|
| | bscan_head = wf["slice-headers"][bi]
|
| | x_0 = int(bscan_head["startX"] / wf["header"]["scaleXSlo"])
|
| | x_1 = int(bscan_head["endX"] / wf["header"]["scaleXSlo"])
|
| | y_0 = int(bscan_head["startY"] / wf["header"]["scaleYSlo"])
|
| | y_1 = int(bscan_head["endY"] / wf["header"]["scaleYSlo"])
|
| | grid.append([x_0, y_0, x_1, y_1])
|
| | return grid
|
| |
|
| | def render_ir_slo(self, filename, render_grid=False):
|
| | """
|
| | Renders IR SLO image as a PNG file and optionally overlays grid of B scans
|
| |
|
| | Args:
|
| | filename (str): filename to save IR SLO image
|
| | renderGrid (bool): True will render red lines for the location of the B scans.
|
| |
|
| | Returns:
|
| | None
|
| |
|
| | """
|
| | from PIL import Image, ImageDraw
|
| |
|
| | wf = self.wholefile
|
| | a = np.copy(wf["sloImage"])
|
| | if render_grid:
|
| | a = np.stack((a,) * 3, axis=-1)
|
| | a = Image.fromarray(a)
|
| | draw = ImageDraw.Draw(a)
|
| | grid = self.grid
|
| | for x_0, y_0, x_1, y_1 in grid:
|
| | draw.line((x_0, y_0, x_1, y_1), fill=(255, 0, 0), width=3)
|
| | a.save(filename)
|
| | else:
|
| | Image.fromarray(a).save(filename)
|
| |
|
| | def render_oct_scans(self, filepre="oct", render_seg=False):
|
| | """
|
| | Renders OCT images a PNG file and optionally overlays segmentation lines
|
| | Also creates a CSV file of vol file features.
|
| |
|
| | Args:
|
| | filepre (str): filename prefix. OCT Images will be named as "<prefix>_001.png"
|
| | renderSeg (bool): True will render colored lines for the segmentation of the RPE, ILM, and NFL on the B scans.
|
| |
|
| | Returns:
|
| | None
|
| |
|
| | """
|
| | from PIL import Image
|
| |
|
| | wf = self.wholefile
|
| | for i in range(wf["cScan"].shape[0]):
|
| | a = np.copy(wf["cScan"][i])
|
| | if render_seg:
|
| | a = np.stack((a,) * 3, axis=-1)
|
| | for li in range(wf["segmentations"].shape[0]):
|
| | for x in range(wf["segmentations"].shape[2]):
|
| | a[int(wf["segmentations"][li, i, x]), x, li] = 255
|
| |
|
| | Image.fromarray(a).save("%s_%03d.png" % (filepre, i))
|
| |
|
| | def __parse_volfile(self, fn, parse_seg=False):
|
| | print(fn)
|
| | wholefile = OrderedDict()
|
| | decode_hex = codecs.getdecoder("hex_codec")
|
| | with open(fn, "rb") as fin:
|
| | header = OrderedDict()
|
| | header["version"] = fin.read(12)
|
| | header["octSizeX"] = struct.unpack("I", fin.read(4))[0]
|
| | header["numBscan"] = struct.unpack("I", fin.read(4))[0]
|
| | header["octSizeZ"] = struct.unpack("I", fin.read(4))[0]
|
| | header["scaleX"] = struct.unpack("d", fin.read(8))[0]
|
| | header["distance"] = struct.unpack("d", fin.read(8))[0]
|
| | header["scaleZ"] = struct.unpack("d", fin.read(8))[0]
|
| | header["sizeXSlo"] = struct.unpack("I", fin.read(4))[0]
|
| | header["sizeYSlo"] = struct.unpack("I", fin.read(4))[0]
|
| | header["scaleXSlo"] = struct.unpack("d", fin.read(8))[0]
|
| | header["scaleYSlo"] = struct.unpack("d", fin.read(8))[0]
|
| | header["fieldSizeSlo"] = struct.unpack("I", fin.read(4))[0]
|
| | header["scanFocus"] = struct.unpack("d", fin.read(8))[0]
|
| | header["scanPos"] = fin.read(4)
|
| | header["examTime"] = struct.unpack("=q", fin.read(8))[0] / 1e7
|
| | header["examTime"] = datetime.datetime.utcfromtimestamp(
|
| | header["examTime"] - (369 * 365.25 + 4) * 24 * 60 * 60
|
| | )
|
| | header["scanPattern"] = struct.unpack("I", fin.read(4))[0]
|
| | header["BscanHdrSize"] = struct.unpack("I", fin.read(4))[0]
|
| | header["ID"] = fin.read(16)
|
| | header["ReferenceID"] = fin.read(16)
|
| | header["PID"] = struct.unpack("I", fin.read(4))[0]
|
| | header["PatientID"] = fin.read(21)
|
| | header["unknown2"] = fin.read(3)
|
| | header["DOB"] = struct.unpack("d", fin.read(8))[0] - 25569
|
| | header["DOB"] = datetime.datetime.utcfromtimestamp(0) + datetime.timedelta(
|
| | seconds=header["DOB"] * 24 * 60 * 60
|
| | )
|
| | header["VID"] = struct.unpack("I", fin.read(4))[0]
|
| | header["VisitID"] = fin.read(24)
|
| | header["VisitDate"] = struct.unpack("d", fin.read(8))[0] - 25569
|
| | header["VisitDate"] = datetime.datetime.utcfromtimestamp(0) + datetime.timedelta(
|
| | seconds=header["VisitDate"] * 24 * 60 * 60
|
| | )
|
| | header["GridType"] = struct.unpack("I", fin.read(4))[0]
|
| | header["GridOffset"] = struct.unpack("I", fin.read(4))[0]
|
| |
|
| | wholefile["header"] = header
|
| | fin.seek(2048)
|
| | u = array.array("B")
|
| | u.frombytes(fin.read(header["sizeXSlo"] * header["sizeYSlo"]))
|
| | u = np.array(u).astype("uint8").reshape((header["sizeXSlo"], header["sizeYSlo"]))
|
| | wholefile["sloImage"] = u
|
| |
|
| | slo_offset = 2048 + header["sizeXSlo"] * header["sizeYSlo"]
|
| | oct_offset = header["BscanHdrSize"] + header["octSizeX"] * header["octSizeZ"] * 4
|
| | bscans = []
|
| | bscanheaders = []
|
| | bscanqualities = []
|
| | if parse_seg:
|
| | segmentations = None
|
| | for i in range(header["numBscan"]):
|
| | fin.seek(16 + slo_offset + i * oct_offset)
|
| | bscan_head = OrderedDict()
|
| | bscan_head["startX"] = struct.unpack("d", fin.read(8))[0]
|
| | bscan_head["startY"] = struct.unpack("d", fin.read(8))[0]
|
| | bscan_head["endX"] = struct.unpack("d", fin.read(8))[0]
|
| | bscan_head["endY"] = struct.unpack("d", fin.read(8))[0]
|
| | bscan_head["numSeg"] = struct.unpack("I", fin.read(4))[0]
|
| | bscan_head["offSeg"] = struct.unpack("I", fin.read(4))[0]
|
| | bscan_head["quality"] = struct.unpack("f", fin.read(4))[0]
|
| | bscan_head["shift"] = struct.unpack("I", fin.read(4))[0]
|
| | bscanheaders.append(bscan_head)
|
| | bscanqualities.append(bscan_head["quality"])
|
| |
|
| |
|
| | fin.seek(header["BscanHdrSize"] + slo_offset + i * oct_offset)
|
| | u = array.array("f")
|
| | u.frombytes(fin.read(4 * header["octSizeX"] * header["octSizeZ"]))
|
| | u = np.array(u).reshape((header["octSizeZ"], header["octSizeX"]))
|
| |
|
| | v = struct.unpack("f", decode_hex("FFFF7F7F")[0])
|
| | u[u == v] = 0
|
| |
|
| | u = np.log(10000 * u + 1)
|
| | u = (255.0 * (np.clip(u, 0, np.max(u)) / np.max(u))).astype("uint8")
|
| | bscans.append(u)
|
| | if parse_seg:
|
| |
|
| | fin.seek(256 + slo_offset + i * oct_offset)
|
| | u = array.array("f")
|
| | u.frombytes(fin.read(4 * header["octSizeX"] * bscan_head["numSeg"]))
|
| | u = np.array(u)
|
| | print(u.shape)
|
| | u[u == v] = 0.0
|
| | if segmentations is None:
|
| | segmentations = []
|
| | for _ in range(bscan_head["numSeg"]):
|
| | segmentations.append([])
|
| |
|
| | for j in range(bscan_head["numSeg"]):
|
| | segmentations[j].append(u[j * header["octSizeX"] : (j + 1) * header["octSizeX"]].tolist())
|
| | wholefile["cScan"] = np.array(bscans)
|
| | if parse_seg:
|
| | wholefile["segmentations"] = np.array(segmentations)
|
| | wholefile["slice-headers"] = bscanheaders
|
| | wholefile["average-quality"] = np.mean(bscanqualities)
|
| | self.wholefile = wholefile
|
| | import csv
|
| | from pathlib import Path, PurePath
|
| |
|
| | vol_features = [
|
| | PurePath(fn).name,
|
| | wholefile["header"]["version"].decode("utf-8").rstrip("\x00"),
|
| | wholefile["header"]["numBscan"],
|
| | wholefile["header"]["octSizeX"],
|
| | wholefile["header"]["octSizeZ"],
|
| | wholefile["header"]["distance"],
|
| | wholefile["header"]["scaleX"],
|
| | wholefile["header"]["scaleZ"],
|
| | wholefile["header"]["sizeXSlo"],
|
| | wholefile["header"]["sizeYSlo"],
|
| | wholefile["header"]["scaleXSlo"],
|
| | wholefile["header"]["scaleYSlo"],
|
| | wholefile["header"]["fieldSizeSlo"],
|
| | wholefile["header"]["scanFocus"],
|
| | wholefile["header"]["scanPos"].decode("utf-8").rstrip("\x00"),
|
| | wholefile["header"]["examTime"],
|
| | wholefile["header"]["scanPattern"],
|
| | wholefile["header"]["BscanHdrSize"],
|
| | wholefile["header"]["ID"].decode("utf-8").rstrip("\x00"),
|
| | wholefile["header"]["ReferenceID"].decode("utf-8").rstrip("\x00"),
|
| | wholefile["header"]["PID"],
|
| | wholefile["header"]["PatientID"].decode("utf-8").rstrip("\x00"),
|
| | wholefile["header"]["DOB"],
|
| | wholefile["header"]["VID"],
|
| | wholefile["header"]["VisitID"].decode("utf-8").rstrip("\x00"),
|
| | wholefile["header"]["VisitDate"],
|
| | wholefile["header"]["GridType"],
|
| | wholefile["header"]["GridOffset"],
|
| | wholefile["average-quality"],
|
| | ]
|
| | output_dir = PurePath(fn).parent
|
| | output_csv = output_dir.joinpath("vols.csv")
|
| | if not Path(output_csv).exists():
|
| | print("Creating vols.csv as it does not exist.")
|
| | with open(output_csv, "w", newline="") as file:
|
| | writer = csv.writer(file)
|
| | writer.writerow(
|
| | [
|
| | "filename",
|
| | "version",
|
| | "numBscan",
|
| | "octSizeX",
|
| | "octSizeZ",
|
| | "distance",
|
| | "scaleX",
|
| | "scaleZ",
|
| | "sizeXSlo",
|
| | "sizeYSlo",
|
| | "scaleXSlo",
|
| | "scaleYSlo",
|
| | "fieldSizeSlo",
|
| | "scanFocus",
|
| | "scanPos",
|
| | "examTime",
|
| | "scanPattern",
|
| | "BscanHdrSize",
|
| | "ID",
|
| | "ReferenceID",
|
| | "PID",
|
| | "PatientID",
|
| | "DOB",
|
| | "VID",
|
| | "VisitID",
|
| | "VisitDate",
|
| | "GridType",
|
| | "GridOffset",
|
| | "Average Quality",
|
| | ]
|
| | )
|
| | with open(output_csv, "r", newline="") as file:
|
| | existing_vols = csv.reader(file)
|
| | for vol in existing_vols:
|
| | if vol[0] == PurePath(fn).name:
|
| | print("Skipping,", PurePath(fn).name, "already present in vols.csv.")
|
| | return
|
| | with open(output_csv, "a", newline="") as file:
|
| | print("Adding", PurePath(fn).name, "to vols.csv.")
|
| | writer = csv.writer(file)
|
| | writer.writerow(vol_features)
|
| |
|
| | @property
|
| | def file_header(self):
|
| | """
|
| | Retrieve vol header fields
|
| |
|
| | Returns:
|
| | Dictionary with the following keys
|
| | - version: version number of vol file definition
|
| | - numBscan: number of B scan images in the volume
|
| | - octSizeX: number of pixels in the width of the OCT B scan
|
| | - octSizeZ: number of pixels in the height of the OCT B scan
|
| | - distance: unknown
|
| | - scaleX: resolution scaling factor of the width of the OCT B scan
|
| | - scaleZ: resolution scaling factor of the height of the OCT B scan
|
| | - sizeXSlo: number of pixels in the width of the IR SLO image
|
| | - sizeYSlo: number of pixels in the height of the IR SLO image
|
| | - scaleXSlo: resolution scaling factor of the width of the IR SLO image
|
| | - scaleYSlo: resolution scaling factor of the height of the IR SLO image
|
| | - fieldSizeSlo: field of view (FOV) of the retina in degrees
|
| | - scanFocus: unknown
|
| | - scanPos: Left or Right eye scanned
|
| | - examTime: Datetime of the scan (needs to be checked)
|
| | - scanPattern: unknown
|
| | - BscanHdrSize: size of B scan header in bytes
|
| | - ID: unknown
|
| | - ReferenceID
|
| | - PID: unknown
|
| | - PatientID: Patient ID string
|
| | - DOB: Date of birth
|
| | - VID: unknown
|
| | - VisitID: Visit ID string
|
| | - VisitDate: Datetime of visit (needs to be checked)
|
| | - GridType: unknown
|
| | - GridOffset: unknown
|
| |
|
| | """
|
| | return self.wholefile["header"]
|
| |
|
| | def bscan_header(self, slicei):
|
| | """
|
| | Retrieve the B Scan header information per slice.
|
| |
|
| | Args:
|
| | slicei (int): index of B scan
|
| |
|
| | Returns:
|
| | Dictionary with the following keys
|
| | - startX: x-coordinate for B scan on IR. (see getGrid)
|
| | - startY: y-coordinate for B scan on IR. (see getGrid)
|
| | - endX: x-coordinate for B scan on IR. (see getGrid)
|
| | - endY: y-coordinate for B scan on IR. (see getGrid)
|
| | - numSeg: 2 or 3 segmentation lines for the B scan
|
| | - quality: OCT signal quality
|
| | - shift: unknown
|
| |
|
| | """
|
| | return self.wholefile["slice-headers"][slicei]
|
| |
|
| | def save_grid(self, outfn):
|
| | """
|
| | Saves the grid coordinates mapping OCT Bscans to the IR SLO image to a text file. The text file
|
| | will be a tab-delimited file with 5 columns: The bscan number, x_0, y_0, x_1, y_1 in pixel space
|
| | scaled to the resolution of the IR SLO image.
|
| |
|
| | Args:
|
| | outfn (str): location of where to output the file
|
| |
|
| | Returns:
|
| | None
|
| |
|
| | """
|
| | grid = self.grid
|
| | with open(outfn, "w") as fout:
|
| | fout.write("bscan\tx_0\ty_0\tx_1\ty_1\n")
|
| | ri = 0
|
| | for r in grid:
|
| | r = [ri] + r
|
| | fout.write("%s\n" % "\t".join(map(str, r)))
|
| | ri += 1
|
| |
|