手记

ik分词器采用MySQL热更新

ik分词器采用MySQL热更新

​ 官方所给的IK分词器只支持远程文本文件热更新,不支持采用MySQL热更新,没关系,这难不倒伟大的博主,给哈哈哈。今天就来和大家讲一下如何采用MySQL做热更新IK分词器的词库。

一、建立数据库表

CREATE TABLE `es_extra_main`
(
    `id`          int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
    `word`        varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '词',
    `is_deleted`  tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已删除',
    `update_time` timestamp(6)                       NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP (6) COMMENT '更新时间',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


CREATE TABLE `es_extra_stopword`
(
    `id`          int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
    `word`        varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '词',
    `is_deleted`  tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已删除',
    `update_time` timestamp(6)                       NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP (6) COMMENT '更新时间',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

二、修改IK分词器插件源码

2. 1修改pom文件

<!--mysql驱动-->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.29</version>
</dependency>

2.2 新增DatabaseMonitor类

这里新增一个关于MySQL的类,源码中有关于远程文本文件的热更新源码,我们这边仿照源码来写一就可以啦。

package org.wltea.analyzer.dic;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.SpecialPermission;
import org.wltea.analyzer.help.ESPluginLoggerFactory;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;

public class DatabaseMonitor implements Runnable {

    private static final Logger logger = ESPluginLoggerFactory.getLogger(DatabaseMonitor.class.getName());
    public static final String PATH_JDBC_PROPERTIES = "jdbc.properties";

    private static final String JDBC_URL = "jdbc.url";
    private static final String JDBC_USERNAME = "jdbc.username";
    private static final String JDBC_PASSWORD = "jdbc.password";
    private static final String JDBC_DRIVER = "jdbc.driver";
    private static final String SQL_UPDATE_MAIN_DIC = "jdbc.update.main.dic.sql";
    private static final String SQL_UPDATE_STOPWORD = "jdbc.update.stopword.sql";

    /**
     * 更新间隔
     */
    public final static String JDBC_UPDATE_INTERVAL = "jdbc.update.interval";

    private static final Timestamp DEFAULT_LAST_UPDATE = Timestamp.valueOf(LocalDateTime.of(LocalDate.of(2020, 1, 1), LocalTime.MIN));

    private static Timestamp lastUpdateTimeOfMainDic = null;

    private static Timestamp lastUpdateTimeOfStopword = null;

    public String getUrl() {
        return Dictionary.getSingleton().getProperty(JDBC_URL);
    }

    public String getUsername() {
        return Dictionary.getSingleton().getProperty(JDBC_USERNAME);
    }

    public String getPassword() {
        return Dictionary.getSingleton().getProperty(JDBC_PASSWORD);
    }

    public String getDriver() {
        return Dictionary.getSingleton().getProperty(JDBC_DRIVER);
    }

    public String getUpdateMainDicSql() {
        return Dictionary.getSingleton().getProperty(SQL_UPDATE_MAIN_DIC);
    }

    public String getUpdateStopwordSql() {
        return Dictionary.getSingleton().getProperty(SQL_UPDATE_STOPWORD);
    }

    /**
     * 加载MySQL驱动
     */
    public DatabaseMonitor() {
        SpecialPermission.check();
        AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
            try {
                Class.forName(getDriver());
            } catch (ClassNotFoundException e) {
                logger.error("mysql jdbc driver not found", e);
            }
            return null;
        });


    }

    @Override
    public void run() {
        SpecialPermission.check();
        AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
            Connection conn = getConnection();

            // 更新主词典
            updateMainDic(conn);
            // 更新停用词
            updateStopword(conn);
            closeConnection(conn);

            return null;
        });

    }

    public Connection getConnection() {
        Connection connection = null;
        try {
            connection = DriverManager.getConnection(getUrl(), getUsername(), getPassword());
        } catch (SQLException e) {
            logger.error("failed to get connection", e);
        }
        return connection;
    }

    public void closeConnection(Connection conn) {
        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                logger.error("failed to close Connection", e);
            }
        }
    }

    public void closeRsAndPs(ResultSet rs, PreparedStatement ps) {
        if (rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                logger.error("failed to close ResultSet", e);
            }
        }

        if (ps != null) {
            try {
                ps.close();
            } catch (SQLException e) {
                logger.error("failed to close PreparedStatement", e);
            }
        }

    }

    /**
     * 主词典
     */
    public synchronized void updateMainDic(Connection conn) {

        logger.info("start update main dic");
        int numberOfAddWords = 0;
        int numberOfDisableWords = 0;
        PreparedStatement ps = null;
        ResultSet rs = null;

        try {
            String sql = getUpdateMainDicSql();

            Timestamp param = lastUpdateTimeOfMainDic == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfMainDic;

            logger.info("param: " + param);

            ps = conn.prepareStatement(sql);
            ps.setTimestamp(1, param);

            rs = ps.executeQuery();

            while (rs.next()) {
                String word = rs.getString("word");
                word = word.trim();

                if (word.isEmpty()) {
                    continue;
                }

                lastUpdateTimeOfMainDic = rs.getTimestamp("update_time");

                if (rs.getBoolean("is_deleted")) {
                    logger.info("[main dic] disable word: {}", word);
                    // 删除
                    Dictionary.disableWord(word);
                    numberOfDisableWords++;
                } else {
                    logger.info("[main dic] add word: {}", word);
                    // 添加
                    Dictionary.addWord(word);
                    numberOfAddWords++;
                }
            }

            logger.info("end update main dic -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);

        } catch (SQLException e) {
            logger.error("failed to update main_dic", e);
            // 关闭 ResultSet、PreparedStatement
            closeRsAndPs(rs, ps);
        }
    }

    /**
     * 停用词
     */
    public synchronized void updateStopword(Connection conn) {

        logger.info("start update stopword");

        int numberOfAddWords = 0;
        int numberOfDisableWords = 0;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            String sql = getUpdateStopwordSql();

            Timestamp param = lastUpdateTimeOfStopword == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfStopword;

            logger.info("param: " + param);

            ps = conn.prepareStatement(sql);
            ps.setTimestamp(1, param);

            rs = ps.executeQuery();

            while (rs.next()) {
                String word = rs.getString("word");
                word = word.trim();


                if (word.isEmpty()) {
                    continue;
                }

                lastUpdateTimeOfStopword = rs.getTimestamp("update_time");

                if (rs.getBoolean("is_deleted")) {
                    logger.info("[stopword] disable word: {}", word);

                    // 删除
                    Dictionary.disableStopword(word);
                    numberOfDisableWords++;
                } else {
                    logger.info("[stopword] add word: {}", word);
                    // 添加
                    Dictionary.addStopword(word);
                    numberOfAddWords++;
                }
            }

            logger.info("end update stopword -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);

        } catch (SQLException e) {
            logger.error("failed to update main_dic", e);
        } finally {
            // 关闭 ResultSet、PreparedStatement
            closeRsAndPs(rs, ps);
        }
    }
}



2.3 修改代码

初始化方法中新增加载JDBC的方法和将getProperty改为public

并且在Dictionary类后面新增下面的方法

	/**
	 * 加载新词条
	 */
	public static void addWord(String word) {
		singleton._MainDict.fillSegment(word.trim().toLowerCase().toCharArray());
	}

	/**
	 * 移除(屏蔽)词条
	 */
	public static void disableWord(String word) {
		singleton._MainDict.disableSegment(word.trim().toLowerCase().toCharArray());
	}

	/**
	 * 加载新停用词
	 */
	public static void addStopword(String word) {
		singleton._StopWords.fillSegment(word.trim().toLowerCase().toCharArray());
	}

	/**
	 * 移除(屏蔽)停用词
	 */
	public static void disableStopword(String word) {
		singleton._StopWords.disableSegment(word.trim().toLowerCase().toCharArray());
	}

	/**
	 * 加载 jdbc.properties
	 */
	public void loadJdbcProperties() {
		Path file = PathUtils.get(getDictRoot(), DatabaseMonitor.PATH_JDBC_PROPERTIES);
		try {
			props.load(new FileInputStream(file.toFile()));
			logger.info("====================================properties====================================");
			for (Map.Entry<Object, Object> entry : props.entrySet()) {
				logger.info("{}: {}", entry.getKey(), entry.getValue());
			}
			logger.info("====================================properties====================================");
		} catch (IOException e) {
			logger.error("failed to read file: " + DatabaseMonitor.PATH_JDBC_PROPERTIES, e);
		}
	}



三、修改插件的权限

grant {
  // needed because of the hot reload functionality
  permission java.net.SocketPermission "*", "connect,resolve";
  permission java.lang.RuntimePermission "setContextClassLoader";
};

四、打包

4.1 加入依赖

将MySQL的jar包依赖加入进来,否则打包会缺少jar包保持错。

<include>mysql:mysql-connector-java</include>

4.2 package

打包成zip文件,然后加压成文件夹

五、安装

将解压的文件夹放到ES的plugins目录下,然后配置一下config目录下的数据库配置信息,最后再重启一下ES即可完成安装。

六、测试验证

在数据库表中中新增下面自己的想要的关键词,然后去Kibana中做测试验证,可以发现已经可以啦。

关键词

停止词

POST _analyze
{
  "text": ["俺是熊二呗"], 
  "analyzer": "ik_max_word"
}

运行结果

0人推荐
随时随地看视频
慕课网APP