数据交换的一些思考
想做到一个通用的数据交换工具,类似DATAX,就会需要一个通用的通道可以进行数据的传输。
从源数据读取数据转换为指定的数据格式,传输到目标数据进行解析然后保存。
学习之路
之前在慕课网看到一篇文章https://www.imooc.com/article/74111 是关于向mysql数据库集群快速导入数据的一篇文章。文章里面详细介绍了通过Load Data方式进行导入。
之前了解过DATAX3.0,里面的关系型数据库现在都是通过jdbc的方式进行数据的交换。如果数据量较大,或者数据交换时间要求较高,可能会不能满足我们的需求,此时可以进行简单的改造。下面已Mysql为例子。
我们先看一下Load Data最基本的例子。
1-建一个测试表
CREATE TABLE `user_copy` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(50) NOT NULL, `telephone` varchar(30) DEFAULT NULL, `mail` varchar(50) DEFAULT NULL, `password` varchar(32) DEFAULT NULL, `remark` varchar(1000) DEFAULT NULL, `status` int(11) NOT NULL, `operator` varchar(50) NOT NULL, `operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, `operate_ip` varchar(20) NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `idx_username` (`username`) USING BTREE, UNIQUE KEY `idx_mail` (`mail`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=224959 DEFAULT CHARSET=utf8;
2-代码开发
public static void main(String[] args) throws SQLException {
Connection connection= ConnnectionManager.getConnection();
String path="D:/app/workspace/idea/demo/src/main/java/com/sn/demo/mysql/load/test.txt";
System.out.println(path);
String testSql = "LOAD DATA LOCAL INFILE '"+path+"' IGNORE INTO TABLE user_copy " +
"character set 'utf8' " +
"fields terminated by ',' " +
"optionally enclosed by '\"' lines terminated by '\\n' (username,telephone,mail)";
PreparedStatement pst=connection.prepareStatement(testSql);
pst.execute();
connection.close();
}3-数据写入到指定文件
接下来写入数据到指定的路径,然后运行就可以去数据库查看结果
4-思考我们怎么作为把它转换到数据交换任务中呢?
首先我们要先了解数据交换的基本原理,数据数据交换的核心代码。
已DATAX为例子。
首先我们会对中间传输层数据的格式进行定义。因为在数据交换中,数据都会按照定义的这种格式进行写入和写出。
5-数据格式定义
public class Column {
private String data;
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}public class DataLine {
private List<Column> columns;
public DataLine() {
this.columns = new ArrayList<>(16);
}
public void setColumn(Column column){
columns.add(column);
}
public Column getColumn(int i) {
if (i < 0 || i >= columns.size()) {
return null;
}
return columns.get(i);
}
public int getColumnNumber() {
return this.columns.size();
}
}DataLine代表每一条完整的数据,Column代表具体的每一列。
6-接下来我们进行修改load data的代码
我们不是通过简单的把数据保存到文件中执行,而是修改为从我们定义的数据格式中解析数据然后进行load.
public class LoadDataLine {
public static void main(String[] args) throws SQLException {
//获取一条数据
DataLine dataLine=getDatas();
//转换数据为inputStream
InputStream inputStream=new ConvertInputStream(dataLine);
//导入到Mysql
Connection connection= ConnnectionManager.getConnection();
String path="D:/app/workspace/idea/demo/src/main/java/com/sn/demo/mysql/load/test.txt";
//注意:数据的格式 每列是逗号分隔,每行是\n进行分隔
String testSql = "LOAD DATA LOCAL INFILE '"+path+"' IGNORE INTO TABLE user_copy " +
"character set 'utf8' " +
"fields terminated by ',' " +
"optionally enclosed by '\"' lines terminated by '\\n' (username,telephone,mail)";
System.out.println("sql===>"+testSql);
PreparedStatement statement=connection.prepareStatement(testSql);
if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) {
com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
mysqlStatement.setLocalInfileInputStream(inputStream);
mysqlStatement.executeUpdate();
}else {
System.out.println("未知错误,请检查!!!!!!");
}
connection.close();
}
//自己产生数据
private static DataLine getDatas() {
//产生一行数据
DataLine dataLine=new DataLine();
Column column=new Column();
column.setData("test02");
Column column1=new Column();
column1.setData("18");
Column column2=new Column();
column2.setData("小河流水哗啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦");
dataLine.setColumn(column);
dataLine.setColumn(column1);
dataLine.setColumn(column2);
return dataLine;
}
}我们可以看到
mysqlStatement.setLocalInfileInputStream(inputStream);
这里需要一个inputStream,此时我们就需要把DataLine,转换为inputStream.
7-把DataLine,转换为inputStream
自定义转换类 需要和需求的格式保持一致 注意:数据的格式 每列是逗号分隔,每行是\n进行分隔
public class ConvertInputStream extends InputStream{
private String columnSplit=",";//列分隔
private String lineSpliter="\r\n";//行分隔
private DataLine dataLine;
private byte buffer[] = null;
private int preLen=0;//上次剩下的长度
private int preOff=0;//上次读取到的位置
public ConvertInputStream(DataLine dataLine) {
this.dataLine = dataLine;
}
//重写此方法 读取数据到b[]中
public int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int c = getLine(b,off,len);
dataLine=null;
if (c == -1) {
return -1;
}
return c;
}
//b 开始位置 写入长度
public int getLine(byte b[], int off, int len) throws UnsupportedEncodingException {
int ret=-1;
//查看上次是否已经读取完毕
if(this.preLen>0){
//说明有剩余,继续读取
int minLen = Math.min(this.preLen, len);//比较剩余和当前设定的数组大小
System.arraycopy(this.buffer, this.preOff, b, off, minLen);
this.preOff+=minLen;
this.preLen=this.preLen-minLen;//是否还有剩余
//返回长度
ret=minLen;
return ret;
}
if(dataLine==null){
System.out.println("end");
ret=-1;
return ret;
}
//获取数据
String lines=buildDataLine(dataLine).toString();
System.out.println("line====>"+lines+"<=====");
this.buffer = lines.toString().getBytes("UTF8");//获取到长度
int bufLen=b.length;// 需要读入的大小
int curLineLen=this.buffer.length;//当前数据的大小
//判断是否超过给定的b[]长度
if(bufLen<curLineLen){
System.arraycopy(this.buffer, 0, b, off, bufLen);
this.preLen=curLineLen-bufLen;//剩余多少没读取
this.preOff=bufLen;//这条数据读取到的位置
ret=bufLen;
return ret;
}
System.arraycopy(this.buffer, 0, b, off, curLineLen);
ret=curLineLen;
return ret;
}
@Override
public int read() throws IOException {
return 0;
}
//解析数据转换为需要的字符串
private StringBuilder buildDataLine(DataLine dataLine) {
StringBuilder stringBuilder=new StringBuilder();
//2
for (int i = 0; i < dataLine.getColumnNumber(); i++) {
Column column=dataLine.getColumn(i);
stringBuilder.append(column.getData());
if(i<(dataLine.getColumnNumber()-1)){
stringBuilder.append(columnSplit);
}
}
stringBuilder.append(lineSpliter);
return stringBuilder;
}
}可以看到我们需要自定义一个
ConvertInputStream
进行数据的转换。具体的方式可以参考代码,此处只是一个简单的demo,还没经过具体的测试。
但是整个思路就是这样。
通过上面的这些步骤,相信,如果你想简单改造你的数据交换,你一定可以做到了。