#!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. import logging import os import time from concurrent.futures._base import Future from concurrent.futures.thread import ThreadPoolExecutor from threading import Event from typing import Dict, List, TextIO __all__ = ["tail_logfile", "TailLog"] log = logging.getLogger(__name__) def tail_logfile( header: str, file: str, dst: TextIO, finished: Event, interval_sec: float ): while not os.path.exists(file): if finished.is_set(): return time.sleep(interval_sec) with open(file, "r") as fp: while True: line = fp.readline() if line: dst.write(f"{header}{line}") else: # reached EOF if finished.is_set(): # log line producer is finished break else: # log line producer is still going # wait for a bit before looping again time.sleep(interval_sec) class TailLog: """ Tails the given log files. The log files do not have to exist when the ``start()`` method is called. The tail-er will gracefully wait until the log files are created by the producer and will tail the contents of the log files until the ``stop()`` method is called. .. warning:: ``TailLog`` will wait indefinitely for the log file to be created! Each log file's line will be suffixed with a header of the form: ``[{name}{idx}]:``, where the ``name`` is user-provided and ``idx`` is the index of the log file in the ``log_files`` mapping. Usage: :: log_files = {0: "/tmp/0_stdout.log", 1: "/tmp/1_stdout.log"} tailer = TailLog("trainer", log_files, sys.stdout).start() # actually run the trainers to produce 0_stdout.log and 1_stdout.log run_trainers() tailer.stop() # once run_trainers() start writing the ##_stdout.log files # the tailer will print to sys.stdout: # >>> [trainer0]:log_line1 # >>> [trainer1]:log_line1 # >>> [trainer0]:log_line2 # >>> [trainer0]:log_line3 # >>> [trainer1]:log_line2 .. note:: Due to buffering log lines between files may not necessarily be printed out in order. You should configure your application's logger to suffix each log line with a proper timestamp. """ def __init__( self, name: str, log_files: Dict[int, str], dst: TextIO, interval_sec: float = 0.1, ): n = len(log_files) self._threadpool = None if n > 0: self._threadpool = ThreadPoolExecutor( max_workers=n, thread_name_prefix=f"{self.__class__.__qualname__}_{name}", ) self._name = name self._dst = dst self._log_files = log_files self._finished_events: Dict[int, Event] = { local_rank: Event() for local_rank in log_files.keys() } self._futs: List[Future] = [] self._interval_sec = interval_sec self._stopped = False def start(self) -> "TailLog": if not self._threadpool: return self for local_rank, file in self._log_files.items(): self._futs.append( self._threadpool.submit( tail_logfile, header=f"[{self._name}{local_rank}]:", file=file, dst=self._dst, finished=self._finished_events[local_rank], interval_sec=self._interval_sec, ) ) return self def stop(self) -> None: for finished in self._finished_events.values(): finished.set() for local_rank, f in enumerate(self._futs): try: f.result() except Exception as e: log.error( f"error in log tailor for {self._name}{local_rank}." f" {e.__class__.__qualname__}: {e}", ) if self._threadpool: self._threadpool.shutdown(wait=True) self._stopped = True def stopped(self) -> bool: return self._stopped