Source code for d3d.dataset.zip

"""
This moduled provide patched version of builtin Zipfile class as in https://github.com/ThomasPinna/python_zipfile_improvement
You can have better speed when read several files from a zip file containing a large number of files
Modifications are marked with '===== PATCH ====='
"""
import io
import struct
from zipfile import *
from zipfile import (_CD_COMMENT_LENGTH, _CD_EXTRA_FIELD_LENGTH,
                     _CD_FILENAME_LENGTH, _CD_LOCAL_HEADER_OFFSET,
                     _CD_SIGNATURE, _ECD_COMMENT, _ECD_LOCATION, _ECD_OFFSET,
                     _ECD_SIGNATURE, _ECD_SIZE, _EndRecData, sizeCentralDir,
                     sizeEndCentDir64, sizeEndCentDir64Locator,
                     stringCentralDir, stringEndArchive64, structCentralDir, MAX_EXTRACT_VERSION)
from typing import Union, List

__all__ = ["PatchedZipFile"]

[docs]class PatchedZipFile(ZipFile): ''' This class is based on build-in ZipFile class, which is further patched for better reading speed. The improvement is achieved by skip reading metadata of files not interested in. :param to_extract: specify the path (inside zip) of files to be extracted ''' def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=True, to_extract: Union[List[str], str] = []): if not isinstance(to_extract, (list, tuple, set)): to_extract = [to_extract] self.to_extract = set(str(p) for p in to_extract) super().__init__(file=file, mode=mode, compression=compression, allowZip64=allowZip64) # patched implementation to reduce open time def _RealGetContents(self): """Read in the table of contents for the ZIP file.""" fp = self.fp try: endrec = _EndRecData(fp) except OSError: raise BadZipFile("File is not a zip file") if not endrec: raise BadZipFile("File is not a zip file") if self.debug > 1: print(endrec) size_cd = endrec[_ECD_SIZE] # bytes in central directory offset_cd = endrec[_ECD_OFFSET] # offset of central directory self._comment = endrec[_ECD_COMMENT] # archive comment # "concat" is zero, unless zip was concatenated to another file concat = endrec[_ECD_LOCATION] - size_cd - offset_cd if endrec[_ECD_SIGNATURE] == stringEndArchive64: # If Zip64 extension structures are present, account for them concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator) if self.debug > 2: inferred = concat + offset_cd print("given, inferred, offset", offset_cd, inferred, concat) # self.start_dir: Position of start of central directory self.start_dir = offset_cd + concat fp.seek(self.start_dir, 0) data = fp.read(size_cd) fp = io.BytesIO(data) total = 0 while total < size_cd: centdir = fp.read(sizeCentralDir) if len(centdir) != sizeCentralDir: raise BadZipFile("Truncated central directory. This error " "might indicate that some files specified " "in `to_extract` are not found!") centdir = struct.unpack(structCentralDir, centdir) if centdir[_CD_SIGNATURE] != stringCentralDir: raise BadZipFile("Bad magic number for central directory") if self.debug > 2: print(centdir) filename = fp.read(centdir[_CD_FILENAME_LENGTH]) flags = centdir[5] if flags & 0x800: # UTF-8 file names extension filename = filename.decode('utf-8') else: # Historical ZIP filename encoding filename = filename.decode('cp437') # ===== PATCH ===== if filename not in self.to_extract: fp.seek(centdir[_CD_EXTRA_FIELD_LENGTH] + centdir[_CD_COMMENT_LENGTH], 1) continue # ================= # Create ZipInfo instance to store file information x = ZipInfo(filename) x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET] (x.create_version, x.create_system, x.extract_version, x.reserved, x.flag_bits, x.compress_type, t, d, x.CRC, x.compress_size, x.file_size) = centdir[1:12] if x.extract_version > MAX_EXTRACT_VERSION: raise NotImplementedError("zip file version %.1f" % (x.extract_version / 10)) x.volume, x.internal_attr, x.external_attr = centdir[15:18] # Convert date/time code to (year, month, day, hour, min, sec) x._raw_time = t x.date_time = ( (d>>9)+1980, (d>>5)&0xF, d&0x1F, t>>11, (t>>5)&0x3F, (t&0x1F) * 2 ) x._decodeExtra() x.header_offset = x.header_offset + concat self.filelist.append(x) self.NameToInfo[x.filename] = x # update total bytes read from central directory total = (total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH] + centdir[_CD_EXTRA_FIELD_LENGTH] + centdir[_CD_COMMENT_LENGTH]) if self.debug > 2: print("total", total) # ===== PATCH ===== self.to_extract.remove(filename) if not len(self.to_extract): break
# =================