This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
defdetect_atomic_save(events:list[FileEvent])->FileOperation|None:"""Detect if events represent an atomic save operation."""fromprovide.foundation.file.operations.detectors.orchestratorimportOperationDetectordetector=OperationDetector()operations=detector.detect(events)returnnext((opforopinoperationsifop.operation_type==OperationType.ATOMIC_SAVE),None)
defextract_original_path(temp_path:Path)->Path|None:"""Extract the original filename from a temp file path."""fromprovide.foundation.file.operations.detectors.helpersimportextract_base_namebase_name=extract_base_name(temp_path)ifbase_name:returntemp_path.parent/base_nameelse:# If no temp pattern matches, return the original pathreturntemp_path
defgroup_related_events(events:list[FileEvent],time_window_ms:int=500)->list[list[FileEvent]]:"""Group events that occur within a time window."""fromprovide.foundation.file.operations.detectors.orchestratorimportOperationDetectorconfig=DetectorConfig(time_window_ms=time_window_ms)detector=OperationDetector(config)returndetector._group_events_by_time(sorted(events,key=lambdae:e.timestamp))
defis_temp_file(path:Path)->bool:"""Check if a path represents a temporary file."""fromprovide.foundation.file.operations.detectors.helpersimportis_temp_fileashelper_is_temp_filereturnhelper_is_temp_file(path)