datetime:2025/04/21 11:26
author:nzb

定时器频率控制

import time
import random
import threading

import linuxfd
import select


class Timer(object):
    '''Timer class for accurate loop rate control
    This class does not use Python's built-in thread timing control
    or management. Only use this class on Linux platforms.
    '''

    def __init__(self, interval: float) -> None:
        self.__epl, self.__tfd = self.__create_timerfd(interval)

    @staticmethod
    def __create_timerfd(interval: float):
        '''Produces a timerfd file descriptor from the kernel
        '''
        tfd = linuxfd.timerfd(rtc=True, nonBlocking=True)
        tfd.settime(interval, interval)
        epl = select.epoll()
        epl.register(tfd.fileno(), select.EPOLLIN)
        return epl, tfd

    def sleep(self) -> None:
        '''Blocks the thread holding this func until the next time point
        '''
        events = self.__epl.poll(-1)
        for fd, event in events:
            if fd == self.__tfd.fileno() and event & select.EPOLLIN:
                self.__tfd.read()


time_sec = None
data = []


def time_sleep():
    global time_sec, data
    while True:
        t = time.time()
        if time_sec is None or int(t) == time_sec:
            time_sec = int(t)
            data.append(1)
        else:
            print(f"time_sleep count: {len(data)}")
            data = []
            data.append(1)
            time_sec = int(t)
        # 模拟程序耗时
        program_cost_time = random.random() / 10
        time.sleep(program_cost_time)
        # time.sleep(1/10)
        # 改进版
        remaing_time = 0.1 - program_cost_time
        if remaing_time > 0:
            time.sleep(remaing_time)


time_sec2 = None
data2 = []


def timer_sleep():
    global time_sec2, data2
    timer = Timer(1/10)

    while True:
        t = time.time()
        if time_sec2 is None or int(t) == time_sec2:
            time_sec2 = int(t)
            data2.append(1)
        else:
            print(f"timer_sleep count: {len(data2)}")
            data2 = []
            data2.append(1)
            time_sec2 = int(t)
        # 模拟程序耗时
        program_cost_time = random.random() / 10
        time.sleep(program_cost_time)
        timer.sleep()


if __name__ == '__main__':
    threading.Thread(target=timer_sleep).start()
    threading.Thread(target=time_sleep).start()

"""
timer_sleep count: 1
time_sleep count: 1
timer_sleep count: 10
time_sleep count: 6
timer_sleep count: 10
time_sleep count: 6
timer_sleep count: 10
time_sleep count: 7
timer_sleep count: 10
time_sleep count: 6
timer_sleep count: 10
time_sleep count: 8

# 改进版
timer_sleep count: 5
time_sleep count: 5
timer_sleep count: 10
time_sleep count: 10
timer_sleep count: 10
time_sleep count: 10
timer_sleep count: 10
"""

results matching ""

    No results matching ""