12. Watch-Folder Automation
Manual document processing breaks down when new files arrive continuously. Watch-folder automation monitors directories and triggers processing automatically when changes occur. This chapter covers implementing reliable file watchers.
Watch-Folder Fundamentals
The filesystem notifies applications of changes through inotify (Linux), FSEvents (macOS), or ReadDirectoryChangesW (Windows). Python wrappers abstract these platform differences.
Using watchdog Library
watchdog provides cross-platform file system monitoring:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class DocumentHandler(FileSystemEventHandler):
def __init__(self, processor):
self.processor = processor
def on_created(self, event):
if event.is_directory:
return
if event.src_path.endswith(".pdf"):
self.processor.queue(event.src_path)
def on_modified(self, event):
if event.is_directory:
return
if event.src_path.endswith(".pdf"):
self.processor.queue(event.src_path)
The Race Condition Problem
New files appear incomplete during active writes. Processing a file while it's still being copied produces corrupted output.
Handle this by waiting for file stability:
import time
from pathlib import Path
class StableFileHandler(DocumentHandler):
def __init__(self, *args, stability_seconds=2, **kwargs):
super().__init__(*args, **kwargs)
self.stability_seconds = stability_seconds
self.pending = {}
def on_created(self, event):
if event.is_directory or not event.src_path.endswith(".pdf"):
return
self.pending[event.src_path] = time.time()
self._check_stability(event.src_path)
def _check_stability(self, filepath):
def delayed_process():
time.sleep(self.stability_seconds)
current_size = Path(filepath).stat().st_size
if filepath in self.pending:
self.pending.pop(filepath)
self.processor.queue(filepath)
import threading
thread = threading.Thread(target=delayed_process, daemon=True)
thread.start()
Debouncing Rapid Changes
Some applications trigger multiple events during saves. Debouncing coalesces repeated events into a single trigger:
import threading
class DebouncedHandler(FileSystemEventHandler):
def __init__(self, callback, debounce_seconds=1.0):
self.callback = callback
self.debounce_seconds = debounce_seconds
self.timers = {}
self.lock = threading.Lock()
def _trigger(self, filepath):
with self.lock:
del self.timers[filepath]
self.callback(filepath)
def dispatch(self, event):
if event.is_directory:
return
filepath = event.src_path
with self.lock:
if filepath in self.timers:
self.timers[filepath].cancel()
timer = threading.Timer(self.debounce_seconds, self._trigger, args=[filepath])
self.timers[filepath] = timer
timer.start()
Starting the Observer
Wire everything together:
def start_watcher(directory, processor):
event_handler = DebouncedHandler(processor.queue, debounce_seconds=2.0)
observer = Observer()
observer.schedule(event_handler, directory, recursive=True)
observer.start()
print(f"Watching {directory} for changes...")
return observer
Graceful Shutdown
Handle SIGTERM to stop cleanly:
import signal
def main():
observer = start_watcher("/documents/incoming", processor)
def shutdown(signum, frame):
print("Shutting down watcher...")
observer.stop()
observer.join()
sys.exit(0)
signal.signal(signal.SIGTERM, shutdown)
observer.join()
Create a watch-folder script that monitors an input directory, moves processed files to an output directory with timestamp-based subfolders, logs all events to a file, and continues running indefinitely.