aegis.cpp
 All Classes Functions Variables Typedefs Enumerations Enumerator Friends Pages
futures.hpp
1 //
2 // futures.hpp
3 // ***********
4 //
5 // Copyright (c) 2019 Sharon W (sharon at aegis dot gg)
6 //
7 // Distributed under the MIT License. (See accompanying file LICENSE)
8 //
9 // Adapted from https://github.com/scylladb/seastar to support asio scheduling
10 
11 #pragma once
12 
13 #include "aegis/config.hpp"
14 #include "aegis/fwd.hpp"
15 #include "aegis/error.hpp"
16 #include <condition_variable>
17 #include <stdexcept>
18 #include <type_traits>
19 #include <memory>
20 #include <functional>
21 #include <cassert>
22 #include <chrono>
23 #include <thread>
24 #include <iostream>
25 #include <mutex>
26 #include <asio/io_context.hpp>
27 #include <asio/post.hpp>
28 #include <asio/bind_executor.hpp>
29 #include <asio/executor_work_guard.hpp>
30 
31 using namespace std::literals::chrono_literals;
32 
33 
34 namespace aegis
35 {
36 
37 template <class T>
38 class promise;
39 
40 template <class T>
41 class future;
42 
43 template <typename T>
45 
46 template <typename T, typename... A>
47 future<T> make_ready_future(A&&... value);
48 
49 template <typename T>
50 future<T> make_ready_future(T&& value);
51 
52 template <typename T = rest::rest_reply>
53 future<T> make_exception_future(std::exception_ptr value) noexcept;
54 
55 template<typename T>
56 struct add_future
57 {
58  using type = future<T>;
59 };
60 
61 template<typename T>
62 struct add_future<future<T>>
63 {
64  using type = future<T>;
65 };
66 
67 template<typename T>
69 {
70  using type = T;
71 };
72 
73 template<typename T>
75 {
76  using type = T;
77 };
78 
79 template<typename T>
80 struct is_future : std::false_type {};
81 
82 template<typename T>
83 struct is_future<future<T>> : std::true_type {};
84 
85 template<typename F, typename... A>
86 struct result_of : std::result_of<F(A...)> {};
87 
88 template<typename F>
89 struct result_of<F, void> : std::result_of<F()> {};
90 
91 template<typename F, typename... A>
92 using result_of_t = typename result_of<F, A...>::type;
93 
94 
97 
98 template<typename T>
99 using add_future_t = typename add_future<T>::type;
100 
101 template<typename T>
102 using remove_future_t = typename remove_future<T>::type;
103 
104 namespace detail
105 {
107 template<typename T, typename Func, typename State>
108 add_future_t<T> call_state(Func&& func, State&& state);
109 
111 template<typename T, typename Func, typename Future>
112 add_future_t<T> call_future(Func&& func, Future&& fut) noexcept;
113 }
114 
116 template <typename T>
118 {
119  using type = T;
120  static constexpr bool copy_noexcept = std::is_nothrow_copy_constructible<T>::value;
121  static_assert(std::is_nothrow_move_constructible<T>::value,
122  "Types must be no-throw move constructible");
123  static_assert(std::is_nothrow_destructible<T>::value,
124  "Types must be no-throw destructible");
125  static_assert(std::is_nothrow_copy_constructible<std::exception_ptr>::value,
126  "std::exception_ptr's copy constructor must not throw");
127  static_assert(std::is_nothrow_move_constructible<std::exception_ptr>::value,
128  "std::exception_ptr's move constructor must not throw");
129  enum class state
130  {
131  invalid,
132  future,
133  result,
134  exception,
135  };// _state = state::future;
136  std::atomic<state> _state;
137  union any
138  {
139  any() {}
140  ~any() {}
141  T value;
142  std::exception_ptr ex;
143  } _u;
144  asio::io_context * _io_context = nullptr;
145  std::recursive_mutex * _global_m = nullptr;
146  future_state() noexcept {}
147  future_state(asio::io_context * _io_context, std::recursive_mutex * _global_m) noexcept
148  : _state(state::future)
149  , _io_context(_io_context)
150  , _global_m(_global_m)
151  {}
152  future_state(future_state&& x) noexcept
153  : _state(state::future)
154  , _io_context(x._io_context)
155  , _global_m(x._global_m)
156  {
157  std::atomic_thread_fence(std::memory_order_acquire);
158  state _x = x._state.load(std::memory_order_acquire);
159  state _s = _state.load(std::memory_order_acquire);
160  switch (_x)
161  {
162  case state::future:
163  break;
164  case state::result:
165  new (&_u.value) T(std::move(x._u.value));
166  x._u.value.~T();
167  break;
168  case state::exception:
169  new (&_u.ex) std::exception_ptr(std::move(x._u.ex));
170  x._u.ex.~exception_ptr();
171  break;
172  case state::invalid:
173  break;
174  default:
175  abort();
176  }
177  _state.store(_x, std::memory_order_release);
178  x._state.store(state::invalid, std::memory_order_release);
179  std::atomic_thread_fence(std::memory_order_release);
180  }
181  ~future_state() noexcept
182  {
183  std::atomic_thread_fence(std::memory_order_acquire);
184  switch (_state)
185  {
186  case state::invalid:
187  break;
188  case state::future:
189  break;
190  case state::result:
191  _u.value.~T();
192  break;
193  case state::exception:
194  _u.ex.~exception_ptr();
195  break;
196  default:
197  abort();
198  }
199  std::atomic_thread_fence(std::memory_order_release);
200  }
201  future_state& operator=(future_state&& x) noexcept
202  {
203  std::atomic_thread_fence(std::memory_order_acquire);
204  state _s = _state.load(std::memory_order_consume);
205  if (this != &x)
206  {
207  this->~future_state();
208  new (this) future_state(std::move(x));
209  }
210  std::atomic_thread_fence(std::memory_order_release);
211  return *this;
212  }
213  bool available() const noexcept
214  {
215  std::atomic_thread_fence(std::memory_order_acquire);
216  auto _s = _state.load(std::memory_order_consume);
217  std::atomic_thread_fence(std::memory_order_release);
218  return _s == state::result || _s == state::exception;
219  }
220  bool failed() const noexcept
221  {
222  return _state == state::exception;
223  }
224  void wait();
225  void set(const T& value) noexcept
226  {
227  state _s = _state.load(std::memory_order_acquire);
228  assert(_s == state::future);
229  new (&_u.value) T(value);
230  _state.store(state::result, std::memory_order_release);
231  }
232  void set(T&& value) noexcept
233  {
234  state _s = _state.load(std::memory_order_acquire);
235  assert(_s == state::future);
236  new (&_u.value) T(std::move(value));
237  _state.store(state::result, std::memory_order_release);
238  }
239  template <typename... A>
240  void set(A&&... a)
241  {
242  state _s = _state.load(std::memory_order_acquire);
243  assert(_s == state::future);
244  new (&_u.value) T(std::forward<A>(a)...);
245  _state.store(state::result, std::memory_order_release);
246  }
247  void set_exception(std::exception_ptr ex) noexcept
248  {
249  state _s = _state.load(std::memory_order_acquire);
250  assert(_s == state::future);
251  new (&_u.ex) std::exception_ptr(ex);
252  _state.store(state::exception, std::memory_order_release);
253  }
254  std::exception_ptr get_exception() && noexcept
255  {
256  state _s = _state.load(std::memory_order_acquire);
257  assert(_s == state::exception);
258  auto ex = std::move(_u.ex);
259  _u.ex.~exception_ptr();
260  _state.store(state::invalid, std::memory_order_release);
261  return ex;
262  }
263  std::exception_ptr get_exception() const& noexcept
264  {
265  state _s = _state.load(std::memory_order_consume);
266  assert(_s == state::exception);
267  return _u.ex;
268  }
269  auto get_value() && noexcept
270  {
271  state _s = _state.load(std::memory_order_acquire);
272  assert(_s == state::result);
273  _state.store(_s, std::memory_order_release);
274  return std::move(_u.value);
275  }
276  template<typename U = T>
277  std::enable_if_t<std::is_copy_constructible<U>::value, U> get_value() const& noexcept(copy_noexcept)
278  {
279  assert(_state == state::result);
280  return _u.value;
281  }
282  T get() &&
283  {
284  auto _s = _state.load(std::memory_order_acquire);
285  assert(_s != state::future);
286  if (_s == state::exception)
287  {
288  auto ex = std::move(_u.ex);
289  _u.ex.~exception_ptr();
290  _state.store(state::invalid, std::memory_order_release);
291  std::rethrow_exception(std::move(ex));
292  }
293  _state.store(_s, std::memory_order_release);
294  return std::move(_u.value);
295  }
296  T get() const&
297  {
298  state _s = _state.load(std::memory_order_consume);
299  assert(_s != state::future);
300  if (_s == state::exception)
301  {
302  std::rethrow_exception(_u.ex);
303  }
304  return _u.value;
305  }
306  void ignore() noexcept
307  {
308  state _s = _state.load(std::memory_order_acquire);
309  assert(_s != state::future);
310  this->~future_state();
311  _state.store(state::invalid, std::memory_order_release);
312  }
313  void forward_to(promise<T>& pr) noexcept
314  {
315  state _s = _state.load(std::memory_order_acquire);
316  assert(_s != state::future);
317  if (_s == state::exception)
318  {
319  pr.set_urgent_exception(std::move(_u.ex));
320  _u.ex.~exception_ptr();
321  }
322  else
323  {
324  pr.set_urgent_value(std::move(_u.value));
325  _u.value.~T();
326  }
327  _state.store(state::invalid, std::memory_order_release);
328  }
329 };
330 
332 template <>
333 struct future_state<void>
334 {
335  using type = void;
336  static_assert(std::is_nothrow_copy_constructible<std::exception_ptr>::value,
337  "std::exception_ptr's copy constructor must not throw");
338  static_assert(std::is_nothrow_move_constructible<std::exception_ptr>::value,
339  "std::exception_ptr's move constructor must not throw");
340  static constexpr bool copy_noexcept = true;
341  enum class state : uintptr_t
342  {
343  invalid = 0,
344  future = 1,
345  result = 2,
346  exception_min = 3
347  };
348  union any
349  {
350  any() { st = state::future; }
351  ~any() {}
352  std::atomic<state> st;
353  std::exception_ptr ex;
354  } _u;
355  std::recursive_mutex _m;
356  asio::io_context * _io_context = nullptr;
357  std::recursive_mutex * _global_m = nullptr;
358  future_state() noexcept {}
359  future_state(asio::io_context * _io_context, std::recursive_mutex * _global_m) noexcept
360  : _io_context(_io_context)
361  , _global_m(_global_m)
362  {} future_state(future_state&& x) noexcept
363  {
364  std::atomic_thread_fence(std::memory_order_acquire);
365  if (x._u.st < state::exception_min)
366  {
367  _u.st.store(x._u.st);
368  }
369  else
370  {
371  new (&_u.ex) std::exception_ptr(std::move(x._u.ex));
372  x._u.ex.~exception_ptr();
373  }
374  x._u.st.store(state::invalid);
375  std::atomic_thread_fence(std::memory_order_release);
376  }
377  ~future_state() noexcept
378  {
379  if (_u.st >= state::exception_min)
380  {
381  _u.ex.~exception_ptr();
382  }
383  }
384  future_state& operator=(future_state&& x) noexcept
385  {
386  std::atomic_thread_fence(std::memory_order_acquire);
387  if (this != &x)
388  {
389  this->~future_state();
390  new (this) future_state(std::move(x));
391  }
392  std::atomic_thread_fence(std::memory_order_release);
393  return *this;
394  }
395  bool available() const noexcept
396  {
397  return _u.st == state::result || _u.st >= state::exception_min;
398  }
399  bool failed() const noexcept
400  {
401  return _u.st >= state::exception_min;
402  }
403  void set()
404  {
405  assert(_u.st == state::future);
406  _u.st.store(state::result, std::memory_order_release);
407  }
408  void set_exception(std::exception_ptr ex) noexcept
409  {
410  assert(_u.st == state::future);
411  new (&_u.ex) std::exception_ptr(ex);
412  assert(_u.st >= state::exception_min);
413  }
414  void get() &&
415  {
416  assert(_u.st != state::future);
417  if (_u.st >= state::exception_min)
418  {
419  std::rethrow_exception(std::move(_u.ex));
420  }
421  }
422  void get() const&
423  {
424  assert(_u.st != state::future);
425  if (_u.st >= state::exception_min)
426  {
427  std::rethrow_exception(_u.ex);
428  }
429  }
430  void ignore() noexcept
431  {
432  assert(available());
433  this->~future_state();
434  _u.st.store(state::invalid, std::memory_order_release);
435  }
436  std::exception_ptr get_exception() && noexcept
437  {
438  assert(_u.st >= state::exception_min);
439  return std::move(_u.ex);
440  }
441  std::exception_ptr get_exception() const& noexcept
442  {
443  assert(_u.st >= state::exception_min);
444  return _u.ex;
445  }
446  void get_value() const noexcept
447  {
448  assert(_u.st == state::result);
449  }
450  void forward_to(promise<void>& pr) noexcept;
451 };
452 
454 class task
455 {
456 public:
457  virtual ~task() = default;
458  virtual void run() noexcept = 0;
459 };
460 
462 template <typename T>
463 class continuation_base : public task
464 {
465 protected:
466  future_state<T> _state;
467  using future_type = future<T>;
468  using promise_type = promise<T>;
469 public:
470  continuation_base() = default;
471  explicit continuation_base(asio::io_context * _io_context, std::recursive_mutex * _global_m)
472  : _state(_io_context, _global_m) {}
473 
474  explicit continuation_base(future_state<T>&& state) : _state(std::move(state)) {}
475 
476  void set_state(T&& state)
477  {
478  _state.set(std::move(state));
479  }
480  void set_state(future_state<T>&& state)
481  {
482  _state = std::move(state);
483  }
484  future_state<T>* state() noexcept
485  {
486  return &_state;
487  }
488 
489  friend class promise<T>;
490  friend class future<T>;
491 };
492 
494 template <>
495 class continuation_base<void> : public task
496 {
497 protected:
498  future_state<void> _state;
499  using future_type = future<void>;
500  using promise_type = promise<void>;
501 public:
502  continuation_base() = default;
503  explicit continuation_base(asio::io_context * _io_context, std::recursive_mutex * _global_m)
504  : _state(_io_context, _global_m) {}
505  explicit continuation_base(future_state<void>&& state) : _state(std::move(state)) {}
506  void set_state(future_state<void>&& state)
507  {
508  _state = std::move(state);
509  }
510  future_state<void>* state() noexcept
511  {
512  return &_state;
513  }
514 
515  friend class promise<void>;
516  friend class future<void>;
517 };
518 
520 template <typename Func, typename T>
522 {
523  continuation(Func&& func, future_state<T>&& state) : continuation_base<T>(std::move(state)), _func(std::move(func)) {}
524  continuation(Func&& func, asio::io_context * _io_context, std::recursive_mutex * _global_m)
525  : continuation_base<T>(_io_context, _global_m)
526  , _func(std::move(func))
527  {}
528  virtual void run() noexcept override
529  {
530  _func(std::move(this->_state));
531  }
532  Func _func;
533 };
534 using task_ptr = std::unique_ptr<task>;
535 
536 
538 template <typename T>
539 class promise
540 {
541  enum class urgent { no, yes };
542  future<T>* _future = nullptr;
543  future_state<T> _local_state;
544  future_state<T>* _state;
545  std::unique_ptr<continuation_base<T>> _task;
546  std::recursive_mutex _m;
547  asio::io_context * _io_context = nullptr;
548  std::recursive_mutex * _global_m = nullptr;
549  static constexpr bool copy_noexcept = future_state<T>::copy_noexcept;
550 public:
551  promise(asio::io_context * _io_context, std::recursive_mutex * _global_m) noexcept
552  : _local_state(future_state<T>(_io_context, _global_m))
553  , _state(&_local_state)
554  , _io_context(_io_context)
555  , _global_m(_global_m)
556  {}
557 
558  promise(promise&& x) noexcept
559  : _local_state(future_state<T>(x._io_context, x._global_m))
560  , _io_context(x._io_context)
561  , _global_m(x._global_m)
562  {
563  std::atomic_thread_fence(std::memory_order_acquire);
564  std::lock_guard<std::recursive_mutex> gl(*_global_m);
565  if (x._future)
566  {
567  std::unique_lock<std::recursive_mutex> l(_m, std::defer_lock);
568  std::unique_lock<std::recursive_mutex> l2(x._future->_m, std::defer_lock);
569  std::lock(l, l2);
570  if (!x._future)
571  goto NOTREALLYTHERE;
572  _future = x._future;
573  _future->_promise = this;
574  _state = x._state;
575  _task = std::move(x._task);
576  if (_state == &x._local_state)
577  {
578  _state = &_local_state;
579  _local_state = std::move(x._local_state);
580  }
581  x._future = nullptr;
582  x._state = nullptr;
583  }
584  else
585  {
586  NOTREALLYTHERE:;
587  std::lock_guard<std::recursive_mutex> l(_m);
588  _state = x._state;
589  _task = std::move(x._task);
590  if (_state == &x._local_state)
591  {
592  _state = &_local_state;
593  _local_state = std::move(x._local_state);
594  }
595  x._state = nullptr;
596  }
597  std::atomic_thread_fence(std::memory_order_release);
598  }
599  promise(const promise&) = delete;
600  ~promise() noexcept
601  {
602  std::unique_lock<std::recursive_mutex> l(_m, std::defer_lock);
603  std::unique_lock<std::recursive_mutex> l2(*_global_m, std::defer_lock);
604  std::lock(l, l2);
605  abandoned();
606  }
607  promise& operator=(promise&& x) noexcept
608  {
609  std::atomic_thread_fence(std::memory_order_acquire);
610  if (this != &x)
611  {
612  this->~promise();
613  new (this) promise(std::move(x));
614  }
615  std::atomic_thread_fence(std::memory_order_release);
616  return *this;
617  }
618  void operator=(const promise&) = delete;
619 
620  future<T> get_future() noexcept;
621 
622  template <typename... A>
623  void set_value(A&&... a) noexcept
624  {
625  assert(_state);
626  _state->set(std::forward<A>(a)...);
627  make_ready<urgent::no>();
628  }
629 
630  void set_exception(std::exception_ptr ex) noexcept
631  {
632  do_set_exception<urgent::no>(std::move(ex));
633  }
634 
635  template<typename Exception>
636  void set_exception(Exception&& e) noexcept
637  {
638  set_exception(make_exception_ptr(std::forward<Exception>(e)));
639  }
640 private:
641 
642  template<urgent Urgent, typename... A>
643  void do_set_value(A... a) noexcept
644  {
645  assert(_state);
646  _state->set(std::move(a)...);
647  make_ready<Urgent>();
648  }
649 
650  template<typename... A>
651  void set_urgent_value(A&&... a) noexcept
652  {
653  set_value(std::forward<A>(a)...);
654  }
655 
656  template<urgent Urgent>
657  void do_set_exception(std::exception_ptr ex) noexcept
658  {
659  assert(_state);
660  _state->set_exception(std::move(ex));
661  make_ready<Urgent>();
662  }
663 
664  void set_urgent_exception(std::exception_ptr ex) noexcept
665  {
666  do_set_exception<urgent::yes>(std::move(ex));
667  }
668 private:
669  template <typename Func>
670  void schedule(Func&& func)
671  {
672  auto tws = std::make_unique<continuation<Func, T>>(std::move(func), _io_context, _global_m);
673  _state = &tws->_state;
674  _task = std::move(tws);
675  }
676  template<urgent Urgent>
677  void make_ready() noexcept;
678  void abandoned() noexcept;
679 
680  template <typename U>
681  friend class future;
682 
683  friend struct future_state<T>;
684  friend struct future_state<void>;
685 };
686 
688 template <typename T>
689 class future
690 {
691  promise<T>* _promise;
692  future_state<T> _local_state;
693  mutable std::recursive_mutex _m;
694  asio::io_context * _io_context = nullptr;
695  std::recursive_mutex * _global_m = nullptr;
696  static constexpr bool copy_noexcept = future_state<T>::copy_noexcept;
697 private:
698  future(promise<T>* pr) noexcept
699  : _local_state(future_state<T>(pr->_io_context, pr->_global_m))
700  {
701  _io_context = pr->_io_context;
702  _global_m = pr->_global_m;
703  std::atomic_thread_fence(std::memory_order_acquire);
704  std::unique_lock<std::recursive_mutex> l(*_global_m, std::defer_lock);
705  std::unique_lock<std::recursive_mutex> l2(_m, std::defer_lock);
706  std::unique_lock<std::recursive_mutex> l3(pr->_m, std::defer_lock);
707  std::lock(l, l2, l3);
708 
709  _promise = pr;
710  _promise->_future = this;
711  std::atomic_thread_fence(std::memory_order_release);
712  }
713  template <typename... A>
714  future(ready_future_marker, A&&... a)
715  : _promise(nullptr)
716  , _local_state(future_state<T>(nullptr, nullptr))
717  {
718  _local_state.set(std::forward<A>(a)...);
719  }
720  future(exception_future_marker, std::exception_ptr ex) noexcept
721  : _promise(nullptr)
722  , _local_state(future_state<T>(nullptr, nullptr))
723  {
724  _local_state.set_exception(std::move(ex));
725  }
726  explicit future(future_state<T>&& state, asio::io_context * _io_context, std::recursive_mutex * _global_m) noexcept
727  : _local_state(future_state<T>(_io_context, _global_m))
728  , _io_context(_io_context)
729  , _global_m(_global_m)
730  {
731  std::atomic_thread_fence(std::memory_order_acquire);
732  _local_state = std::move(state);
733  _promise = nullptr;
734  std::atomic_thread_fence(std::memory_order_release);
735  }
736  future_state<T> * state() noexcept
737  {
738  std::atomic_thread_fence(std::memory_order_acquire);
739  if (_promise)
740  {
741  std::unique_lock<std::recursive_mutex> l(_m, std::defer_lock);
742  std::unique_lock<std::recursive_mutex> l2(*_global_m, std::defer_lock);
743  std::lock(l, l2);
744 
745  //std::lock_guard<std::recursive_mutex> l(_promise->_m);
746  future_state<T> * _st = _promise->_state;
747  std::atomic_thread_fence(std::memory_order_release);
748  return _st;
749  }
750  else
751  {
752  future_state<T> * _st = &_local_state;
753  std::atomic_thread_fence(std::memory_order_release);
754  return _st;
755  }
756  }
757  const future_state<T> * state() const noexcept
758  {
759  std::atomic_thread_fence(std::memory_order_acquire);
760  if (_promise)
761  {
762  std::unique_lock<std::recursive_mutex> l(_m, std::defer_lock);
763  std::unique_lock<std::recursive_mutex> l2(*_global_m, std::defer_lock);
764  std::lock(l, l2);
765 
766  //std::lock_guard<std::recursive_mutex> l(_promise->_m);
767  const future_state<T> * _st = _promise->_state;
768  std::atomic_thread_fence(std::memory_order_release);
769  return _st;
770  }
771  else
772  {
773  const future_state<T> * _st = &_local_state;
774  std::atomic_thread_fence(std::memory_order_release);
775  return _st;
776  }
777  }
778  template <typename Func>
779  void schedule(Func&& func)
780  {
781  std::atomic_thread_fence(std::memory_order_acquire);
782  future_state<T> * _st = state();
783  if (state()->available())
784  {
785  asio::post(*_io_context, [func = std::move(func), _state = std::move(*state())]() mutable
786  {
787  func(std::move(_state));
788  });
789  }
790  else
791  {
792  std::lock_guard<std::recursive_mutex> gl(*_global_m);
793  std::unique_lock<std::recursive_mutex> l(_m, std::defer_lock);
794  std::unique_lock<std::recursive_mutex> l2(_promise->_m, std::defer_lock);
795  std::lock(l, l2);
796  assert(_promise);
797  _promise->schedule(std::move(func));
798  _promise->_future = nullptr;
799  _promise = nullptr;
800  }
801  std::atomic_thread_fence(std::memory_order_release);
802  }
803  future_state<T> get_available_state() noexcept
804  {
805  auto st = state();
806  if (_promise)
807  {
808  std::lock_guard<std::recursive_mutex> gl(*_global_m);
809  std::lock_guard<std::recursive_mutex> l(_promise->_m);
810  if (!_promise)
811  goto NOTREALLYTHERE;
812  _promise->_future = nullptr;
813  _promise = nullptr;
814  }
815  NOTREALLYTHERE:;
816  return std::move(*st);
817  }
818 
819  future<T> rethrow_with_nested()
820  {
821  if (!failed())
822  {
823  return make_exception_future<T>(std::current_exception());
824  }
825  else
826  {
827  std::nested_exception f_ex;
828  try
829  {
830  get();
831  }
832  catch (...)
833  {
834  std::throw_with_nested(f_ex);
835  }
836  throw std::runtime_error("unreachable");
837  }
838  }
839 
840  template<typename U>
841  friend class shared_future;
842 public:
843  using value_type = T;
844  using promise_type = promise<T>;
845  future(future&& x) noexcept
846  : _local_state(future_state<T>(x._io_context, x._global_m))
847  , _io_context(x._io_context)
848  , _global_m(x._global_m)
849  {
850  std::atomic_thread_fence(std::memory_order_acquire);
851  std::unique_lock<std::recursive_mutex> l(_m, std::defer_lock);
852  std::unique_lock<std::recursive_mutex> l2(x._m, std::defer_lock);
853  std::lock(l, l2);
854 
855  _promise = x._promise;
856  if (!_promise)
857  {
858  _local_state = std::move(x._local_state);
859  }
860  x._promise = nullptr;
861  if (_promise)
862  {
863  std::lock_guard<std::recursive_mutex> gl(*_global_m);
864  std::lock_guard<std::recursive_mutex> l(_promise->_m);
865  _promise->_future = this;
866  }
867  std::atomic_thread_fence(std::memory_order_release);
868  }
869  future(const future&) = delete;
870  future& operator=(future&& x) noexcept
871  {
872  std::atomic_thread_fence(std::memory_order_acquire);
873  if (this != &x)
874  {
875  this->~future();
876  new (this) future(std::move(x));
877  }
878  std::atomic_thread_fence(std::memory_order_release);
879  return *this;
880  }
881  void operator=(const future&) = delete;
882  ~future()
883  {
884  std::atomic_thread_fence(std::memory_order_acquire);
885  if (_promise)
886  {
887  std::lock_guard<std::recursive_mutex> l(*_global_m);
888  if (_promise)
889  {
890  std::lock_guard<std::recursive_mutex> l(_promise->_m);
891  _promise->_future = nullptr;
892  }
893  }
894  /*if (failed())
895  {
896  }*/
897  std::atomic_thread_fence(std::memory_order_release);
898  }
899 
900  T get()
901  {
902  {
903  std::atomic_thread_fence(std::memory_order_acquire);
904  future_state<T> * _st = state();
905  std::atomic_thread_fence(std::memory_order_release);
906  if (!_st->available())
907  {
908  do_wait();
909  }
910  }
911 
912  {
913  std::atomic_thread_fence(std::memory_order_acquire);
914 
915  std::unique_lock<std::recursive_mutex> l(_m, std::defer_lock);
916  std::unique_lock<std::recursive_mutex> l2(*_global_m, std::defer_lock);
917  std::lock(l, l2);
918 
919  future_state<T> _st(get_available_state());
920  std::atomic_thread_fence(std::memory_order_release);
921  return _st.get();
922  }
923  }
924 
925  std::exception_ptr get_exception()
926  {
927  std::atomic_thread_fence(std::memory_order_acquire);
928  future_state<T> _st(get_available_state());
929  std::atomic_thread_fence(std::memory_order_release);
930  return _st.get_exception();
931  }
932 
933  void wait() const noexcept
934  {
935  std::atomic_thread_fence(std::memory_order_acquire);
936  const future_state<T> * _st = state();
937  std::atomic_thread_fence(std::memory_order_release);
938  if (!_st->available())
939  {
940  do_wait();
941  }
942  }
943 private:
944  void do_wait() const noexcept
945  {
946  // fake wait
947  // maybe execute something in the asio queue?
948 
949  while (!available())
950  std::this_thread::sleep_for(std::chrono::nanoseconds(1));
951  }
952 
953 public:
954  bool available() const noexcept
955  {
956  std::atomic_thread_fence(std::memory_order_acquire);
957  const future_state<T> * _st = state();
958  assert(_st);
959  auto res = _st->available();
960  std::atomic_thread_fence(std::memory_order_release);
961  return res;
962  }
963 
964  bool failed() const noexcept
965  {
966  std::atomic_thread_fence(std::memory_order_acquire);
967  auto _st = state();
968  if (!_st)
969  return false;
970  bool f = _st->failed();
971  std::atomic_thread_fence(std::memory_order_release);
972  return f;
973  }
974 
975  template <typename Func, typename Result = result_of_t<Func, T>>
976  add_future_t<Result> then(Func&& func) noexcept
977  {
978  using inner_type = remove_future_t<Result>;
979  std::atomic_thread_fence(std::memory_order_acquire);
980  if (available())
981  {
982  if (failed())
983  {
984  return make_exception_future<inner_type>(get_exception());
985  }
986  else
987  {
988  return detail::call_state<inner_type>(std::forward<Func>(func), get_available_state());
989  }
990  }
991  std::atomic_thread_fence(std::memory_order_release);
992  promise<inner_type> pr(_io_context, _global_m);
993  auto fut = pr.get_future();
994  try
995  {
996  this->schedule([pr = std::move(pr), func = std::forward<Func>(func)](future_state<T> && state) mutable {
997  std::atomic_thread_fence(std::memory_order_acquire);
998  if (state.failed())
999  {
1000  pr.set_exception(std::move(state).get_exception());
1001  }
1002  else
1003  {
1004  detail::call_state<inner_type>(std::forward<Func>(func), std::move(state)).forward_to(std::move(pr));
1005  }
1006  std::atomic_thread_fence(std::memory_order_release);
1007  });
1008  }
1009  catch (...)
1010  {
1011  abort();
1012  }
1013  return fut;
1014  }
1015 
1016  template <typename Func, typename Result = std::result_of_t<Func(future)>>
1017  add_future_t<Result> then_wrapped(Func&& func) noexcept
1018  {
1019  using inner_type = remove_future_t<Result>;
1020  std::atomic_thread_fence(std::memory_order_acquire);
1021  if (available())
1022  {
1023  return detail::call_future<inner_type>(std::forward<Func>(func), future(get_available_state(), _io_context, _global_m));
1024  }
1025  std::atomic_thread_fence(std::memory_order_release);
1026  promise<inner_type> pr(_io_context, _global_m);
1027  auto fut = pr.get_future();
1028  try
1029  {
1030  this->schedule([pr = std::move(pr), func = std::forward<Func>(func), this](auto&& state) mutable {
1031  std::atomic_thread_fence(std::memory_order_acquire);
1032  detail::call_future<inner_type>(std::forward<Func>(func), future(std::move(state), _io_context, _global_m)).forward_to(std::move(pr));
1033  std::atomic_thread_fence(std::memory_order_release);
1034  });
1035  }
1036  catch (...)
1037  {
1038  abort();
1039  }
1040  return fut;
1041  }
1042 
1043  void forward_to(promise<T>&& pr) noexcept
1044  {
1045  std::atomic_thread_fence(std::memory_order_acquire);
1046  if (state()->available())
1047  {
1048  state()->forward_to(pr);
1049  }
1050  else
1051  {
1052  std::lock_guard<std::recursive_mutex> gl(*_global_m);
1053  _promise->_future = nullptr;
1054  *_promise = std::move(pr);
1055  _promise = nullptr;
1056  }
1057  std::atomic_thread_fence(std::memory_order_release);
1058  }
1059 
1060  template <typename Func>
1061  future<T> finally(Func&& func) noexcept
1062  {
1063  return then_wrapped(finally_body<Func, is_future<std::result_of_t<Func()>>::value>(std::forward<Func>(func)));
1064  }
1065 
1066  template <typename Func, bool FuncReturnsFuture>
1068 
1069  template <typename Func>
1070  struct finally_body<Func, true>
1071  {
1072  Func _func;
1073 
1074  using inner_type = remove_future_t<std::result_of_t<Func(T)>>;
1075 
1076  finally_body(Func&& func) : _func(std::forward<Func>(func))
1077  {
1078  }
1079 
1080  future<T> operator()(future<T>&& result)
1081  {
1082  return detail::call_state<inner_type>(_func).then_wrapped([result = std::move(result)](auto f_res) mutable {
1083  if (!f_res.failed())
1084  {
1085  return std::move(result);
1086  }
1087  else
1088  {
1089  try
1090  {
1091  f_res.get();
1092  }
1093  catch (...)
1094  {
1095  return result.rethrow_with_nested();
1096  }
1097  throw std::runtime_error("unreachable");
1098  }
1099  });
1100  }
1101  };
1102 
1103  template <typename Func>
1104  struct finally_body<Func, false>
1105  {
1106  Func _func;
1107 
1108  finally_body(Func&& func) : _func(std::forward<Func>(func))
1109  {
1110  }
1111 
1112  future<T> operator()(future<T>&& result)
1113  {
1114  try
1115  {
1116  _func();
1117  return std::move(result);
1118  }
1119  catch (...)
1120  {
1121  return result.rethrow_with_nested();
1122  }
1123  };
1124  };
1125 
1126 
1127  template <typename Func>
1128  future<T> handle_exception(Func&& func) noexcept
1129  {
1130  using func_ret = std::result_of_t<Func(std::exception_ptr)>;
1131  return then_wrapped([func = std::forward<Func>(func)]
1132  (auto&& fut) mutable->future<T> {
1133  if (!fut.failed())
1134  return make_ready_future<T>(fut.get());
1135  else
1136  return detail::call_future<func_ret>(func, fut.get_exception());
1137  });
1138  }
1139 
1140  void ignore_ready_future() noexcept
1141  {
1142  state()->ignore();
1143  }
1144 
1145 private:
1146  template <typename U>
1147  friend class promise;
1148  template <typename U, typename... A>
1149  friend future<U> make_ready_future(A&&... value);
1150  template <typename U>
1151  friend future<U> make_ready_future(U&& value);
1152  template <typename U>
1153  friend future<U> make_exception_future(std::exception_ptr ex) noexcept;
1154 };
1155 
1157 template <typename T>
1159 {
1160  assert(!_future && _state && !_task);
1161  return future<T>(this);
1162 }
1163 
1165 template <typename T>
1166 template<typename promise<T>::urgent Urgent>
1167 inline void promise<T>::make_ready() noexcept
1168 {
1169  if (_task)
1170  {
1171  _state = nullptr;
1172  if (Urgent == urgent::yes)
1173  {
1174  _task->run();
1175  }
1176  else
1177  {
1178  asio::post(*_io_context, [_task = std::move(_task)]
1179  {
1180  _task->run();
1181  });
1182  }
1183  }
1184 }
1185 
1187 template <typename T>
1188 inline void promise<T>::abandoned() noexcept
1189 {
1190  std::atomic_thread_fence(std::memory_order_acquire);
1191  if (_future)
1192  {
1193  std::lock_guard<std::recursive_mutex> l(_future->_m);
1194  assert(_state);
1195  assert(_state->available() || !_task);
1196  _future->_local_state = std::move(*_state);
1197  _future->_promise = nullptr;
1198  }
1199  else if (_state && _state->failed())
1200  {
1201  }
1202  std::atomic_thread_fence(std::memory_order_release);
1203 }
1204 
1206 template <typename T, typename... A>
1207 inline future<T> make_ready_future(A&&... value)
1208 {
1209  return { ready_future_marker(), std::forward<A>(value)... };
1210 }
1211 
1213 template <typename T>
1214 inline future<T> make_ready_future(T&& value)
1215 {
1216  return { ready_future_marker(), std::forward<T>(value) };
1217 }
1218 
1220 template <typename T>
1221 inline future<T> make_exception_future(std::exception_ptr ex) noexcept
1222 {
1223  return { exception_future_marker(), std::move(ex) };
1224 }
1225 
1226 
1227 namespace detail
1228 {
1230 template<typename T>
1231 inline add_future_t<T> to_future(T&& value)
1232 {
1233  return make_ready_future<T>(std::forward<T>(value));
1234 }
1235 
1237 template<typename T>
1238 inline future<T> to_future(future<T>&& fut)
1239 {
1240  return std::move(fut);
1241 }
1242 
1244 template<typename T, typename Func, typename... A, typename Ret = std::result_of_t<Func(A&&...)>>
1245 inline std::enable_if_t<is_future<Ret>::value, add_future_t<Ret>> call_function(std::true_type, Func&& func, A&&... args)
1246 {
1247  try
1248  {
1249  return func(std::forward<A>(args)...);
1250  }
1251  catch (...)
1252  {
1253  return make_exception_future<T>(std::current_exception());
1254  }
1255 }
1256 
1258 template<typename T, typename Func, typename... A, typename Ret = std::result_of_t<Func(A&&...)>>
1259 inline std::enable_if_t<!is_future<Ret>::value, add_future_t<T>> call_function(std::true_type, Func&& func, A&&... args)
1260 {
1261  try
1262  {
1263  func(std::forward<A>(args)...);
1264  return make_ready_future<T>();
1265  }
1266  catch (...)
1267  {
1268  return make_exception_future<T>(std::current_exception());
1269  }
1270 }
1271 
1273 template<typename T, typename Func, typename... A>
1274 inline add_future_t<T> call_function(std::false_type, Func&& func, A&&... args)
1275 {
1276  try
1277  {
1278  return to_future(func(std::forward<A>(args)...));
1279  }
1280  catch (...)
1281  {
1282  return make_exception_future<T>(std::current_exception());
1283  }
1284 }
1285 
1287 template<typename T, typename Func, typename State>
1288 inline add_future_t<T> call_from_state(std::true_type, Func&& func, State&&)
1289 {
1290  return call_function<T>(std::is_void<T>{}, std::forward<Func>(func));
1291 }
1292 
1294 template<typename T, typename Func, typename State>
1295 inline add_future_t<T> call_from_state(std::false_type, Func&& func, State&& state)
1296 {
1297  return call_function<T>(std::is_void<T>{}, std::forward<Func>(func), std::forward<State>(state).get_value());
1298 }
1299 
1301 template<typename T, typename Func, typename State>
1302 inline add_future_t<T> call_state(Func&& func, State&& state)
1303 {
1304  return call_from_state<T>(std::is_void<typename State::type>{}, std::forward<Func>(func), std::forward<State>(state));
1305 }
1306 
1308 template<typename T, typename Func, typename Future>
1309 inline add_future_t<T> call_future(Func&& func, Future&& fut) noexcept
1310 {
1311  return call_function<T>(std::is_void<T>{}, std::forward<Func>(func), std::forward<Future>(fut));
1312 }
1313 }
1314 
1316 inline void future_state<void>::forward_to(promise<void>& pr) noexcept
1317 {
1318  std::atomic_thread_fence(std::memory_order_acquire);
1319  assert(available());
1320  if (_u.st == state::exception_min)
1321  {
1322  pr.set_urgent_exception(std::move(_u.ex));
1323  }
1324  else
1325  {
1326  pr.set_urgent_value();
1327  }
1328  _u.st = state::invalid;
1329  std::atomic_thread_fence(std::memory_order_release);
1330 }
1331 
1333 template <typename T = rest::rest_reply>
1334 inline future<T> make_exception_future(aegis::error ec)
1335 {
1336  return aegis::make_exception_future<T>(std::make_exception_ptr(aegis::exception(make_error_code(ec))));
1337 }
1338 
1339 }
Definition: futures.hpp:96
Definition: futures.hpp:80
Definition: futures.hpp:68
promise
Definition: futures.hpp:38
Definition: futures.hpp:95
Definition: futures.hpp:56
Definition: futures.hpp:1067
future_state
Definition: futures.hpp:117
Definition: futures.hpp:86
Definition: error.hpp:176
continuation_base
Definition: futures.hpp:463
task
Definition: futures.hpp:454
Definition: futures.hpp:44
future
Definition: futures.hpp:41
continuation
Definition: futures.hpp:521
future_state
Definition: futures.hpp:333