RingBuffer环形数组,数组每个元素成为一个slot。RingBuffer容量,默认为Snowflake算法中sequence最大值,且为2^N。可通过boostPower配置进行扩容,以提高RingBuffer 读写吞吐量。


Tail指针 表示Producer生产的最大序号(此序号从0开始,持续递增)。Tail不能超过Cursor,即生产者不能覆盖未消费的slot。当Tail已赶上curosr,此时可通过rejectedPutBufferHandler指定PutRejectPolicy

Cursor指针 表示Consumer消费到的最小序号(序号序列与Producer序列相同)。Cursor不能超过Tail,即不能消费未生产的slot。当Cursor已赶上tail,此时可通过rejectedTakeBufferHandler指定TakeRejectPolicy


由于数组元素在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题,为此在Tail、Cursor指针、Flag-RingBuffer中采用了CacheLine 补齐方式。

RingBuffer填充时机 初始化预填充 RingBuffer初始化时,预先填充满整个RingBuffer.

即时填充 Take消费时,即时检查剩余可用slot量(tail - cursor),如小于设定阈值,则补全空闲slots。阈值可通过paddingFactor来进行配置,请参考Quick Start中CachedUidGenerator配置

周期填充 通过Schedule线程,定时补全空闲slots。可通过scheduleInterval配置,以应用定时填充功能,并指定Schedule时间间隔


主要部分CachedUidGenerator 它是继承DefaultUidGenerator里面的算法

3.1 重要几个参数

/** Tail: last position sequence to produce 尾部:要生成的最后一个位置序列 相当于每生产一个uid就+1 */ private final AtomicLong tail = new PaddedAtomicLong(START_POINT); /** Cursor: current position sequence to consume 光标:要使用的当前位置序列 相当于每消耗一个uid就+1*/ private final AtomicLong cursor = new PaddedAtomicLong(START_POINT); /** 存放uid的数组*/ private final long[] slots; /** 存放对应uid使用的状态,0代表已使用,1代表未使用*/ private final PaddedAtomicLong[] flags

3.2 获取id和填充id的方法

//从获取uid的方法 public long take() { // spin get next available cursor 旋转获取下一个可用光标---每次拿uid时,光标+1 long currentCursor = cursor.get(); long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1); // check for safety consideration, it never occurs 出于安全考虑,绝不会发生 Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back"); // trigger padding in an async-mode if reach the threshold 如果达到阈值,则以异步模式触发填充 long currentTail = tail.get(); if (currentTail - nextCursor < paddingThreshold) { LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail, nextCursor, currentTail - nextCursor); bufferPaddingExecutor.asyncPadding(); } // cursor catch the tail, means that there is no more available UID to take 光标捕捉尾部,意味着没有更多可用的UID if (nextCursor == currentCursor) { rejectedTakeHandler.rejectTakeBuffer(this); } // 1. check next slot flag is CAN_TAKE_FLAG int nextCursorIndex = calSlotIndex(nextCursor); Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status"); // 2. get UID from next slot // 3. set next slot flag as CAN_PUT_FLAG. 获取slots里面的值 long uid = slots[nextCursorIndex]; //更改状态,代表数据id已被获取掉 flags[nextCursorIndex].set(CAN_PUT_FLAG); // Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the // slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring return uid; } //填充方法 public void paddingBuffer() { LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer); // is still running if (!running.compareAndSet(false, true)) { LOGGER.info("Padding buffer is still running. {}", ringBuffer); return; } // fill the rest slots until to catch the cursor boolean isFullRingBuffer = false; while (!isFullRingBuffer) { //获取该s内部全部的uid List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet()); for (Long uid : uidList) { isFullRingBuffer = !ringBuffer.put(uid); if (isFullRingBuffer) { break; } } } // not running now running.compareAndSet(true, false); LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer); } //填充id到slots数组,填充flags状态变更 public synchronized boolean put(long uid) { long currentTail = tail.get(); long currentCursor = cursor.get(); // tail catches the cursor, means that you can't put any cause of RingBuffer is full long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor); if (distance == bufferSize - 1) { rejectedPutHandler.rejectPutBuffer(this, uid); return false; } // 1. pre-check whether the flag is CAN_PUT_FLAG int nextTailIndex = calSlotIndex(currentTail + 1); if (flags[nextTailIndex].get() != CAN_PUT_FLAG) { rejectedPutHandler.rejectPutBuffer(this, uid); return false; } // 2. put UID in the next slot // 3. update next slot' flag to CAN_TAKE_FLAG // 4. publish tail with sequence increase by one 填充id,更改状态 slots[nextTailIndex] = uid; flags[nextTailIndex].set(CAN_TAKE_FLAG); tail.incrementAndGet(); // The atomicity of operations above, guarantees by 'synchronized'. In another word, // the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet()) return true; }


2:消耗uid的方法,cursor 光标从0开始,每消耗一次uid,光标就+1,根据光标找到slots对应数组的位置,根据位置获取slots数组里面的uid,同时变更flags对应位置的状态0

3:当消耗到50%时, bufferPaddingExecutor.asyncPadding(); 异步触发填充方法,开始生产uid,tail+1,tail找到对应slots数组的位置,把生成的uid替换旧位置的值,然后变更flags状态为1.

