物联网操作系统学习笔记——消息队列2

tech2023-10-19  121

消息队列实现原理

消息队列控制块 pcHead和pcTail:队列的头和尾指针。 pcWriteTo和pcReadFrom:队列要入队和出队指向的地址,便于操作 xTasksWaitingToSend、xTasksWaitingToReceive:和任务相关的链表, uxMessagesWaiting:队列里还没 处理消息的个数 cRxLock\cTxLock:接受锁和发送锁,任务和中断为了防止冲突而定义.

消息队列创建

//队列实现,实际是xQueueGenericCreate函数传参传入一个队列类型queueQUEUE_TYPE //队列的创建类型 #define queueQUEUE_TYPE_BASE ( ( uint8_t ) 0U ) //基本类型 #define queueQUEUE_TYPE_SET ( ( uint8_t ) 0U ) //集合类型 #define queueQUEUE_TYPE_MUTEX ( ( uint8_t ) 1U ) //互斥信号量 #define queueQUEUE_TYPE_COUNTING_SEMAPHORE ( ( uint8_t ) 2U ) //计数信号量 #define queueQUEUE_TYPE_BINARY_SEMAPHORE ( ( uint8_t ) 3U ) //二制信号量 #define queueQUEUE_TYPE_RECURSIVE_MUTEX ( ( uint8_t ) 4U ) //递归信号量 #if( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) #define xQueueCreate( uxQueueLength, uxItemSize ) xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) ) #endif /* 参数: uxItemSize uxQueueLength ucQueueType 返回值 QueueHandle_t 队列的句柄,其实是对于列控制块地址 */ QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, const uint8_t ucQueueType ) { Queue_t *pxNewQueue; size_t xQueueSizeInBytes; uint8_t *pucQueueStorage; //队列内存空间为空 if( uxItemSize == ( UBaseType_t ) 0 ) { /* 队列字节大小赋值为0 */ xQueueSizeInBytes = ( size_t ) 0; } else { /* 队列字节大小 赋值为 长度*每个队列项的大小 */ xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize ); /*lint !e961 MISRA exception as the casts are only redundant for some ports. */ } //申请内存空间 = 消息队列控制块大小+消息队列空间大小 pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes ); if( pxNewQueue != NULL ) { /* 找到消息队列操作空间的首地址 */ pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t ); //初始化一个新的消息队列 prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue ); } return pxNewQueue; } /* 参数: pucQueueStorage:队列操作空间的首地址 ucQueueType:队列类型 pxNewQueue :队列句柄 */ static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, uint8_t *pucQueueStorage, const uint8_t ucQueueType, Queue_t *pxNewQueue ) { /* Remove compiler warnings about unused parameters should configUSE_TRACE_FACILITY not be set to 1. */ ( void ) ucQueueType; if( uxItemSize == ( UBaseType_t ) 0 ) { /* 把队列控制块首地址赋值到队列头指针上 ???? 这是互斥信号使用,后面再分析 */ pxNewQueue->pcHead = ( int8_t * ) pxNewQueue; } else { /* 把队列空间首地址赋值给队列头指针上 */ pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage; } /* 长度 单元大小 */ pxNewQueue->uxLength = uxQueueLength; pxNewQueue->uxItemSize = uxItemSize; //队列重置函数 ( void ) xQueueGenericReset( pxNewQueue, pdTRUE ); } /* 参数: xQueue 队列句柄 xNewQueue 操作队列的状态是什么,新建pdTURE还是已经创建好了 返回值: */ BaseType_t xQueueGenericReset( QueueHandle_t xQueue, BaseType_t xNewQueue ) { Queue_t * const pxQueue = ( Queue_t * ) xQueue; //进入临界段,这个时候操作队列控制块,不允许打断 taskENTER_CRITICAL(); { /* 1、头地址赋值 2、等待处理的消息个数为0 3、写入指针赋值为队列头指针 4、读出指针写入最后一个可用消息 5、赋值队列为解锁状态 */ pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize ); pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U; pxQueue->pcWriteTo = pxQueue->pcHead; pxQueue->u.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize ); pxQueue->cRxLock = queueUNLOCKED; pxQueue->cTxLock = queueUNLOCKED; //判断是否为新建队列 if( xNewQueue == pdFALSE ) { /* 判断发送等待列表里面是否有任务 */ if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) { //移除事件列表中的任务 if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) { //进行上下文切换 queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } else { mtCOVERAGE_TEST_MARKER(); } } //新建队列,直接初始化,发送和接受 列表项 else { /* Ensure the event queues start in the correct state. */ vListInitialise( &( pxQueue->xTasksWaitingToSend ) ); vListInitialise( &( pxQueue->xTasksWaitingToReceive ) ); } } taskEXIT_CRITICAL(); /* A value is returned for calling semantic consistency with previous versions. */ return pdPASS; } /* */

消息队列删除

//消息队列删除————主要功能就是释放消息队列的内存空间 void vQueueDelete( QueueHandle_t xQueue ) { Queue_t * const pxQueue = ( Queue_t * ) xQueue; configASSERT( pxQueue ); traceQUEUE_DELETE( pxQueue ); #if ( configQUEUE_REGISTRY_SIZE > 0 ) { vQueueUnregisterQueue( pxQueue ); } #endif #if( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) ) { /* 其实就是释放消息队列的内存空间 */ vPortFree( pxQueue ); } #elif( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 1 ) ) { /* The queue could have been allocated statically or dynamically, so check before attempting to free the memory. */ if( pxQueue->ucStaticallyAllocated == ( uint8_t ) pdFALSE ) { vPortFree( pxQueue ); } else { mtCOVERAGE_TEST_MARKER(); } } #else { /* The queue must have been statically allocated, so is not going to be deleted. Avoid compiler warnings about the unused parameter. */ ( void ) pxQueue; } #endif /* configSUPPORT_DYNAMIC_ALLOCATION */ } /*-----------------------------------------------------------*/

消息队列在任务中发送 1、 因为消息队列是一个共享资源,任何一个任务都可以进行接收和发送,所以第一步就是进入临界段。 2、(1)退出临界段:解除共享资源部分,其他内核可以调度,中断可以响应了。 (2)挂起调度器:内核无法响应,不让其他任务打断接下来的任务。 (3)锁定队列:在查找列表的同时不希望被打断,否则任务优先级会出现混乱。 (4)将任务插入到等待发送列表中,由运行态变为阻塞态

实现代码分析 /* 最终调用发送消息的接口是xQueueGenericSend 由于信号量都是由消息队列实现的,这个时候操作系统定义了消息队列的类型 类型: #define queueSEND_TO_BACK ( ( BaseType_t ) 0 ) 1、从队尾加入 #define queueSEND_TO_FRONT ( ( BaseType_t ) 1 ) 2、从队头加入 #define queueOVERWRITE ( ( BaseType_t ) 2 ) 3、覆盖入队 */ #define xQueueSend( xQueue, pvItemToQueue, xTicksToWait ) xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), ( xTicksToWait ), queueSEND_TO_BACK ) /* 参数: xQueue pvItemToQueue xTicksToWait queueSEND_TO_BACK 返回值 BaseType_t //溢出或队满 */ BaseType_t xQueueGenericSend( QueueHandle_t xQueue, const void * const pvItemToQueue, TickType_t xTicksToWait, const BaseType_t xCopyPosition ) { BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired; TimeOut_t xTimeOut; Queue_t * const pxQueue = ( Queue_t * ) xQueue; /* 使用for死循环的目的,为了快速的处理消息拷贝,消息处理功能 */ for( ;; ) { //进入了临界段 taskENTER_CRITICAL(); { /* 1、判断消息队列是否满了 2、是否允许覆盖入队 这两个条件任何一个成立,都执行入队操作 */ if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) ) { //拷贝数据到队列操作空间内 traceQUEUE_SEND( pxQueue ); xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition ); #if ( configUSE_QUEUE_SETS == 1 ) { #else /* configUSE_QUEUE_SETS */ { /* 判断等待接收的列表是否为空 */ if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) { //移除列表,改版等待任务状态为就绪态 if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) { /* 进行上下文切换 */ queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } else if( xYieldRequired != pdFALSE ) { /* 再次进行上下文切换 */ queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } #endif /* configUSE_QUEUE_SETS */ //退出临界段,返回成功 taskEXIT_CRITICAL(); return pdPASS; } else { //不允许入队,先判断是否需要阻塞 if( xTicksToWait == ( TickType_t ) 0 ) { /*不需要阻塞,退出临界段,之后返回队满*/ taskEXIT_CRITICAL(); /* Return to the original privilege level before exiting the function. */ traceQUEUE_SEND_FAILED( pxQueue ); return errQUEUE_FULL; } //超时结构体是否操作过 else if( xEntryTimeSet == pdFALSE ) { /* 超时结构题初始化 */ vTaskSetTimeOutState( &xTimeOut ); xEntryTimeSet = pdTRUE; } else { /* Entry time was already set. */ mtCOVERAGE_TEST_MARKER(); } } } //退出临界段 taskEXIT_CRITICAL(); //后面的代码都是允许阻塞处理 /* 1、挂起了调度器————————不让其他任务打断 2、队列上锁----------不让中断打断(因为之前已经退出临界段了) */ vTaskSuspendAll(); prvLockQueue( pxQueue ); /* 判断阻塞时间是否超时了 */ if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) { //如果没有超时,判断队列是否满 if( prvIsQueueFull( pxQueue ) != pdFALSE ) { //如果队满,把当前任务,添加到等待发送的事件列表中,内部还把任务添加到延时列表中去 traceBLOCKING_ON_QUEUE_SEND( pxQueue ); vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait ); /* 解锁 */ prvUnlockQueue( pxQueue ); /* 恢复调度器 */ if( xTaskResumeAll() == pdFALSE ) { //进行上下文切换 portYIELD_WITHIN_API(); } } else { //队列未满,解锁,恢复调度器,重新进行入队操作 /* Try again. */ prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); } } //已经超时了,解锁,开启调度器 返回队满 else { /* The timeout has expired. */ prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); traceQUEUE_SEND_FAILED( pxQueue ); return errQUEUE_FULL; } } }

消息队列在中断中发送

/* 最终调用发送消息的接口是xQueueGenericSendFromISR 由于信号量都是由消息队列实现的,这个时候操作系统定义了消息队列的类型 类型: #define queueSEND_TO_BACK ( ( BaseType_t ) 0 ) 1、从队尾加入 #define queueSEND_TO_FRONT ( ( BaseType_t ) 1 ) 2、从队头加入 #define queueOVERWRITE ( ( BaseType_t ) 2 ) 3、覆盖入队 */ #define xQueueSendFromISR( xQueue, pvItemToQueue, pxHigherPriorityTaskWoken ) xQueueGenericSendFromISR( ( xQueue ), ( pvItemToQueue ), ( pxHigherPriorityTaskWoken ), queueSEND_TO_BACK ) /* 参数: xQueue pvItemToQueue NULL queueSEND_TO_BACK 返回值 BaseType_t */ BaseType_t xQueueGenericSendFromISR( QueueHandle_t xQueue, const void * const pvItemToQueue, BaseType_t * const pxHigherPriorityTaskWoken, const BaseType_t xCopyPosition ) { BaseType_t xReturn; UBaseType_t uxSavedInterruptStatus; Queue_t * const pxQueue = ( Queue_t * ) xQueue; /* 带返回值的关闭中断,需要保存上次关闭中断的状态值,恢复时候,写入 */ uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR(); { //判断是否队满和覆盖入队 if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) ) { //获取了队列发送锁的状态值 const int8_t cTxLock = pxQueue->cTxLock; //拷贝数据到队列操作空间内 ( void ) prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition ); /* 判断队列是否上锁 */ /* #define queueUNLOCKED ( ( int8_t ) -1 ) 解锁状态 #define queueLOCKED_UNMODIFIED ( ( int8_t ) 0 ) 上锁状态初值 */ if( cTxLock == queueUNLOCKED ) { #if ( configUSE_QUEUE_SETS == 1 ) #else /* configUSE_QUEUE_SETS */ { //恢复等待消息任务,中断内没有上下文切换 if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) { /* The task waiting has a higher priority so record that a context switch is required. */ if( pxHigherPriorityTaskWoken != NULL ) { *pxHigherPriorityTaskWoken = pdTRUE; } else { mtCOVERAGE_TEST_MARKER(); } } else { mtCOVERAGE_TEST_MARKER(); } } else { mtCOVERAGE_TEST_MARKER(); } } #endif /* configUSE_QUEUE_SETS */ } else //队列已经上锁 { /* 发送锁+1 */ pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 ); } //返回成功 xReturn = pdPASS; } else { //返回队满 traceQUEUE_SEND_FROM_ISR_FAILED( pxQueue ); xReturn = errQUEUE_FULL; } } //开启中断,保存上次状态值 portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus ); return xReturn; } /* 队列上锁 把发送和接受锁都赋值为上锁的初始值 */ #define prvLockQueue( pxQueue ) \ taskENTER_CRITICAL(); \ { \ if( ( pxQueue )->cRxLock == queueUNLOCKED ) \ { \ ( pxQueue )->cRxLock = queueLOCKED_UNMODIFIED; \ } \ if( ( pxQueue )->cTxLock == queueUNLOCKED ) \ { \ ( pxQueue )->cTxLock = queueLOCKED_UNMODIFIED; \ } \ } \ taskEXIT_CRITICAL() /* 队列解锁 参数:消息队列句柄 */ static void prvUnlockQueue( Queue_t * const pxQueue ) { /* THIS FUNCTION MUST BE CALLED WITH THE SCHEDULER SUSPENDED. */ /* 进入临界段 */ taskENTER_CRITICAL(); { //获取发送锁的状态值 int8_t cTxLock = pxQueue->cTxLock; /* 遍历,直到解锁为止 */ while( cTxLock > queueLOCKED_UNMODIFIED ) { /* Data was posted while the queue was locked. Are any tasks blocked waiting for data to become available? */ #if ( configUSE_QUEUE_SETS == 1 ) { #else /* configUSE_QUEUE_SETS */ { /* 解除等待消息任务,进行上下文切换 */ if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) { /* The task waiting has a higher priority so record that a context switch is required. */ vTaskMissedYield(); } else { mtCOVERAGE_TEST_MARKER(); } } else { break; } } #endif /* configUSE_QUEUE_SETS */ //队列发送锁减一 --cTxLock; } //最后,解除发送锁 pxQueue->cTxLock = queueUNLOCKED; } taskEXIT_CRITICAL(); /* 解除接受锁,与上面操作一致 */ taskENTER_CRITICAL(); { int8_t cRxLock = pxQueue->cRxLock; while( cRxLock > queueLOCKED_UNMODIFIED ) { if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) { vTaskMissedYield(); } else { mtCOVERAGE_TEST_MARKER(); } --cRxLock; } else { break; } } pxQueue->cRxLock = queueUNLOCKED; } taskEXIT_CRITICAL(); } /*-----------------------------------------------------------*/

消息队列在任务中接收

BaseType_t xQueueGenericReceive( QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait, const BaseType_t xJustPeeking ) { //重点分析这个,其他流程和发送流程差不多 //判断是否删除消息已经接收到的消息空间 if( xJustPeeking == pdFALSE ) { traceQUEUE_RECEIVE( pxQueue ); //更新消息等待的记录值,让它减一 pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1; } else { //不删除,就重新赋值位读取消息之前的地址,到出队指针 pxQueue->u.pcReadFrom = pcOriginalReadPosition; } taskEXIT_CRITICAL(); return pdPASS; }

消息队列在中断中接收

BaseType_t xQueueReceiveFromISR( QueueHandle_t xQueue, void * const pvBuffer, BaseType_t * const pxHigherPriorityTaskWoken ) { BaseType_t xReturn; UBaseType_t uxSavedInterruptStatus; Queue_t * const pxQueue = ( Queue_t * ) xQueue; configASSERT( pxQueue ); configASSERT( !( ( pvBuffer == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) ); /* RTOS ports that support interrupt nesting have the concept of a maximum system call (or maximum API call) interrupt priority. Interrupts that are above the maximum system call priority are kept permanently enabled, even when the RTOS kernel is in a critical section, but cannot make any calls to FreeRTOS API functions. If configASSERT() is defined in FreeRTOSConfig.h then portASSERT_IF_INTERRUPT_PRIORITY_INVALID() will result in an assertion failure if a FreeRTOS API function is called from an interrupt that has been assigned a priority above the configured maximum system call priority. Only FreeRTOS functions that end in FromISR can be called from interrupts that have been assigned a priority at or (logically) below the maximum system call interrupt priority. FreeRTOS maintains a separate interrupt safe API to ensure interrupt entry is as fast and as simple as possible. More information (albeit Cortex-M specific) is provided on the following link: http://www.freertos.org/RTOS-Cortex-M3-M4.html */ portASSERT_IF_INTERRUPT_PRIORITY_INVALID(); uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR(); { const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting; /*判断等待处理消息是否大于0 */ if( uxMessagesWaiting > ( UBaseType_t ) 0 ) { const int8_t cRxLock = pxQueue->cRxLock; traceQUEUE_RECEIVE_FROM_ISR( pxQueue ); //将数据从队列中拷贝 prvCopyDataFromQueue( pxQueue, pvBuffer ); //更新消息等待的记录值,让它减一 pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1; /* 判断队列是否上锁 */ if( cRxLock == queueUNLOCKED ) { //如果没有上锁,判断等待发送列表是否为空 if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) { if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) { /* 等待任务的优先级高,因此强制进行上下文切换 */ if( pxHigherPriorityTaskWoken != NULL ) { *pxHigherPriorityTaskWoken = pdTRUE; } else { mtCOVERAGE_TEST_MARKER(); } } else { mtCOVERAGE_TEST_MARKER();//返回成功 } } else { mtCOVERAGE_TEST_MARKER(); } } else { /* 接受锁加一 */ pxQueue->cRxLock = ( int8_t ) ( cRxLock + 1 ); } xReturn = pdPASS; } else { xReturn = pdFAIL; traceQUEUE_RECEIVE_FROM_ISR_FAILED( pxQueue ); } } portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus ); return xReturn; }
最新回复(0)