手记

disruptor实现生产者和消费者

disruptor 的核心是RingBuffer.环形结构,数组,采用了内存预加载,并且能够在无锁的情况下实现高性能的并发操作。
感兴趣的可以研究源码。

起源

之前看到DATAX的 MemoryChannel 是通过ArrayBlockingQueue<Record> queue来实现的,利用了批量新增和读取的方式。(通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。)
queue.addAll(rs);
this.queue.drainTo(rs,bufferSize)
通过ReentrantLock来控制并发
private ReentrantLock lock;
private Condition notInsufficient, notEmpty;
这里想用disruptor实现一个类似的功能

代码

  • 定义传输的数据结构
@Data
public abstract class Column {

    private Type type;//数据类型

    private Object rawData;//数据内容

    public Column(Type type, Object rawData) {
        this.type = type;
        this.rawData = rawData;
    }

    //数据类型
    public enum Type {
        BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES
    }
}
public class StringColumn extends Column {
    public StringColumn() {
        this((String)null);
    }
    public StringColumn(final Object rawData) {
        super(Type.STRING, rawData);
    }
}
/**
 * @author dalizu on 2018/9/28.
 * @version v1.0
 * @desc 一条记录
 */
public interface Record {

     void addColumn(Column column);

     void setColumn(int i, final Column column);

     Column getColumn(int i);

     String toString();

     int getColumnNumber();

}

/**
 * @author dalizu on 2018/9/28.
 * @version v1.0
 * @desc 传输记录的方法
 */
public class DefaultRecord implements Record{

    private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16;

    private List<Column>columns;//一条数据

    public DefaultRecord() {
        this.columns = new ArrayList<Column>(RECORD_AVERGAE_COLUMN_NUMBER);
    }

    @Override
    public void addColumn(Column column) {
        columns.add(column);
    }

    @Override
    public void setColumn(int i, Column column) {
        if (i < 0) {
            throw new RuntimeException("不能给index小于0的column设置值");
        }

        if (i >= columns.size()) {
            expandCapacity(i + 1);
        }

        this.columns.set(i, column);
    }

    @Override
    public Column getColumn(int i) {
        if (i < 0 || i >= columns.size()) {
            return null;
        }
        return columns.get(i);
    }

    @Override
    public int getColumnNumber() {
        return this.columns.size();
    }

    private void expandCapacity(int totalSize) {
        if (totalSize <= 0) {
            return;
        }

        int needToExpand = totalSize - columns.size();
        while (needToExpand-- > 0) {
            this.columns.add(null);
        }
    }
}
  • 实现disruptor相关代码
/**
 * @author dalizu on 2018/11/7.
 * @version v1.0
 * @desc 包装对象
 */
public class RecordEvent {

    private Record record;

    public Record getRecord() {
        return record;
    }

    public void setRecord(Record record) {
        this.record = record;
    }
}

public class RecordEventFactory implements EventFactory<RecordEvent> {

    @Override
    public RecordEvent newInstance() {
        return new RecordEvent();
    }
}

public class RecordEventExceptionHandler implements ExceptionHandler<Object> {

    private final Disruptor<RecordEvent> disruptor;

    public RecordEventExceptionHandler(Disruptor<RecordEvent> disruptor) {
        this.disruptor = disruptor;
    }

    public void handleEventException(Throwable t, long sequence, Object event) {
        disruptor.shutdown();
    }

    public void handleOnShutdownException(Throwable t) {
        disruptor.shutdown();
    }

    public void handleOnStartException(Throwable t) {
        disruptor.shutdown();
    }
}

/**
 * @author dalizu on 2018/11/7.
 * @version v1.0
 * @desc 消费事件处理
 */
public class RecordWorkHandler implements WorkHandler<RecordEvent>{

    private String consumerId;

    private long startTime;

    private static final AtomicLong sums = new AtomicLong(0);

    public RecordWorkHandler(String consumerId) {
        this.consumerId = consumerId;
        startTime=System.nanoTime();
    }
    @Override
    public void onEvent(RecordEvent event) throws Exception {

        //System.out.println("当前消费者: " + consumerId+":处理数据");
        Record record=event.getRecord();

        for (int i=0;i<record.getColumnNumber();i++){

            Column column=record.getColumn(i);

            if(Column.Type.STRING.equals(column.getType())){

                String value=column.getRawData().toString();
                //System.out.println("处理数据:"+value);
            }
        }

        sums.incrementAndGet();
        if (sums.get() == 100000000) {
            long endTime = System.nanoTime();
            System.out.println("耗时:"+(endTime-startTime)/1000000000+" s");
        }
    }
}
  • 定义内存传输接口
public interface Storage {

	public void put(Record record);

	public void put(Record[] records);

	public boolean isEmpty();

	public int size();

	public void close();
}
  • 通过disruptor实现内存传输接口
public class DefaultStorage implements Storage {

    private final Disruptor<RecordEvent> disruptor;
    private final RingBuffer<RecordEvent> ringBuffer;

    public DefaultStorage(Disruptor<RecordEvent> disruptor, RecordWorkHandler[] handlers) {
        this.disruptor = disruptor;
        disruptor.setDefaultExceptionHandler(new RecordEventExceptionHandler(disruptor));
        disruptor.handleEventsWithWorkerPool(handlers);
        ringBuffer = disruptor.start();
    }


    private static final EventTranslatorOneArg<RecordEvent, Record> TRANSLATOR = new EventTranslatorOneArg<RecordEvent, Record>() {

        @Override
        public void translateTo(RecordEvent event, long sequence, Record record) {
            event.setRecord(record);
        }
    };

    @Override
    public void put(Record record) {
        disruptor.publishEvent(TRANSLATOR, record);
    }

    @Override
    public void put(Record[] records) {
        for (Record record : records) {
            put(record);
        }
    }

    @Override
    public boolean isEmpty() {
        return ringBuffer.remainingCapacity() == ringBuffer.getBufferSize();
    }

    @Override
    public int size() {
        return ringBuffer.getBufferSize();
    }

    @Override
    public void close() {
        disruptor.shutdown();
    }
}

  • 创建disruptor 进行数据的生产
public class DisruptorMiddleWare {

    public static void main(String[] args) {

        int producerCount=1;

        int writerParallelism=1;//消费者的格式

        final RecordWorkHandler[] handlers = new RecordWorkHandler[writerParallelism];
        //配置disruptior的  消费者
        for (int i = 0; i < writerParallelism; i++) {
            handlers[i] = new RecordWorkHandler("consumer"+i);
        }

        int bufferSize = 4096;

        //构建disruptor
        ProducerType producerType;
        if (producerCount == 1) {
            producerType = ProducerType.SINGLE;
        } else {
            producerType = ProducerType.MULTI;
        }
        Disruptor<RecordEvent> disruptor = new Disruptor<>(new RecordEventFactory(), bufferSize,
                Executors.defaultThreadFactory(), producerType,
                new YieldingWaitStrategy());
        Storage storage = new DefaultStorage(disruptor, handlers);

        //生产数据
        new Thread(new Runnable() {

            public void run() {
                //提交到线程池,进行读取数据
                producerData(storage);

                disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
            }
        }).start();

    }

    private static void producerData(Storage storage) {
        for (int i=0;i<100000000;i++){
            StringColumn stringColumn=new StringColumn("test"+i);
            Record record=new DefaultRecord();
            record.addColumn(stringColumn);
            storage.put(record);
        }
    }
}

如果有疑问或者发现Bug,或想了解其他方面,可以留言,如果了解尽量分享。
4人推荐
随时随地看视频
慕课网APP