手记

mongodb拆库分表脚本


脚本功能:

      1. 将指定的报告文件按照指定的字段、切库切表策略切分

      2. 将切分后的文件并发导入到对应的Mongodb中

      3. 生成日志文件和done标识文件

使用手册:

     -h    打印帮助信息,并退出";

         -f     需要切分的数据文件";

         -g    清理昨日或历史全部数据: 1 昨日数据  2 历史全部数据";

         -k     拆分字段在文件中列数,从1开始";

         -o    需要切分的数据文件格式 tsv或csv ";

         -d    切分的库数目";

         -t     切分的表数目";

         -m   切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_01";

         -c    切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_0102";

         -a    入库fieldFile";

         -p    配置文件",

使用步骤:

     1. 在配置文件中设置日志、切割后数据临时路径$LOG_HOME 和 $DATA_SPLIT_HOME目录,如果不存在,则手动创建;

         在配置文件中设置目标Mongodb参数信息,用来作为导入数据的目标库;

         在配置文件中设置Mongodb程序的主目录$MONGO;

     2. 按照具体的参数意义,仿照下面的格式执行脚本:

          举例:./mongo-split-importer.sh -f /data/shell/test.ata -g 1 -o tsv -k 3 -d 3 -t 3 -m idea -c idea -p ../conf/demeter_conf_qa.sh -a ../conf/idea-head-file

          -f 切分目标文件   -o 文件格式 tsv    -k 切割字段,第三个  -d 切割成3个库 -t 每个库3个表

          -m 导入的mongodb未拆分名称idea -c 导入的mongodb未拆分表名idea -p 环境配置文件 -a 导入目标表的fieldFile文件 -g 清理昨日数据

mongo-split-importer.sh执行脚本:

#!/bin/bash

SPLITFILE="" #目标切割文件

FILEFORMAT="" # 目标切割文件格式 , \t

FILEFORMATNAME="" #切割目标文件格式名称 csv tsv

SPLITKEY=1

SPLITDBNUM="" #目标切割库数目

SPLITTBNUM="" #目标切割表数目

IMPORTDBNAME="" # 目标入库未分割库名

IMPORTTBNAME="" #目标入库未切割表名

PROFILE="" #配置文件

FIELDFILE="" #入库fieldFile

CLEAN=0  #清理数据, 0:默认不清理, 1 : 清理昨日的数据    2: 清理所有以前的数据

SPILTTMPDIR="" #目标切割文件存放临时目录

FULLPATH=$(cd `dirname $0`;pwd -P)

SCRIPTFILE=`basename $0`

TOTLE_RECORD_NUM=0 #文件切割前的记录条目

SUBFILE_RECORD_NUM=0 #切割后所有文件汇总的记录条目

_mongo_count="-1"

#------------------------------------------------函数---------------------------------------------------------------

function usage(){

        echo "$SCRIPTFILE - 分库分表后将数据导数据到mongodb"

        echo "SYNOPSIS"

        echo "OPTIONS" 

        echo "  -h    打印帮助信息,并退出";

        echo "  -f     需要切分的数据文件";

        echo "  -g    是否清理历史数据,默认不清理   1:清理昨日数据  2:清理以前所有数据";

        echo "  -k     拆分字段在文件中列数,从1开始";

        echo "  -o    需要切分的数据文件格式 tsv或csv ";

        echo "  -d    切分的库数目";

        echo "  -t     切分的表数目";

        echo "  -m   切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_01";

        echo "  -c    切分后,需要入库的mongodb未拆分库名,比如拆分前cpc, 拆分后cpc_0102";

        echo "  -a    入库fieldFile";

        echo "  -p    配置文件,绝对或相对路径文件",

        exit

}

function setFileFormat(){

        FILEFORMATNAME=$1

        case $1

        in

                csv)  FILEFORMAT=",";;

                tsv)   FILEFORMAT="\t";;

                *) echo "unknow profile -o $1"; usage;;

        esac

}

while getopts ':hf:g:o:k:d:t:a:p:m:c:' OPTION

do

        case $OPTION

        in

                h) usage;;

                f) SPLITFILE=$OPTARG;;

                g)CLEAN=$OPTARG;;

                o) setFileFormat $OPTARG;;

                k) SPLITKEY=$OPTARG;;

                d) SPLITDBNUM=$OPTARG;;

                t) SPLITTBNUM=$OPTARG;;

                a) FIELDFILE=$OPTARG;;

                p) PROFILE=$OPTARG;;

                m) IMPORTDBNAME=$OPTARG;;

                c) IMPORTTBNAME=$OPTARG;;

                :) echo "选项 \"-$OPTARG\" 后面缺少对应值, 将使用默认值";;

                \?)echo " 错误的选项 -$OPTARG, 将退出"; usage;;

        esac

done

#记录日志信息

function logInfo(){

  echo "[`date +"%Y-%m-%d %H:%M:%S"`] $@ " | tee -a $LOGFILE

}

function checkError(){

        if [ $? -ne 0 ]; then

                echo "[`date +"%Y-%m-%d %H:%M:%S,%s"`][$SCRIPTFILE, $$] ERROR OCCURS! - $1" | tee -a $ERRORFILE

                exit 1;

        fi

}

function check_ready() {

        tmp_done_file=`printf "$reportDoneFile" "$TABLE" "$1"`

        while [ "$isok" = "false" ]; do

                rsync  --list-only ${tmp_done_file}

                if [ $? -eq 0 ]; then

                        isok="true";

                        break;

                fi

                if [ "$isok" = "false" ]; then

                        sleep 300

                fi

                time_now=`date  +%s`

                if [ `expr ${time_now} - ${time_start}` -ge $max_interval ]; then

                        return 255;

                fi

        done

        return 0;

}

#从数据库列表里选择主库

function selectMongoMaster(){

                tmp="TARGET_MONGO_HOST_LIST_0$1"

                TMP_HOST=${!tmp}

                echo $TMP_HOST

                #replica set

                for DUBHE_MONGO_HOST in $TMP_HOST; do

                        if [ $? -eq 0 ] ; then

                                break;

                        fi

                done

                # single server

                #for DUBHE_MONGO_HOST in $TMP_HOST; do

                        #TARGET_MONGO_HOST=$DUBHE_MONGO_HOST

                        #echo $TARGET_MONGO_HOST

                #done

}

#切割

function split() {

        logInfo "spilt data file"

        echo "split db num"$SPLITDBNUM

        echo "split tb num"$SPLITTBNUM

        echo "Start to split file: "$SPLITFILE

        awk '

        BEGIN {

                FS="'${FILEFORMAT}'";

        }

        ARGIND==1{

                        #分库分表

                DBN=$'${SPLITKEY}' % '${SPLITDBNUM}' + 1;

                TBN=int($'${SPLITKEY}' / '${SPLITDBNUM}')

                        TBN=TBN % '${SPLITTBNUM}' + 1;

                DBN="0"DBN;

                TBN="0"TBN;

                        print $0 > "'${SPILTTMPDIR}'""/""'${IMPORTTBNAME}'""_"DBN""TBN

        }

        END {

        }

        ' ${SPLITFILE};

        ls $SPILTTMPDIR

        echo "Split file successfully : "$SPLITFILE

}

#导入

function import() {

        #importData

        local iter=1;

    while [ $iter -le $SPLITDBNUM ]; do

                thread_import $iter &

                iter=`expr $iter + 1`

        done

        #wait for child-threads

        wait;

}

#导入子线程

function thread_import() {

        local num=1;

                                targetFileName=$IMPORTTBNAME"_0"$1"0"$num

                                targetFile=$SPILTTMPDIR/$IMPORTTBNAME"_0"$1"0"$num

                                targetDB=$IMPORTDBNAME"_0"$1

                                targetCollection=$IMPORTTBNAME"_0"$1"0"$num

                                if [ ! -f $targetFile ]; then

                                        logInfo "spilt file does not exits : " $targetFile

                                        num=`expr $num + 1`

                                        continue

                                fi

                                user="TARGET_MONGO_USER_0"$1

                                TMP_USER=${!user}

                                password="TARGET_MONGO_PWD_0"$1

                                TMP_PASSWORD=${!password}

                                 #选择master   

                                selectMongoMaster $1;

                                #clean dirty data

                            if [ $CLEAN -gt 0  ]; then

                                        logInfo "$qdate $targetDB.$targetCollection cleaning up dirty data in mongodb"

                                        clean_dirty_data

                                        checkError "whether error occurs during cleaning dirty data from mongodb"

                                fi

                                #import data

                                import2mongo $1 $targetFile  $targetDB  $targetCollection

                #record done file

                statusfile="$STATUS_LOG_HOME/$targetFileName.done.`date -d $qdate +"%Y-%m-%d"`"

                touch $statusfile

                num=`expr $num + 1`

        done

        logInfo "thread $1 ends"

}

#把指定的文件导到指定的库指定的表,并建立索引,mongodb自身会判断索引是否存在

#不存在的情况下才创建新索引

function import2mongo(){

    if [ "$FIELDFILE" != "" ]; then

                MONGO_FIELD_FILE=$FIELDFILE

        else

                MONGO_FIELD_FILE=$FULLPATH/../conf/${IMPORTTBNAME}-head-file

        fi

        DATAFILE=$2

    if [ ! -f $DATAFILE ]; then

        logInfo "mongodb [${DB}.${COLL}] imported 0 objects"

        return 0

    fi

    TMPLOGFILE=$INFO_LOG_HOME/$DB.$COLL.tmp.log

        tmp=$?

        if [ "$tmp" != "0" ]; then

                return $tmp

        fi

        #data check

        _mongo_count=`tail $TMPLOGFILE | grep imported`

        _mongo_count=`expr 0$_mongo_count + 0`

        #start to ensure index

        ensureIndex

        logInfo "mongodb [${DB}.${COLL}] imported $_mongo_count objects"

        return $tmp

}

function ensureIndex(){

}

#垃圾数据清理

function clean_dirty_data(){

        day=`date -d ${1:-' -1day'} +"%y%m%d"`

        if [ $CLEAN -eq 1  ]; then

                _mongo_condition="{\"_id\":{\"\$gte\":\"${day}_0\",\"\$lte\":\"${day}_9\"}}"

        else

                _mongo_condition="{\"_id\":{\"\$lte\":\"${day}_9\"}}"

        fi

        logInfo "waiting for the clean task.."

        echo  $_mongo_condition

        tmp=$?

        if [ "$tmp" != "0" ]; then

                return $tmp

        fi

        sleep 5s

        logInfo "dirty data cleaned: "$targetDB  $targetCollection  $dirtyCount

        echo "dirty data cleaned: "$targetDB  $targetCollection  $dirtyCount

        return $tmp

}

#parameter check

function checkParams() {

        if [ 1 -ne $CLEAN -a 2 -ne $CLEAN ]; then

                logInfo "-g the parameter clean is not in [1, 2] : "$CLEAN

                return 1;

        fi

        if [  $FILEFORMAT != "," -a  $FILEFORMAT != "\t"  ]; then

                logInfo "-o the parameter file format  is not in [csv, tsv] : "$FILEFORMAT

                return 1;

        fi

        if [ $SPLITKEY -lt 1 ]; then

                        logInfo "-k split key must not be less  than 1 : "$SPLITKEY

                        return 1;

        fi

        if [ $SPLITDBNUM -lt 1 ]; then

                        logInfo "-d database number must not  be less  than 1 : "$SPLITDBNUM

                        return 1;

        fi

        if [ $SPLITTBNUM -lt 1 ]; then

                        logInfo "-t collection number must not  be less  than 1 : "$SPLITTBNUM

                        return 1;

        fi

        if [ ! -f  $FIELDFILE ];  then

                logInfo "-a field file is not a common file or not exits : "$FIELDFILE

                return 1;

        fi

        if [ "" = $IMPORTDBNAME ] ; then

                        logInfo "-m import  database name is empty  : "$IMPORTDBNAME

                        return 1;

        fi

        if [ "" = $IMPORTTBNAME ] ; then

                        logInfo "-m import  table name is empty  : "$IMPORTTBNAME

                        return 1;

        fi

}

#主函数

function main() {

                set +x

                echo "check split file and profile: " $SPLITFILE   $PROFILE

                if [ ! -f  $SPLITFILE ];  then

                        echo  "-f split file is not a common file or not exits : "$SPLITFILE

                        return 1;

                fi

                if [ ! -f  $PROFILE ];  then

                        echo  "-p profile file is not a common file or not exits : "$PROFILE

                        return 1;

                fi

                source $PROFILE

                qdate=`date +"%Y-%m-%d"`

                last_day=`date -d "-1day" +"%Y-%m-%d"`

                BASEFILENAME=$(basename $SPLITFILE)

                echo "base split file name is : "$BASEFILENAME

                if [ ! -d $LOG_HOME ] ; then

                        logInfo  " log home  is not a common directory or not exits : "$LOG_HOME

                        return 1;

                fi

                LOGFILE=$INFO_LOG_HOME/$BASEFILENAME.$qdate.log

                if [ -f $LOGFILE ]; then

                        mv $LOGFILE $LOGFILE.$last_day

                fi

                touch $LOGFILE

                ERRORFILE=$ERROR_LOG_HOME/$BASEFILENAME.error.log

                if [ -f $ERRORFILE ]; then

                        mv $ERRORFILE $ERRORFILE.$last_day

                fi

                touch $ERRORFILE

                #空行

                echo

                echo

                logInfo "start to check parameters!"

                checkParams

                checkError "whether error occurs during check parameters : $SPLITFILE"

                #空行

                echo

                echo

                logInfo "start to split file: "$SPLITFILE

                if [ ! -d $DATA_SPLIT_HOME ] ; then

                        logInfo  " data split home  is not a common directory or not exits : "$DATA_SPLIT_HOME

                        return 1;

                fi

                SPILTTMPDIR=$DATA_SPLIT_HOME/$BASEFILENAME

                echo "split temple directory : "$SPILTTMPDIR

                if [ -d ${SPILTTMPDIR} ]; then

                        rm -rf ${SPILTTMPDIR}

                fi

                mkdir -p ${SPILTTMPDIR}

                split

                checkError "whether error occurs during split data : $SPLITFILE"

                logInfo "split data completely : $SPLITFILE"

                statusfile=$STATUS_LOG_HOME/$BASEFILENAME".split.done."$qdate

                touch  ${statusfile}

                #空行

                echo

                echo

                logInfo "start to import split  file to mongodb"

                import

                logInfo "import data completely : $SPLITFILE"

                statusfile=$STATUS_LOG_HOME/$BASEFILENAME".import.done."$qdate

                touch  ${statusfile}

                #空行

                echo

                echo

                #remove temple directory

        #       if [ -d ${SPILTTMPDIR} ]; then

        #               rm -rf ${SPILTTMPDIR}

        #       fi

}

#-------------------------------------------------入口----------------------------------------------------------------

source /etc/profile

demeter_conf_cpc_qa.sh 脚本:

#!/bin/bash

source /etc/profile

#logger path

INFO_LOG_HOME="${LOG_HOME}/info"

STATUS_LOG_HOME="${LOG_HOME}/status"

if [ ! -d $ERROR_LOG_HOME ]; then

if [ ! -d $INFO_LOG_HOME ]; then

        mkdir -p $INFO_LOG_HOME

fi

if [ ! -d $STATUS_LOG_HOME ]; then

        mkdir -p $STATUS_LOG_HOME

fi

if [ ! -d $DATA_HOME ]; then

        mkdir -p $DATA_HOME

fi

#data path for source  and target data path

DATA_SPLIT_HOME=/data/demeter/sdata

#import target mongodbs

TARGET_MONGO_PORT_01=XXX

TARGET_MONGO_USER_01=XXX

TARGET_MONGO_PWD_01=XXX

TARGET_MONGO_HOST_LIST_01="test01.mongodb01:$TARGET_MONGO_PORT_01 test01.mongodb02:$TARGET_MONGO_PORT_01 test01.mongodb03:$

TARGET_MONGO_PORT_01"

TARGET_MONGO_PORT_02=XXX

TARGET_MONGO_USER_02=XXX

TARGET_MONGO_PWD_02=XXX

TARGET_MONGO_HOST_LIST_02="testt02.mongodb01:$TARGET_MONGO_PORT_02 test02.mongodb02:$TARGET_MONGO_PORT_02 test02.mongodb03:$

TARGET_MONGO_PORT_02"

TARGET_MONGO_PORT_03=XXX

TARGET_MONGO_USER_03=XXX

TARGET_MONGO_PWD_03=XXX

TARGET_MONGO_HOST_LIST_03="test03.mongodb01:$TARGET_MONGO_PORT_03 test03.mongodb02:$TARGET_MONGO_PORT_03 test03.mongodb03:$

TARGET_MONGO_PORT_03"

#mongodb utils

MONGO=/opt/mongodb

xuri-cpc-head-file

a

b

c

d

e

f

g

h

i

j

k

l

m

n

0

p

q

r

s

host:

XX.XX.XX.XX  test01.mongodb01

XX.XX.XX.XX  test01.mongodb02

XX.XX.XX.XX  testt01.mongodb03

XX.XX.XX.XX  test02.mongodb01

XX.XX.XX.XX  test02.mongodb02

XX.XX.XX.XX  test02.mongodb03

XX.XX.XX.XX  test03.mongodb01

XX.XX.XX.XX  test03.mongodb02

XX.XX.XX.XX  test03.mongodb03

©著作权归作者所有:来自51CTO博客作者安大叔的原创作品,如需转载,请注明出处,否则将追究法律责任

mongodb拆库分表数据库相关


0人推荐
随时随地看视频
慕课网APP