1:UidGenerator源码
https://github.com/baidu/uid-generator
2:CachedUidGenerator介绍
RingBuffer环形数组,数组每个元素成为一个slot。RingBuffer容量,默认为Snowflake算法中sequence最大值,且为2^N。可通过boostPower配置进行扩容,以提高RingBuffer 读写吞吐量。
Tail指针、Cursor指针用于环形数组上读写slot:
Tail指针 表示Producer生产的最大序号(此序号从0开始,持续递增)。Tail不能超过Cursor,即生产者不能覆盖未消费的slot。当Tail已赶上curosr,此时可通过rejectedPutBufferHandler指定PutRejectPolicy
Cursor指针 表示Consumer消费到的最小序号(序号序列与Producer序列相同)。Cursor不能超过Tail,即不能消费未生产的slot。当Cursor已赶上tail,此时可通过rejectedTakeBufferHandler指定TakeRejectPolicy
CachedUidGenerator采用了双RingBuffer,Uid-RingBuffer用于存储Uid、Flag-RingBuffer用于存储Uid状态(是否可填充、是否可消费)
由于数组元素在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题,为此在Tail、Cursor指针、Flag-RingBuffer中采用了CacheLine 补齐方式。
RingBuffer填充时机 初始化预填充 RingBuffer初始化时,预先填充满整个RingBuffer.
即时填充 Take消费时,即时检查剩余可用slot量(tail - cursor),如小于设定阈值,则补全空闲slots。阈值可通过paddingFactor来进行配置,请参考Quick Start中CachedUidGenerator配置
周期填充 通过Schedule线程,定时补全空闲slots。可通过scheduleInterval配置,以应用定时填充功能,并指定Schedule时间间隔
3:CachedUidGenerator源码
主要部分CachedUidGenerator 它是继承DefaultUidGenerator里面的算法
3.1 重要几个参数
/** Tail: last position sequence to produce 尾部:要生成的最后一个位置序列 相当于每生产一个uid就+1 */
private final AtomicLong tail = new PaddedAtomicLong(START_POINT);
/** Cursor: current position sequence to consume 光标:要使用的当前位置序列 相当于每消耗一个uid就+1*/
private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);
/** 存放uid的数组*/
private final long[] slots;
/** 存放对应uid使用的状态,0代表已使用,1代表未使用*/
private final PaddedAtomicLong[] flags
3.2 获取id和填充id的方法
//从获取uid的方法
public long take() {
// spin get next available cursor 旋转获取下一个可用光标---每次拿uid时,光标+1
long currentCursor = cursor.get();
long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
// check for safety consideration, it never occurs 出于安全考虑,绝不会发生
Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");
// trigger padding in an async-mode if reach the threshold 如果达到阈值,则以异步模式触发填充
long currentTail = tail.get();
if (currentTail - nextCursor < paddingThreshold) {
LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
nextCursor, currentTail - nextCursor);
bufferPaddingExecutor.asyncPadding();
}
// cursor catch the tail, means that there is no more available UID to take 光标捕捉尾部,意味着没有更多可用的UID
if (nextCursor == currentCursor) {
rejectedTakeHandler.rejectTakeBuffer(this);
}
// 1. check next slot flag is CAN_TAKE_FLAG
int nextCursorIndex = calSlotIndex(nextCursor);
Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
// 2. get UID from next slot
// 3. set next slot flag as CAN_PUT_FLAG. 获取slots里面的值
long uid = slots[nextCursorIndex];
//更改状态,代表数据id已被获取掉
flags[nextCursorIndex].set(CAN_PUT_FLAG);
// Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
// slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
return uid;
}
//填充方法
public void paddingBuffer() {
LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
// is still running
if (!running.compareAndSet(false, true)) {
LOGGER.info("Padding buffer is still running. {}", ringBuffer);
return;
}
// fill the rest slots until to catch the cursor
boolean isFullRingBuffer = false;
while (!isFullRingBuffer) {
//获取该s内部全部的uid
List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
for (Long uid : uidList) {
isFullRingBuffer = !ringBuffer.put(uid);
if (isFullRingBuffer) {
break;
}
}
}
// not running now
running.compareAndSet(true, false);
LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
}
//填充id到slots数组,填充flags状态变更
public synchronized boolean put(long uid) {
long currentTail = tail.get();
long currentCursor = cursor.get();
// tail catches the cursor, means that you can't put any cause of RingBuffer is full
long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
if (distance == bufferSize - 1) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}
// 1. pre-check whether the flag is CAN_PUT_FLAG
int nextTailIndex = calSlotIndex(currentTail + 1);
if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}
// 2. put UID in the next slot
// 3. update next slot' flag to CAN_TAKE_FLAG
// 4. publish tail with sequence increase by one 填充id,更改状态
slots[nextTailIndex] = uid;
flags[nextTailIndex].set(CAN_TAKE_FLAG);
tail.incrementAndGet();
// The atomicity of operations above, guarantees by 'synchronized'. In another word,
// the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet())
return true;
}
1:定义数组大小(默认是65536)slots,flags,项目启动时,初始化相关数据,uid填充进slots,状态1填充到flags,每生产一个uid,tail+1
2:消耗uid的方法,cursor 光标从0开始,每消耗一次uid,光标就+1,根据光标找到slots对应数组的位置,根据位置获取slots数组里面的uid,同时变更flags对应位置的状态0
3:当消耗到50%时, bufferPaddingExecutor.asyncPadding(); 异步触发填充方法,开始生产uid,tail+1,tail找到对应slots数组的位置,把生成的uid替换旧位置的值,然后变更flags状态为1.
4:先理解上一个DefaultUidGenerator,就容易理解这个,源码还是很清晰的。基于这种思维,也可以自己写一个类似分布式id