Tags down


Why does Netty use `Multiple producer single consumer` Queue?

By : LucPaulLabonté
Date : October 14 2020, 02:22 PM
With these it helps Because you can submit work from out side of the EventLoop. For example if you call Channel.write(...) from another thread it will be dispatched to the EventLoop for processing. Which means it will need to be put into the Queue, which requires it to be MPSC at least.
code :

Share : facebook icon twitter icon

Does a multiple producer single consumer lock-free queue exist for c++?

By : Bobby Kissinger
Date : March 29 2020, 07:55 AM
Hope that helps You may want to check disruptor; it's available in C++ here: http://lmax-exchange.github.io/disruptor/
You can also find explanation how it works here on stackoverflow Basically it's circular buffer with no locking, optimized for passing FIFO messages between threads in a fixed-size slots.
code :
    total=1000000 samples, avg=0.24us
    50%=0.214us, avg=0.093us
    90%=0.23us, avg=0.151us
    99%=0.322us, avg=0.159us
    99.9%=15.566us, avg=0.173us
    total=1500000 samples, avg=0.07us
    50%=0us, avg=0us
    90%=0.155us, avg=0.016us
    99%=0.361us, avg=0.038us
    99.9%=8.723us, avg=0.044us
// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <core/api.hxx>
#include <core/wheel/exception.hxx>

#include <boost/noncopyable.hpp>
#include <boost/type_traits.hpp>
#include <boost/lexical_cast.hpp>
#include <typeinfo>

namespace core { namespace wheel
  struct bad_size : core::exception
    template<typename T> explicit bad_size(const T&, size_t m)
      : core::exception(std::string("Slot capacity exceeded, sizeof(")
                  + typeid(T).name()
                  + ") = "
                  + boost::lexical_cast<std::string>(sizeof(T))
                  + ", capacity = "
                  + boost::lexical_cast<std::string>(m)

  // inspired by Disruptor
  template <typename Header>
  class wheel : boost::noncopyable
    struct slot_detail
      // slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel)
      // slot read:  (memory barrier in wheel) > read_done > (memory barrier in wheel)

      // done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate
      template <bool Writing>
      void done(wheel* w)
        if (Writing)

      // cache line for sequence number and header
      long long sequence;
      Header header;

      // there is no such thing as data type with variable size, but we need it to avoid thrashing
      // cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element.
      // This is well into UB territory! Using template parameter for this is not good, since it
      // results in this small implementation detail leaking to all possible user interfaces.
      char data[8];

    // use this as a storage space for slot_detail, to guarantee 64 byte alignment
    struct slot_block { long long padding[8]; };

    // wrap slot data to outside world
    template <bool Writable>
    class slot
      template<typename> friend class wheel;

      slot& operator=(const slot&); // moveable but non-assignable

      // may only be constructed by wheel
      slot(slot_detail* impl, wheel<Header>* w, size_t c)
        : slot_(impl) , wheel_(w) , capacity_(c)

      slot(slot&& s)
        : slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_)
        s.slot_ = NULL;

        if (slot_)

      // slot accessors - use Header to store information on what type is actually stored in data
      bool empty() const          { return !slot_; }
      long long sequence() const  { return slot_->sequence; }
      Header& header()            { return slot_->header; }
      char* data()                { return slot_->data; }

      template <typename T> T& cast()
        static_assert(boost::is_pod<T>::value, "Data type must be POD");
        if (sizeof(T) > capacity_)
          throw bad_size(T(), capacity_);
        if (empty())
          throw no_data();
        return *((T*) data());

      slot_detail*    slot_;
      wheel<Header>*  wheel_;
      const size_t    capacity_;

    // dynamic size of slot, with extra capacity, expressed in 64 byte blocks
    static size_t sizeof_slot(size_t s)
      size_t m = sizeof(slot_detail);
      // add capacity less 8 bytes already within sizeof(slot_detail)
      m += max(8, s) - 8;
      // round up to 64 bytes, i.e. alignment of slot_detail
      size_t r = m & ~(unsigned int)63;
      if (r < m)
        r += 64;
      r /= 64;
      return r;

    // calculate actual slot capacity back from number of 64 byte blocks
    static size_t slot_capacity(size_t s)
      return s*64 - sizeof(slot_detail) + 8;

    // round up to power of 2
    static size_t round_size(size_t s)
      // enfore minimum size
      if (s <= min_size)
        return min_size;

      // find rounded value
      size_t r = 1;
      while (s)
        s >>= 1;
        r <<= 1;
      return r;

    slot_detail& at(long long sequence)
      // find index from sequence number and return slot at found index of the wheel
      return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]);

    wheel(size_t capacity, size_t size)
      : head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_()
      , blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size))
      static_assert(boost::is_pod<Header>::value, "Header type must be POD");
      static_assert(sizeof(slot_block) == 64, "This was unexpected");

      wheel_ = new slot_block[size_ * blocks_];
      // all slots must be initialised to 0
      memset(wheel_, 0, size_ * 64 * blocks_);
      active_ = 1;

      delete[] wheel_;

    // all accessors needed
    size_t capacity() const { return capacity_; }   // capacity of a single slot
    size_t size() const     { return size_; }       // number of slots available
    size_t queue() const    { return (size_t)head_ - (size_t)tail_; }
    bool active() const     { return active_ == 1; }

    // enough to call it just once, to fine tune slot capacity
    template <typename T>
    void check() const
      static_assert(boost::is_pod<T>::value, "Data type must be POD");
      if (sizeof(T) > capacity_)
        throw bad_size(T(), capacity_);

    // stop the wheel - safe to execute many times
    size_t stop()
      InterlockedExchange(&active_, 0);
      // must wait for current read to complete
      while (rdng_ != tail_)

      return size_t(head_ - tail_);

    // return first available slot for write
    slot<true> post()
      if (!active_)
        throw stopped();

      // the only memory barrier on head seq. number we need, if not overflowing
      long long h = InterlockedIncrement64(&head_);
      while(h - (long long) size_ > tail_)
        if (InterlockedDecrement64(&head_) == h - 1)
          throw overflowing();

        // protection against case of race condition when we are overflowing
        // and two or more threads try to post and two or more messages are read,
        // all at the same time. If this happens we must re-try, otherwise we
        // could have skipped a sequence number - causing infinite wait in post_done
        h = InterlockedIncrement64(&head_);

      slot_detail& r = at(h);
      r.sequence = h;

      // wrap in writeable slot
      return slot<true>(&r, this, capacity_);

    // return first available slot for write, nothrow variant
    slot<true> post(std::nothrow_t)
      if (!active_)
        return slot<true>(NULL, this, capacity_);

      // the only memory barrier on head seq. number we need, if not overflowing
      long long h = InterlockedIncrement64(&head_);
      while(h - (long long) size_ > tail_)
        if (InterlockedDecrement64(&head_) == h - 1)
          return slot<true>(NULL, this, capacity_);

        // must retry if race condition described above
        h = InterlockedIncrement64(&head_);

      slot_detail& r = at(h);
      r.sequence = h;

      // wrap in writeable slot
      return slot<true>(&r, this, capacity_);

    // read first available slot for read
    slot<false> read()
      slot_detail* r = NULL;
      // compare rdng_ and wrtn_ early to avoid unnecessary memory barrier
      if (active_ && rdng_ < wrtn_)
        // the only memory barrier on reading seq. number we need
        const long long h = InterlockedIncrement64(&rdng_);
        // check if this slot has been written, step back if not
        if (h > wrtn_)
          r = &at(h);

      // wrap in readable slot
      return slot<false>(r , this, capacity_);

    // waiting for new post, to be used by non-polling clients
    void acquire()

    bool try_acquire()
      return event_.try_acquire();

    bool try_acquire(unsigned long timeout)
      return event_.try_acquire(timeout);

    void release()

    void post_done(long long sequence)
      const long long t = sequence - 1;

      // the only memory barrier on written seq. number we need
      while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t)

      // this is outside of critical path for polling clients

    void read_done()
      // the only memory barrier on tail seq. number we need

    // each in its own cache line
    // head_ - wrtn_ = no. of messages being written at this moment
    // rdng_ - tail_ = no. of messages being read at the moment
    // head_ - tail_ = no. of messages to read (including those being written and read)
    // wrtn_ - rdng_ = no. of messages to read (excluding those being written or read)
    __declspec(align(64)) volatile long long head_; // currently writing or written
    __declspec(align(64)) volatile long long wrtn_; // written
    __declspec(align(64)) volatile long long rdng_; // currently reading or read
    __declspec(align(64)) volatile long long tail_; // read
    __declspec(align(64)) volatile long active_;    // flag switched to 0 when stopped

    api::event event_;          // set when new message is posted
    const size_t blocks_;       // number of 64-byte blocks in a single slot_detail
    const size_t capacity_;     // capacity of data() section per single slot. Initialisation depends on blocks_
    const size_t size_;         // number of slots available, always power of 2
    slot_block* wheel_;
  while (wheel.active())
    core::wheel::wheel<int>::slot<false> slot = wheel.read();
    if (!slot.empty())
      Data& d = slot.cast<Data>();
      // do work
    // uncomment below for waiting consumer, saving CPU cycles
    // else
    //   wheel.try_acquire(10);

Producer-Consumer wtih remote message queue as producer and ExecutorService as local consumer

By : TFung
Date : March 29 2020, 07:55 AM
like below fixes the issue If you use the ThreadPoolExecutor implementation of the ExecutorService then you'll have access to the information you need that's missing from the ExecutorService (e.g. getActiveCount() and getTaskCount())

Multi producer single consumer queue without dedicated consumer thread

By : user3824596
Date : March 29 2020, 07:55 AM
Any of those help Your code is correct, but problem is not seen as standard: usually, having background thread is not so expensive, as:

Single Producer Multiple Consumer - queue contains null

By : Alex Mazurov
Date : March 29 2020, 07:55 AM
help you fix your problem You need to use while(isBufferEmpty()) instead of just if (and the same for full). Since all consumers (and producers) get signaled at the same time, you have to recheck to make sure that the other ones haven't already processed the elements added in the queue.

why does producer consumer queue with single producer/consumer doesn't need mutex?

By : S.Avery
Date : March 29 2020, 07:55 AM
it fixes the issue Because such queue will usually be implemented as a circular queue. Producer will be writing to the tail of the queue, while consumer reads from the head. They never access the same memory at the same time.
The idea here is that both consumer and producer can track the position of the tail/head independently.
code :
int producerPtr = 0, consumerPtr = 0;

void putItemIntoBuffer(Item item)
     data[producerPtr] = item;
     producerPtr = (producerPtr  + 1) % BUFFER_SIZE;

Item removeItemFromBuffer(void)
     Item item = data[consumerPtr ];
     consumerPtr = (consumerPtr + 1) % BUFFER_SIZE;
     return item;
Related Posts Related Posts :
  • Is there such a thing as a filename that is too long?
  • Karate - [#document: null] in output
  • SAP Introspection: Resolve ForeignKey
  • Flask: Trouble resolving endpoint locations in package
  • Metadata in DynamoDB stream event for delete operation?
  • Create waf size feature
  • Floating decimal point type in Haxe
  • Installing Spyder (updated for 2018)
  • How to use Active Directory Authentication in ASP.NET Core?
  • ABAP: from get_auth_values() result to SQL query
  • AOSP build for Samsung Galaxy Tab A
  • Allow to find only users members of a specific group in 1 query
  • How can I easily label my data in Power BI?
  • Use or not, of lambda to define a function in Racket
  • Can I use GitHub's Linguist as a replacement to Rouge in Jekyll
  • Storing streamed tweets in a list for further analysis
  • Swagger permanent authorization token
  • Output index of ELKI
  • Diverts deleted when restarting ActiveMQ Artemis
  • Sum-up and then calculate vs. calculate and then sum-up (SSAS-MDX)
  • xQuery - fill custom array
  • Issue with javax.mail and attached file
  • How to change the theme colors in Vuetify in standalone/CDN mode?
  • WildFly 10.0 port offset "9" can't connect to CLI at 9999
  • 401 Error when sending data to Stripe `Customers` API
  • When would a linked list be preferred over a circular buffer?
  • How to get Facebook page feed and Filter its fields as Json using Google App script
  • How to delete or set lifespan to zero - Dialogflow Agent Context
  • How to update Mat-Input Place Holder on Focus
  • How to show the Systray Icon and also the corresponding executable in the taskmanager by default when we run the install
  • Google Smart Home Agent responded Empty JSON
  • Using conditionalpanel in shiny where input is a vector
  • How many images are generated by keras fit_generator?
  • feed data to fitDataset()
  • Storm simple jdbc mapper write array to phoenix db not supported?
  • Unable to install the printer driver . Operation could not be completed (error 0x0000007e)
  • What is causing my ToDataSourceResult error in my KendoUI Core Application?
  • Questions abous blockchain and ethereum
  • SUMO: How to add new routing algorithm
  • Automation Anywhere. Using variables to select window in object cloning command
  • Numerical issues integrating a pulse signal that is delayed (fixedDelay)
  • JavaFX Boolean Binding and TableView Binding multiple
  • Does RavenDb's `Include` support constructing a document id, not just selecting one?
  • Extract tokens from grammar
  • How to install only test dependencies with Zef
  • Nexus Repository Manager 3.14 with Ceph blobstore performance
  • Implement custom RDF4J function for GraphDB
  • gRPC Java Client - hasNext during onNext?
  • Make InfluxDB/Grafana cumulative function that resets daily (sawtooth graph)
  • Using newer version of nodejs in a ruby project with cloud foundary
  • Downsides of using Shade plugin relocation feature
  • How to add JMS Message Header Property from WLST
  • GetDateFormatEx format string and custom text
  • File not found in Gitlab-CI
  • Using collision to pick up items
  • Polygon on Bing Map using Local GeoJSON Object results in wrong location
  • Searching for a Blame
  • Outlook Add In Recipient Not Being Returned
  • Verify the signing of manifest and application files
  • Gulp 4 watch gulp.series only run one time
  • shadow
    Privacy Policy - Terms - Contact Us © voile276.org