This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
defstream_slot(self,descriptor:SlotDescriptor,chunk_size:int=DEFAULT_CHUNK_SIZE)->Generator[bytes|memoryview,None,None]:"""Stream slot data in chunks."""offset=descriptor.offsetremaining=descriptor.sizewhileremaining>0:to_read=min(chunk_size,remaining)chunk=self.read_at(offset,to_read)yieldchunkoffset+=to_readremaining-=to_read
def__init__(self)->None:self.file:BinaryIO|None=Noneself.path:Path|None=Noneself._cache:dict[tuple[int,int],bytes]={}# Simple cache for frequently accessed regions
defopen(self,path:Path)->None:"""Open file with buffered I/O."""start_time=time.perf_counter()self.path=pathfile_size=path.stat().st_sizelogger.debug("๐ Opening buffered file backend",path=str(path),size_bytes=file_size,buffer_size=64*1024,)# Use buffered I/O for better performanceself.file=path.open("rb",buffering=64*1024)time.perf_counter()-start_time
defread_at(self,offset:int,size:int)->bytes:"""Read data at specific offset."""start_time=time.perf_counter()ifnotself.file:logger.error("โ Backend not opened")raiseRuntimeError("Backend not opened")# Check cache firstcache_key=(offset,size)ifcache_keyinself._cache:logger.debug("โก Cache hit",offset=offset,size=size)returnself._cache[cache_key]# Read from fileself.file.seek(offset)data=self.file.read(size)# Cache small readsifsize<=4096:# Cache small readsself._cache[cache_key]=data# Limit cache sizeiflen(self._cache)>100:# Remove oldest entries (simple FIFO)evicted=0for_inrange(20):self._cache.pop(next(iter(self._cache)))evicted+=1logger.debug("๐๏ธ Cache eviction",evicted=evicted,remaining=len(self._cache))elapsed=time.perf_counter()-start_timelogger.debug("๐ File read_at",offset=offset,size=size,elapsed_us=elapsed*1000000,cached=size<=4096,)returndata
defclose(self)->None:"""Close mappings and file."""# Release all memory views firstself._views.clear()ifself.header_mmap:withsuppress(BufferError):# If views still exist, just clear our referenceself.header_mmap.close()self.header_mmap=Noneifself.file:self.file.close()self.file=None
defopen(self,path:Path)->None:"""Open with partial memory mapping."""self.path=pathself.file=path.open("rb")# Get file sizefile_size=path.stat().st_size# Memory-map just the header regionmap_size=min(self.header_size,file_size)self.header_mmap=mmap.mmap(self.file.fileno(),map_size,access=mmap.ACCESS_READ)
defread_at(self,offset:int,size:int)->bytes|memoryview:"""Read using mmap for header, file I/O for rest."""ifnotself.file:raiseRuntimeError("Backend not opened")# Use mmap for header regionifself.header_mmapandoffset+size<=len(self.header_mmap):view=memoryview(self.header_mmap)[offset:offset+size]self._views.append(view)# Track for cleanupreturnview# Use file I/O for slot dataself.file.seek(offset)returnself.file.read(size)
defread_slot(self,descriptor:SlotDescriptor)->bytes|memoryview:"""Read slot using appropriate method."""returnself.read_at(descriptor.offset,descriptor.size)
defclose(self)->None:"""Close memory map and file."""logger.debug("๐ Closing mmap backend",path=str(self.path)ifself.pathelseNone,tracked_views=len(self._views),)# Release all memory views firstself._views.clear()ifself.mmap:withsuppress(BufferError):# BufferError expected if external code holds memoryview references# The mmap will be cleaned up by Python's GC when all references are releasedself.mmap.close()self.mmap=Noneifself.file:self.file.close()self.file=None
defprefetch(self,offset:int,size:int)->None:"""Hint to OS to prefetch pages."""logger.debug("๐ฅ Prefetching pages",offset=offset,size=size,pages=size//DEFAULT_PAGE_SIZE,)ifhasattr(os,"posix_fadvise")andhasattr(os,"POSIX_FADV_WILLNEED")andself.file:# Linux: hint that we'll need this data soonos.posix_fadvise(self.file.fileno(),offset,size,os.POSIX_FADV_WILLNEED)# type: ignore[attr-defined]elifsys.platform=="win32"andself.mmap:# Windows: touch pages to load them# This is less efficient but worksview=memoryview(self.mmap)[offset:offset+1]_=view[0]# Touch first byte to trigger page loadelse:logger.debug("โ ๏ธ Prefetch not available on this platform")
defread_at(self,offset:int,size:int)->memoryview:"""Return a memory view without copying data."""start_time=time.perf_counter()ifnotself.mmap:logger.error("โ Backend not opened")raiseRuntimeError("Backend not opened")# Validate boundsifoffset<0:logger.error("โ Invalid offset",offset=offset)raiseValueError(f"Negative offset not allowed: {offset}")ifsize<0:logger.error("โ Invalid size",size=size)raiseValueError(f"Negative size not allowed: {size}")ifoffset+size>len(self.mmap):logger.error("โ Read beyond bounds",offset=offset,size=size,file_size=len(self.mmap),)raiseValueError(f"Read beyond file bounds: offset={offset}, size={size}, file_size={len(self.mmap)}")# Return a view into the mapped memory (zero-copy)view=memoryview(self.mmap)[offset:offset+size]self._views.append(view)# Track for cleanupelapsed=time.perf_counter()-start_timelogger.debug("๐ MMap read_at",offset=offset,size=size,elapsed_us=elapsed*1000000,zero_copy=True,)returnview
defview_at(self,offset:int,size:int)->memoryview:"""Get a zero-copy view of data at offset (same as read_at for mmap)."""returnself.read_at(offset,size)
defread_at(self,offset:int,size:int)->bytes:"""Read data at specific offset - limited to chunk size."""ifnotself.file:raiseRuntimeError("Backend not opened")# Limit read size for streamingread_size=min(size,self.chunk_size)self.file.seek(offset)returnself.file.read(read_size)
defread_slot(self,descriptor:SlotDescriptor)->bytes:"""Read only first chunk of slot for streaming."""# For streaming, we don't read the whole slot at oncereturnself.read_at(descriptor.offset,min(descriptor.size,self.chunk_size))
defstream_slot(self,descriptor:SlotDescriptor,chunk_size:int|None=None)->Generator[bytes|memoryview,None,None]:"""Stream slot data in chunks."""chunk_size=chunk_sizeorself.chunk_sizereturnsuper().stream_slot(descriptor,chunk_size)
defcreate_backend(mode:int=ACCESS_AUTO,path:Path|None=None)->Backend:"""Factory function to create the appropriate backend."""ifmode==ACCESS_AUTO:# Auto-select based on file size and platformifpathandpath.exists():file_size=path.stat().st_size# Use mmap for files over 1MBiffile_size>1024*1024:mode=ACCESS_MMAPlogger.debug("๐ค Auto-selected mmap backend",file_size_mb=file_size/1024/1024,)# Use streaming for very large files on limited memoryeliffile_size>100*1024*1024andsys.platform=="win32":mode=ACCESS_STREAMlogger.debug("๐ค Auto-selected stream backend",file_size_mb=file_size/1024/1024,platform=sys.platform,)else:mode=ACCESS_FILElogger.debug("๐ค Auto-selected file backend",file_size_kb=file_size/1024)else:mode=ACCESS_FILElogger.debug("๐ค Default to file backend",path_exists=False)# Create the appropriate backendifmode==ACCESS_MMAP:returnMMapBackend()elifmode==ACCESS_STREAM:returnStreamBackend()elifmode==ACCESS_FILE:returnFileBackend()else:# Default to hybrid for unknown modesreturnHybridBackend()