PX4的linux调度

作为鼎鼎大名的飞控,PX4实际上支持在linux下直接运行的,因此可以不必再外接pixhawk。

那么linux作为一个非实时的操作系统,PX4具体是如何在上面运行的呢?这篇文章就是为了探究这个问题。

首先在main.cpp中找到初始化函数:

void init_once()
{
    _shell_task_id = pthread_self();

    work_queues_init();
    hrt_work_queue_init();

    px4_platform_init();
}

除了第一句以外,其他的看起来都跟调度可能有关,挨个去看看。

Work Queues Init

void work_queues_init(void)
{
    px4_sem_init(&_work_lock[HPWORK], 0, 1);
    px4_sem_init(&_work_lock[LPWORK], 0, 1);
#ifdef CONFIG_SCHED_USRWORK
    px4_sem_init(&_work_lock[USRWORK], 0, 1);
#endif

    // Create high priority worker thread
    g_work[HPWORK].pid = px4_task_spawn_cmd("hpwork",
                        SCHED_DEFAULT,
                        SCHED_PRIORITY_MAX - 1,
                        2000,
                        work_hpthread,
                        (char *const *)NULL);

    // Create low priority worker thread
    g_work[LPWORK].pid = px4_task_spawn_cmd("lpwork",
                        SCHED_DEFAULT,
                        SCHED_PRIORITY_MIN,
                        2000,
                        work_lpthread,
                        (char *const *)NULL);

}

其中,px4_task_spawn_cmd是用来新建一个可调度的task,SCHED_DEFAULT是一个宏定义,定义为:SCHED_FIFO,对linux比较熟悉的应该知道,这是一种实时调度策略,
高优先级线程可以抢占低优先级线程,并且如果高优先级线程不主动释放CPU,低优先级线程是无法执行的。
px4_task_spawn_cmd函数声明为:

px4_task_t px4_task_spawn_cmd(const char *name, int scheduler, int priority, int stack_size, px4_main_t entry,
                  char *const argv[])

内部实现是调用操作系统的pthread。

好了,通过这里可以发现,这里新建了两个work queue,分别是高优先级的队列和低优先级的队列。

接下来进去看看这两个队列管理线程里干了些啥事

int work_hpthread(int argc, char *argv[])
{
    /* Loop forever */

    for (;;) {
        /* First, perform garbage collection.  This cleans-up memory de-allocations
         * that were queued because they could not be freed in that execution
         * context (for example, if the memory was freed from an interrupt handler).
         * NOTE: If the work thread is disabled, this clean-up is performed by
         * the IDLE thread (at a very, very low priority).
         */

#ifndef CONFIG_SCHED_LPWORK
        sched_garbagecollection();
#endif

        /* Then process queued work.  We need to keep interrupts disabled while
         * we process items in the work list.
         */

        work_process(&g_work[HPWORK], HPWORK);
    }

    return PX4_OK; /* To keep some compilers happy */
}

注释里写得很清楚,首先是垃圾回收(一些上下文下无法及时释放的内存会在这里释放,但是sched_garbagecollection似乎没实际使用,因为CONFIG_SCHED_LPWORK这个宏是有定义的,而且在work_lpthread中,将sched_garbagecollection注释了,且我也没找到这个函数的定义或者声明,这里先不管他),然后开始实际的work_progress,再进去看看:(出于方便,我会直接将标注用中文注释的方式写在下方代码中)

static void work_process(struct wqueue_s *wqueue, int lock_id)
{
    volatile struct work_s *work;
    worker_t  worker;
    void *arg;
    uint64_t elapsed;
    uint32_t remaining;
    uint32_t next;

    /* Then process queued work.  We need to keep interrupts disabled while
     * we process items in the work list.
     */

    next  = CONFIG_SCHED_WORKPERIOD;

    work_lock(lock_id);

    work  = (struct work_s *)wqueue->q.head;

    while (work) {
        /* Is this work ready?  It is ready if there is no delay or if
         * the delay has elapsed. qtime is the time that the work was added
         * to the work queue.  It will always be greater than or equal to
         * zero.  Therefore a delay of zero will always execute immediately.
         */

        elapsed = USEC2TICK(clock_systimer() - work->qtime);
        // 检测该work上次执行到现在所过的时间

        //printf("work_process: in ticks elapsed=%lu delay=%u\n", elapsed, work->delay);
        if (elapsed >= work->delay) {
            /* Remove the ready-to-execute work from the list */

            (void)dq_rem((struct dq_entry_s *)work, &wqueue->q);

            /* Extract the work description from the entry (in case the work
             * instance by the re-used after it has been de-queued).
             */

            worker = work->worker;
            arg    = work->arg;

            /* Mark the work as no longer being queued */
            // 取出worker 和 参数,并将标记为NULL(表示已经执行过)
            work->worker = NULL;

            /* Do the work.  Re-enable interrupts while the work is being
             * performed... we don't have any idea how long that will take!
             */

             // 这里他说重新使能中断了,但实际上之类只是设置了一下信号量,因此可以推测中断也用了这个信号量来进行同步
            work_unlock(lock_id);

            if (!worker) {
                PX4_WARN("MESSED UP: worker = 0\n");

            } else {
                worker(arg);
                // 实际运行这个work
            }

            /* Now, unfortunately, since we re-enabled interrupts we don't
             * know the state of the work list and we will have to start
             * back at the head of the list.
             */
            // 如上所言,由于实际执行了一项work,此时应该从头开始遍历一下这个工作队列,(可能在队列头部有一些work已经就绪了,这时候应该去执行队列头部的work)

            work_lock(lock_id);
            work  = (struct work_s *)wqueue->q.head;

        } else {

            // 整个else逻辑就是:
            // 这个work还未就绪,那么计算一下它离就绪还有多久,如果比next还少,那么就将next赋值为这个下次就绪时间
            // 遍历一遍后,next就是最快将要就绪的work的等待时间,然后再usleep 这个时间即可。

            /* This one is not ready.. will it be ready before the next
             * scheduled wakeup interval?
             */

            /* Here: elapsed < work->delay */
            remaining = USEC_PER_TICK * (work->delay - elapsed);

            if (remaining < next) {
                /* Yes.. Then schedule to wake up when the work is ready */

                next = remaining;
            }

            /* Then try the next in the list. */

            work = (struct work_s *)work->dq.flink;
        }
    }

    /* Wait awhile to check the work list.  We will wait here until either
     * the time elapses or until we are awakened by a signal.
     */
    work_unlock(lock_id);

    px4_usleep(next);
}

到这里,调度的逻辑已经比较清晰了,即通过两个work_queue,这两个work_queue自身通过linux得SCHED_FIFO来调度,在它们内部,会自己进行调度。
刚刚分析的主要是work_hpthread,但实际上work_lpthreadwork_hrtthread的流程也几乎是一样的,这里就不再分析了。

总结一下这一部分,基本就是三种work queue的初始化,分别是hpthread(高优先级) lphread(低优先级)和hrtthread(依赖高分辨率定时器的workqueue)

Platform Init

接下来是平台级的一些初始化,

int px4_platform_init(void)
{
    hrt_init();

    param_init();

    px4::WorkQueueManagerStart();

    uorb_start();

    px4_log_initialize();

    return PX4_OK;
}

我们主要关注一下WorkQueueManagerStart,该函数

static int
WorkQueueManagerRun(int, char **)
{
    _wq_manager_wqs_list = new BlockingList<WorkQueue *>();
    _wq_manager_create_queue = new BlockingQueue<const wq_config_t *, 1>();

    while (!_wq_manager_should_exit.load()) {
        // create new work queues as needed
        const wq_config_t *wq = _wq_manager_create_queue->pop();

        if (wq != nullptr) {
            // create new work queue
            // 如果有work queue 待创建,那么下面据开始创建流程。
            // 一开始_wq_manager_create_queue是空的,只有当运行到其他模块时,才会往这里添加元素

            pthread_attr_t attr;
            int ret_attr_init = pthread_attr_init(&attr);

            // .. 省略 attr 和 优先级的设置

            // create thread
            pthread_t thread;
            int ret_create = pthread_create(&thread, &attr, WorkQueueRunner, (void *)wq);

            // ...
            // destroy thread attributes
            int ret_destroy = pthread_attr_destroy(&attr);

            if (ret_destroy != 0) {
                PX4_ERR("failed to destroy thread attributes for %s (%i)", wq->name, ret_create);
            }
        }
    }

    return 0;
}

那么什么时候会往_wq_manager_create_queue添加元素呢?首先是WorkQueueFindOrCreate,该函数会往_wq_manager_create_queue push 一个新元素。

WorkQueue *
WorkQueueFindOrCreate(const wq_config_t &new_wq)
{
    if (_wq_manager_create_queue == nullptr) {
        PX4_ERR("not running");
        return nullptr;
    }

    // search list for existing work queue
    WorkQueue *wq = FindWorkQueueByName(new_wq.name);

    // create work queue if it doesn't already exist
    if (wq == nullptr) {
        // add WQ config to list
        //  main thread wakes up, creates the thread
        _wq_manager_create_queue->push(&new_wq);

        // we wait until new wq is created, then return
        uint64_t t = 0;

        while (wq == nullptr && t < 10_s) {
            // Wait up to 10 seconds, checking every 1 ms
            t += 1_ms;
            px4_usleep(1_ms);

            wq = FindWorkQueueByName(new_wq.name);
        }

        if (wq == nullptr) {
            PX4_ERR("failed to create %s", new_wq.name);
        }
    }

    return wq;
}

WorkQueueFindOrCreate 又在WorkItem::Init中被调用

bool WorkItem::Init(const wq_config_t &config)
{
    // clear any existing first
    Deinit();

    px4::WorkQueue *wq = WorkQueueFindOrCreate(config);

    if ((wq != nullptr) && wq->Attach(this)) {
        _wq = wq;
        _time_first_run = 0;
        return true;
    }

    PX4_ERR("%s not available", config.name);
    return false;
}

什么是WorkItem?随手举个例子,如固定翼的控制模块:

FixedwingPositionControl::FixedwingPositionControl(bool vtol) :
    ModuleParams(nullptr),
    WorkItem(MODULE_NAME, px4::wq_configurations::nav_and_controllers),
    _attitude_sp_pub(vtol ? ORB_ID(fw_virtual_attitude_setpoint) : ORB_ID(vehicle_attitude_setpoint)),
    _loop_perf(perf_alloc(PC_ELAPSED, MODULE_NAME": cycle")),
    _launchDetector(this),
    _runway_takeoff(this)

也就是说,在PX4的一些组件模块初始化的时候,会创建一个WorkItem,然后将与该WorkItem关联的workqueue加到待初始化的work queue列表中,由WorkQueueManager来实际地创建线程。

不过,需要注意的是,这里创建的都是线程,也就是和一开始讨论的hpthread lpthread等线程都是同级的关系。有哪些操作是通过线程直接调度的,可以查看wq_configurations。可以看到,里面大部分都是一些接口,如I2C、SPI(另外高度、速度控制也作为一个work queue)。可以推测,每种接口都是一个work queue,如一个I2C总线下挂多个设备,那么每个设备的操作就是该work queue的一项work。每项work的调度就是最开始hpthread那样地调度了。

总结

总结一下,PX4在linux上(或者说在posix上),主要是通过线程调度+work queue内部调度来完成的。

依据所要完成的操作种类,会有多个work queue,每个workqueue都由一个线程管理,也即每个workque 自身是由linux通过线程来调度的。不同的workqueue有不同的优先级,该优先级也是与linux FIFO调度的优先级对应的。

一个workqueue下会有多个work,workqueue也是通过FIFO的方式来调度,但是与linux的SCHED_FIFO不同,这里的FIFO不会发生抢占(因为这里面的每项work已经没有优先级的区分了)。

那么,这种方式有缺点吗?我想应该是有的,每个线程(或者说workqueue)都要仔细考虑其优先级,如果高优先级的workqueue中有太多的work,那么显然它是不会将CPU释放给低优先级的workqueue的,因此如果处理器性能不够,高优先级的线程实时性能够得到保证,但是低优先级的线程实时性可能就无法保证了。(但是,低优先级线程的实时性重要吗?如果重要,怎么不设置为高优先级呢?)另外,其他rtos的实现不也是如此吗,如果有一个高优先级的线程在执行,其他低优先级的线程同样无法抢占(在抢占式调度下),因此这样说来,实际上这种方式并不比rtos在调度上逊色太多。当然,由于linux的内核比较重,因此与RTOS相比,还有内核调度上的一些开销。

此外,linux上要运行PX4,要打开内核抢占(实现软实时),打实时性补丁(实现硬实时)的,虽然PX4官方没提到,但是其提供的在树莓派上运行的一个OS,实际上是已经打过补丁的了。

函数声明、变量类型备查

static BlockingQueue<const wq_config_t *, 1> *_wq_manager_create_queue{nullptr};
static BlockingList<WorkQueue *> *_wq_manager_wqs_list{nullptr};    //当前正在的执行的work queues,如果某个workqueue不再需要执行,会被移出这个队列
static void work_process(struct wqueue_s *wqueue, int lock_id);  // 实际的workqueue 处理函数
struct wqueue_s g_work[NWORKERS];
px4_sem_t _work_lock[NWORKERS];
struct wqueue_s g_hrt_work;
struct wq_config_t {
    const char *name;
    uint16_t stacksize;
    int8_t relative_priority; // relative to max
};
namespace wq_configurations
{
static constexpr wq_config_t rate_ctrl{"wq:rate_ctrl", 1952, 0}; // PX4 inner loop highest priority
static constexpr wq_config_t ctrl_alloc{"wq:ctrl_alloc", 9500, 0}; // PX4 control allocation, same priority as rate_ctrl

static constexpr wq_config_t SPI0{"wq:SPI0", 2336, -1};
static constexpr wq_config_t SPI1{"wq:SPI1", 2336, -2};
static constexpr wq_config_t SPI2{"wq:SPI2", 2336, -3};
static constexpr wq_config_t SPI3{"wq:SPI3", 2336, -4};
static constexpr wq_config_t SPI4{"wq:SPI4", 2336, -5};
static constexpr wq_config_t SPI5{"wq:SPI5", 2336, -6};
static constexpr wq_config_t SPI6{"wq:SPI6", 2336, -7};

static constexpr wq_config_t I2C0{"wq:I2C0", 2336, -8};
static constexpr wq_config_t I2C1{"wq:I2C1", 2336, -9};
static constexpr wq_config_t I2C2{"wq:I2C2", 2336, -10};
static constexpr wq_config_t I2C3{"wq:I2C3", 2336, -11};
static constexpr wq_config_t I2C4{"wq:I2C4", 2336, -12};

// PX4 att/pos controllers, highest priority after sensors.
static constexpr wq_config_t nav_and_controllers{"wq:nav_and_controllers", 2240, -13};

static constexpr wq_config_t INS0{"wq:INS0", 6000, -14};
static constexpr wq_config_t INS1{"wq:INS1", 6000, -15};
static constexpr wq_config_t INS2{"wq:INS2", 6000, -16};
static constexpr wq_config_t INS3{"wq:INS3", 6000, -17};

static constexpr wq_config_t hp_default{"wq:hp_default", 1900, -18};

static constexpr wq_config_t uavcan{"wq:uavcan", 3624, -19};

static constexpr wq_config_t UART0{"wq:UART0", 1632, -21};
static constexpr wq_config_t UART1{"wq:UART1", 1632, -22};
static constexpr wq_config_t UART2{"wq:UART2", 1632, -23};
static constexpr wq_config_t UART3{"wq:UART3", 1632, -24};
static constexpr wq_config_t UART4{"wq:UART4", 1632, -25};
static constexpr wq_config_t UART5{"wq:UART5", 1632, -26};
static constexpr wq_config_t UART6{"wq:UART6", 1632, -27};
static constexpr wq_config_t UART7{"wq:UART7", 1632, -28};
static constexpr wq_config_t UART8{"wq:UART8", 1632, -29};
static constexpr wq_config_t UART_UNKNOWN{"wq:UART_UNKNOWN", 1632, -30};

static constexpr wq_config_t lp_default{"wq:lp_default", 1920, -50};

static constexpr wq_config_t test1{"wq:test1", 2000, 0};
static constexpr wq_config_t test2{"wq:test2", 2000, 0};

} // namespace wq_configurations

int pthread_create(
                 pthread_t *restrict tidp,   //新创建的线程ID指向的内存单元。
                 const pthread_attr_t *restrict attr,  //线程属性,默认为NULL
                 void *(*start_rtn)(void *), //新创建的线程从start_rtn函数的地址开始运行
                 void *restrict arg //默认为NULL。若上述函数需要参数,将参数放入结构中并将地址作为arg传入。
                  );

2022.4.10更新

之前对于PX4的调度方式,实际上还没完全弄清楚,且有一些误解,但基本是正确的:

每个work_queue对应一个线程,全局初始化了三个work_queue,分别是hp,lp 和hrt。其中,hrt是为了ScheleduledWorkItem而实现的。

另外,还有一个WorkQueueManager,这个线程也是最高优先级的。由于上面这些wq_INS0, wq_SPI0 这些work_queue是在运行时才建立的,因此使用WorkQueueManager来管理。
这些work_queue的建立方式:

int ret_create = pthread_create(&thread, &attr, WorkQueueRunner, (void *)wq);

其中wq就是某个wq_config_t,如wq_SPI0

WorkQueueRunner 中再调用具体的WorkQueue的Run函数:

static void *
WorkQueueRunner(void *context)
{
    wq_config_t *config = static_cast<wq_config_t *>(context);
    WorkQueue wq(*config);

    // add to work queue list
    _wq_manager_wqs_list->add(&wq);

    wq.Run();

    // remove from work queue list
    _wq_manager_wqs_list->remove(&wq);

    return nullptr;
}

为什么这里要加个WorkQueueRunner再封装一层,而不是直接调用wq.Run呢,主要是为了在wq.Run执行结束后,能进行remove操作,如果wq.Run自己作为一个线程,那么它结束的时候就悄无声息结束了,无法将其移除。

work_queue中的基本单位是WorkItem。

其基本运行调用方式是:

void WorkQueue::Run()
{
    while (!should_exit()) {
        // loop as the wait may be interrupted by a signal
        do {} while (px4_sem_wait(&_process_lock) != 0);

        work_lock();

        // process queued work
        while (!_q.empty()) {
            WorkItem *work = _q.pop();

            work_unlock(); // unlock work queue to run (item may requeue itself)
            work->RunPreamble();
            work->Run();
            // Note: after Run() we cannot access work anymore, as it might have been deleted
            work_lock(); // re-lock
        }

#if defined(ENABLE_LOCKSTEP_SCHEDULER)

        if (_q.empty()) {
            px4_lockstep_unregister_component(_lockstep_component);
            _lockstep_component = -1;
        }

#endif // ENABLE_LOCKSTEP_SCHEDULER

        work_unlock();
    }

    PX4_DEBUG("%s: exiting", _config.name);
}

在队列为空的时候,会在_process_lock这里停下来,因为这个信号量初始化为0。等到有Item 通过Add加入后,会post这个信号量,使其往下执行:

void WorkQueue::Add(WorkItem *item)
{
    work_lock();

    _q.push(item);
    work_unlock();

    SignalWorkerThread();
}

void WorkQueue::SignalWorkerThread()
{
    int sem_val;

    if (px4_sem_getvalue(&_process_lock, &sem_val) == 0 && sem_val <= 0) {
        px4_sem_post(&_process_lock);
    }
}

此外,还有一个特殊的WorkItem类型,叫做:ScheduledWorkItem,该类型额外实现了:

bool    Scheduled ()

void    ScheduleDelayed (uint32_t delay_us)

void    ScheduleOnInterval (uint32_t interval_us, uint32_t delay_us=0)

void    ScheduleAt (hrt_abstime time_us)

void    ScheduleClear ()

基本的WorkItem的调度函数仅有一个ScheduleNow,用来马上再调度一次本Item。ScheduledWorkItem 额外实现的这些调度函数,是配合hrt实现的,基本思路就是
hrt_workqueue中,跑一个hrt_tim_isr(当然,这个是模拟的,在nuttx中可能是硬件中断),一直来检测是否有item的deadline到了,如果到了,就调用ScheduleNow
将其加入WorkQueue中处理。

《PX4的linux调度》有1条评论

  1. Do you mind if I quote a couple of your posts as long as I provide credit and
    sources back to your site? My website is in the exact same area of
    interest as yours and my visitors would truly benefit from a lot of the information you present
    here. Please let me know if this ok with you. Thanks a lot!

    回复

发表评论