small_925_ant
2018-12-12 18:58:53浏览 4776
disruptor 的核心是RingBuffer.环形结构,数组,采用了内存预加载,并且能够在无锁的情况下实现高性能的并发操作。
感兴趣的可以研究源码。
起源
之前看到DATAX的 MemoryChannel 是通过ArrayBlockingQueue<Record> queue来实现的,利用了批量新增和读取的方式。(通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。)
queue.addAll(rs);
this.queue.drainTo(rs,bufferSize)
通过ReentrantLock来控制并发
private ReentrantLock lock;
private Condition notInsufficient, notEmpty;
这里想用disruptor实现一个类似的功能
代码
@Data
public abstract class Column {
private Type type;
private Object rawData;
public Column(Type type, Object rawData) {
this.type = type;
this.rawData = rawData;
}
public enum Type {
BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES
}
}
public class StringColumn extends Column {
public StringColumn() {
this((String)null);
}
public StringColumn(final Object rawData) {
super(Type.STRING, rawData);
}
}
public interface Record {
void addColumn(Column column);
void setColumn(int i, final Column column);
Column getColumn(int i);
String toString();
int getColumnNumber();
}
public class DefaultRecord implements Record{
private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16;
private List<Column>columns;
public DefaultRecord() {
this.columns = new ArrayList<Column>(RECORD_AVERGAE_COLUMN_NUMBER);
}
@Override
public void addColumn(Column column) {
columns.add(column);
}
@Override
public void setColumn(int i, Column column) {
if (i < 0) {
throw new RuntimeException("不能给index小于0的column设置值");
}
if (i >= columns.size()) {
expandCapacity(i + 1);
}
this.columns.set(i, column);
}
@Override
public Column getColumn(int i) {
if (i < 0 || i >= columns.size()) {
return null;
}
return columns.get(i);
}
@Override
public int getColumnNumber() {
return this.columns.size();
}
private void expandCapacity(int totalSize) {
if (totalSize <= 0) {
return;
}
int needToExpand = totalSize - columns.size();
while (needToExpand-- > 0) {
this.columns.add(null);
}
}
}
public class RecordEvent {
private Record record;
public Record getRecord() {
return record;
}
public void setRecord(Record record) {
this.record = record;
}
}
public class RecordEventFactory implements EventFactory<RecordEvent> {
@Override
public RecordEvent newInstance() {
return new RecordEvent();
}
}
public class RecordEventExceptionHandler implements ExceptionHandler<Object> {
private final Disruptor<RecordEvent> disruptor;
public RecordEventExceptionHandler(Disruptor<RecordEvent> disruptor) {
this.disruptor = disruptor;
}
public void handleEventException(Throwable t, long sequence, Object event) {
disruptor.shutdown();
}
public void handleOnShutdownException(Throwable t) {
disruptor.shutdown();
}
public void handleOnStartException(Throwable t) {
disruptor.shutdown();
}
}
public class RecordWorkHandler implements WorkHandler<RecordEvent>{
private String consumerId;
private long startTime;
private static final AtomicLong sums = new AtomicLong(0);
public RecordWorkHandler(String consumerId) {
this.consumerId = consumerId;
startTime=System.nanoTime();
}
@Override
public void onEvent(RecordEvent event) throws Exception {
Record record=event.getRecord();
for (int i=0;i<record.getColumnNumber();i++){
Column column=record.getColumn(i);
if(Column.Type.STRING.equals(column.getType())){
String value=column.getRawData().toString();
}
}
sums.incrementAndGet();
if (sums.get() == 100000000) {
long endTime = System.nanoTime();
System.out.println("耗时:"+(endTime-startTime)/1000000000+" s");
}
}
}
public interface Storage {
public void put(Record record);
public void put(Record[] records);
public boolean isEmpty();
public int size();
public void close();
}
public class DefaultStorage implements Storage {
private final Disruptor<RecordEvent> disruptor;
private final RingBuffer<RecordEvent> ringBuffer;
public DefaultStorage(Disruptor<RecordEvent> disruptor, RecordWorkHandler[] handlers) {
this.disruptor = disruptor;
disruptor.setDefaultExceptionHandler(new RecordEventExceptionHandler(disruptor));
disruptor.handleEventsWithWorkerPool(handlers);
ringBuffer = disruptor.start();
}
private static final EventTranslatorOneArg<RecordEvent, Record> TRANSLATOR = new EventTranslatorOneArg<RecordEvent, Record>() {
@Override
public void translateTo(RecordEvent event, long sequence, Record record) {
event.setRecord(record);
}
};
@Override
public void put(Record record) {
disruptor.publishEvent(TRANSLATOR, record);
}
@Override
public void put(Record[] records) {
for (Record record : records) {
put(record);
}
}
@Override
public boolean isEmpty() {
return ringBuffer.remainingCapacity() == ringBuffer.getBufferSize();
}
@Override
public int size() {
return ringBuffer.getBufferSize();
}
@Override
public void close() {
disruptor.shutdown();
}
}
public class DisruptorMiddleWare {
public static void main(String[] args) {
int producerCount=1;
int writerParallelism=1;
final RecordWorkHandler[] handlers = new RecordWorkHandler[writerParallelism];
for (int i = 0; i < writerParallelism; i++) {
handlers[i] = new RecordWorkHandler("consumer"+i);
}
int bufferSize = 4096;
ProducerType producerType;
if (producerCount == 1) {
producerType = ProducerType.SINGLE;
} else {
producerType = ProducerType.MULTI;
}
Disruptor<RecordEvent> disruptor = new Disruptor<>(new RecordEventFactory(), bufferSize,
Executors.defaultThreadFactory(), producerType,
new YieldingWaitStrategy());
Storage storage = new DefaultStorage(disruptor, handlers);
new Thread(new Runnable() {
public void run() {
producerData(storage);
disruptor.shutdown();
}
}).start();
}
private static void producerData(Storage storage) {
for (int i=0;i<100000000;i++){
StringColumn stringColumn=new StringColumn("test"+i);
Record record=new DefaultRecord();
record.addColumn(stringColumn);
storage.put(record);
}
}
}
如果有疑问或者发现Bug,或想了解其他方面,可以留言,如果了解尽量分享。