123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- #!/usr/bin/env python3
- # Copyright (c) Facebook, Inc. and its affiliates.
- # All rights reserved.
- #
- # This source code is licensed under the BSD-style license found in the
- # LICENSE file in the root directory of this source tree.
- import json
- from dataclasses import asdict, dataclass, field
- from enum import Enum
- from typing import Dict, Union, Optional
- __all__ = ['EventSource', 'Event', 'NodeState', 'RdzvEvent']
- EventMetadataValue = Union[str, int, float, bool, None]
- class EventSource(str, Enum):
- """
- Known identifiers of the event producers.
- """
- AGENT = "AGENT"
- WORKER = "WORKER"
- @dataclass
- class Event:
- """
- The class represents the generic event that occurs during the torchelastic
- job execution. The event can be any kind of meaningful action.
- Args:
- name: event name.
- source: the event producer, e.g. agent or worker
- timestamp: timestamp in milliseconds when event occured.
- metadata: additional data that is associated with the event.
- """
- name: str
- source: EventSource
- timestamp: int = 0
- metadata: Dict[str, EventMetadataValue] = field(default_factory=dict)
- def __str__(self):
- return self.serialize()
- @staticmethod
- def deserialize(data: Union[str, "Event"]) -> "Event":
- if isinstance(data, Event):
- return data
- if isinstance(data, str):
- data_dict = json.loads(data)
- data_dict["source"] = EventSource[data_dict["source"]]
- return Event(**data_dict)
- def serialize(self) -> str:
- return json.dumps(asdict(self))
- class NodeState(str, Enum):
- """
- The states that a node can be in rendezvous.
- """
- INIT = "INIT"
- RUNNING = "RUNNING"
- SUCCEEDED = "SUCCEEDED"
- FAILED = "FAILED"
- @dataclass
- class RdzvEvent:
- """
- Dataclass to represent any rendezvous event.
- Args:
- name: Event name. (E.g. Current action being performed)
- run_id: The run id of the rendezvous
- message: The message describing the event
- hostname: Hostname of the node
- pid: The process id of the node
- node_state: The state of the node (INIT, RUNNING, SUCCEEDED, FAILED)
- master_endpoint: The master endpoint for the rendezvous store, if known
- rank: The rank of the node, if known
- local_id: The local_id of the node, if defined in dynamic_rendezvous.py
- error_trace: Error stack trace, if this is an error event.
- """
- name: str
- run_id: str
- message: str
- hostname: str
- pid: int
- node_state: NodeState
- master_endpoint: str = ""
- rank: Optional[int] = None
- local_id: Optional[int] = None
- error_trace: str = ""
- def __str__(self):
- return self.serialize()
- @staticmethod
- def deserialize(data: Union[str, "RdzvEvent"]) -> "RdzvEvent":
- if isinstance(data, RdzvEvent):
- return data
- if isinstance(data, str):
- data_dict = json.loads(data)
- data_dict["node_state"] = NodeState[data_dict["node_state"]]
- return RdzvEvent(**data_dict)
- def serialize(self) -> str:
- return json.dumps(asdict(self))
|