python 操作 PostgreSQL 数据库,线程并行修改 5w 条数据,性能优化

 获取新xls表中的所有数据并整理为列表形式返回

其实修改的代码量不大,但是要考虑保留之前我们用的函数和方法还要继续能使用。

excel2sql.py中:

在我们创建对象OpExcel时,为文件url和sheet名添加默认的值:


class OpExcel(object):

    def __init__(

        self, 

        url:"str类型的文件路径" = config.src_path+"\\data\\2019最新全国城市省市县区行政级别对照表(194).xls", 

        sheet:"excel中的表单名" = "全国城市省市县区域列表"):

当我们不给予参数时,我们打开的文件时原始文件。

当我们打开新的拥有地域名和所有经纬度的xls文件时像如下:


test = OpExcel(config.src_path+"\\data\\test.xls","全国城市省市县区域列表")

则我们可以查询到新的文件和数据

之前我们写过的函数init_SampleViaProvince_name,时通过传参而返回指定省的第1级别的地点和地域名(list)形式

我们修改为默认不输入传输则返回所有的数据:


# 通过高德地图API查询每个地域名的经纬度,并返回为list数据

    def init_SampleViaProvince_name(

        self, 

        Province_name:"省名" = None

        ) ->"insert的数据,列表形式[('地域名1','1','经纬度'),('地域名2','1','经纬度')]":

        geo_app = Geo_mapInterface(config.geo_key)

        all_data = [self.sh_data.row_values(i) for i in range(self.rows)]

        print(all_data)

        if Province_name:# 生成测试用数据

            cities_data=[ [ ["".join(i[0:3]),1,'test'], ["".join(i[1:3]),1,'test']][i[0]==i[1]] for i in all_data if i[0] == Province_name]

        else: # 生成实际数据

            # 生成第1级别地点和经纬度

            cities_data=[[["".join(i[0:3]),1,i[4]],["".join(i[1:3]),1,i[4]]][i[0]==i[1]] for i in all_data[1:]]

            # 继续添加第0级别的地点和经纬度:

            for i in all_data:

                cities_data.append(["".join(i[0:2-(i[0]==i[1])]),0,i[4]])

        # for i in cities_data:

        #     i.append(geo_app.get_coordinatesViaaddress("".join(i[0])))

        return cities_data

测试一下:


f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/process_data/excel2sql.py

-----

犁哈萨克自治州', 0, 12345.0], ['新疆省伊犁哈萨克自治州', 0, 12345.0], ['新疆省伊犁哈萨

(展示部分数据)

---

成功。

新的函数已经定义好,那么我们可以在主函数中同样利用之前的方法将所有的数据传入到数据库中。

 优化代码,写入更便捷的打印日志系统

在config.py文件中写入:


# 构造打印日志格式,调用config.logger.info()即可

logging.basicConfig(stream=open(src_path + '/log/syserror.log', encoding="utf-8", mode="a"), level = logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')

logger = logging.getLogger(__name__)

再需要打印日志的时候再来打印日志。

比如我们初始化数据库的链接的代码:

opsql.py


#!/usr/bin/python

# -*- coding: utf-8 -*-

#__author__: stray_camel

'''

定义对mysql数据库基本操作的封装

1.数据插入

2.表的清空

3.查询表的所有数据

'''

import logging,datetime,time,sys,os

import psycopg2

import asyncio,multiprocessing,threading

# 线程池

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED

absPath = os.path.abspath(__file__)   #返回代码段所在的位置,肯定是在某个.py文件中

temPath = os.path.dirname(absPath)    #往上返回一级目录,得到文件所在的路径

temPath = os.path.dirname(temPath)    #在往上返回一级,得到文件夹所在的路径

sys.path.append(temPath)

from public import config

class OperationDbInterface(object):

    #定义初始化连接数据库

    def __init__(self, 

    host_db : '数据库服务主机' = 'localhost', 

    user_db: '数据库用户名' = 'postgres', 

    passwd_db: '数据库密码' = '1026shenyang', 

    name_db: '数据库名称' = 'linezone', 

    port_db: '端口号,整型数字'=5432):

        # 创建任务池

        self.all_task = []

        # 创建默认返回result,默认为失败

        result={'code':'9999','message':'默认message','data':'默认data'}

        try:

            self.conn=psycopg2.connect(database=name_db, user=user_db, password=passwd_db, host=host_db, port=port_db)#创建数据库链接

            print("创建数据库成功|postgresql, %s,user:%s,host:%s,port:%s"%(name_db,user_db,host_db,port_db))

            self.cur=self.conn.cursor()

        except psycopg2.Error as e:

            self.conn = ""

            self.cur = ""

            print("创建数据库连接失败,退出运行|postgresql Error,请看系统日志!")

            config.logger.exception(e)

            sys.exit(0)

修改我们之前OP数据库的时候的清理某表数据和查询函数:


#定义对表的清空

def ini_table(self,

tablename:"表名")->"清空表数据结果":

    try:

        rows_affect = self.cur.execute("select count(*) from {}".format(tablename))

        test = self.cur.fetchone()  # 获取一条结果

        self.cur.execute("truncate table {}".format(tablename))

        self.conn.commit()

        result={'code':'0000','message':'执行清空表操作成功','data':test[0]}

        config.logger.info("清空{}表,操作数据{}条,操作:{}!".format(tablename,result['data'],result['message']))

    except psycopg2.Error as e:

        self.conn.rollback()  # 执行回滚操作

        result={'code':'9999','message':'执行批量插入异常','data':[]}

        print ("数据库错误|insert_data : %s" % (e.args[0]))

        config.logger.exception(e)

    return result

    #定义对表的清空

    def ini_table(self,

    tablename:"表名")->"清空表数据结果":

        try:

            rows_affect = self.cur.execute("select count(*) from {}".format(tablename))

            test = self.cur.fetchone()  # 获取一条结果

            self.cur.execute("truncate table {}".format(tablename))

            self.conn.commit()

            result={'code':'0000','message':'执行清空表操作成功','data':test[0]}

            config.logger.info("清空{}表,操作数据{}条,操作:{}!".format(tablename,result['data'],result['message']))

        except psycopg2.Error as e:

            self.conn.rollback()  # 执行回滚操作

            result={'code':'9999','message':'执行批量插入异常','data':[]}

            print ("数据库错误|insert_data : %s" % (e.args[0]))

            config.logger.exception(e)

        return result

    #查询表的所有数据

    def select_all(self, 

    tablename:"表名")->"返回list,存放查询的结果":

        try:

            rows_affect = self.cur.execute("select * from {}".format(tablename))

            test = self.cur.fetchall()

            self.conn.commit()

            result={'code':'0000','message':'查询表成功','data':test}

            config.logger.info("查询{}表,共查询数据{}条,操作:{}!".format(tablename,len(result['data']),result['message']))

        except psycopg2.Error as e:

            self.conn.rollback()  # 执行回滚操作

            result={'code':'9999','message':'查询数据异常','data':[]}

            print ("数据库错误| select_all,请看系统日志!")

            config.logger.exception(e)

        return result

以及调用完毕后的链接断开代码:


    #数据库关闭

    def __del__(self):

        # 情况线程池的任务

        if self.all_task != []:

            wait(self.all_task, return_when=ALL_COMPLETED)

            print(len(self.all_task))

        if self.conn:

            self.conn.close()

            print("数据库断开链接...")

            config.logger.info("数据库断开链接...")

测试一下清空操作:


if __name__ == "__main__":

    op_postgre = OperationDbInterface()

    # 初始化样表

    result =op_postgre.ini_table("sample_data")

    if result['code']=='0000':

        print("操作数据{}条,操作:{}!".format(result['data'],result['message']))

    else:

        print(result)


(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/op_postgresql/opsql.py

创建数据库成功|postgresql, linezone,user:postgres,host:localhost,port:5432

操作数据3206条,操作:执行清空表操作成功!

(env) PS F:\workspace>

ok!nice

 同样采用线程并发池,来op数据库,修改之前的代码,优化😝

还是采用线程池的并发:


    #定义在城市样本表中插入数据操作

    def insert_sample_data(self, 

    condition : "insert语句" = "insert into sample_data(address,ad_type,coordinates) values (%s,%s,%s)", 

    params : "insert数据,列表形式[('地域名1','1','经纬度'),('地域名2','1','经纬度')]" = [('地域名1','1','经纬度'),('地域名2','1','经纬度')]

    ) -> "字典形式的批量插入数据结果" :

        def test(l):

            # 插入一条数据

            self.cur.execute(condition,l)

            self.conn.commit()

        executor = ThreadPoolExecutor(max_workers=99)

        def do_calculation(i):

            self.all_task.append(executor.submit(test,i))

        try:

            # 利用线程池--concurrent.futures模块来管理多线程:

            self.all_task.append(executor.submit(test,params))

            result={'code':'0000','message':'执行批量插入操作成功','data':len(params)}

            config.logger.info("在样本表sample_data中插入数据{}条,操作:{}!".format(result['data'],result['message']))

        except psycopg2.Error as e:

            self.conn.rollback()  # 执行回滚操作

            result={'code':'9999','message':'执行批量插入异常','data':[]}

            print ("数据库错误|insert_data : %s" % (e.args[0]))

            config.logger.exception(e)

        return result

好的我们先在main.py写个测试函数用测试数据看一下:


@config.logging_time

def test_2():

     # 定义excel中操作对象excel_data

    excel_data = excel2sql.OpExcel(config.src_path+"\\data\\"+config.new_file,"全国城市省市县区域列表")

    all_cities = excel_data.sh_data

    # 获取excel_data的数据总量

    print("数据总共条数,和列数:",all_cities.nrows,all_cities.ncols)

    # data = [all_cities.row_values(i) for i in range(15,22)]

    # 定义数据库操作对象

    op_postgre = opsql.OperationDbInterface()

    # 获取插入样表数据的sql语句,并插入数据,默认不输入省名就查询所有

    data_insert_sample = excel_data.init_SampleViaProvince_name()

    # print(data_insert_sample)

    test_data = [['新疆省省直辖行政单位', 0, 12345.0], ['新疆省省直辖行政单位', 0, 12345.0], ['新疆省省直辖行政单位', 0, 12345.0], ['新疆省省直辖行政单位', 0, 12345.0]]

    print("获取的数据量",len(data_insert_sample))

    result = op_postgre.insert_sample_data("insert into sample_data(address,ad_type,coordinates) values (%s,%s,%s)",test_data)

    # 初始化样表

    # result =op_postgre.ini_table("sample_data")

    if result['code']=='0000':

        print("操作数据{}条,操作:{}!".format(result['data'],result['message']))

    else:

        print(result)

运行成功!:


(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/main.py

test_2 is running

数据总共条数,和列数: 2867 5

创建数据库成功|postgresql, linezone,user:postgres,host:localhost,port:5432

获取的数据量 3206

操作数据4条,操作:执行批量插入操作成功!

test_2 time func takes 0:00:00.351036

1

数据库断开链接...

(env) PS F:\workspace>

速度有点离谱的快,我们看看数据是否真的存进去了?

nice!

 从数据库中调取地点数据,并1-1地来通过高德API来获取距离和路径,处理少量数据(0级城市)

因为我们需要比较地地点是:

- 0级别和0级别距离

- 1级别和1级别距离

那我我们将两种地点分类:

我们需要从城市的list中生成1-1关系,根据握手原则,这里我采用了无向图的满图计算节点的方法,并用递归的算法来实现:


    def get_disViaCoordinates(self,

    addList:"一个列表存放地址数据",

    ) ->  "{'origin':[],'destination':[],'distance':[],'route':[]}":

        def test(origin, destination):

            # API炸了,先用假数据吧

            # url2='https://restapi.amap.com/v3/direction/driving?origin='+origin+'&destination='+destination+'&extensions=all&output=json&key=3e2235273dd2c0ca2421071fbb96def4'

            # #编码

            # newUrl2 = parse.quote(url2, safe="/:=&?#+!$,;'@()*[]")

            # #发送请求

            # response2 = urllib.request.urlopen(newUrl2)

            # #接收数据

            # data2 = response2.read()

            # #解析json文件

            # jsonData2 = json.loads(data2)

            # #输出该json中所有road的值

            # # print(jsonData2)

            # road=jsonpath.jsonpath(jsonData2,'$..road')

            # #从json文件中提取距离

            # distance = jsonData2['route']['paths'][0]['distance']

            # 假数据

            distance = 999

            road = ['路径1', '路径2', '路径3']

            #字典dict_route中追加数据

            self.dict_route['origin'].append('origin')

            self.dict_route['destination'].append('destination')

            self.dict_route['distance'].append('distance')

            self.dict_route['road'].append('road')

        def base(a,b=0):

            for i in range(len(a)-1):

                # print(a[b][0],a[i+1][0])

                origin = a[b][0]

                destination = a[i+1][0]

                with ThreadPoolExecutor(max_workers=99) as executor:

                    # 利用线程池--concurrent.futures模块来管理多线程:

                    self.all_task.append(executor.submit(test,origin, destination))

                if i+1 == len(a)-1:

                    b += 1

                    a = a[b:]

                    if len(a) == 1:

                        break

                    base(a,b=0)

        base(addList)

        if self.all_task != []:

            wait(self.all_task, return_when=ALL_COMPLETED)

        return self.dict_route

编写测试函数从数据库中查询数据,根据握手原理 并查询一对一地点路径,向数据库注入0级别城市的距离等数据,0级别城市数据又5w条


@config.logging_time

def test_3(a):

     # generate_sampledata()

    op_postgre = opsql.OperationDbInterface()

    addList = op_postgre.select_all("sample_data")['data']

    test = geo_map.Geo_mapInterface()

    print("所有地点个数",len(addList))

    lsit_city = []

    # 第0级别的数据

    lsit_city.append([list(i) for i in addList if i[1] == 0])

    # 第1级别的数据

    lsit_city.append([list(i) for i in addList if i[1] == 1])

    print(len(lsit_city[1]))

    asd = lsit_city[a][:]

    all_test = test.get_disViaCoordinates(asd)

    # print(all_test[:10])

    num = len(all_test['origin'])

    all_data = []

    # print(len(all_data))

    all_data = [[str(all_test[i][x]) for i in all_test] for x in range(len(all_test['origin']))]

    result = op_postgre.insert_sample_route("insert into sample_route(origin,destination,distance,route) values (%s,%s,%s,%s)",all_data)

    # # # 初始化样表

    # result =op_postgre.ini_table("sample_route")

    if result['code']=='0000':

        print("操作数据{}条,操作:{}!".format(len(result['data']),result['message']))

    else:

        print(result)

当我们运行的时候可以明显的看到线程急剧的增加还是很有乐趣,哈哈

我们将查询的饿数据存到参数中,通过调试台可以看到 大约又57630条数据:

我们向数据库中插入城市0类型的数据:


(env) PS F:\workspace> & f:/workspace/env/Scripts/python.exe f:/workspace/城市距离爬取/main.py

test_3 is running

创建数据库成功|postgresql, linezone,user:postgres,host:localhost,port:5432

所有地点个数 3206

2866

操作数据57630条,操作:执行批量插入操作成功!

this function < test_3 > takes time: 0:00:14.047413

数据库断开链接...

(env) PS F:\workspace>

查看数据库:

这个数据量应该是和我们之前算的一样!

操作成功!

but!

1级别城市2800多个但是0级别的只有300多个。当握手原理去匹配数据之后,数量级肯定不是一个单位。

脑壳疼🙃

最后抛出一个讨论,我们利用公式(握手原理,满无向图的边为N(N-1)/2)算出,2800多条城市1-1关系为400w条,如何更好的插入数据库呢?

本作品采用《CC 协议》,转载必须注明作者和本文链接
文章!!首发于我的博客Stray_Camel(^U^)ノ~YO
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!