python 操作 Excel 文档 IO 频繁,多线程 / 线程池修改 2800 条数据

之前的任务中,我们创建了,日志系统,操作excel文档 和 postgresql数据库的接口、高德地图的接口。但是城市的数据有2000多条,我们每次单线程地去存取数据,太慢了。我们来试试集中并发执行地方式,并比较一下。测试用进程、协程、线程来分别测试。看看哪个效果好。

之前的任务中,我们创建了,日志系统,操作excel文档 和 postgresql数据库的接口、高德地图的接口。但是城市的数据有2000多条,我们每次单线程地去存取数据,太慢了。我们来试试集中并发执行地方式,并比较一下。测试用进程、协程、线程来分别测试。看看哪个效果好。

上一篇文章:https://boywithacoin.cn/article/pythondui-...

本地创建数据库,将excel数据存储到city表中,再取|湖北省|的所有地级市和县、县级市、区数据作为样表数据记录在样表中。

博客地址:https://boywithacoin.cn/
项目的完整地址在https://github.com/Freen247/python_get_cit...
文档都可以在里面查看,有兴趣的可以给我评论和star/issue哦?~ (ง •_•)ง

本地创建数据库,将excel数据存储到city表中,再取|湖北省|的所有地级市和县、县级市、区数据作为样表数据记录在样表中。准备工作创建好public/config.py扩展包,到时候,利用python的xlrd包,定义process_data包来存放操作excel数据,生成sql语句的类,
定义op_postgresql包来存放数据库的操作对象,定义各种方法
创建crwler包,来存放爬虫的操作对象 -> 发现对方网站调用的地图api -> 更改为调用德地图api的包-存放操作对象
创建log文件夹,存放数据库操作的日志
创建data文件夹,存放初始excel数据

之前的文章可以在博客中查看

线程和进程之间的区别:

参考文章:
11.4. threading — 管理单个进程里的并行操作

我们的excel数据集总量有2867个城市:

编写默认插入数据的函数

查看我们的源代码:

#!/usr/bin/python
# -*- coding: utf-8 -*-
#__author__: stray_camel

import xlrd,xlwt,sys,os,logging,datetime,time
import asyncio,multiprocessing,threading
# 线程池
from concurrent.futures import ThreadPoolExecutor
from xlutils.copy import copy
absPath = os.path.abspath(__file__)   #返回代码段所在的位置,肯定是在某个.py文件中
temPath = os.path.dirname(absPath)    #往上返回一级目录,得到文件所在的路径
temPath = os.path.dirname(temPath)    #在往上返回一级,得到文件夹所在的路径
sys.path.append(temPath)
from geo.geo_map import Geo_mapInterface
from public import config
#sys.path.insert(0, temPath)          #也可以使用这种方式,确定tmpPath为最高级搜索路径
class OpExcel(object):
    def __init__( self, 
        url:"str类型的文件路径" = config.src_path+"\\data\\2019最新全国城市省市县区行政级别对照表(194).xls", 
        sheet:"excel中的表单名" = "全国城市省市县区域列表"):
        # 传入文件名和表单名
        self.f_name = url
        self.sheet = sheet
        # 将excel中特定表单名数据存储起来
        self.excel = xlrd.open_workbook(self.f_name)
        self.sh_data = self.excel.sheet_by_name(sheet)
        self.rows = self.sh_data.nrows
        self.cols = self.sh_data.ncols
        # 返回的数据集
        self.result = {'code':'999','message':'默认通信','data':"默认数据"}

    # 通过高德地图API查询每个地域名的经纬度,并返回为list数据
    def init_SampleViaProvince_name(
        self, 
        Province_name:"省名" = None
        ) ->"insert的数据,列表形式[('地域名1','1','经纬度'),('地域名2','1','经纬度')]":
        geo_app = Geo_mapInterface(config.geo_key)
        all_data = [self.sh_data.row_values(i) for i in range(self.rows)]
        if Province_name:# 生成测试用数据
            cities_data=[ [ ["".join(i[0:3]),1,'test'], ["".join(i[1:3]),1,'test']][i[0]==i[1]] for i in all_data if i[0] == Province_name]
        else: # 生成实际数据
            # 生成第1级别地点和经纬度
            cities_data=[[["".join(i[0:3]),1,i[4]],["".join(i[1:3]),1,i[4]]][i[0]==i[1]] for i in all_data[1:]]
            # 继续添加第0级别的地点和经纬度:
            for i in all_data:
                if cities_data.count(["".join(i[0:2-(i[0]==i[1])]),0,i[4]]) ==0:
                    cities_data.append(["".join(i[0:2-(i[0]==i[1])]),0,i[4]])
        return cities_data

if __name__ == "__main__":
    pass

上一次我们写好了 通过高德地图API查询每个地域名的经纬度,并返回为list数据的函数。

插入函数我们常用的写法:

    # 在新的表test.xls中插入第一级第二级的经纬度(通过api)
    def insert_coordinates(self, 
        new_file:"新生成的文件名", 
    ):
        geo_app = Geo_mapInterface(config.geo_key)
        all_data = [self.sh_data.row_values(i) for i in range(self.rows)]
        test_data = [i for i in all_data ]
        new_excel = copy(self.excel)
        ws = new_excel.get_sheet(self.sheet)
        for i,l in enumerate(test_data[:]):
            # xls表首位是名称,所以去掉查询
            if i == 0:
                continue
            # 避免高德地图api使用多余的调用次数
            if l[3]:
                print("记录已经存在",i[3])
            elif l[3]!=" ":
                # 测试时我们用默认的参数,先不调用api
                cur_coordinates = 12345
                # cur_coordinates = geo_app.get_coordinatesViaaddress("".join(l[0:3]))
                # 添加地级市经纬度
                ws.write(i,3,cur_coordinates)
                # 添加县、县级市、区经纬度
                ws.write(i,4,cur_coordinates)
                # print(l,cur_coordinates)
        new_excel.save(config.src_path+'\\data\\'+new_file)

为了方便我们测试,看哪个方法用的时间比较多,我们在我们的config配置文件中写一个打印时间的装饰器:

public/config.py:

#!/usr/bin/python
# -*- coding: utf-8 -*-
#__author__: stray_camel

import os,sys,logging,datetime
#当前项目的所在目录的上级目录
src_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 高德地图key
geo_key = '8fd4f42451e2ae4200c8a5efa8a523ac'

# 打印时间的装饰器
def logging_time(func):
    start = datetime.datetime.now()
    def wrapper(*args, **kwargs):
        print(func.__name__,'is running')
        res = func(*args, **kwargs)
        print("time func takes",datetime.datetime.now()-start)
        return res
    return wrapper

在函数中引用:

 @config.logging_time
    def insert_coordinates(self, new_file:"新生成的文件名", ):
    ···

ok,让我们来测试一下将2800多条数据存入到xls需要多久呢?🙃

if __name__ == "__main__":
    test = OpExcel()
    test.insert_coordinates("test.xls")
(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py
insert_coordinates is running
time func takes 0:00:00.260264

good,nice不错。其实后面我尝试用递归的方法来处理数据,但是迭代函数,数据量太大,最后导致溢栈。而且时间复杂度也高出了logn倍。
🙃不要尝试哦!

def test(data):
    print(len(data))
    for i,l in enumerate(data):
        # xls表首位是名称,所以去掉查询
        if i == 0:
            continue
        # 避免高德地图api使用多余的调用次数
        if l[3]:
            print("记录已经存在",i[3])
        elif l[3]!=" ":
            # 测试时我们用默认的参数,先不调用api
            cur_coordinates = 12345
            # cur_coordinates = geo_app.get_coordinatesViaaddress("".join(l[0:3]))
            # 添加地级市经纬度
            ws.write(i,3,cur_coordinates)
            # 添加县、县级市、区经纬度
            ws.write(i,4,cur_coordinates)
            # print(l,cur_coordinates)
    data = data[1:]
    if len(data) != 0:
        test(data)
test(all_data)

ok,将2800多条数据存入到xls需要0.2到0.3s。

其实我们都知道,读写文件,只是io操作而已,所以用多线程操作就完事了。但是多线程和线程的利用是不是有很大的区别呢,让我们来试试。

# 多线程——在新的表test.xls中插入第一级第二级的经纬度(通过api)
    @config.logging_time
    def insert_coordinates_threading(self, new_file:"新生成的文件名", ):
        geo_app = Geo_mapInterface(config.geo_key)
        all_data = [self.sh_data.row_values(i) for i in range(self.rows)]
        test_data = [i for i in all_data ]
        new_excel = copy(self.excel)
        ws = new_excel.get_sheet(self.sheet)

        def test(i):
            cur_coordinates = 12345
            ws.write(i,3,cur_coordinates)
            ws.write(i,4,cur_coordinates)
        for i,l in enumerate(test_data[:]):
            if l[3] and i!=0:
                print(test[0][3],"已经存在",i[3])
            elif l[3]!=" ":
                # cur_coordinates = geo_app.get_coordinatesViaaddress(i[1])
                # test(i)
                t = threading.Thread(target=test, args=(i,))
                t.start()
        new_excel.save(config.src_path+'\\data\\'+new_file)

运行结果:

(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py
insert_coordinates_threading is running
time func takes 0:00:00.640247

WTF?难道用多线程还比单线程跑的快吗?。。我们尝试将时间量扩大,当我们每存入一个数据饿时候执行以下time.sleep(0.01)。:

比较一下两者的结果:

if __name__ == "__main__":
    test = OpExcel()
    test.insert_coordinates_threading("test.xls")
    test.insert_coordinates("test.xls")
(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py
insert_coordinates_threading is running
time func takes 0:00:00.679151
insert_coordinates is running
time func takes 0:00:31.500298

好的好的 我就放心了!🙃

使用线程池,多线程操作

import asyncio,multiprocessing,threading
# 线程池
from concurrent.futures import ThreadPoolExecutor

线程池的代码编写:

    # 使用线程池的多线程——在新的表test.xls中插入第一级第二级的经纬度(通过api)
    @config.logging_time
    def insert_coordinates_ThreadPool(self, new_file:"新生成的文件名", ):
        geo_app = Geo_mapInterface(config.geo_key)
        all_data = [self.sh_data.row_values(i) for i in range(self.rows)]
        test_data = [i for i in all_data ]
        new_excel = copy(self.excel)
        ws = new_excel.get_sheet(self.sheet)
        def test(i,l,new_excel):
            cur_coordinates = 12345
            # cur_coordinates = geo_app.get_coordinatesViaaddress("".join(l[0:3]))
            # 添加地级市经纬度
            ws.write(i,3,cur_coordinates)
            # 添加县、县级市、区经纬度
            ws.write(i,4,cur_coordinates)
            time.sleep(0.01)

        executor = ThreadPoolExecutor(max_workers=20)
        for i,l in enumerate(test_data[:]):
            # xls表首位是名称,所以去掉查询
            if i == 0:
                continue
            if l[3]:
                print(test[0][3],"已经存在",i[3])
            elif l[3]!=" ":
                # 利用线程池--concurrent.futures模块来管理多线程:
                future = executor.submit(test,i,l,new_excel)
        new_excel.save(config.src_path+'\\data\\'+new_file)

注意哦,测试的代码每一条都用了sleep(0.01),运行时间:

(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py
insert_coordinates_ThreadPool is running
time func takes 0:00:01.759619

还不错,但是我们打开文档时,还有很多经纬度数据没有存储进去呢?
OP!( ఠൠఠ )ノ(注意我们使用的20条线程)

好吧,原来是线程池里得还没有运行完,主线程就关闭了。也就是说当我们数据还没有存储完毕的时候就执行了new_excel.save(config.src_path+'\\data\\'+new_file)。我们是设置让线程池运行完毕再结束主线程:

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
···
task_pool = []
with ThreadPoolExecutor(max_workers=20) as executor:
    for i,l in enumerate(test_data[:]):
        # xls表首位是名称,所以去掉查询
        if i == 0:
            continue
        if l[3]:
            print(test[0][3],"已经存在",i[3])
        elif l[3]!=" ":
            # 利用线程池--concurrent.futures模块来管理多线程:
            task_pool.append(executor.submit(test,i,l,new_excel))
if task_pool != []:
    wait(task_pool, return_when=ALL_COMPLETED)
new_excel.save(config.src_path+'\\data\\'+new_file)

再次运行看看:

(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py
insert_coordinates_ThreadPool is running
time func takes 0:00:01.797645

最后查看新建地文件、可以看到插入数据成功!

好的吧。那为什么使用线程池和不适用线程池的区别这么大呢?
问答:线程并行使用线程池并发操作 2800 多条线程,时间反而用时比不用线程...
之前我在这个论坛发表过问题。后面也找到了些许的答案

后面我又测试了一下,当我们设置线程池的大小为100的时候或者用BoundedSemaphore设置线程数量,两种情况下还是线程池相对的优化好些。。!个人觉得是当对线程进行操作时比如传参、控制量等,会涉及到更多的上下文操作。线程池的优化就很好的体现了。。在日常的开发中,我们还是用线程池比较好!

感觉应该是设置这个并行发的时候的问题、毕竟相当于一个容器装起来 容器建立、之类的操作都需要时间,但是并发池肯定更放便管理。应该就是这个理儿了 🙃。解答完毕! 估计就是这样子了,有兴趣的看这篇问答来研究一下呗。问答:如何合理地估算线程池大小?N 为 CPU 核数,2N+1/N+1 公式是怎么来的...

多进程地来处理2800条数据

预计是没有线程好的,虽然python有gil锁,但是对于频繁地IO操作还是处理地优化很大地。

尝试编写代码:

# 使用进程池——在新的表test.xls中插入第一级第二级的经纬度(通过api)
    @config.logging_time
    def insert_coordinates_ProcessPool(self, new_file:"新生成的文件名", ):
        geo_app = Geo_mapInterface(config.geo_key)
        all_data = [self.sh_data.row_values(i) for i in range(self.rows)]
        test_data = [i for i in all_data ]
        # print(test_data)
        new_excel = copy(self.excel)
        ws = new_excel.get_sheet(self.sheet)
        task = []

        def test(i,l,new_excel):
            cur_coordinates = 12345
            # cur_coordinates = geo_app.get_coordinatesViaaddress("".join(l[0:3]))
            # 添加地级市经纬度
            ws.write(i,3,cur_coordinates)
            # 添加县、县级市、区经纬度
            print(cur_coordinates)
            ws.write(i,4,cur_coordinates)
            time.sleep(0.01)
        pool = multiprocessing.Pool(os.cpu_count()*2)
        for i,l in enumerate(test_data[:]):
            # xls表首位是名称,所以去掉查询
            if i == 0:
                continue
            if l[3]:
                print(test[0][3],"已经存在",i[3])
            elif l[3]!=" ":
                pool.apply_async(func=test,args=(i,l,new_excel))
        pool.close()
        new_excel.save(config.src_path+'\\data\\test.xls')

我们访问test.xls是否修改成功、但是你会发现一个数据都没有。这是为什么吗呢?

文档:
https://docs.python.org/3/library/pickle.h...
https://docs.python.org/2/library/pickle.h...
因为默认在多个进程之间,只有以下数据可以共享:

  • None,True和False
  • 整数,长整数,浮点数,复数
  • 普通和Unicode字符串
  • 仅包含可腌制对象的元组,列表,集合和词典
  • 在模块顶层定义的功能
  • 在模块顶层定义的内置函数
  • 在模块顶层定义的类
  • 此类类的实例dict或其调用的结果getstate()是可腌制的( 有关详细信息,请参见“腌制协议”一节)

其次我们可以通过锁机制来共享内存中地数据。但是不适合我们这次地需求:

因为,我们需要并发地修改数据,但是pv操作是串行。
其次,通过lock和value函数,有特定地数据对象才能进行处理:
https://www.cnblogs.com/junyuhuang/p/55395...
https://www.cnblogs.com/junyuhuang/p/55395...

同样其他地方法:我们需要使用queue或者pipe来处理数据。
所以我们测试IO操作地时候,确实不适合利用多进程来处理。

虽然这个方法不奏效,但是我们要通过这次经验,了解IO操作和CPU操作地区别,和进程和线程地选择。!(☆▽☆)

尝试一下asyncio来通过协程来建立并发应用的功能(OP🙃并未优化时间)

asyncio 模块提供一套使用协程来建立并发应用的功能。threading 模块所实现的并发是多个线程,multiprocessing 则是多个系统进程,asyncio 是以单个进程,单个线程的方式所实现的,应用程序的协同程序会在适宜的时候显式得进行切换。大多数上下文的切换发生在程序被阻塞住等待读或写数据的时候,asyncio 还支持定时器任务,可以在未来时间自动执行设置好的代码,这样就可以让一个协程等待另一个协程完全完成后再执行,同时也可以处理系统信号,识别让应用程序做些其他事的事件。

参考官方代码:
https://pymotw.com/3/asyncio/tasks.html#cr...
https://www.liaoxuefeng.com/wiki/101695966...

具体操作:

    # 使用协程——在新的表test.xls中插入第一级第二级的经纬度(通过api)
@config.logging_time
def insert_coordinates_async(self, new_file:"新生成的文件名", ):
    geo_app = Geo_mapInterface(config.geo_key)
    all_data = [self.sh_data.row_values(i) for i in range(self.rows)]
    test_data = [i for i in all_data ]
    # print(test_data)
    new_excel = copy(self.excel)
    ws = new_excel.get_sheet(self.sheet)
    task = []
    # 测试用代码
    async def asy_test(i,l,new_excel):
        cur_coordinates = 12345
        ws.write(i,3,cur_coordinates)
        ws.write(i,4,cur_coordinates)
        time.sleep(0.01)
        # print('Hello world! (%s)' % threading.currentThread())

    for i,l in enumerate(test_data[:]):
        # xls表首位是名称,所以去掉查询
        if i == 0:
            continue
        if l[3]:
            print(test[0][3],"已经存在",i[3])
        elif l[3]!=" ":

            # 利用协程并发处理代码,将多个任务加入到协程任务列表中
            task.append(asy_test(i,l,new_excel))
    # # 利用协程并发处理代码30.147071
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(task))
    loop.close()
    new_excel.save(config.src_path+'\\data\\'+new_file)

运行结果:

(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py
insert_coordinates_async is running
time func takes 0:00:30.779030

总体运行时间为0:00:30.779030,OP!🙃没有线程并行效率高,这是为什么呢?

问题先留着,还在处理中。暂时就先用多线程方法吧。

2019-12-29 21:57:40 星期日
问题发现了,在这个测试环境中之所以协程并行比线程并行慢很多是因为我手动的让每次test函数运行的时候都sleep了0.1秒。

回想一下协程和线程的关系:

协程和线程的共同目的之一是实现系统资源的上下文调用,不过它们的实现层级不同;

线程(Thraed)是比进程小一级的的运行单位,多线程实现系统资源上下文调用,是编程语言交付系统内核来进行的(可能是并发,也可能是伪并发),大部分的编程语言的多线程实现都是抢占式的,而对于这些线程的控制,编程语言无法直接控制,需要通过系统内核来进行,由系统内核决定最终的行为;

协程(Coroutine)是在语言层面实现“多线程”这个过程,一般在代码中以串行的方式表达并发逻辑,由于是在编程语言层面模拟这一过程,而非涉及到硬件层面,在编程语言层面可以完全控制这一过程,可以这么说,协程是软件层面模拟硬件层面的多线程;但不是说协程就一定是单线程,具体的实现要看具体编程语言的实现,kotlin的协程实现可能是单线程,也可能是多线程;

协程的使用场景

协程可以用于解决高负荷网络 IO、文件 IO、CPU/GPU 密集型任务等;

比如在IO线程高负载的场景下,CPU资源会被大量线程占用,这会极大浪费CPU资源,同时可能导致一些重要的线程被阻塞,代价是十分昂贵的,此时如果使用IO协程代替IO线程,可以大大减少线程数量,节省CPU资源,同时在协程挂起是几乎无代价的(不需要上下文切换或OS干预),同时编程语言对协程又有极大控制性;

总而言之、协程并发处理数据,并不是想线程并行一样来节约时间。

协程拥有自己的寄存器上下文和栈,调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

有了协程:
无需线程上下文切换的开销
无需原子操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

最后还是决定用线程池,但是两种线程池的区别很大吗?

之前我们已经使用了ThreadPoolExecutor,这次让我们来尝试一下BoundedSemaphore:

级第二级的经纬度(通过api)
@config.logging_time
def insert_coordinates_BoundedSemaphore(self, new_file:"新生成的文件名", ):
    geo_app = Geo_mapInterface(config.geo_key)
    all_data = [self.sh_data.row_values(i) for i in range(self.rows)]
    test_data = [i for i in all_data ]
    # print(test_data)
    new_excel = copy(self.excel)
    ws = new_excel.get_sheet(self.sheet)
    task = []
    def test(i,l,new_excel):
        cur_coordinates = 12345
        # cur_coordinates = geo_app.get_coordinatesViaaddress("".join(l[0:3]))
        # 添加地级市经纬度
        ws.write(i,3,cur_coordinates)
        # 添加县、县级市、区经纬度
        ws.write(i,4,cur_coordinates)
        threadmax.release()

    threadmax = threading.BoundedSemaphore(20)
    for i,l in enumerate(test_data[:]):
        # xls表首位是名称,所以去掉查询
        if i == 0:
            continue
        if l[3]:
            print(test[0][3],"已经存在",i[3])
        elif l[3]!=" ":
            # 单进程,8核CPU,线程并行00.743780s,
            threadmax.acquire()
            t = threading.Thread(target=test, args=(i,l,new_excel),daemon=True)
            task.append(t)
            t.start()
            t.join()
            # 我们需要让线程运行阻塞到结束,因为高德API获取数据的速度不是很快~~🙃
    new_excel.save(config.src_path+'\\data\\'+new_file)  

运行结果(不调用api):

(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py
insert_coordinates_BoundedSemaphore is running
time func takes 0:00:00.675194
(env) PS F:\workspace>

和线程池threadingPool 01.797645比还是优化效果更好一些。

这里我们可以讨论一下threadingPool和BoundedSemaphore的区别~
其实我的想法和之前用threading的时候一样:
池化的过程,相当于制作了一个容器,而BoundedSemaphore只是给线程给了限制,我一下只能运行20条啊!,而不是池化了。池化的作用,当然是更方便管理,资源调用。目前我还没设计的很深。以后可能我经验更丰富后就会有更好的 解答。

最终,三个高德的api全部满了,明天再来实操的,哈哈~

文章编写的可鞥你不够严谨、如果有错误或者不严谨的地方,可以在博客或者评论下给我留言U•ェ•*U

本作品采用《CC 协议》,转载必须注明作者和本文链接
文章!!首发于我的博客Stray_Camel(^U^)ノ~YO
讨论数量: 1

分享的非常好,学习了。。。

4年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!