RDKit
Open-source cheminformatics and machine learning.
ConcurrentQueue.h
Go to the documentation of this file.
1 //
2 // Copyright (C) 2020 Shrey Aryan
3 //
4 // @@ All Rights Reserved @@
5 // This file is part of the RDKit.
6 // The contents are covered by the terms of the BSD license
7 // which is included in the file license.txt, found at the root
8 // of the RDKit source tree.
9 //
10 #ifdef RDK_THREADSAFE_SSS
11 #ifndef CONCURRENT_QUEUE
12 #define CONCURRENT_QUEUE
13 #include <condition_variable>
14 #include <thread>
15 #include <vector>
16 
17 namespace RDKit {
18 template <typename E>
19 class ConcurrentQueue {
20  private:
21  unsigned int d_capacity;
22  bool d_done;
23  std::vector<E> d_elements;
24  unsigned int d_head, d_tail;
25  mutable std::mutex d_lock;
26  std::condition_variable d_notEmpty, d_notFull;
27 
28  private:
29  ConcurrentQueue<E>(const ConcurrentQueue<E>&);
30  ConcurrentQueue<E>& operator=(const ConcurrentQueue<E>&);
31 
32  public:
33  ConcurrentQueue<E>(unsigned int capacity)
34  : d_capacity(capacity), d_done(false), d_head(0), d_tail(0) {
35  std::vector<E> elements(capacity);
36  d_elements = elements;
37  }
38 
39  //! tries to push an element into the queue if it is not full without
40  //! modifying the variable element, if the queue is full then pushing an
41  //! element will result in blocking
42  void push(const E& element);
43 
44  //! tries to pop an element from the queue if it is not empty and not done
45  //! the boolean value indicates the whether popping is successful, if the
46  //! queue is empty and not done then popping an element will result in
47  //! blocking
48  bool pop(E& element);
49 
50  //! checks whether the ConcurrentQueue is empty
51  bool isEmpty() const;
52 
53  //! returns the value of the variable done
54  bool getDone() const;
55 
56  //! sets the variable d_done = true
57  void setDone();
58 
59  //! clears the vector
60  void clear();
61 };
62 
63 template <typename E>
64 void ConcurrentQueue<E>::push(const E& element) {
65  std::unique_lock<std::mutex> lk(d_lock);
66  //! concurrent queue is full so we wait until
67  //! it is not full
68  while (d_head + d_capacity == d_tail) {
69  d_notFull.wait(lk);
70  }
71  bool wasEmpty = (d_head == d_tail);
72  d_elements.at(d_tail % d_capacity) = element;
73  d_tail++;
74  //! if the concurrent queue was empty before
75  //! then it is not any more since we have "pushed" an element
76  //! thus we notify all the consumer threads
77  if (wasEmpty) {
78  d_notEmpty.notify_all();
79  }
80 }
81 
82 template <typename E>
83 bool ConcurrentQueue<E>::pop(E& element) {
84  std::unique_lock<std::mutex> lk(d_lock);
85  //! concurrent queue is empty so we wait until
86  //! it is not empty
87  while (d_head == d_tail) {
88  if (d_done) {
89  return false;
90  }
91  d_notEmpty.wait(lk);
92  }
93  bool wasFull = (d_head + d_capacity == d_tail);
94  element = d_elements.at(d_head % d_capacity);
95  d_head++;
96  //! if the concurrent queue was full before
97  //! then it is not any more since we have "popped" an element
98  //! thus we notify all producer threads
99  if (wasFull) {
100  d_notFull.notify_all();
101  }
102  return true;
103 }
104 
105 template <typename E>
106 bool ConcurrentQueue<E>::isEmpty() const {
107  std::unique_lock<std::mutex> lk(d_lock);
108  return (d_head == d_tail);
109 }
110 
111 template <typename E>
112 bool ConcurrentQueue<E>::getDone() const {
113  std::unique_lock<std::mutex> lk(d_lock);
114  return d_done;
115 }
116 
117 template <typename E>
118 void ConcurrentQueue<E>::setDone() {
119  std::unique_lock<std::mutex> lk(d_lock);
120  d_done = true;
121  d_notEmpty.notify_all();
122 }
123 
124 template <typename E>
125 void ConcurrentQueue<E>::clear() {
126  std::unique_lock<std::mutex> lk(d_lock);
127  d_elements.clear();
128 }
129 
130 } // namespace RDKit
131 #endif
132 #endif
Std stuff.
Definition: Abbreviations.h:17