feat(test): 在 FileWatcher 中添加 force_polling 参数以增强文件监控功能

This commit is contained in:
DrSmoothl
2026-03-09 14:42:03 +08:00
parent 5b7945ac7b
commit 426cbc6190
2 changed files with 4 additions and 2 deletions

View File

@@ -123,7 +123,7 @@ async def test_add_callback_while_watcher_running(tmp_path: Path):
dirs.mkdir(exist_ok=True)
file = (dirs / "a.toml").resolve()
file.touch()
watcher = FileWatcher(paths=[dirs], debounce_ms=200)
watcher = FileWatcher(paths=[dirs], debounce_ms=200, force_polling=True)
calls = 0

View File

@@ -55,9 +55,11 @@ class FileWatcher:
callback_timeout_s: float = 10.0,
callback_failure_threshold: int = 3,
callback_cooldown_s: float = 30.0,
force_polling: bool = False,
) -> None:
self._paths = [path.resolve() for path in paths]
self._debounce_ms = debounce_ms
self._force_polling = force_polling
self._callback_timeout_s = callback_timeout_s
self._callback_failure_threshold = callback_failure_threshold
self._callback_cooldown_s = callback_cooldown_s
@@ -136,7 +138,7 @@ class FileWatcher:
async def _run(self) -> None:
while self._running:
try:
async for changes in awatch(*self._paths, debounce=self._debounce_ms):
async for changes in awatch(*self._paths, debounce=self._debounce_ms, force_polling=self._force_polling):
if not self._running:
break
normalized_changes = self._normalize_changes(changes)