手记

【九月打卡】第3天 Python3入门机器学习

①课程介绍


课程名称:Python3入门机器学习 经典算法与应用 入行人工智能
课程章节:6-8,6-9
主讲老师:liuyubobobo

内容导读


  • 第一部分 实现小批量梯度下降法
  • 第二部分 梯度的调试
  • 第三部分 代码展示

②课程详细


第一部分 实现小批量梯度下降法

导入函数

import numpy as np
import matplotlib.pyplot as plt

创建多维度的数据(1000, 10)

np.random.seed(666)
X = np.random.random(size=(1000, 10))

true_theta = np.arange(1, 12, dtype=float)
true_theta

X_b = np.concatenate([np.ones((len(X), 1)),X], axis=1)
y = X_b.dot(true_theta) + np.random.normal(size=(1000))

对数据前两维度可视化

plt.scatter(X_b[:,2],y)
plt.show()


数据呈现随机正太分布
导入自己写的小批量梯度下降算法并运行

from nike.LinearRegression import LinearRegression
reg = LinearRegression()
%time reg.fit_mbgd(X, y)

CPU times: total: 500 ms
Wall time: 496 ms
array([ 1.06842484, 2.00102707, 2.97106331, 4.0678298 , 4.97266176,
6.01117703, 7.04290512, 8.0225941 , 9.03038604, 9.99826339,
10.99534304])
从theta的数据来看我自己编写的实现的还是很不错的

第二部分 梯度的调试

承接上文,使用上部分的,来进行梯度检测

def dJ_debug(theta, X_b, y, epsilon=0.01):
    res = np.empty(len(theta))
    for i in range(len(theta)):
        theta_1 = theta.copy()
        theta_1[i] += epsilon
        theta_2 = theta.copy()
        theta_2[i] -=epsilon
        res[i] = (J(theta_1, X_b, y) - J(theta_2, X_b, y))  /(epsilon * 2)
    return res

这个是梯度检测的代码封装,具体的思想是利用很近的上下两个点的导数来估算出中间节点的导数的值,不一定准确,但是近似,可以用于检测自己通过数学公式算出的梯度算法是否正确,然后记得关闭然后再进行正常运行,

第三部分 代码展示

小批量梯度下降代码展示

import numpy as np
from .metrics import r2_score


class LinearRegression:

    def __init__(self):
        """初始化多元线性回归模型"""
        #初始化截距coef_和系数interception_,和theta私有化参数
        self.coef_ =  None
        self.intercept_ = None
        self._theta = None
        self.history_j = []

    def fit_normal(self, X_train ,y_train):
        assert X_train.shape[0] ==y_train.shape[0],\
            "the size of X_train must be equal to the size of y_train"
        #ones(多少个,0or1行和列)
        X_b = np.concatenate(([np.ones((len(X_train), 1)), X_train]), axis=1)
        self._theta = np.linalg.pinv(X_b.T.dot(X_b)).dot(X_b.T).dot(y_train)

        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]

        return self
    def fit_gd(self,X_train, y_train, eta=0.01, n_iters=1e4, epsilon=1e-8):

        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"

        # 计算J的值
        def J(theta, X_b, y):
            try:
                return np.sum((y - X_b.dot(theta)) ** 2) / len(X_b)
            except:
                return float('inf')

        # 计算J的导数
        def dJ(theta, X_b, y):
            reg = np.array(X_b.dot(theta) - y).dot(X_b)
            return reg * 2 / len(X_b)
            # return X_b.T.dot(X_b.dot(theta) - y) * 2. / len(y)
        def gradient_descent(X_b, y,initial_theta, eta, n_iters=1e4, epsilon=1e-8):

            # 初始化,theta的值,运行次数的值,theta历史的数字
            theta = initial_theta
            i_iter = 0

            # 运行次数超过1万次就退出循环条件1
            while i_iter < n_iters:

                # 求导数
                gradient = dJ(theta, X_b, y)
                # 赋值历史theta,用于判断退出循环条件2
                last_theta = theta
                # 梯度下降,
                theta = theta - eta * gradient
                # 推出条件,J与上一个J的值差距小于1e-8
                if (abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon):
                    break
                # 用于记录运行次数
                i_iter += 1

            return theta

        # 合并,在X前插入全1向量
        X_b = np.concatenate([np.ones((len(X_train), 1)), X_train], axis=1)
        # 随机化系数
        initial_theta = np.array(np.random.randint(1,10, X_b.shape[1]))

        self._theta = gradient_descent(X_b, y_train, initial_theta, eta, n_iters, epsilon)
        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]

        return self

    def fit_sgd(self,X_train, y_train, n_iters=5, t0=5, t1=50):
        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"
        assert n_iters >= 1,\
            'n_iters must big to 1'

        def dJ_sgd(theta, X_b_i, y_i):
            return X_b_i.dot(X_b_i.dot(theta) - y_i) * 2

        # eta不用传了,应为由tot1和循环次数决定
        def sgd(X_b, y, initial_theta, n_iters=5, t0=5, t1=50):
            def learning_rate(t):
                return t0 / (t + t1)
            theta = initial_theta
            m = len(X_b)

            for cur_iter in range(n_iters):
                indexs = np.random.permutation(m)
                X_b_new = X_b[indexs,:]
                y_new = y[indexs]
                for i in range(m):
                    grandient = dJ_sgd(theta, X_b_new[i], y_new[i])
                    theta = theta - learning_rate(cur_iter * m + i) * grandient
            return theta

        # 合并,在X前插入全1向量
        X_b = np.concatenate([np.ones((len(X_train), 1)), X_train], axis=1)
        # 随机化系数
        initial_theta = np.random.randn(X_b.shape[1])


        self._theta = sgd(X_b, y_train, initial_theta, n_iters, t0, t1)
        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]

    def fit_mbgd(self, X_train, y_train, n_iters=5, batch_size=10 , t0=5, t1=50):
        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"
        assert n_iters >= 1, \
            'n_iters must big to 1'

        def J(theta, X_b, y):
            try:
                return np.sum((y - X_b.dot(theta)) ** 2) / len(X_b)
            except:
                return float('inf')

        def dJ_mbgd(theta, X_b_b, y_b):
            return X_b_b.T.dot(X_b_b.dot(theta) - y_b) * 2 / len(y_b)

        # eta不用传了,应为由tot1和循环次数决定
        def mbgd(X_b, y, initial_theta, n_iters=5, batch_size=10, t0=5, t1=50):
            def learning_rate(t):
                return t0 / (t + t1)

            theta = initial_theta
            m = len(X_b)
            for cur_iter in range(n_iters):
                indexs = np.random.permutation(m)
                X_b_new = X_b[indexs, :]
                y_new = y[indexs]

                b_iters = m // batch_size
                b_lost = m % batch_size

                for i in range(b_iters):

                    star_sub = i * batch_size
                    end_sub = (i + 1) * batch_size
                    #print(star_sub,':',end_sub)
                    grandient = dJ_mbgd(theta, X_b_new[star_sub:end_sub], y_new[star_sub:end_sub])
                    theta = theta - learning_rate(cur_iter * m + i) * grandient
                    #对剩下的余数进行操作
                    if i == b_iters - 1 and b_lost != 0:
                        star_sub = m - b_lost
                        end_sub = m
                        #print(star_sub, ':', end_sub)
                        grandient = dJ_mbgd(theta, X_b_new[star_sub:end_sub], y_new[star_sub:end_sub])
                        theta = theta - learning_rate(cur_iter * m + (star_sub+end_sub) // 2) * grandient

            return theta

        # 合并,在X前插入全1向量
        X_b = np.concatenate([np.ones((len(X_train), 1)), X_train], axis=1)
        # 随机化系数
        initial_theta = np.random.randn(X_b.shape[1])

        self._theta = mbgd(X_b, y_train, initial_theta, n_iters,batch_size, t0, t1)
        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]


    def predict(self,X_predict):
        assert self.intercept_ is not None and self.coef_ is not None,\
            'must fit before predict'
        assert X_predict.shape[1] == len(self.coef_),\
            'the feature number of X_predict must be equal to X_train'
        X_b = np.concatenate([np.ones((len(X_predict),1)),X_predict], axis=1)

        return X_b.dot(self._theta)

    def score(self, X_test, y_test):

        y_predict = self.predict(X_test)
        return r2_score(y_test, y_predict)

    def __repr__(self):
        return "LinearRegression()"


class LinearRegression2:

    def __init__(self):
        """初始化Linear Regression模型"""
        self.coef_ = None
        self.intercept_ = None
        self._theta = None

    def fit_normal(self, X_train, y_train):
        """根据训练数据集X_train, y_train训练Linear Regression模型"""
        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"

        X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
        self._theta = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y_train)

        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]

        return self

    def fit_bgd(self, X_train, y_train, eta=0.01, n_iters=1e4):
        """根据训练数据集X_train, y_train, 使用梯度下降法训练Linear Regression模型"""
        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"

        def J(theta, X_b, y):
            try:
                return np.sum((y - X_b.dot(theta)) ** 2) / len(y)
            except:
                return float('inf')

        def dJ(theta, X_b, y):
            return X_b.T.dot(X_b.dot(theta) - y) * 2. / len(y)

        def gradient_descent(X_b, y, initial_theta, eta, n_iters=1e4, epsilon=1e-8):

            theta = initial_theta
            cur_iter = 0

            while cur_iter < n_iters:
                gradient = dJ(theta, X_b, y)
                last_theta = theta
                theta = theta - eta * gradient
                if (abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon):
                    break

                cur_iter += 1

            return theta

        X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
        initial_theta = np.zeros(X_b.shape[1])
        self._theta = gradient_descent(X_b, y_train, initial_theta, eta, n_iters)

        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]

        return self

    def fit_sgd(self, X_train, y_train, n_iters=50, t0=5, t1=50):
        """根据训练数据集X_train, y_train, 使用梯度下降法训练Linear Regression模型"""
        assert X_train.shape[0] == y_train.shape[0], \
            "the size of X_train must be equal to the size of y_train"
        assert n_iters >= 1

        def dJ_sgd(theta, X_b_i, y_i):
            return X_b_i * (X_b_i.dot(theta) - y_i) * 2.

        def sgd(X_b, y, initial_theta, n_iters=5, t0=5, t1=50):

            def learning_rate(t):
                return t0 / (t + t1)

            theta = initial_theta
            m = len(X_b)
            for i_iter in range(n_iters):
                indexes = np.random.permutation(m)
                X_b_new = X_b[indexes,:]
                y_new = y[indexes]
                for i in range(m):
                    gradient = dJ_sgd(theta, X_b_new[i], y_new[i])
                    theta = theta - learning_rate(i_iter * m + i) * gradient

            return theta

        X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
        initial_theta = np.random.randn(X_b.shape[1])
        self._theta = sgd(X_b, y_train, initial_theta, n_iters, t0, t1)

        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]

        return self

    def predict(self, X_predict):
        """给定待预测数据集X_predict,返回表示X_predict的结果向量"""
        assert self.intercept_ is not None and self.coef_ is not None, \
            "must fit before predict!"
        assert X_predict.shape[1] == len(self.coef_), \
            "the feature number of X_predict must be equal to X_train"

        X_b = np.hstack([np.ones((len(X_predict), 1)), X_predict])
        return X_b.dot(self._theta)

    def score(self, X_test, y_test):
        """根据测试数据集 X_test 和 y_test 确定当前模型的准确度"""

        y_predict = self.predict(X_test)
        return r2_score(y_test, y_predict)

    def __repr__(self):
        return "LinearRegression()"


③课程思考


  • 小批量梯度下降法是一种介于批量梯度下降法和随机梯度下降法之间的算法,这种二执中用的思想似乎别处也有,
  • 批量梯度下降法为m,随机梯度下降法为1,小批量梯度下降法就为10或者比1大的数,意思就是小批量梯度下降法做的事情就是每次取10个数据来进行计算导数也就是梯度下降的方向,在精度与时间之间又增加了一种新的选择。

④课程截图

1人推荐
随时随地看视频
慕课网APP