tail_log.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #!/usr/bin/env python3
  2. # Copyright (c) Facebook, Inc. and its affiliates.
  3. # All rights reserved.
  4. #
  5. # This source code is licensed under the BSD-style license found in the
  6. # LICENSE file in the root directory of this source tree.
  7. import logging
  8. import os
  9. import time
  10. from concurrent.futures._base import Future
  11. from concurrent.futures.thread import ThreadPoolExecutor
  12. from threading import Event
  13. from typing import Dict, List, TextIO
  14. __all__ = ["tail_logfile", "TailLog"]
  15. log = logging.getLogger(__name__)
  16. def tail_logfile(
  17. header: str, file: str, dst: TextIO, finished: Event, interval_sec: float
  18. ):
  19. while not os.path.exists(file):
  20. if finished.is_set():
  21. return
  22. time.sleep(interval_sec)
  23. with open(file, "r") as fp:
  24. while True:
  25. line = fp.readline()
  26. if line:
  27. dst.write(f"{header}{line}")
  28. else: # reached EOF
  29. if finished.is_set():
  30. # log line producer is finished
  31. break
  32. else:
  33. # log line producer is still going
  34. # wait for a bit before looping again
  35. time.sleep(interval_sec)
  36. class TailLog:
  37. """
  38. Tails the given log files. The log files do not have to exist when the
  39. ``start()`` method is called. The tail-er will gracefully wait until the
  40. log files are created by the producer and will tail the contents of the
  41. log files until the ``stop()`` method is called.
  42. .. warning:: ``TailLog`` will wait indefinitely for the log file to be created!
  43. Each log file's line will be suffixed with a header of the form: ``[{name}{idx}]:``,
  44. where the ``name`` is user-provided and ``idx`` is the index of the log file
  45. in the ``log_files`` mapping.
  46. Usage:
  47. ::
  48. log_files = {0: "/tmp/0_stdout.log", 1: "/tmp/1_stdout.log"}
  49. tailer = TailLog("trainer", log_files, sys.stdout).start()
  50. # actually run the trainers to produce 0_stdout.log and 1_stdout.log
  51. run_trainers()
  52. tailer.stop()
  53. # once run_trainers() start writing the ##_stdout.log files
  54. # the tailer will print to sys.stdout:
  55. # >>> [trainer0]:log_line1
  56. # >>> [trainer1]:log_line1
  57. # >>> [trainer0]:log_line2
  58. # >>> [trainer0]:log_line3
  59. # >>> [trainer1]:log_line2
  60. .. note:: Due to buffering log lines between files may not necessarily
  61. be printed out in order. You should configure your application's
  62. logger to suffix each log line with a proper timestamp.
  63. """
  64. def __init__(
  65. self,
  66. name: str,
  67. log_files: Dict[int, str],
  68. dst: TextIO,
  69. interval_sec: float = 0.1,
  70. ):
  71. n = len(log_files)
  72. self._threadpool = None
  73. if n > 0:
  74. self._threadpool = ThreadPoolExecutor(
  75. max_workers=n,
  76. thread_name_prefix=f"{self.__class__.__qualname__}_{name}",
  77. )
  78. self._name = name
  79. self._dst = dst
  80. self._log_files = log_files
  81. self._finished_events: Dict[int, Event] = {
  82. local_rank: Event() for local_rank in log_files.keys()
  83. }
  84. self._futs: List[Future] = []
  85. self._interval_sec = interval_sec
  86. self._stopped = False
  87. def start(self) -> "TailLog":
  88. if not self._threadpool:
  89. return self
  90. for local_rank, file in self._log_files.items():
  91. self._futs.append(
  92. self._threadpool.submit(
  93. tail_logfile,
  94. header=f"[{self._name}{local_rank}]:",
  95. file=file,
  96. dst=self._dst,
  97. finished=self._finished_events[local_rank],
  98. interval_sec=self._interval_sec,
  99. )
  100. )
  101. return self
  102. def stop(self) -> None:
  103. for finished in self._finished_events.values():
  104. finished.set()
  105. for local_rank, f in enumerate(self._futs):
  106. try:
  107. f.result()
  108. except Exception as e:
  109. log.error(
  110. f"error in log tailor for {self._name}{local_rank}."
  111. f" {e.__class__.__qualname__}: {e}",
  112. )
  113. if self._threadpool:
  114. self._threadpool.shutdown(wait=True)
  115. self._stopped = True
  116. def stopped(self) -> bool:
  117. return self._stopped