Advanced usage

Memory

All the user memory space (all the SRAM except the registers and the static variables) is used as a heap memory (aka dynamic memory). The main stack disappears giving way to dynamically allocated spaces in order to store the stack of each task.

Screenshot

The kernel uses the standard functions for allocating memory: malloc() / new and free() / delete.

warning DOn't forget to protect these functions against interrupts when you call them.

The standard functions are not really fast and memory fragmentation can arise if you allocate / deallocate objects many times. To solve theses problems, the kernel provides memory pools. Memory pools are usefull to allocate and deallocate many data instances quickly and frequently. This is often the case with the exchanged messages between tasks but a little bit less with tasks.

In a memory pool a fixed number of object instances are preallocated as blocks and a flag identifies if a block is used or free. One memory block can have only one type of object. Each free block is linked to the next free block. This is a short explanation of the memory pool mechanisms:

Screenshot

Default memory pools are already created (in the static memory area) for tasks, messages and work objects but no block is reserved for tasks and work objects. Messages have 6 blocks reserved. If your application sends a lot of messages, or, creates and removes many times tasks , or, uses the TaskWorkQueue very frequently, then you can increase the number of blocks reserved in Advanced_parameters.h.

Screenshot

When you call Scheduler::createTask(), the kernel try to first reserve a memory pool block . If no block is available, the kernel use the standard malloc() function. When you instanciate a message or a work object with the new operator, the same procedure is followed.

You can also use lass MemoryPool class for your own objects.

See also this link to have more informations.

How works scheduling

warning This is an important part of the documentation. The following explanations can be understanded only by advanced users.

Kernel interruption

The timer 0 of your arduino board is configured to execute a micro code at a specific interval for all scheduling algorithms expected for the cooperative one. Each time this micro code is executed and corresponds to one kernel tick.

By default, the kernel clock tick occurs every 1024us.

A kernel tick does not necessarily result in a context switch. The scheduling algorithm defines for the chosen task a number of kernel tick before proceeding to a new context switch. More a task has an important number of kernel ticks assigned, more this task has time to run its code. Generally, tasks with a high priority get more ticks kernel.

The following example shows how the kernel interruption interrupts the process of a task in order to resume the process of another task (assuming for this example that each task have 1 kernel tick assigned).

The red arrows correspond to the context switch subroutine.

Screenshot

Assume a task which has 3 kernel ticks assigned. At each kernel tick, this number will be decremented to 1 until 0. At 0, a context switch occurs in order to select a new task to resume (and the kernel assigns to the resumed task a new number of kernel ticks and so on).

The context

To resume the execution of a task at the exact point where it was interrupted, the kernel has to save registers, pointer to stack, ... All these data, aka the context, are stored on the stack of the current task.

The kernel saves the context of the current task, calls the context switch subroutine and restores the context of the newly task.

Screenshot

When the next task is chosen, the kernel does the exact opposite of the above process.

The context switch subroutine

A specific number of kernel ticks is attribued for a running task. This is the time slot. When all the time slot is consumed (no kernel ticks remaining) the kernel calls the scheduling algorithm in order to choose a new task. The new time slot (the number of kernel ticks) is depending on the task and the scheduling algorithm chosen.

Also, at each kernel ticks, the kernel looks for tasks that must be woken up. These tasks are added to the ready queue and the scheduling algorithm is called.

Before and after the call of the scheduling algorithm the kernel checks also if there is a stack overflow. Before: in order to check if the stack pointer is beyond the limits of the stack. After: in order to check if some data have been overwritten on the stack of the new task chosen.

This is the subroutine algorithm:

Screenshot

If the idle task is chosen, the time slot is always 1.

About tasks

You have two categories of tasks.

The scheduling algorithms choose a task among alive ones.

The idle task is always alive. This is a special task chosen when no more tasks are available. This task falls asleep the microcontroller. An interruption of the kernel (from the timer 0) interrupts the sleep of the MCU and the context switch subroutine is called normally.

You can change the behaviour of the idle task by assigning a custom function with Scheduler::setIdleUserFnc().

The ready queue:

Screenshot

When you create a task (with autostart = true, default behaviour):

Screenshot

When you remove an alive task:

Screenshot

Manually yield

When you choose the cooperative scheduling algorithm, you can use the Scheduler::yieldTo(Task* task, bool delayed) function that lets you yield the running task to another task of your choice. If you set the delayed argument to true, the task you pass in the 1st argument will be chosen the next time you call a blocking function of the kernel (e.g Semaphore::acquire()).

warning Use this function outside the cooperative algorithm is really discouraged.

You have also the function Scheduler::yield() which, according to the chosen scheduling algortihm, yields the current task to another.

When using the cooperative scheduling policy, this function chooses a task with the same priority or a lower if not.

Scheduling algorithms

You can change the scheduling algorithm at any time with Scheduler::setSchedulingPolicy(). The kernel uses by default the Round Robin algorithm.

Cooperative algorithm

Use Scheduler::setSchedulingPolicy(SchPolicyCoop).

This is the only mode that allows you to choose deliberately when and which task will be executed after.

The kernel never interrupts itself a task process. This is the best choice to have a full control.

The inconvenient is to have a more complex code, and prone to errors.

Preemptive algorithm

Use Scheduler::setSchedulingPolicy(SchPolicyPreemptive).

This algorithm always chooses the task with the highest priority among the other tasks. If multiple tasks have the highest priority, the task chosen will be the next one of the previously executed task.

When a higher priority task than current one is added in the ready queue (i.e alive tasks, state StQueuing), a context switching is performed.

Lower priority tasks may not be executed even if they have been woken up after a Task::sleep call. The lower priority tasks will be executed only when all tasks with an higher priority are terminated, sleeping, aborted, suspended, and so on.

The advantage is to have a high responsiveness of tasks with an high priority.

The time slot allocated is constant and long (300 kernel ticks ~307 ms) before a context switch. It only serves as a timeout.

Example: Screenshot

Round robin

Use Scheduler::setSchedulingPolicy(SchPolicyRoundRobin).

This algorithm is usually sufficient for many usages, it's a pretty average algorithm in many criteria.

The next task to execute is the next one of the previously executed task.

This algorithm has the feature to allocate a CPU time slot proportional to the priority of the selected task:

In this way, a high priority task has more time to complete its work. Furthermore, all tasks are sure to be served, there is no starvation.

Example step by step: Screenshot

Random priority

Use Scheduler::setSchedulingPolicy(SchPolicyRandomPriority).

This algorithm is moderately complex. It chooses a priority at random with probability proportional to the priority.

In the present case we have:

The algorithm selects the first task corresponding to the chosen priority. Then, the task is moved in tail of the queue to prevent it from being selected again in the future (for this given priority).

This algorithm avoids starvation by serving all tasks. The CPU time slot allocated for the selected task is constant but shorter than for the preempted algorithm (80ms).

Example step by step: Screenshot

Intelligent

Use Scheduler::setSchedulingPolicy(SchPolicyIntelligent).

This algorithm is a little bit complex.

The chosen task depends on several criteria. A score will be assigned for each task. The task with the highest score will be selected as the next task to resume. The score is calculated by adding:

All tasks are sure to be served, that avoids starvation.

When a higher priority task than current one is added in the ready queue (i.e alive tasks, state StQueuing), a context switching is performed.

The response time of the tasks that have been waiting the longest time is reduced.

The CPU time slot allocated for the task is equal to the highest score multiplied by 4.

Comparison of the differents algorithms

Algorithm Return speed Starvation-free Predictable Responsiveness of high priority tasks Compatible with a hard real time context Responsiveness of tasks woken up CPU task time slot
Cooperative High Dependent on user Yes Dependent on user Yes Immediate* Dependent on user
Preemptive High No Yes High Yes Depends** Constant and long
Round Robin High Yes Yes Low No Low Depending on priority
Random priority Medium Yes No Average No Average Constant
Intelligent Low Yes Hardly High No High Variable

* By using the Arduino delay() function. Otherwise, by using Task::sleep(), the kernel yields the current task to another and you have to check yourself when resume the sleeping task.

** If the woken up task has the highest priority among the other tasks, the response time is immediate, otherwise the task will never be executed as long as it can not have the highest priority among the other tasks.

Interruptions management

warning This section is for informed users. In this tutorial, you have seen in the examples OS48_ATOMIC_BLOCK. By using it, you inform the compiler to forbid all interruptions in the section between braces. A lot of kernel functions need to be atomic by using internally OS48_ATOMIC_BLOCK. Actually the kernel provides 3 sorts of non-interruption in Herlpers.h:

Interrupts that have not been able to execute in an atomic block will be executed just after.

The kernel tick disables all interrupts by default. Redefining OS48_KERNEL_SAFETY_BLOCK has no effect because it works only in kernel functions and not in the kernel tick interrupt. Allow interrupts inside the kernel tick can have some bad impacts and cause inconsistencies. If you really know what you to want and what you do some extra functions are provided to enable interrupts, as soon as possible, just after saving the context of the task (only 5 microseconds for UNO and MEGA). These functions are setKernelTickEnterFnc(void_fnc_t); and setKernelTickExitFnc(void_fnc_t fnc);. You can enable interrupts inside and, if you want, forbid the context switch by calling os48_disable_context_switch(). Don't forget to re-enable the context switch after.

Concurrency

One of most important aspects with multi-tasking programs is the synchronization. This library provides some synchronization controls.

All synchronization objects in this library have two ways to release waiting tasks. By default the behaviour is FIFO. That means that the first waiting task is released first.

But in the constructor of theses classes, you can specify SyncRMHighestPriority. Example: Sync sync(SyncRMHighestPriority). SyncRMHighestPriority is the behaviour for releasing high priority tasks first. The waiting tasks having the same priority are released as the FIFO behaviour.

Events

The most basic control is the event. An event allows you to stop the execution of some tasks at a specific point of your program and release one or all the stopped tasks later. This mechanism is useful to warn tasks when data are available for example.

The class representing this mechanism is Sync.

In comparison, an event can be compared to an exam room . The candidates enter the room and wait (Sync::wait()). At the time expected, the candidates in the room begin to work (Sync::releaseAll()). Those who arrive late are waiting for the next session (Sync::wait() function called after Sync::releaseAll()). The time is the trigger event.

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get(); 
Task* task1 = NULL; 
Task* task2 = NULL;
Task* task3 = NULL;

void setup() {
  Serial.begin(9600);
  task1 = scheduler->createTask(&func1, 60);
  task2 = scheduler->createTask(&func2, 60);
  task3 = scheduler->createTask(&func3, 60);

  scheduler->start();
}

Sync sync;
const char* hexData = "A223E4AB594C";

void func1()
{ 
  task()->sleep(3000);
  sync.releaseOne();
  task()->sleep(2000);
  sync.releaseOne();
}

void func2()
{
  task()->sleep(1000);     
  sync.wait();
  OS48_NO_CS_BLOCK
  {    
    Serial.print("Task2 can now process data: ");
    Serial.println(hexData);
  }
}

void func3()
{
  task()->sleep(2000); 
  sync.wait();
  OS48_NO_CS_BLOCK
  {    
    Serial.print("Task3 can now process data: ");
    Serial.println(hexData);
  }
}

void loop() {} 

warning Pay attention to the order of operations! If you call one of the release methods in a task and if there is no task pending in the waiting queue, the next task calling Sync::wait() will wait.

Barriers

The class representing this mechanism is Barrier.

A task calling the wait method (Barrier::wait()) will wait until the number of pending tasks does not reach the threshold set initially. Once reached the threshold, all pending tasks will be released. The following calls to the wait method will be ignored and immediately released.

A barrier is like a dining table. Those already sitted wait. When all family members sat (the threshold), they can start eating.

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get(); 
Task* task1 = NULL; 
Task* task2 = NULL;
Task* task3 = NULL;

void setup() {
  Serial.begin(9600);
  task1 = scheduler->createTask(&func1, 60);
  task2 = scheduler->createTask(&func2, 60);
  task3 = scheduler->createTask(&func3, 60);

  scheduler->start();
}

Barrier barrier(2); //threshold = 2 tasks

void func1()
{ 
  task()->sleep(3000); //simulate a process
  barrier.wait();
  OS48_ATOMIC_BLOCK
  {    
    Serial.println("Task1 released");
    Serial.println("Checkpoint reached");
  }
}

void func2()
{
  task()->sleep(1500); //simulate a process
  barrier.wait();
  OS48_NO_CS_BLOCK
  {    
    Serial.println("Task2 released");
    Serial.println("Checkpoint reached");
  }
}

void func3()
{
  task()->sleep(5000); //simulate a process
  barrier.wait();
  OS48_ATOMIC_BLOCK
  {    
    Serial.println("Task 3 did not have to wait.");
  }
}

void loop() {}

Semaphores

The class representing this mechanism is Semaphore.

A semaphore is a synchronization mechanism that controls access by multiple task to a common resource.

When you create a semaphore, you have to specify the maximum of tasks that can acquire it. When the maximum calls of Semaphore::acquire() is reached, the following tasks will wait a call to Semaphore::release().

An elevator can be a representation of the usage of a semaphore. A maximum number of persons is allowed. When a person leaves the elevator, another person who is waiting can enter. According to the established rule, the person who can enter is :

Any tasks can release a semaphore, not necessarily the task which has acquired the semaphore.

You can create a binary semaphore by allowing one task at a time.

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get();
Task* task1 = NULL;
Task* task2 = NULL;
Task* task3 = NULL;

void setup() {
  Serial.begin(9600);
  task1 = scheduler->createTask(&func, 60);
  task2 = scheduler->createTask(&func, 60);
  task3 = scheduler->createTask(&func, 60);

  scheduler->start();
}

Semaphore sem(2);

void func()
{
  for (;;)
  {    
    OS48_NO_CS_BLOCK
    {
      sem.acquire();
      Serial.print ("In ");
      Serial.println(task()->getId());      
    }

    task()->sleep(1000);


    OS48_NO_CS_BLOCK
    {    
      sem.release();
      Serial.print ("Out ");
      Serial.println(task()->getId());

      scheduler->yield(); //recommended to not loop too fast in order that others tasks can acquire the semaphore
    }
  }
}

void loop() {}

warning That's optionnal, but you shouldn't omit scheduler->yield(); in some cases. In this example the task reloops immediately and tries to acquire again the semaphore. Without the yield function, some tasks risk to never acquire the semaphore in time.

Mutexes

The class representing this mechanism is Mutex.

A mutex is a high level synchronization mechanism. While a binary semaphore may be used as a mutex, a mutex is a more specific use-case, which allows extra guarantees:

A mutex is like a checkout of a supermarket. Only one person can pass at a time. Only this person can release the checkout.

Priority inversion safety example:

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get();
Task* task1 = NULL;
Task* task2 = NULL;
Task* task3 = NULL;

void setup() {
  Serial.begin(9600);
  task1 = scheduler->createTask(&func, 60, PrLow);
  task2 = scheduler->createTask(&func, 60);
  task3 = scheduler->createTask(&funcHPrior, 60);

  scheduler->setSchedulingPolicy(SchPolicyPreemptive);
  scheduler->start();
}

Mutex mutex;
Semaphore sem;

void func()
{
  //exceptionally, choose the task 1 to ensure the lowest priority task is the first to execute the next code

  //let this task sleep in order to let the scheduler choose the task 1 
  if (task()->getId() == 2)
    task()->sleep(200); 

  for (;;)
  {
    OS48_NO_CS_BLOCK
    {
      mutex.lock();
      //sem.acquire();

      Serial.print ("In ");
      Serial.println(task()->getId());
    }

    task()->sleep(2000);

    OS48_NO_CS_BLOCK
    {
      Serial.print ("Out ");
      Serial.println(task()->getId());

      mutex.unlock();
      //sem.release();   
    }
  }
}

void funcHPrior()
{
  task()->sleep(200); //let this task sleep in order to let the scheduler choose the task 1 

  for (;;)
  {
    OS48_NO_CS_BLOCK
    {
      Serial.println("HP");
    }

    uint32_t m = millis();
    while (millis() - m < 1000) {} //busy wait
  }
}

void loop() {}

The above example illustrates how works the priority inversion safety. When the task 2 tries to acquire the lock on the mutex, the task 1 which has a lower priority gets the same priority than the task 2. In this way, the scheduler can't always choose the task 3 (because task 2 is waiting and task 1 won't be able to be chosen by keeping its original priority). If you uncomment the lines corresponding to the semaphore and if you comment the lines corresponding to the mutex (in order to enable the semaphore instead of the mutex), you will notice that the task 3 will be always chosen and the task 2 can no longer acquire the semaphore.

Reentrance example:

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get();
Task* task1 = NULL;
Task* task2 = NULL;

void setup() {
  Serial.begin(9600);
  task1 = scheduler->createTask(&func, 100);
  task2 = scheduler->createTask(&func, 100);

  scheduler->start();
}

Mutex mutex;

void func()
{
  reentrantFunction(10);
}

void reentrantFunction(uint8_t counter)
{
  OS48_NO_CS_BLOCK
  {
    mutex.lock();

    Serial.print ("In ");
    Serial.println(task()->getId());
    Serial.print("Counter in : ");
    Serial.println(counter);
  }

  task()->sleep(300);

  if (counter > 0)
    reentrantFunction(counter - 1);

  task()->sleep(300);

  OS48_NO_CS_BLOCK
  {
    mutex.unlock();

    Serial.print ("Out ");
    Serial.println(task()->getId());
    Serial.print("Counter out : ");
    Serial.println(counter);
  }
}

void loop() {}

Deletion safety:

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get();
Task* task1 = NULL;
Task* task2 = NULL;
Task* task3 = NULL;

void setup() {
  Serial.begin(9600);
  task1 = scheduler->createTask(&func, 100);
  task2 = scheduler->createTask(&func, 100, PrHigh);
  task3 = scheduler->createTask(&funcDelete, 100);

  scheduler->start();
}

Mutex mutex;

void func()
{
  OS48_NO_CS_BLOCK
  {
    mutex.lock();

    Serial.print ("In ");
    Serial.println(task()->getId());
  }

  task()->sleep(6000);

  OS48_NO_CS_BLOCK
  {
    mutex.unlock();

    Serial.print ("Out ");
    Serial.println(task()->getId());
  }
}

void funcDelete()
{
  task()->sleep(4000);

  OS48_NO_CS_BLOCK
  {
    Serial.println("task2 deleted");
    scheduler->deleteTask(task2);
  }
}

void loop() {}

Monitors

A monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become true. Monitors also have a mechanism for signalling other threads that their condition has been met. A monitor consists of a mutex object and condition variables. In this library condition variables consists of a Sync object and a condition.

The library provides helpers to build a monitor in file @em Monitor_helpers.h. Example:

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get();
Task* task1 = NULL;
Task* task2 = NULL;
Task* task3 = NULL;

void setup() {
  Serial.begin(9600);
  task1 = scheduler->createTask(&func, 100);
  task2 = scheduler->createTask(&func, 100, PrHigh);
  task3 = scheduler->createTask(&funcCondition, 100);

  scheduler->start();
}

volatile bool condition = false;
Mutex mutex;
Sync sync;

void func()
{
  OS48_NO_CS_BLOCK
  {
    Serial.print ("In ");
    Serial.println(task()->getId());
  }

  OS48_NO_CS_BLOCK
  {
    Serial.print(task()->getId());
    Serial.println(" is waiting until condition is true...");
  }

  OS48_MONITOR_ENTER(&mutex); //acquires the lock
  OS48_MONITOR_WAIT_UNTIL(condition, &mutex, &sync); //releases the lock and waits. When task is woken up, re-acquires the lock

  OS48_NO_CS_BLOCK
  {
    Serial.print(task()->getId());
    Serial.println(" does  something...");
  }

  task()->sleep(2000);

  OS48_MONITOR_EXIT(&mutex); //releases the lock

  OS48_NO_CS_BLOCK
  {
    Serial.print ("Out ");
    Serial.println(task()->getId());
  }
}

void funcCondition()
{
  task()->sleep(4000);

  OS48_NO_CS_BLOCK
  {
    Serial.println("task 3 sets condition to true");
    condition = true;
    sync.releaseAll(); //notifies the condition has changed
  }
}

void loop() {}

Messages

Tasks can send messages to other tasks. That allows a communication between tasks. Each task have a message box (messages are internally stored in a queue) and you can pick up a specific category of message.

A message is composed of a code in order to categorize the message, optionaly a content of any type.

You only need 4 functions: Scheduler::sendMessage(), Task::peekMessage(), Task::getNextMessage() and Task::waitNextMessage().

The body of a message is of type databag_t. See Helpers.h to see the complete declaration of the type. This is an union type, which means that you can store any type of 32bits length or less (pointer + size of data, string, intergers, float, ...). The message class has some constructor helpers in order to hide the usage of databag_t except for decoding the message body at the reception.

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get();

Task* task1 = NULL;
Task* task2 = NULL;
Task* task_print = NULL;
Task* task_print_error = NULL;

void setup() {
  Serial.begin(9600);
  task1 = scheduler->createTask(&func1, 60);
  task2 = scheduler->createTask(&func2, 60);
  task_print = scheduler->createTask(&funcPrint, 60);
  task_print_error = scheduler->createTask(&funcPrintErrors, 60);

  scheduler->start();
}

const byte code_print = 1;
const byte code_print_error = 2;

void func1()
{
  scheduler->sendMessage(task_print, new Message(code_print, "hello world!")); 

  task()->sleep(3000);

  scheduler->sendMessage(task_print_error, new Message(code_print_error, "fire in the hole!"));

  task()->sleep(3000);

  //this message won't be printed because task_print expects only messages with the code: code_print
  //and this is not the case here.
  scheduler->sendMessage(task_print, new Message(code_print_error, "this message will be ignored"));
}

void func2()
{
  scheduler->sendMessage(task_print_error, new Message(code_print_error, "this is not an exercice!"));

  task()->sleep(2000);

  scheduler->sendMessage(task_print, new Message(code_print, "hello world again!"));

  task()->sleep(2000);

  //this message won't be printed because task_print_error expects only messages with the code: code_print_error
  //and this is not the case here.
  scheduler->sendMessage(task_print_error, new Message(code_print, "this message will be ignored")); 
}

void funcPrint()
{
  for (;;)
  {
    Message* mess = task()->waitNextMessage(code_print);
    OS48_NO_CS_BLOCK
    {
      Serial.print("Message is: ");
      Serial.println(mess->getBody().bStr);
      delete mess;
    }   
  }
}

void funcPrintErrors()
{
  for (;;)
  {
    Message* mess = task()->waitNextMessage(code_print_error);
    OS48_NO_CS_BLOCK
    {
      Serial.print("ERROR message is: ");
      Serial.println(mess->getBody().bStr);
      delete mess;   
    }
  }
}

void loop() {}

The above example shows two senders and two receivers. Their work is to print messages.

To know if you need to delete the message (if the message has been instancied in the heap memory), you can use the Message::setFlags(uint8_t) function. This is a free parameter to use. For example the first least significant bit can be set to 1 to inform the receiver of the message to delete the message. In the above example, all messages are created with the new operator, so we don't need flags.

The message box can store an unlimited messages. You can explicitly limit the emission of messages with a semaphore. Take a look to this example. A wrapper has been created to send messages.

#include <os48.h>

using namespace os48;

Scheduler* scheduler = Scheduler::get();

Task* tProducer1 = NULL;
Task* tProducer2 = NULL;
Task* tConsumer = NULL;

void setup() {
  Serial.begin(9600);
  tProducer1 = scheduler->createTask(&producer1, 60);
  tProducer2 = scheduler->createTask(&producer2, 60);
  tConsumer = scheduler->createTask(&consumer, 60);

  scheduler->start();
}

const byte code = 1;
Semaphore limiter(3); //message box is limited to 3 messages max.

void sendAMessageToConsumer(os48::Message* msg)
{
  limiter.acquire();
  scheduler->sendMessage(tConsumer, msg);    
}

void producer1()
{
  for(;;)
  {
     sendAMessageToConsumer(new Message(code, "flooding from producer 1!"));
  }  
}


void producer2()
{
  for(;;)
  {
     sendAMessageToConsumer(new Message(code, "flooding from producer 2!"));   
  }  
}

void consumer()
{
  for (;;)
  {
    Message* mess = task()->waitNextMessage(code);
    limiter.release();
    OS48_ATOMIC_BLOCK
    {
      Serial.print("Message is: ");
      Serial.println(mess->getBody().bStr);
      delete mess;
    } 
  }
}

void loop() {}

Limiting the number of messages for a task is a good practice to avoid memory overflows if you plan to send frequently messages to a task.

Screenshot

You can abort Task::waitNextMessage() with Task::resume(). The function has also a timeout argument if you don't want to wait indefinitely.

In order to clear all messages of a task, you can call many times the function Task::getNextMessage() with 0 as message code. When the function returns NULL the message list is empty.