sync worked

This commit is contained in:
2026-02-25 15:20:26 +03:00
parent 6d068b7cea
commit 82331d3454
6 changed files with 319 additions and 61 deletions

View File

@@ -75,10 +75,9 @@ class TaskManager:
# @POST: Logs are batch-written to database every LOG_FLUSH_INTERVAL seconds.
def _flusher_loop(self):
"""Background thread that flushes log buffer to database."""
with belief_scope("_flusher_loop"):
while not self._flusher_stop_event.is_set():
self._flush_logs()
self._flusher_stop_event.wait(self.LOG_FLUSH_INTERVAL)
while not self._flusher_stop_event.is_set():
self._flush_logs()
self._flusher_stop_event.wait(self.LOG_FLUSH_INTERVAL)
# [/DEF:_flusher_loop:Function]
# [DEF:_flush_logs:Function]
@@ -87,24 +86,24 @@ class TaskManager:
# @POST: All buffered logs are written to task_logs table.
def _flush_logs(self):
"""Flush all buffered logs to the database."""
with belief_scope("_flush_logs"):
with self._log_buffer_lock:
task_ids = list(self._log_buffer.keys())
for task_id in task_ids:
with self._log_buffer_lock:
task_ids = list(self._log_buffer.keys())
logs = self._log_buffer.pop(task_id, [])
for task_id in task_ids:
with self._log_buffer_lock:
logs = self._log_buffer.pop(task_id, [])
if logs:
try:
self.log_persistence_service.add_logs(task_id, logs)
except Exception as e:
logger.error(f"Failed to flush logs for task {task_id}: {e}")
# Re-add logs to buffer on failure
with self._log_buffer_lock:
if task_id not in self._log_buffer:
self._log_buffer[task_id] = []
self._log_buffer[task_id].extend(logs)
if logs:
try:
self.log_persistence_service.add_logs(task_id, logs)
logger.debug(f"Flushed {len(logs)} logs for task {task_id}")
except Exception as e:
logger.error(f"Failed to flush logs for task {task_id}: {e}")
# Re-add logs to buffer on failure
with self._log_buffer_lock:
if task_id not in self._log_buffer:
self._log_buffer[task_id] = []
self._log_buffer[task_id].extend(logs)
# [/DEF:_flush_logs:Function]
# [DEF:_flush_task_logs:Function]