电脑有4个串口,但程序却只尝试打开其中2个,我写的是都要去尝试打开的

Python
源代码如下(因源代码是拷贝网上其他链接的,最下文还有源代码链接):
import serial
import serial.tools.list_ports
import threading
import binascii
import time
from datetime import datetime

default value

baunRate = 115200
is_read = False
is_write = False
write_buff = []
sys_buff = []
mSerial = None
callback = None
is_opened = 0
is_registed = 0
class SerialPort:
def init(self, port, buand):
self.port = serial.Serial(port, buand)
self.port.close()
if not self.port.isOpen():
self.port.open()

    # the index of data_bytes for read operation,私有属性

only used in read lines

self.read_ptr = 0
self.
read_head = 0

store all read bytes

used in read date, read lines self.__data_bytes = bytearray()

def port_open(self):
    if not self.port.isOpen():
        self.port.open()

def port_close(self):
    self.port.close()

def send(self):
    global is_write
    global write_buff

    while is_write:
        if len(write_buff):
            msg = write_buff.pop(0)
            msg = msg + "n"

cmd = msg.encode()
try:

                self.port.write(cmd)
            except:
                write_buff.clear()
                is_write = False

write_buff.clear()

def read_data(self):
    global is_read
    global is_opened
    byte_cnt = 0

while is_read:
try:
count = self.port.inWaiting()
if count > 0:
rec_str = self.port.read(count)
self.data_bytes = self.data_bytes + rec_str

                # print("receive:",rec_str.decode())

print(rec_str) byte_cnt += count

                if not is_opened:
                    is_opened = 1

print(“累计收到:”,byte_cnt)

time.sleep(0.5) self.read_lines()

        except:
            deinit()

# 将当前所有的数据都读出,读取位置不变,每次读取指针依次移动,不漏数据, 读取行为一直在进行

def read_lines(self):

    # reset

line_cnt = 0
data_len = len(self.__data_bytes)

    # print ("")

print (“begin: prt=:”, self.read_ptr, “ head =”, self.read_head,”current len =”,data_len) if self.__read_ptr >= data_len:

        return

get all lines in current data_bytes

while (self.read_ptr < data_len - 1):
if (self.
data_bytes[self.read_ptr + 1] == 0x0a and self.data_bytes[self.read_ptr] == 0x0d):
tmp = bytearray()
tmp = self.
data_bytes[self.read_head:self.read_ptr]

            try:
                line = tmp.decode()
            except:
                self.__read_head = self.__read_ptr + 2

self.read_ptr = self.read_head
continue
iprint(line)
line_cnt += 1
self.read_head = self.read_ptr + 2
self.read_ptr = self.read_head
else:
self.read_ptr = self.read_ptr + 1

def auto_open_serial():
global baunRate
global mSerial
global callback
global is_registed
global is_opened

# reset

deinit()

# 列出所有当前的com口

port_list = list(serial.tools.list_ports.comports())
port_list_name = []

# get all com

print(len(port_list))#这句是我加的
if len(port_list) <= 0:
iprint(“the serial port can’t find!”)
return False
else:
for itms in port_list:
port_list_name.append(itms.device)

# try open

print(port_list_name)
for i in port_list_name:
try:
mSerial = SerialPort(i, baunRate)
iprint(“try open %s” % i)
start_task()
send(“”)

        # return True

time.sleep(1)
if is_opened:
iprint(“connect %s successfully” % i)
return True
else:
deinit()
if i == port_list_name[len(port_list_name) - 1]:
iprint(“uart don’t open”)
break
continue except:
iprint(“ uart don’t open”)
deinit()
return False

def deinit():
global mSerial
global is_write
global is_read
global write_buff
global is_opened

if mSerial:
    mSerial.port_close()

is_opened = 0

is_read = False
is_write = False
write_buff = []

mSerial = None

time.sleep(1)

def init():
global mSerial
global callback
global is_registed
global is_opened
global is_read
global is_write

# retry

retry_time = 0
while not auto_open_serial():
if not is_opened:
iprint(“wait for uart connect, retry %s” % str(retry_time))
else:
return True
retry_time += 1
time.sleep(2)
if retry_time == 10:
iprint(“ open uart fail”)
return False

def send(msg):
global mSerial
global is_write
global write_buff
if is_write:
write_buff.append(msg)

def start_task():
global mSerial
global is_write
global is_read

if mSerial:
    is_write = True

t1 = threading.Thread(target=mSerial.send)
t1.daemon=False
t1.start()

    is_read = True

t2 = threading.Thread(target=mSerial.read_data)
t2.daemon=False
t2.start()

def iprint(msg):
global callback
global is_registed

msg = "[Uart] " + str(msg)
if is_registed:
    callback.append(msg)
else:
    print(msg)

def start_sys_cmd():
global is_registed
if is_registed:
t3 = threading.Thread(target=process_receive_sys_cmd)
t3.daemon=False
t3.start()

def process_receive_sys_cmd():
global sys_buff
global is_registed
global callback

# print("process_receive_sys_cmd")

while is_registed:

    # print ("wait,process_receive_sys_cmd")

if len(sys_buff):

        # print ("receive,process_receive_sys_cmd")

line = sys_buff.pop(0)
if “init” in line:
if is_opened and is_read and is_write:
iprint(“already open uart”)
break
iprint(“start init”)
init()
if is_opened:
break
iprint(“Eixt uart sys thread”)

def register_cback(list):
global callback
global is_registed

callback = list
is_registed = 1

def unregister_cback():
global callback
callback.clear()

if name == ‘main‘:

receive = []
register_cback(receive)
sys_buff.append("init")
start_sys_cmd()

def process_receive_msg():
    global receive
    while True:
        # print("wait")

if len(receive):

            # print("receive")

print(receive.pop(0))

t = threading.Thread(target=process_receive_msg)
t.daemon=False

t.start()

源代码链接如下:https://blog.csdn.net/cai472861/article/details/106366598?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-106366598-blog-113507728.pc_relevant_downloadblacklistv1&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-106366598-blog-113507728.pc_relevant_downloadblacklistv1&utm_relevant_index=2
在编写串口通信工具中,需要实现一个函数,自动找到对应com 口,并且连接该com口,保证后续通信正常
作为初始化过程的一部分。

思路
在win 设备管理器中,经常会出现多个com 口,但并不是每个com 口都是目标设备所链接的。
尝试打开每个com 口,输入enter 按键, 正确的com 口,会有ack log 返回,表明通信 正常

否则,没有任何log 返回,则判断为非目标设备所连接的com 口。
————————————————
版权声明:本文为CSDN博主「菜鸡学长」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:blog.csdn.net/cai472861/article/de...

讨论数量: 1
Jason990420

I don't have com port to do any test. Try following code to get what the exception information for 'COM9' or/and 'COM3'.

import serial

baudrate = 115200

for port in ('COM4', 'COM8', 'COM9', 'COM3'):
    s = serial.Serial(port=port, baudrate=baudrate)
    s.close()
    if not s.isOpen():
        s.open()
1年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!