Python线程池读写mysql

Posted by Mr.Be1ieVe on Sunday, March 21, 2021

基于mysql-connector-python,主要是因为一次函数调用有七个select语句,而一共有3w8次7个select,原先要跑两天才能跑完。先吐槽一波mysql connector的库文档,关闭连接啥的不和建立连接放一起,找了老久最后百度出来的。。

mysql连接池

建立连接池

from mysql.connector import pooling
mydb = pooling.MySQLConnectionPool(
            host="localhost",
            user="root",
            passwd="xxx",
            database="xxxxx",
            auth_plugin='mysql_native_password',
            pool_name='test',
            pool_size=32
        )

申请连接,获得游标,关闭游标和连接

thread_connection = mydb.get_connection()
mycursor = thread_connection.cursor()

mycursor.close()
thread_connection.close()

线程池

concurrent.futures — 启动并行任务 — Python 3.9.2 文档

from concurrent.futures.thread import ThreadPoolExecutor
pool = ThreadPoolExecutor(16)

一开始我用的submit(),这样会导致mysql连接池被耗尽并且还没有任何报错的情况(调试也不好发现问题在哪),翻看文档后用了map

map(func, *iterables, timeout=None, chunksize=1) 类似于 map(func, *iterables) 函数,除了以下两点:

  • iterables 是立即执行而不是延迟执行的;
  • func 是异步执行的,对 func 的多个调用可以并发执行。

如果 __next__() 已被调用且返回的结果在对 Executor.map() 的原始调用经过 timeout 秒后还不可用,则已返回的迭代器将引发 concurrent.futures.TimeoutErrortimeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None,则不限制等待时间。 如果 func 调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。 使用 ProcessPoolExecutor 时,这个方法会将 iterables 分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由 chunksize 指定正整数设置。 对很长的迭代器来说,使用大的 chunksize 值比默认值 1 能显著地提高性能。 chunksizeThreadPoolExecutor 没有效果。

userlist = list(xxx)#*iterables的要
pool = ThreadPoolExecutor(16)
result = pool.map(select_function, userlist)
count = 0
for r in result:#线程正常完成会返回None,print(r)可以帮助调试
    count += 1
    print(str(count) + " in " + str(len(userlist)))

这里选择16,是因为填32会崩,索性直接砍半了。最后再展示一下mysql的情况

性能测试对比

库里一共有6w条记录,for循环的话要死人了,所以只测1000条时候的速度

date && /usr/bin/python3 ./test.py && date 
日  3 21 20:36:01 CST 2021
日  3 21 20:36:24 CST 2021

23秒,然后for循环

date && /usr/bin/python3 ./test.py && date 
日  3 21 20:43:57 CST 2021
日  3 21 20:45:13 CST 2021

1分16秒

暂时够用了👨‍💻

「真诚赞赏,手留余香」

Mr.Be1ieVe's Treasure

真诚赞赏,手留余香

使用微信扫描二维码完成支付