#!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. from datetime import timedelta from typing import List def get_all(store, rank: int, prefix: str, size: int): r""" Given a store and a prefix, the method goes through the array of keys of the following format: ``{prefix}{idx}``, where idx is in a range from 0 to size, and tries to retrieve the data. The Rank0 process waits at the end to make sure all other processes finished the procedure before exiting. Usage :: values = get_all(store, 'torchelastic/data', 3) value1 = values[0] # retrieves the data for key torchelastic/data0 value2 = values[1] # retrieves the data for key torchelastic/data1 value3 = values[2] # retrieves the data for key torchelastic/data2 """ data_arr = [] for idx in range(size): data = store.get(f"{prefix}{idx}") data_arr.append(data) store.set(f"{prefix}{rank}.FIN", b"FIN") if rank == 0: # Rank0 runs the TCPStore daemon, as a result it needs to exit last. # Otherwise, the barrier may timeout if rank0 process finished the work # before other processes finished `get_all` method for node_rank in range(size): store.get(f"{prefix}{node_rank}.FIN") return data_arr def synchronize( store, data: bytes, rank: int, world_size: int, key_prefix: str, barrier_timeout: float = 300, ) -> List[bytes]: """ Synchronizes ``world_size`` agents between each other using the underlying c10d store. The ``data`` will be available on each of the agents. Note: The data on the path is not deleted, as a result there can be stale data if you use the same key_prefix twice. """ store.set_timeout(timedelta(seconds=barrier_timeout)) store.set(f"{key_prefix}{rank}", data) agent_data = get_all(store, rank, key_prefix, world_size) return agent_data def barrier( store, rank: int, world_size: int, key_prefix: str, barrier_timeout: float = 300 ) -> None: """ A global lock between agents. Note: Since the data is not removed from the store, the barrier can be used once per unique ``key_prefix``. """ data = f"{rank}".encode(encoding="UTF-8") synchronize(store, data, rank, world_size, key_prefix, barrier_timeout)