123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- // Copyright (C) 2004-2006 The Trustees of Indiana University.
- // Use, modification and distribution is subject to the Boost Software
- // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- // Authors: Douglas Gregor
- // Andrew Lumsdaine
- #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
- #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
- #ifndef BOOST_GRAPH_USE_MPI
- #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
- #endif
- #include <boost/graph/parallel/process_group.hpp>
- #include <boost/optional.hpp>
- #include <boost/shared_ptr.hpp>
- #include <vector>
- namespace boost { namespace graph { namespace distributed {
- /// A unary predicate that always returns "true".
- struct always_push
- {
- template<typename T> bool operator()(const T&) const { return true; }
- };
- /** A distributed queue adaptor.
- *
- * Class template @c distributed_queue implements a distributed queue
- * across a process group. The distributed queue is an adaptor over an
- * existing (local) queue, which must model the @ref Buffer
- * concept. Each process stores a distinct copy of the local queue,
- * from which it draws or removes elements via the @ref pop and @ref
- * top members.
- *
- * The value type of the local queue must be a model of the @ref
- * GlobalDescriptor concept. The @ref push operation of the
- * distributed queue passes (via a message) the value to its owning
- * processor. Thus, the elements within a particular local queue are
- * guaranteed to have the process owning that local queue as an owner.
- *
- * Synchronization of distributed queues occurs in the @ref empty and
- * @ref size functions, which will only return "empty" values (true or
- * 0, respectively) when the entire distributed queue is empty. If the
- * local queue is empty but the distributed queue is not, the
- * operation will block until either condition changes. When the @ref
- * size function of a nonempty queue returns, it returns the size of
- * the local queue. These semantics were selected so that sequential
- * code that processes elements in the queue via the following idiom
- * can be parallelized via introduction of a distributed queue:
- *
- * distributed_queue<...> Q;
- * Q.push(x);
- * while (!Q.empty()) {
- * // do something, that may push a value onto Q
- * }
- *
- * In the parallel version, the initial @ref push operation will place
- * the value @c x onto its owner's queue. All processes will
- * synchronize at the call to empty, and only the process owning @c x
- * will be allowed to execute the loop (@ref Q.empty() returns
- * false). This iteration may in turn push values onto other remote
- * queues, so when that process finishes execution of the loop body
- * and all processes synchronize again in @ref empty, more processes
- * may have nonempty local queues to execute. Once all local queues
- * are empty, @ref Q.empty() returns @c false for all processes.
- *
- * The distributed queue can receive messages at two different times:
- * during synchronization and when polling @ref empty. Messages are
- * always received during synchronization, to ensure that accurate
- * local queue sizes can be determines. However, whether @ref empty
- * should poll for messages is specified as an option to the
- * constructor. Polling may be desired when the order in which
- * elements in the queue are processed is not important, because it
- * permits fewer synchronization steps and less communication
- * overhead. However, when more strict ordering guarantees are
- * required, polling may be semantically incorrect. By disabling
- * polling, one ensures that parallel execution using the idiom above
- * will not process an element at a later "level" before an earlier
- * "level".
- *
- * The distributed queue nearly models the @ref Buffer
- * concept. However, the @ref push routine does not necessarily
- * increase the result of @c size() by one (although the size of the
- * global queue does increase by one).
- */
- template<typename ProcessGroup, typename OwnerMap, typename Buffer,
- typename UnaryPredicate = always_push>
- class distributed_queue
- {
- typedef distributed_queue self_type;
- enum {
- /** Message indicating a remote push. The message contains a
- * single value x of type value_type that is to be pushed on the
- * receiver's queue.
- */
- msg_push,
- /** Push many elements at once. */
- msg_multipush
- };
- public:
- typedef ProcessGroup process_group_type;
- typedef Buffer buffer_type;
- typedef typename buffer_type::value_type value_type;
- typedef typename buffer_type::size_type size_type;
- /** Construct a new distributed queue.
- *
- * Build a new distributed queue that communicates over the given @p
- * process_group, whose local queue is initialized via @p buffer and
- * which may or may not poll for messages.
- */
- explicit
- distributed_queue(const ProcessGroup& process_group,
- const OwnerMap& owner,
- const Buffer& buffer,
- bool polling = false);
- /** Construct a new distributed queue.
- *
- * Build a new distributed queue that communicates over the given @p
- * process_group, whose local queue is initialized via @p buffer and
- * which may or may not poll for messages.
- */
- explicit
- distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
- const OwnerMap& owner = OwnerMap(),
- const Buffer& buffer = Buffer(),
- const UnaryPredicate& pred = UnaryPredicate(),
- bool polling = false);
- /** Construct a new distributed queue.
- *
- * Build a new distributed queue that communicates over the given @p
- * process_group, whose local queue is default-initalized and which
- * may or may not poll for messages.
- */
- distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
- const UnaryPredicate& pred, bool polling = false);
- /** Virtual destructor required with virtual functions.
- *
- */
- virtual ~distributed_queue() {}
- /** Push an element onto the distributed queue.
- *
- * The element will be sent to its owner process to be added to that
- * process's local queue. If polling is enabled for this queue and
- * the owner process is the current process, the value will be
- * immediately pushed onto the local queue.
- *
- * Complexity: O(1) messages of size O(sizeof(value_type)) will be
- * transmitted.
- */
- void push(const value_type& x);
- /** Pop an element off the local queue.
- *
- * @p @c !empty()
- */
- void pop() { buffer.pop(); }
- /**
- * Return the element at the top of the local queue.
- *
- * @p @c !empty()
- */
- value_type& top() { return buffer.top(); }
- /**
- * \overload
- */
- const value_type& top() const { return buffer.top(); }
- /** Determine if the queue is empty.
- *
- * When the local queue is nonempty, returns @c true. If the local
- * queue is empty, synchronizes with all other processes in the
- * process group until either (1) the local queue is nonempty
- * (returns @c true) (2) the entire distributed queue is empty
- * (returns @c false).
- */
- bool empty() const;
- /** Determine the size of the local queue.
- *
- * The behavior of this routine is equivalent to the behavior of
- * @ref empty, except that when @ref empty returns true this
- * function returns the size of the local queue and when @ref empty
- * returns false this function returns zero.
- */
- size_type size() const;
- // private:
- /** Synchronize the distributed queue and determine if all queues
- * are empty.
- *
- * \returns \c true when all local queues are empty, or false if at least
- * one of the local queues is nonempty.
- * Defined as virtual for derived classes like depth_limited_distributed_queue.
- */
- virtual bool do_synchronize() const;
- private:
- // Setup triggers
- void setup_triggers();
- // Message handlers
- void
- handle_push(int source, int tag, const value_type& value,
- trigger_receive_context);
- void
- handle_multipush(int source, int tag, const std::vector<value_type>& values,
- trigger_receive_context);
- mutable ProcessGroup process_group;
- OwnerMap owner;
- mutable Buffer buffer;
- UnaryPredicate pred;
- bool polling;
- typedef std::vector<value_type> outgoing_buffer_t;
- typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
- shared_ptr<outgoing_buffers_t> outgoing_buffers;
- };
- /// Helper macro containing the normal names for the template
- /// parameters to distributed_queue.
- #define BOOST_DISTRIBUTED_QUEUE_PARMS \
- typename ProcessGroup, typename OwnerMap, typename Buffer, \
- typename UnaryPredicate
- /// Helper macro containing the normal template-id for
- /// distributed_queue.
- #define BOOST_DISTRIBUTED_QUEUE_TYPE \
- distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
- /** Synchronize all processes involved with the given distributed queue.
- *
- * This function will synchronize all of the local queues for a given
- * distributed queue, by ensuring that no additional messages are in
- * transit. It is rarely required by the user, because most
- * synchronization of distributed queues occurs via the @c empty or @c
- * size methods.
- */
- template<BOOST_DISTRIBUTED_QUEUE_PARMS>
- inline void
- synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
- { Q.do_synchronize(); }
- /// Construct a new distributed queue.
- template<typename ProcessGroup, typename OwnerMap, typename Buffer>
- inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
- make_distributed_queue(const ProcessGroup& process_group,
- const OwnerMap& owner,
- const Buffer& buffer,
- bool polling = false)
- {
- typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
- return result_type(process_group, owner, buffer, polling);
- }
- } } } // end namespace boost::graph::distributed
- #include <boost/graph/distributed/detail/queue.ipp>
- #undef BOOST_DISTRIBUTED_QUEUE_TYPE
- #undef BOOST_DISTRIBUTED_QUEUE_PARMS
- #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
|