背景
开发过程中需要记录机器人串口发送的数据,就想到之前体验感很好的loguru
,就想着继续用它做一个:
- 可以定期、定量、动作触发打包压缩,
- 日志等级分类
- 配置简单
- 占用资源低
的机器人本体日志记录的系统,同时也要有很好的功能扩展性,后期扩展功能方便等
loguru配置
loguru是最早开发AMR调度系统发现的,比起去大量自定义logging
,实在是太方便了,同时也是异步的,不会阻塞当前逻辑。
可以按照官方文档配置:
logger.add(
logPath,
format="{time:YYYY-MM-DD at HH:mm:ss:SSS} | {message}",
rotation=logger_rotation,
retention=logRetention,
compression=logCompression,
encoding=logEncoding,
enqueue=True,
)
这里为了是配置文件外置成config.ini
,使用了configparser
库,这样可以在不修改代码的情况下,通过修改配置文件来改变功能,这里不再赘述。
rotation函数
着重要说明的是rotation函数的配置,如果使用默认的rotation=00:00
或者rotation=500 MB
只能单一的实现每天或定量打包压缩日志,如果想要两种条件都匹配以及自定义一些条件,比如启动时打包便于本体开发人员截断之前日志等等,就需要使用自定义函数来实现,参考Log rotate by time and size #241:
class Rotator:
def __init__(self, size, at):
self._size = size
now = datetime.now()
today_at_time = now.replace(hour=at.hour, minute=at.minute, second=at.second)
if now >= today_at_time:
# the current time is already past the target time so it would rotate already
# add one day to prevent an immediate rotation
self._next_rotate = today_at_time + timedelta(days=1)
else:
self._next_rotate = today_at_time
def should_rotate(self, message, file):
file.seek(0, 2)
if file.tell() + len(message) > self._size:
return True
if message.record["time"].timestamp() > self._next_rotate.timestamp():
self._next_rotate += timedelta(days=1)
return True
return False
要实现每次启动程序,我的做法是定义一个全局变量和seek()
值来判断是否需要return True
,如果有更优美的实现请告诉我。这里也考虑过读取日志文件的创建时间来做判断,使用get_crtimes(os.path.abspath(file.name))
在Linux系统下获得的时间为文件最后修改的时间,而不是文件创建的时间。
使用serial_asyncio
对于这种IO密集的逻辑和仅仅只是日志记录,最好的选择还是异步架构,Python的asyncio
,读取串口数据的https://pyserial-asyncio.readthedocs.io/en/latest/,程序伪代码如下:
async def handle_serial_data(reader, writer):
while True:
try:
ByteData = await reader.readline()
if ByteData and len(ByteData) > 3: # filter of 3 strings data
StrData = ByteData.decode(
"utf-8", errors='ignore') # fix error
StrData.strip().split("\n")
logger.info(StrData)
else:
pass
except Exception as e:
# exception handle
async def main():
try:
loop = asyncio.get_event_loop()
reader, writer = await serial_asyncio.open_serial_connection(
loop=loop, url=serialPort, baudrate=serialBaudrate
)
except Exception:
# exception handle
return
try:
await handle_serial_data(reader, writer)
finally:
writer.close()
await writer.wait_closed()
if __name__ == '__main__':
try:
asyncio.run(main())
except Exception as e:
logger.exception("main error!", type(e).__name__)
至此基本功能就实现了,结束了吗?
没~
因为忽视了程序实际运行的环境是树莓派而不是以往的服务器,而树莓派的eMMC寿命有限,所以日志内容分级处理和加入buffer,降低IO压力,以及使用Log2RAM,这里不谈。
在Ubuntu上使用Docker搭建ARM开发环境
由于日常开发的系统是Ubuntu 22.04,而树莓派则是ARM环境,一般来说,把Python当脚本跑的话没有区别的,但要是想用Pyinstaller打包的话,则要考虑运行环境了,为了后续开发需要,就使用Docker在Ubuntu上构建一个ARM环境。
docker buildx
docker buildx
的Docker CLI插件,它允许你在不同的架构上构建Docker镜像。首先,安装并启用buildx
插件:
# 安装docker buildx插件
docker buildx install
# 创建一个名为multiarch的构建实例,支持多种架构
docker buildx create --name multiarch
# 使用刚创建的multiarch实例
docker buildx use multiarch
安装QEMU模拟器:
sudo apt-get update && sudo apt-get install -y qemu-user-static
devcontainer.json
{
"name": "ARM Python Development Environment",
"dockerFile": "Dockerfile",
"settings": {
"terminal.integrated.shell.linux": "/bin/bash"
},
"extensions": [
"ms-python.python"
],
"remoteUser": "root"
}
Dockerfile
# 使用 ARM 架构的 Python 基础镜像
FROM arm32v7/python:3.9-slim
# 更新软件包列表并安装一些基本的工具
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
git \
libssl-dev \
zlib1g-dev \
libbz2-dev \
libreadline-dev \
libsqlite3-dev \
wget \
llvm \
libncursesw5-dev \
xz-utils \
tk-dev \
libxml2-dev \
libxmlsec1-dev \
libffi-dev \
liblzma-dev \
&& rm -rf /var/lib/apt/lists/*
# 在此处添加其他 Docker 指令
构建完成后,查看系统环境:
dpkg --print-architecture
发表回复