12. Watch-Folder Automation

Chapter 12 of 18 · 20 min

Manual document processing breaks down when new files arrive continuously. Watch-folder automation monitors directories and triggers processing automatically when changes occur. This chapter covers implementing reliable file watchers.

Watch-Folder Fundamentals

The filesystem notifies applications of changes through inotify (Linux), FSEvents (macOS), or ReadDirectoryChangesW (Windows). Python wrappers abstract these platform differences.

Using watchdog Library

watchdog provides cross-platform file system monitoring:

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class DocumentHandler(FileSystemEventHandler):
    def __init__(self, processor):
        self.processor = processor
    
    def on_created(self, event):
        if event.is_directory:
            return
        if event.src_path.endswith(".pdf"):
            self.processor.queue(event.src_path)
    
    def on_modified(self, event):
        if event.is_directory:
            return
        if event.src_path.endswith(".pdf"):
            self.processor.queue(event.src_path)

The Race Condition Problem

New files appear incomplete during active writes. Processing a file while it's still being copied produces corrupted output.

Handle this by waiting for file stability:

import time
from pathlib import Path

class StableFileHandler(DocumentHandler):
    def __init__(self, *args, stability_seconds=2, **kwargs):
        super().__init__(*args, **kwargs)
        self.stability_seconds = stability_seconds
        self.pending = {}
    
    def on_created(self, event):
        if event.is_directory or not event.src_path.endswith(".pdf"):
            return
        self.pending[event.src_path] = time.time()
        self._check_stability(event.src_path)
    
    def _check_stability(self, filepath):
        def delayed_process():
            time.sleep(self.stability_seconds)
            current_size = Path(filepath).stat().st_size
            if filepath in self.pending:
                self.pending.pop(filepath)
                self.processor.queue(filepath)
        
        import threading
        thread = threading.Thread(target=delayed_process, daemon=True)
        thread.start()

Debouncing Rapid Changes

Some applications trigger multiple events during saves. Debouncing coalesces repeated events into a single trigger:

import threading

class DebouncedHandler(FileSystemEventHandler):
    def __init__(self, callback, debounce_seconds=1.0):
        self.callback = callback
        self.debounce_seconds = debounce_seconds
        self.timers = {}
        self.lock = threading.Lock()
    
    def _trigger(self, filepath):
        with self.lock:
            del self.timers[filepath]
        self.callback(filepath)
    
    def dispatch(self, event):
        if event.is_directory:
            return
        filepath = event.src_path
        with self.lock:
            if filepath in self.timers:
                self.timers[filepath].cancel()
            timer = threading.Timer(self.debounce_seconds, self._trigger, args=[filepath])
            self.timers[filepath] = timer
            timer.start()

Starting the Observer

Wire everything together:

def start_watcher(directory, processor):
    event_handler = DebouncedHandler(processor.queue, debounce_seconds=2.0)
    observer = Observer()
    observer.schedule(event_handler, directory, recursive=True)
    observer.start()
    print(f"Watching {directory} for changes...")
    return observer

Graceful Shutdown

Handle SIGTERM to stop cleanly:

import signal

def main():
    observer = start_watcher("/documents/incoming", processor)
    
    def shutdown(signum, frame):
        print("Shutting down watcher...")
        observer.stop()
        observer.join()
        sys.exit(0)
    
    signal.signal(signal.SIGTERM, shutdown)
    observer.join()
EXERCISE

Create a watch-folder script that monitors an input directory, moves processed files to an output directory with timestamp-based subfolders, logs all events to a file, and continues running indefinitely.