Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
tbb::flow::interface11::limiter_node< T, DecrementType > Class Template Reference

Forwards messages only if the threshold has not been reached. More...

#include <flow_graph.h>

Collaboration diagram for tbb::flow::interface11::limiter_node< T, DecrementType >:

Public Types

typedef T input_type
 
typedef T output_type
 
typedef receiver< input_type >::predecessor_type predecessor_type
 
typedef sender< output_type >::successor_type successor_type
 

Public Member Functions

 limiter_node (graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
 Constructor. More...
 
 limiter_node (const limiter_node &src)
 Copy constructor. More...
 
bool register_successor (successor_type &r) __TBB_override
 Replace the current successor with this new successor. More...
 
bool remove_successor (successor_type &r) __TBB_override
 Removes a successor from this node. More...
 
bool register_predecessor (predecessor_type &src) __TBB_override
 Adds src to the list of cached predecessors. More...
 
bool remove_predecessor (predecessor_type &src) __TBB_override
 Removes src from the list of cached predecessors. More...
 

Public Attributes

internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
 The internal receiver< DecrementType > that decrements the count. More...
 

Protected Member Functions

tasktry_put_task (const T &t) __TBB_override
 Puts an item to this receiver. More...
 
graphgraph_reference () const __TBB_override
 
void reset_receiver (reset_flags) __TBB_override
 
void reset_node (reset_flags f) __TBB_override
 

Private Member Functions

bool check_conditions ()
 
taskforward_task ()
 
void forward ()
 
taskdecrement_counter (long long delta)
 
void initialize ()
 

Private Attributes

size_t my_threshold
 
size_t my_count
 
size_t my_tries
 
internal::reservable_predecessor_cache< T, spin_mutexmy_predecessors
 
spin_mutex my_mutex
 
internal::broadcast_cache< T > my_successors
 

Friends

class internal::forward_task_bypass< limiter_node< T, DecrementType > >
 
class internal::decrementer< limiter_node< T, DecrementType >, DecrementType >
 
template<typename R , typename B >
class run_and_put_task
 
template<typename X , typename Y >
class internal::broadcast_cache
 
template<typename X , typename Y >
class internal::round_robin_cache
 

Detailed Description

template<typename T, typename DecrementType = continue_msg>
class tbb::flow::interface11::limiter_node< T, DecrementType >

Forwards messages only if the threshold has not been reached.

This node forwards items until its threshold is reached. It contains no buffering. If the downstream node rejects, the message is dropped.

Definition at line 121 of file flow_graph.h.

Member Typedef Documentation

◆ input_type

template<typename T , typename DecrementType = continue_msg>
typedef T tbb::flow::interface11::limiter_node< T, DecrementType >::input_type

Definition at line 2987 of file flow_graph.h.

◆ output_type

template<typename T , typename DecrementType = continue_msg>
typedef T tbb::flow::interface11::limiter_node< T, DecrementType >::output_type

Definition at line 2988 of file flow_graph.h.

◆ predecessor_type

template<typename T , typename DecrementType = continue_msg>
typedef receiver<input_type>::predecessor_type tbb::flow::interface11::limiter_node< T, DecrementType >::predecessor_type

Definition at line 2989 of file flow_graph.h.

◆ successor_type

template<typename T , typename DecrementType = continue_msg>
typedef sender<output_type>::successor_type tbb::flow::interface11::limiter_node< T, DecrementType >::successor_type

Definition at line 2990 of file flow_graph.h.

Constructor & Destructor Documentation

◆ limiter_node() [1/2]

template<typename T , typename DecrementType = continue_msg>
tbb::flow::interface11::limiter_node< T, DecrementType >::limiter_node ( graph g,
__TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0)   
)
inline

Constructor.

Definition at line 3110 of file flow_graph.h.

3112  : graph_node(g), my_threshold(threshold), my_count(0),
3114  my_tries(0), decrement(),
3115  init_decrement_predecessors(num_decrement_predecessors),
3116  decrement(num_decrement_predecessors)) {
3117  initialize();
3118  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::initialize().

Here is the call graph for this function:

◆ limiter_node() [2/2]

template<typename T , typename DecrementType = continue_msg>
tbb::flow::interface11::limiter_node< T, DecrementType >::limiter_node ( const limiter_node< T, DecrementType > &  src)
inline

Copy constructor.

Definition at line 3129 of file flow_graph.h.

3129  :
3130  graph_node(src.my_graph), receiver<T>(), sender<T>(),
3131  my_threshold(src.my_threshold), my_count(0),
3133  my_tries(0), decrement(),
3134  init_decrement_predecessors(src.init_decrement_predecessors),
3135  decrement(src.init_decrement_predecessors)) {
3136  initialize();
3137  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::initialize().

Here is the call graph for this function:

Member Function Documentation

◆ check_conditions()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::check_conditions ( )
inlineprivate

◆ decrement_counter()

template<typename T , typename DecrementType = continue_msg>
task* tbb::flow::interface11::limiter_node< T, DecrementType >::decrement_counter ( long long  delta)
inlineprivate

Definition at line 3076 of file flow_graph.h.

3076  {
3077  {
3079  if( delta > 0 && size_t(delta) > my_count )
3080  my_count = 0;
3081  else if( delta < 0 && size_t(delta) > my_threshold - my_count )
3083  else
3084  my_count -= size_t(delta); // absolute value of delta is sufficiently small
3085  }
3086  return forward_task();
3087  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::forward_task(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, and tbb::flow::interface11::limiter_node< T, DecrementType >::my_threshold.

Here is the call graph for this function:

◆ forward()

template<typename T , typename DecrementType = continue_msg>
void tbb::flow::interface11::limiter_node< T, DecrementType >::forward ( )
inlineprivate

Definition at line 3071 of file flow_graph.h.

3071  {
3072  __TBB_ASSERT(false, "Should never be called");
3073  return;
3074  }

References __TBB_ASSERT.

◆ forward_task()

template<typename T , typename DecrementType = continue_msg>
task* tbb::flow::interface11::limiter_node< T, DecrementType >::forward_task ( )
inlineprivate

Definition at line 3018 of file flow_graph.h.

3018  {
3019  input_type v;
3020  task *rval = NULL;
3021  bool reserved = false;
3022  {
3024  if ( check_conditions() )
3025  ++my_tries;
3026  else
3027  return NULL;
3028  }
3029 
3030  //SUCCESS
3031  // if we can reserve and can put, we consume the reservation
3032  // we increment the count and decrement the tries
3033  if ( (my_predecessors.try_reserve(v)) == true ){
3034  reserved=true;
3035  if ( (rval = my_successors.try_put_task(v)) != NULL ){
3036  {
3038  ++my_count;
3039  --my_tries;
3040  my_predecessors.try_consume();
3041  if ( check_conditions() ) {
3042  if ( internal::is_graph_active(this->my_graph) ) {
3043  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3044  internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3046  }
3047  }
3048  }
3049  return rval;
3050  }
3051  }
3052  //FAILURE
3053  //if we can't reserve, we decrement the tries
3054  //if we can reserve but can't put, we decrement the tries and release the reservation
3055  {
3057  --my_tries;
3058  if (reserved) my_predecessors.try_release();
3059  if ( check_conditions() ) {
3060  if ( internal::is_graph_active(this->my_graph) ) {
3061  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3062  internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3063  __TBB_ASSERT(!rval, "Have two tasks to handle");
3064  return rtask;
3065  }
3066  }
3067  return rval;
3068  }
3069  }

References __TBB_ASSERT, tbb::flow::interface11::limiter_node< T, DecrementType >::check_conditions(), tbb::flow::interface11::limiter_node< T, DecrementType >::graph_reference(), tbb::flow::interface11::internal::is_graph_active(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_tries, and tbb::flow::interface11::internal::spawn_in_graph_arena().

Referenced by tbb::flow::interface11::limiter_node< T, DecrementType >::decrement_counter().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ graph_reference()

template<typename T , typename DecrementType = continue_msg>
graph& tbb::flow::interface11::limiter_node< T, DecrementType >::graph_reference ( ) const
inlineprotected

Definition at line 3260 of file flow_graph.h.

3260 { return my_graph; }

Referenced by tbb::flow::interface11::limiter_node< T, DecrementType >::forward_task(), tbb::flow::interface11::limiter_node< T, DecrementType >::register_predecessor(), and tbb::flow::interface11::limiter_node< T, DecrementType >::register_successor().

Here is the caller graph for this function:

◆ initialize()

template<typename T , typename DecrementType = continue_msg>
void tbb::flow::interface11::limiter_node< T, DecrementType >::initialize ( )
inlineprivate

Definition at line 3089 of file flow_graph.h.

3089  {
3090  my_predecessors.set_owner(this);
3091  my_successors.set_owner(this);
3092  decrement.set_owner(this);
3094  CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
3095  static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
3096  static_cast<sender<output_type> *>(this)
3097  );
3098  }

References CODEPTR, tbb::flow::interface11::limiter_node< T, DecrementType >::decrement, tbb::internal::fgt_node(), tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors, and tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors.

Referenced by tbb::flow::interface11::limiter_node< T, DecrementType >::limiter_node().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ register_predecessor()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::register_predecessor ( predecessor_type src)
inline

Adds src to the list of cached predecessors.

Definition at line 3210 of file flow_graph.h.

3210  {
3212  my_predecessors.add( src );
3213  if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
3214  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3215  internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3217  }
3218  return true;
3219  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::graph_reference(), tbb::flow::interface11::internal::is_graph_active(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_threshold, tbb::flow::interface11::limiter_node< T, DecrementType >::my_tries, and tbb::flow::interface11::internal::spawn_in_graph_arena().

Here is the call graph for this function:

◆ register_successor()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::register_successor ( successor_type r)
inline

Replace the current successor with this new successor.

Definition at line 3146 of file flow_graph.h.

3146  {
3148  bool was_empty = my_successors.empty();
3149  my_successors.register_successor(r);
3150  //spawn a forward task if this is the only successor
3151  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
3152  if ( internal::is_graph_active(this->my_graph) ) {
3153  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3154  internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
3156  }
3157  }
3158  return true;
3159  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::graph_reference(), tbb::flow::interface11::internal::is_graph_active(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_threshold, tbb::flow::interface11::limiter_node< T, DecrementType >::my_tries, and tbb::flow::interface11::internal::spawn_in_graph_arena().

Here is the call graph for this function:

◆ remove_predecessor()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::remove_predecessor ( predecessor_type src)
inline

Removes src from the list of cached predecessors.

Definition at line 3222 of file flow_graph.h.

3222  {
3223  my_predecessors.remove( src );
3224  return true;
3225  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::my_predecessors.

◆ remove_successor()

template<typename T , typename DecrementType = continue_msg>
bool tbb::flow::interface11::limiter_node< T, DecrementType >::remove_successor ( successor_type r)
inline

Removes a successor from this node.

r.remove_predecessor(*this) is also called.

Definition at line 3163 of file flow_graph.h.

3163  {
3164  r.remove_predecessor(*this);
3165  my_successors.remove_successor(r);
3166  return true;
3167  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors.

◆ reset_node()

template<typename T , typename DecrementType = continue_msg>
void tbb::flow::interface11::limiter_node< T, DecrementType >::reset_node ( reset_flags  f)
inlineprotected

◆ reset_receiver()

template<typename T , typename DecrementType = continue_msg>
void tbb::flow::interface11::limiter_node< T, DecrementType >::reset_receiver ( reset_flags  )
inlineprotected

Definition at line 3262 of file flow_graph.h.

3262  {
3263  __TBB_ASSERT(false,NULL); // should never be called
3264  }

References __TBB_ASSERT.

◆ try_put_task()

template<typename T , typename DecrementType = continue_msg>
task* tbb::flow::interface11::limiter_node< T, DecrementType >::try_put_task ( const T &  t)
inlineprotected

Puts an item to this receiver.

Definition at line 3233 of file flow_graph.h.

3233  {
3234  {
3236  if ( my_count + my_tries >= my_threshold )
3237  return NULL;
3238  else
3239  ++my_tries;
3240  }
3241 
3242  task * rtask = my_successors.try_put_task(t);
3243 
3244  if ( !rtask ) { // try_put_task failed.
3246  --my_tries;
3247  if (check_conditions() && internal::is_graph_active(this->my_graph)) {
3248  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
3249  internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
3250  }
3251  }
3252  else {
3254  ++my_count;
3255  --my_tries;
3256  }
3257  return rtask;
3258  }

References tbb::flow::interface11::limiter_node< T, DecrementType >::check_conditions(), tbb::flow::interface11::internal::is_graph_active(), lock, tbb::flow::interface11::limiter_node< T, DecrementType >::my_count, tbb::flow::interface11::limiter_node< T, DecrementType >::my_mutex, tbb::flow::interface11::limiter_node< T, DecrementType >::my_successors, tbb::flow::interface11::limiter_node< T, DecrementType >::my_threshold, and tbb::flow::interface11::limiter_node< T, DecrementType >::my_tries.

Here is the call graph for this function:

Friends And Related Function Documentation

◆ internal::broadcast_cache

template<typename T , typename DecrementType = continue_msg>
template<typename X , typename Y >
friend class internal::broadcast_cache
friend

Definition at line 3230 of file flow_graph.h.

◆ internal::decrementer< limiter_node< T, DecrementType >, DecrementType >

template<typename T , typename DecrementType = continue_msg>
friend class internal::decrementer< limiter_node< T, DecrementType >, DecrementType >
friend

Definition at line 3005 of file flow_graph.h.

◆ internal::forward_task_bypass< limiter_node< T, DecrementType > >

template<typename T , typename DecrementType = continue_msg>
friend class internal::forward_task_bypass< limiter_node< T, DecrementType > >
friend

Definition at line 3005 of file flow_graph.h.

◆ internal::round_robin_cache

template<typename T , typename DecrementType = continue_msg>
template<typename X , typename Y >
friend class internal::round_robin_cache
friend

Definition at line 3231 of file flow_graph.h.

◆ run_and_put_task

template<typename T , typename DecrementType = continue_msg>
template<typename R , typename B >
friend class run_and_put_task
friend

Definition at line 3229 of file flow_graph.h.

Member Data Documentation

◆ decrement

template<typename T , typename DecrementType = continue_msg>
internal::decrementer< limiter_node<T, DecrementType>, DecrementType > tbb::flow::interface11::limiter_node< T, DecrementType >::decrement

The internal receiver< DecrementType > that decrements the count.

Definition at line 3101 of file flow_graph.h.

Referenced by tbb::flow::interface11::limiter_node< T, DecrementType >::initialize(), and tbb::flow::interface11::limiter_node< T, DecrementType >::reset_node().

◆ my_count

◆ my_mutex

◆ my_predecessors

◆ my_successors

◆ my_threshold

◆ my_tries


The documentation for this class was generated from the following file:
internal::forward_task_bypass
A task that calls a node's forward_task function.
Definition: _flow_graph_body_impl.h:328
tbb::flow::interface11::internal::is_graph_active
bool is_graph_active(tbb::flow::interface10::graph &g)
Definition: _flow_graph_impl.h:494
tbb::flow::interface11::limiter_node::input_type
T input_type
Definition: flow_graph.h:2987
tbb::flow::interface11::internal::spawn_in_graph_arena
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
Definition: _flow_graph_impl.h:521
CODEPTR
#define CODEPTR()
Definition: _flow_graph_trace_impl.h:297
__TBB_ASSERT
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
__TBB_DEPRECATED_LIMITER_ARG4
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
Definition: _flow_graph_impl.h:54
tbb::flow::interface11::limiter_node::check_conditions
bool check_conditions()
Definition: flow_graph.h:3013
tbb::flow::interface11::limiter_node::my_successors
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:3005
tbb::flow::interface11::limiter_node::my_tries
size_t my_tries
Definition: flow_graph.h:3002
tbb::spin_mutex::scoped_lock
friend class scoped_lock
Definition: spin_mutex.h:179
lock
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Definition: ittnotify_static.h:121
tbb::flow::interface11::limiter_node::my_predecessors
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:3003
tbb::flow::interface11::limiter_node::my_count
size_t my_count
Definition: flow_graph.h:3001
tbb::flow::interface11::limiter_node::initialize
void initialize()
Definition: flow_graph.h:3089
task
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
Definition: ittnotify_static.h:119
tbb::flow::interface11::limiter_node::my_mutex
spin_mutex my_mutex
Definition: flow_graph.h:3004
tbb::flow::interface11::rf_clear_edges
@ rf_clear_edges
Definition: _flow_graph_impl.h:161
tbb::internal::fgt_node
static void fgt_node(void *, string_index, void *, void *)
Definition: _flow_graph_trace_impl.h:326
tbb::flow::interface11::limiter_node::my_threshold
size_t my_threshold
Definition: flow_graph.h:3000
tbb::flow::interface11::limiter_node::forward_task
task * forward_task()
Definition: flow_graph.h:3018
tbb::flow::interface11::limiter_node::decrement
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:3101
tbb::flow::interface11::limiter_node::graph_reference
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:3260

Copyright © 2005-2020 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.