Python 自动化拉取 MySQL 数据并建表装载到 Oracle

Python自动化拉取Mysql数据并装载到Oracle

Mysql的数据表分为普通的表及以键值与256取模分表两种形式,暂时未处理源表是分表的情况;流程顺序是先根据Mysql表结构进行Oracle库上的建表,然后从Mysql将数据直接插入到Oracle数据库中。
处理过程中,考虑到每行一提交的效率问题,所以默认设置了1000行提交一次的Oracle数据插入方式。
目前,暂不考虑数据的增量更新;主要考虑目前的数据环境较乱,可以较方便的将适用于Oracle平台的数据拉取到Oracle数据库,然后进行相关的计算。
代码调用的时候,主要修改两方面:源、目标数据源配置信息及传源配置名、目标配置名、表名、表类型参数给调度程序。

1、目标Oracle数据库建好存储过程[p_dropTable_ifExists]供Python调用

create or replace procedure p_dropTable_ifExists(
    p_table in varchar2
) is
    v_count number(10);
begin
   select count(*)
   into v_count
   from user_tablesp
   where table_name = upper(p_table);

   if v_count > 0 then
      execute immediate 'drop table ' || p_table ||' purge';
   end if;
end p_dropTable_ifExists;

2、存储数据库配置信息及获取参数的代码

ParAndDbGet.py

# -*- coding=utf-8 -*-
import warnings
import datetime
import pymysql
import cx_Oracle
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'

warnings.filterwarnings("ignore")

srcMysql_dbConfig1 = {
    'host': '62.234.125.246',
    'user': 'root',
    'passwd': 'MyNewPass@123',
    'port': 3306,
    'db': 'test'
}

targetOracle_dbConfig = {
    'host': 'localhost',
    'user': 'scott',
    'passwd': 'dh05138087',
    'port': 1521,
    'tnsname': 'orcl'
}

def getNowDay():
    DayNow = datetime.datetime.today().strftime('%Y-%m-%d')
    return DayNow

def dateRange(beginDate, endDate):
    dates = []
    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
    date = beginDate[:]
    while date <= endDate:
        dates.append(date)
        dt = dt + datetime.timedelta(1)
        date = dt.strftime("%Y-%m-%d")
    return dates

def getMysqlDB(mysqlDbConfig):
    try:
        mysqlDbConfig = eval("{mysqlDbConfig}".format(mysqlDbConfig=mysqlDbConfig))
        host = mysqlDbConfig['host']
        user = mysqlDbConfig['user']
        passwd = mysqlDbConfig['passwd']
        port = mysqlDbConfig['port']
        db = mysqlDbConfig['db']
        conn = pymysql.connect(host=host, user=user, passwd=passwd, port=port)
        conn.autocommit(True)
        curr = conn.cursor()
        curr.execute("SET NAMES utf8");
        curr.execute("USE %s" % db);

        return conn, curr, db
    except pymysql.Error as e:
        print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
        return None, None

def getOracleDB(oracleDbConfig):
    oracleDbConfig = eval("{oracleDbConfig}".format(oracleDbConfig=oracleDbConfig))
    user = oracleDbConfig['user']
    passwd = oracleDbConfig['passwd']
    host = oracleDbConfig['host']
    port = oracleDbConfig['port']
    tnsname = oracleDbConfig['tnsname']
    conn = cx_Oracle.connect('{user}/{passwd}@{host}:{port}/{tnsname}'.format(user=user, passwd=passwd, host=host, port=port, tnsname=tnsname))
    return conn

3、获取源表的元数据信息并构建目标表建表语句

getSrcMetadata.py

# -*- coding=utf-8 -*-
from ParAndDbGet import *
import re
warnings.filterwarnings("ignore")

def getSrcMyqslMetadataAndTagCreateTabScript(mysqlDbConfig, src_tabName, tabType):
    if tabType == 'single':
        src_tabName = src_tabName
    elif tabType == 'submeter':
        src_tabName = src_tabName+"_0"

    conn, curr, db = getMysqlDB(mysqlDbConfig)
    sql_getTabStructure = "select a2.column_name,case when a2.data_type like '%int' then 'int' when a2.data_type like '%date%' then 'varchar2(19)' else a2.column_type end data_type \
        from information_schema.TABLES a1 \
        left join information_schema.columns a2 on a1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a1.TABLE_NAME=a2.TABLE_NAME \
        where a1.TABLE_SCHEMA='{db}' and a1.table_name ='{src_tabName}' \
        order by a2.ORDINAL_POSITION;".format(db=db, src_tabName=src_tabName)
    #print(sql_getTabStructure)
    curr.execute(sql_getTabStructure)
    srcTabCol_list = curr.fetchall()
    TabDropScript = "call p_dropTable_ifExists('{src_tabName}')".format(src_tabName=src_tabName)
    TabCreateScript = "create table {src_tabName}(\n".format(src_tabName=src_tabName)
    colList = []
    for srcTabCol in srcTabCol_list:
        TabCreateScript = TabCreateScript + '"' + srcTabCol[0].upper() + '"' + ' ' + srcTabCol[1].upper() + ',\n'
        colList.append(srcTabCol[0].upper())
    TabCreateScript = TabCreateScript[:-2] + ")\n"
    #正则处理oracle建表语句
    TabCreateScript = re.sub(' CHAR'+'\('+'[0-9]*'+'\)',' VARCHAR2(4000)',TabCreateScript)
    TabCreateScript = re.sub(' VARCHAR'+'\('+'[0-9]*'+'\)',' VARCHAR2(4000)',TabCreateScript)
    #print(TabCreateScript)
    return TabDropScript, TabCreateScript, colList

4、目标库建表

OracleCreateTab.py

# -*- coding=utf-8 -*-
import cx_Oracle
from getSrcMetadata import *

warnings.filterwarnings("ignore")

def OracleCreateTab(mysqlDbConfig, oracleDbConfig, src_tabName, tabType):
    ora_conn = getOracleDB(oracleDbConfig=oracleDbConfig)
    ora_curr = ora_conn.cursor()
    TabDropScript = getSrcMyqslMetadataAndTagCreateTabScript(mysqlDbConfig, src_tabName, tabType)[0]
    TabCreateScript = getSrcMyqslMetadataAndTagCreateTabScript(mysqlDbConfig, src_tabName, tabType)[1]
    ora_curr.execute(TabDropScript)
    ora_curr.execute(TabCreateScript)

5、数据从Mysql传送到Oracle

MysqlData2Oracle.py

# -*- coding=utf-8 -*-
from getSrcMetadata import *
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'

warnings.filterwarnings("ignore")

def mysqlData2Oracle(mysqlDbConfig, oracleDbConfig, src_tabName, tabType):
    colCnt = len(getSrcMyqslMetadataAndTagCreateTabScript(mysqlDbConfig=mysqlDbConfig, src_tabName=src_tabName, tabType=tabType)[2])
    allColChars=''
    for colName in getSrcMyqslMetadataAndTagCreateTabScript(mysqlDbConfig=mysqlDbConfig, src_tabName=src_tabName, tabType=tabType)[2]:
        allColChars = allColChars + '`' + colName + '`' + ','

    if tabType == 'single':
        src_tabName = src_tabName
    elif tabType == 'submeter':
        src_tabName = src_tabName+"_0"
    target_tabName = "{src_tabName}".format(src_tabName=src_tabName)

    srcQuerySql = "select " + allColChars[:-1] + " from " + src_tabName + ""
    conn, curr, db = getMysqlDB(mysqlDbConfig=mysqlDbConfig)
    print(srcQuerySql)
    curr.execute(srcQuerySql)
    srcTabData = curr.fetchall()

    targetTab_truncateSql = 'truncate table ' + target_tabName
    ora_conn = getOracleDB(oracleDbConfig=oracleDbConfig)
    ora_curr = ora_conn.cursor()
    ora_curr.execute(targetTab_truncateSql)
    ora_conn.commit()

    j = 0
    insertIntoSql = 'insert into ' + target_tabName + '(' + allColChars[:-1].replace('`', '"') + ')\n'
    for colField in srcTabData:
        allField = ''
        for i in range(0, colCnt, 1):
            allField = allField + '\'' + str(colField[i]).replace('\n', '').replace(',', '[逗号]').replace('\'','"').replace('&','') + '\'' + ','

        j += 1
        insertIntoSql = insertIntoSql + 'select ' + allField[:-1] + ' from dual union all\n'
        if (j % 1000 == 0):
            insertIntoSql = insertIntoSql[:-10]
            #print(insertIntoSql)
            ora_curr = ora_conn.cursor()
            ora_curr.execute(insertIntoSql)
            ora_conn.commit()

            insertIntoSql = 'insert into ' + target_tabName + '(' + allColChars[:-1].replace('`', '"') + ')\n'

    # remaining data proc
    insertIntoSql = insertIntoSql[:-10]
    #print(insertIntoSql)
    ora_curr = ora_conn.cursor()
    ora_curr.execute(insertIntoSql)
    ora_conn.commit()

6、调度的总控

Mysql2OracleCtl.py

# -*- coding=utf-8 -*-
from OracleCreateTab import *
from MysqlData2Oracle import *

warnings.filterwarnings("ignore")

def Mysql2OracleCtl(mysqlDbConfig, oracleDbConfig, src_tabName, tabType):
    OracleCreateTab(mysqlDbConfig, oracleDbConfig, src_tabName, tabType)
    mysqlData2Oracle(mysqlDbConfig, oracleDbConfig, src_tabName, tabType)
#测试
Mysql2OracleCtl(mysqlDbConfig=srcMysql_dbConfig1, oracleDbConfig=targetOracle_dbConfig, src_tabName='student', tabType='single')
Mysql2OracleCtl(mysqlDbConfig=srcMysql_dbConfig1, oracleDbConfig=targetOracle_dbConfig, src_tabName='course', tabType='single')
Mysql2OracleCtl(mysqlDbConfig=srcMysql_dbConfig1, oracleDbConfig=targetOracle_dbConfig, src_tabName='sc', tabType='single')
Mysql2OracleCtl(mysqlDbConfig=srcMysql_dbConfig1, oracleDbConfig=targetOracle_dbConfig, src_tabName='runoob_tbl', tabType='single')
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!