手记

基于gevent和pymysql实现mysql读写的异步非堵塞方案


咱们经常使用的mysql库,MySQL-Python库是用C写的,很遗憾它是阻塞的,要实现异步的MySQL驱动必须用Python版本的MySQL驱动! 

现在社区里面有两个纯python实现的mysql驱动。一个是 myconnpy  另一个是PyMysql ~ 这两个mysql驱动文档相当的少呀,好在他们的用法和MySQldb相当的像,不然就要头疼的看代码了。。。 实现的方式是用socket来交互,不像mysqldb封装了libmysqlclient那样 !

myconnpy中国的tornado大牛推荐过,但是也评价过,貌似有些bug的样子。

我这边就用Mozilla公司 也在用的pymysql ~

咱们先创建数据库和数据表

201006358.jpg

安装python的mysql模块~

200829712.jpg

PySQL针对mysql操作demo还算简单的~

import pymysql

db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db = 'rui')

cursor = db.cursor()

sql='select count(*) from kkk'

data = cursor.execute(sql)

print cursor

cursor.close()

db.close()

其实咱们也可以模拟apache那样prefork模式,来派生任务执行对象。

prefork采用预派生子进程方式,用单独的子进程来处理 不同的请求,进程之间彼此独立。我这边测试是mysql堵塞方式,大家也可以利用这种方案模拟多个任务执行。有点类似多进程的样子,消耗比较大的~

#!/usr/bin/python

# -*- coding: utf-8 -*-

#xiaorui.cc

import MySQLdb, pymysql

import signal, os, sys

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

workers = {}

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

def run():

  con = MySQLdb.connect(host='localhost', db='rui', user='root', passwd='123123')

  signal.signal(signal.SIGTERM, lambda sig, status: sys.exit(0))

  cur = con.cursor()

  cur.execute("SELECT SLEEP(30)")

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

def killall(sig, status):

  for pid in workers.keys():

    os.kill(pid, signal.SIGTERM)

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

def waitall():

  for pid in workers.keys():

    try:

      os.waitpid(pid, 0)

    except:

      print "waitpid: interrupted exception"

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

def main():

  print os.getpid()

  signal.signal(signal.SIGTERM, killall)

  for i in range(3):

    pid = os.fork()

    if pid == 0:

      try:

        run()

      except:

        print "run: interrupted exception"

        sys.exit(0)

    else:

      workers[pid] = 1

  waitall()

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  

if __name__ == '__main__':

  main()

看到这三个sleep30都在跑吗? 看起来生效了,但三个进程还是消耗了130秒。

230057221.jpg

225729957.jpg

好了说正题,今天怎么主要说的就是gevent和python下的mysql驱动测试,分享个gevent和pymysql的在一起使用的实例demo ~

def goodquery(sql):

    db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db= 'rui')

    cursor = db.cursor()

    data = cursor.execute(sql)

    cursor.close()

    db.close()

    return cursor

sqla='select count(*) from kkk'

sqlb="select * from kkk where name like '%888888%'"

jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery, (sqlb))]

#jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]

gevent.joinall(jobs, timeout=2)

what_you_want = [job.value for job in jobs]

print what_you_want

for i in what_you_want:

    for a in i:

        print a

221013629.jpg

哎,还是有点堵塞。。。 跑了6个耗时3s的sql,共用了18秒的时间。。。。

231823673.jpg

经过一上午的折腾,得知gevent的版本没有用对,只有gevent 1.0 才完美支持socket,然后需要在引入模块的后面,打上别的补丁!

143638790.png

gevent.monkey.patch_socket()

real    0m8.993s

user    0m0.071s

sys     0m0.016s

在这里我再测试下多线程的版本:

import pymysql

import threading

def goodquery(sql):

    db = pymysql.connect(host = 'localhost', passwd = '123123', user = 'root', db= 'rui')

    cursor = db.cursor()

    data = cursor.execute(sql)

    cursor.close()

    db.close()

    print cursor

    return cursor

sqla='select count(*) from kkk'

sqlb="select * from kkk where name like '%888888%'"

#jobs = [gevent.spawn(goodquery,(sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]

#jobs = [gevent.spawn(goodquery, (sqla)),gevent.spawn(goodquery,(sqlb)),gevent.spawn(goodquery,(sqlb))]

#gevent.joinall(jobs, timeout=30)

#what_you_want = [job.value for job in jobs]

threads=[]

for i in range(5):                                                                                                                

    threads.append( threading.Thread( target=goodquery,args=(sqlb,) ) )

for t in threads:

    t.start()

for t in threads:

    t.join()

034003590.jpg

他的测试结果要比gevent慢点 ~但也是并发的执行,可以在mysql进程里面看到执行的记录 ~

[root@101 ~]# time python t.py

real    0m11.122s

user    0m0.095s

sys     0m0.026s

总结下:

       gevent pymysql 或者是 threading pymysql 是靠谱的~  是可以解决大数据下的mysql读写堵塞的问题的~    

       但是和sohu、腾讯的朋友讨论了下我的这个方案,mysql堵塞是在与事务处理时发生的。 看来mysql的堵塞不是这么搞解决的,以后有环境后,会继续的追踪这事 ~

©著作权归作者所有:来自51CTO博客作者rfyiamcool的原创作品,谢绝转载,否则将追究法律责任

python geventpython pymysqlpython 堵塞python应用


0人推荐
随时随地看视频
慕课网APP