带翻转缓冲器的无锁容器

对于我的一个必须支持并发读取和写入的项目,我需要一个能够缓冲项的容器,直到使用者一次获取每个当前缓冲的项。由于生产者应该能够生成数据,而不管消费者是否读取当前缓冲区,我提出了一个自定义实现,在的帮助下,将每个条目添加到支持中,直到执行翻转,这会导致当前条目被返回,同时存储具有空队列和元数据的新条目以原子方式存储在该中。AtomicReferenceConcurrentLinkedQueueAtomicReference


我想出了一个解决方案,例如


public class FlippingDataContainer<E> {


  private final AtomicReference<FlippingDataContainerEntry<E>> dataObj = new AtomicReference<>();


  public FlippingDataContainer() {

    dataObj.set(new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0));

  }


  public FlippingDataContainerEntry<E> put(E value) {

    if (null != value) {

      while (true) {

        FlippingDataContainerEntry<E> data = dataObj.get();

        FlippingDataContainerEntry<E> updated = FlippingDataContainerEntry.from(data, value);

        if (dataObj.compareAndSet(data, updated)) {

          return merged;

        }

      }

    }

    return null;

  }


  public FlippingDataContainerEntry<E> flip() {

    FlippingDataContainerEntry<E> oldData;

    FlippingDataContainerEntry<E> newData = new FlippingDataContainerEntry<>(new ConcurrentLinkedQueue<>(), 0, 0, 0);

    while (true) {

      oldData = dataObj.get();

      if (dataObj.compareAndSet(oldData, newData)) {

        return oldData;

      }

    }

  }


  public boolean isEmptry() {

    return dataObj.get().getQueue().isEmpty();

  }

}


由于其他线程在此线程可以执行更新之前更新值可能会导致可能的重试,因此我需要在每次写入尝试时复制实际队列,否则即使原子引用无法更新,也会将条目添加到共享队列中。因此,只需将值添加到共享队列中,就可能导致值条目多次添加到队列中,而实际上它只应出现一次。


虽然与复制队列的代码相比,这允许运行测试的速度快10倍以上,但它也经常无法通过消耗测试,因为现在,在消费者线程翻转队列并处理数据之后,可能会立即将值元素添加到队列中,因此并非所有项目似乎都被消耗了。


现在的实际问题是,是否可以避免复制后备队列以获得性能提升,同时仍然允许使用无锁算法原子地更新队列的内容,从而避免中途丢失一些条目?


梦里花落0921
浏览 130回答 2
2回答

三国纷争

首先,让我们陈述一个显而易见的事实 - 最好的解决方案是避免编写任何此类自定义类。也许像java.util.concurrent.LinkedTransferQueue这样简单的东西也可以工作,并且不那么容易出错。如果 a 不起作用,那么&nbsp;LMAX 干扰器或类似的东西呢?您是否看过现有的解决方案?LinkedTransferQueue如果你仍然需要/想要一个自定义解决方案,那么我有一个稍微不同的方法的草图,一个可以避免复制的方法:这个想法是让操作围绕一些原子变量旋转,试图设置它。如果线程设法设置了它,那么它将获得对当前队列的独占访问权限,这意味着它可以追加到它。追加后,它会重置原子变量以允许其他线程追加。它基本上是一个旋转锁。这样,线程之间的争用就会在追加到队列之前发生,而不是在追加到队列之后。put

MYYA

我需要在每次写入尝试时复制实际队列你的想法听起来像RCU(https://en.wikipedia.org/wiki/Read-copy-update)。Java被垃圾回收通过为您解决解除分配问题使RCU变得更加容易(我认为)。如果我从你的问题的快速浏览中正确理解,你的“读者”实际上想为自己“声称”容器的整个当前内容。这也使它们有效地成为编写者,但是它们不是读取+复制,而是可以构造一个空容器,并原子地交换顶级引用以指向该容器。(因此,声明旧容器进行独占访问。RCU的一大好处是容器数据结构本身不必到处都是原子的。一旦你有对它的引用,就没有其他人在修改它。唯一棘手的部分是当作家想要将新内容添加到非空容器中时。然后,您复制现有容器并修改副本,并尝试将更新后的副本 CAS(比较交换,即)到共享的顶级中。compareAndSet()AtomicReference一个作家不能只是无条件地交换,因为它可能最终会得到一个非空的容器,无处放它。除非编写器可以坚持一批工作并旋转等待读取器清空队列...我在这里假设你的作家有一批工作可以同时排队;否则RCU对作家来说可能太贵了。很抱歉,如果我错过了您的问题中的一个细节,可以排除这一点。我不经常使用Java,所以我只是快速编写这个,以防万一它有帮助。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java