Datax的整体框架我们已经大体了解。这次来分析一下reader到writer中间数据的传输层。
这次采取另外一种方式,我们把代码抽取,自己实现一个通道
1-首先是定义一个接口代表传输的每一条数据
public interface Record {
public void addColumn(Column column);
public void setColumn(int i, final Column column);
public Column getColumn(int i);
public String toString();
public int getColumnNumber();
public int getByteSize();
public int getMemorySize();
}2-然后会定义一个抽象类 代表每一条数据中的每一列数据
public abstract class Column {
private Type type;
private Object rawData;
private int byteSize;
public Column(final Object object, final Type type, int byteSize) {
this.rawData = object;
this.type = type;
this.byteSize = byteSize;
}
public Object getRawData() {
return this.rawData;
}
public Type getType() {
return this.type;
}
public int getByteSize() {
return this.byteSize;
}
protected void setType(Type type) {
this.type = type;
}
protected void setRawData(Object rawData) {
this.rawData = rawData;
}
protected void setByteSize(int byteSize) {
this.byteSize = byteSize;
}
public abstract Long asLong();
public abstract Double asDouble();
public abstract String asString();
public abstract Date asDate();
public abstract byte[] asBytes();
public abstract Boolean asBoolean();
public abstract BigDecimal asBigDecimal();
public abstract BigInteger asBigInteger();
@Override
public String toString() {
return JSON.toJSONString(this);
}
public enum Type {
BAD, NULL, INT, LONG, DOUBLE, STRING, BOOL, DATE, BYTES
}
}具体实现类如:
public class StringColumn extends Column {
public StringColumn() {
this((String) null);
}
public StringColumn(final String rawData) {
super(rawData, Column.Type.STRING, (null == rawData ? 0 : rawData
.length()));
}
@Override
public Long asLong() {
return null;
}
@Override
public Double asDouble() {
return null;
}
@Override
public String asString() {
if (null == this.getRawData()) {
return null;
}
return (String) this.getRawData();
}
@Override
public Date asDate() {
return null;
}
@Override
public byte[] asBytes() {
return new byte[0];
}
@Override
public Boolean asBoolean() {
return null;
}
@Override
public BigDecimal asBigDecimal() {
return null;
}
@Override
public BigInteger asBigInteger() {
return null;
}
}注意每个实现保存数据的同时,会计算数据的大小,后面限制时候使用
3-定义一个类实现Record,主要是对reader或者writer一行数据的增加和获取
public class DefaultRecord implements Record {
private static final int RECORD_AVERGAE_COLUMN_NUMBER = 16;
private List<Column> columns;
private int byteSize;
// 首先是Record本身需要的内存
private int memorySize = ClassSize.DefaultRecordHead;
public DefaultRecord() {
this.columns = new ArrayList<Column>(RECORD_AVERGAE_COLUMN_NUMBER);
}
@Override
public void addColumn(Column column) {
columns.add(column);
incrByteSize(column);
}
@Override
public Column getColumn(int i) {
if (i < 0 || i >= columns.size()) {
return null;
}
return columns.get(i);
}
@Override
public void setColumn(int i, final Column column) {
if (i < 0) {
throw new RuntimeException("不能给index小于0的column设置值");
}
if (i >= columns.size()) {
expandCapacity(i + 1);
}
decrByteSize(getColumn(i));
this.columns.set(i, column);
incrByteSize(getColumn(i));
}
@Override
public String toString() {
Map<String, Object> json = new HashMap<String, Object>();
json.put("size", this.getColumnNumber());
json.put("data", this.columns);
return JSON.toJSONString(json);
}
@Override
public int getColumnNumber() {
return this.columns.size();
}
@Override
public int getByteSize() {
return byteSize;
}
public int getMemorySize(){
return memorySize;
}
private void decrByteSize(final Column column) {
if (null == column) {
return;
}
byteSize -= column.getByteSize();
//内存的占用是column对象的头 再加实际大小
memorySize = memorySize - ClassSize.ColumnHead - column.getByteSize();
}
private void incrByteSize(final Column column) {
if (null == column) {
return;
}
byteSize += column.getByteSize();
//内存的占用是column对象的头 再加实际大小
memorySize = memorySize + ClassSize.ColumnHead + column.getByteSize();
}
private void expandCapacity(int totalSize) {
if (totalSize <= 0) {
return;
}
int needToExpand = totalSize - columns.size();
while (needToExpand-- > 0) {
this.columns.add(null);
}
}
}中间会用到计算对象头大小的工具类,在datax源码中,此处希望有大神可以讲解一下对象头大小计算等知识!!!
4-接下来定义一个抽象类Channel,统计和限速都在这里
"capacity": 512, 队列大小
"byteCapacity": 67108864 每条数据大小的现在
public abstract class Channel {
private static final Logger LOG = LoggerFactory.getLogger(Channel.class);
protected int taskGroupId;
protected int capacity;
protected int byteCapacity;
protected long byteSpeed; // bps: bytes/s
protected long recordSpeed; // tps: records/s
protected long flowControlInterval;
protected volatile boolean isClosed = false;
protected Configuration configuration = null;
protected volatile long waitReaderTime = 0;
protected volatile long waitWriterTime = 0;
private static Boolean isFirstPrint = true;
public Channel(final Configuration configuration) {
//channel的queue里默认record为1万条。原来为512条
int capacity = configuration.getInt(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY, 2048);
long byteSpeed = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);
long recordSpeed = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);
if (capacity <= 0) {
throw new IllegalArgumentException(String.format(
"通道容量[%d]必须大于0.", capacity));
}
synchronized (isFirstPrint) {
if (isFirstPrint) {
Channel.LOG.info("Channel set byte_speed_limit to " + byteSpeed
+ (byteSpeed <= 0 ? ", No bps activated." : "."));
Channel.LOG.info("Channel set record_speed_limit to " + recordSpeed
+ (recordSpeed <= 0 ? ", No tps activated." : "."));
isFirstPrint = false;
}
}
this.capacity = capacity;
this.byteSpeed = byteSpeed;
this.recordSpeed = recordSpeed;
this.flowControlInterval = configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL, 1000);
//channel的queue默认大小为8M,原来为64M
this.byteCapacity = configuration.getInt(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
this.configuration = configuration;
}
public void pushTerminate(final TerminateRecord r) {
Validate.notNull(r, "record不能为空.");
this.doPush(r);
}
public void close() {
this.isClosed = true;
}
public void open() {
this.isClosed = false;
}
public boolean isClosed() {
return isClosed;
}
public int getTaskGroupId() {
return this.taskGroupId;
}
public int getCapacity() {
return capacity;
}
public long getByteSpeed() {
return byteSpeed;
}
public Configuration getConfiguration() {
return this.configuration;
}
public void push(final Record r) {
Validate.notNull(r, "record不能为空.");
this.doPush(r);
}
public void pushAll(final Collection<Record> rs) {
Validate.notNull(rs);
Validate.noNullElements(rs);
this.doPushAll(rs);
}
public Record pull() {
Record record = this.doPull();
return record;
}
public void pullAll(final Collection<Record> rs) {
Validate.notNull(rs);
this.doPullAll(rs);
}
protected abstract void doPush(Record r);
protected abstract void doPushAll(Collection<Record> rs);
protected abstract Record doPull();
protected abstract void doPullAll(Collection<Record> rs);
public abstract int size();
public abstract boolean isEmpty();
public abstract void clear();
private long getByteSize(final Collection<Record> rs) {
long size = 0;
for (final Record each : rs) {
size += each.getByteSize();
}
return size;
}
}重要的是它的实现类,内存Channel的具体实现,底层其实是一个ArrayBlockingQueue
生产者消费者模型基于ReentrantLock,Condition!!!
public class MemoryChannel extends Channel {
private int bufferSize = 0;
private AtomicInteger memoryBytes = new AtomicInteger(0);
private ArrayBlockingQueue<Record> queue = null;
private ReentrantLock lock;
private Condition notInsufficient, notEmpty;//不充足的,不为空
public MemoryChannel(final Configuration configuration) {
super(configuration);
this.queue = new ArrayBlockingQueue<Record>(this.getCapacity());//初始化队列的大小512
this.bufferSize = configuration.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
//初始化锁和线程同步工具
lock = new ReentrantLock();
notInsufficient = lock.newCondition();
notEmpty = lock.newCondition();
}
@Override
public void clear(){
this.queue.clear();
}
@Override
protected void doPush(Record r) {
try {
long startTime = System.nanoTime();
this.queue.put(r);
waitWriterTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(r.getMemorySize());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
protected void doPushAll(Collection<Record> rs) {
try {
long startTime = System.nanoTime();
lock.lockInterruptibly();
int bytes = getRecordBytes(rs);
while (memoryBytes.get() + bytes > this.byteCapacity || rs.size() > this.queue.remainingCapacity()) {//检查剩余可以插入的数量
notInsufficient.await(200L, TimeUnit.MILLISECONDS);
}
this.queue.addAll(rs);
waitWriterTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(bytes);
notEmpty.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException();
} finally {
lock.unlock();
}
}
@Override
protected Record doPull() {
try {
long startTime = System.nanoTime();
Record r = this.queue.take();
waitReaderTime += System.nanoTime() - startTime;
memoryBytes.addAndGet(-r.getMemorySize());
return r;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
@Override
protected void doPullAll(Collection<Record> rs) {
assert rs != null;
rs.clear();
try {
long startTime = System.nanoTime();
lock.lockInterruptibly();
while (this.queue.drainTo(rs, bufferSize) <= 0) {
notEmpty.await(200L, TimeUnit.MILLISECONDS);
}
waitReaderTime += System.nanoTime() - startTime;
int bytes = getRecordBytes(rs);
memoryBytes.addAndGet(-bytes);
notInsufficient.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException();
} finally {
lock.unlock();
}
}
private int getRecordBytes(Collection<Record> rs){
int bytes = 0;
for(Record r : rs){
bytes += r.getMemorySize();
}
return bytes;
}
@Override
public int size() {
return this.queue.size();
}
@Override
public boolean isEmpty() {
return this.queue.isEmpty();
}
}5-最后会定义一个RecordExchanger用来具体的传输数据或者获取数据
public class BufferedRecordExchanger {
private final Channel channel;
private final Configuration configuration;
private final List<Record> buffer;
private int bufferSize ;
protected final int byteCapacity;
private final AtomicInteger memoryBytes = new AtomicInteger(0);
private int bufferIndex = 0;
private static Class<? extends Record> RECORD_CLASS;
private volatile boolean shutdown = false;
public BufferedRecordExchanger(final Channel channel) {
assert null != channel;
assert null != channel.getConfiguration();
this.channel = channel;
this.configuration = channel.getConfiguration();
this.bufferSize = configuration
.getInt(CoreConstant.DATAX_CORE_TRANSPORT_EXCHANGER_BUFFERSIZE);
this.buffer = new ArrayList<Record>(bufferSize);
//channel的queue默认大小为8M,原来为64M
this.byteCapacity = configuration.getInt(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CAPACITY_BYTE, 8 * 1024 * 1024);
try {
BufferedRecordExchanger.RECORD_CLASS = ((Class<? extends Record>) Class
.forName("com.fayayo.core.transport.record.DefaultRecord"));
} catch (Exception e) {
throw new RuntimeException();
}
}
public Record createRecord() {
try {
return BufferedRecordExchanger.RECORD_CLASS.newInstance();
} catch (Exception e) {
throw new RuntimeException();
}
}
public void sendToWriter(Record record) {
if(shutdown){
throw new RuntimeException();
}
Validate.notNull(record, "record不能为空.");
if (record.getMemorySize() > this.byteCapacity) {
throw new RuntimeException("单条记录超过大小限制,当前限制为:"+this.byteCapacity);
}
//bufferSize默认是32 如果大于32或者超过最大限制 则提交一个批次
boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
if (isFull) {
flush();
}
this.buffer.add(record);
this.bufferIndex++;
memoryBytes.addAndGet(record.getMemorySize());
}
//真正加入队列的方法
public void flush() {
if(shutdown){
throw new RuntimeException();
}
this.channel.pushAll(this.buffer);//加入到arrayBlockQueue
this.buffer.clear();
this.bufferIndex = 0;
this.memoryBytes.set(0);
}
//从队列中获取一行数据
public Record getFromReader() {
if(shutdown){
throw new RuntimeException();
}
boolean isEmpty = (this.bufferIndex >= this.buffer.size());
if (isEmpty) {//为空的话就去拉去数据,陷入等待
log.info("没有数据等待.....");
receive();
}
Record record = this.buffer.get(this.bufferIndex++);
if (record instanceof TerminateRecord) {//生产者的结束标志
record = null;
}
return record;
}
public void shutdown(){
shutdown = true;
try{
buffer.clear();
channel.clear();
}catch(Throwable t){
t.printStackTrace();
}
}
private void receive() {
this.channel.pullAll(this.buffer);
this.bufferIndex = 0;
this.bufferSize = this.buffer.size();
}
public void terminate() {
if(shutdown){
throw new RuntimeException();
}
flush();
this.channel.pushTerminate(TerminateRecord.get());
}
}
6- 生产者传输的前提要有个结束标志
/**
* 作为标示 生产者已经完成生产的标志
*/
public class TerminateRecord implements Record {
private final static TerminateRecord SINGLE = new TerminateRecord();
private TerminateRecord() {
}
public static TerminateRecord get() {
return SINGLE;
}
@Override
public void addColumn(Column column) {
}
@Override
public Column getColumn(int i) {
return null;
}
@Override
public int getColumnNumber() {
return 0;
}
@Override
public int getByteSize() {
return 0;
}
@Override
public int getMemorySize() {
return 0;
}
@Override
public void setColumn(int i, Column column) {
return;
}
}
7- 测试
public class MainTest {
public static void main(String[] args) {
System.setProperty("elastic.home","D:\\app\\workspace\\idea\\elastic-job\\target\\elastic\\elastic");
Configuration configuration=Configuration.newDefault();
Configuration allConfig=configuration.merge(ConfigParser.parseCoreConfig(CoreConstant.ELASTIC_CONF_PATH),false);
Channel channel=new MemoryChannel(allConfig);
BufferedRecordExchanger bufferedRecordExchanger=new BufferedRecordExchanger(channel);
Record record=bufferedRecordExchanger.createRecord();//创建一行数据
String str="abcdefghigklmnopqrstuvwxyz";
//生产数据
new Thread(new Runnable() {
@Override
public void run() {
//发送数据
record.setColumn(0,new StringColumn(str));//添加完数据
bufferedRecordExchanger.sendToWriter(record);//发送数据
bufferedRecordExchanger.sendToWriter(record);//发送数据
bufferedRecordExchanger.terminate();//发送结束标志
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//消费数据
//生产数据
new Thread(new Runnable() {
@Override
public void run() {
//消费数据
Record record1=null;//接收数据
while ((record1=bufferedRecordExchanger.getFromReader())!=null){
System.out.println(record1.toString());
}
System.out.println("读取完毕");
}
}).start();
}
}
测试的时候用到了datax中的json工具以及core.json,会用到里面的限制的参数:
"transport": {
"channel": {
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"speed": {
"byte": -1,
"record": -1
},
"flowControlInterval": 20,
"capacity": 512,
"byteCapacity": 67108864
},
"exchanger": {
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
"bufferSize": 32
}
}