@Data
public class EventData {
private Long value;}
1.
2.
3.
4.
消费者
public class EventConsumer implements WorkHandler<EventData>{/** * 消费回调 * @param eventData * @throws Exception */
@Override
public void onEvent(EventData eventData) throws Exception {
Thread.sleep(5000);
System.out.println(Thread.currentThread()+", eventData:"+ eventData.getValue());}}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
生产者
public class EventProducer {
private final RingBuffer<EventData> ringBuffer;
public EventProducer(RingBuffer<EventData> ringBuffer){
this.ringBuffer= ringBuffer;}
public void sendData(Long v){// cas展位
long next = ringBuffer.next();
try {
EventData eventData = ringBuffer.get(next);
eventData.setValue(v);} finally {// 通知等待的消费者
System.out.println("EventProducer send success, sequence:"+next);
ringBuffer.publish(next);}}}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
测试类
public class DisruptorTest {
public static void main(String[] args){//2的n次方
int bufferSize =8;
Disruptor<EventData> disruptor = new Disruptor<EventData>(()-> new EventData(),// 事件工厂
bufferSize,// 环形数组大小
Executors.defaultThreadFactory(),// 线程池工厂
ProducerType.MULTI,// 支持多事件发布者
new BlockingWaitStrategy());// 等待策略
// 设置消费者
disruptor.handleEventsWithWorkerPool(
new EventConsumer(),
new EventConsumer(),
new EventConsumer(),
new EventConsumer());
disruptor.start();
RingBuffer<EventData> ringBuffer = disruptor.getRingBuffer();
EventProducer eventProducer = new EventProducer(ringBuffer);long i =0;
for(;;){
i++;
eventProducer.sendData(i);
try {
Thread.sleep(1500);} catch (InterruptedException e){
e.printStackTrace();}}}}
// 消费者上一次消费的最小序号 // 后续第二点会讲到
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);// 当前进度的序号
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);// 所有消费者的序号 //后续第二点会讲到
protected volatile Sequence[] gatingSequences = new Sequence[0];
public long next(int n){
if (n <1){
throw new IllegalArgumentException("n must be > 0");}long current;long next;
do
{// 当前进度的序号,Sequence的value具有可见性,保证多线程间线程之间能感知到可申请的最新值
current = cursor.get();// 要申请的序号空间:最大序列号
next = current + n;long wrapPoint = next - bufferSize;// 消费者最小序列号
long cachedGatingSequence = gatingSequenceCache.get();// 大于一圈 || 最小消费序列号>当前进度
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){long gatingSequence = Util.getMinimumSequence(gatingSequences, current);// 说明大于1圈,并没有多余空间可以申请
if (wrapPoint > gatingSequence){
LockSupport.parkNanos(1);// TODO, should we spin based on the wait strategy?
continue;}// 更新最小值到Sequence的value中
gatingSequenceCache.set(gatingSequence);}// CAS成功后更新当前Sequence的value
else if (cursor.compareAndSet(current, next)){
break;}}
while (true);
return next;}