Ecore_Thread - API overview

Working with threads is hard.

Ecore helps to do so a bit easier, but as the example in ecore_thread_example.c shows, there's a lot to consider even when doing the most simple things.

We'll be going through this thorough example now, showing how the differents aspects of Ecore_Thread are used, but users are encourage to avoid threads unless it's really the only option, as they always add more complexity than the program usually requires.

Ecore Threads come in two flavors, short jobs and feedback jobs. Short jobs just run the given function and are more commonly used for small tasks where the main loop does not need to know how the work is going in between. The short job in our example is so short we had to artificially enlarge it with sleep(). Other than that, it also uses threads local data to keep the data we are working with persistent across different jobs ran by the same system thread. This data will be freed when the no more jobs are pending and the thread is terminated. If the data doesn't exist in the thread's storage, we create it and save it there for future jobs to find it. If creation fails, we cancel ourselves, so the main loop knows that we didn't just exit normally, meaning the job could not be done. The main part of the function checks in each iteration if it was canceled by the main loop, and if it was, it stops processing and clears the data from the storage (we assume cancel means no one else will need this, but this is really application dependent).

static void
_local_data_free(void *data)
{
Thread_Data *td = data;
char *str;
EINA_LIST_FREE(td->list, str)
{
printf("Freeing string: %s\n", str);
free(str);
}
free(td);
}
static void
_short_job(void *data EINA_UNUSED, Ecore_Thread *th)
{
Thread_Data *td;
int i;
td = ecore_thread_local_data_find(th, "data");
if (!td)
{
td = calloc(1, sizeof(Thread_Data));
if (!td)
{
return;
}
ecore_thread_local_data_add(th, "data", td, _local_data_free,
}
for (i = 0; i < 10; i++)
{
char buf[200];
{
break;
}
snprintf(buf, sizeof(buf), "Thread %p: String number %d", th, i);
td->list = eina_list_append(td->list, strdup(buf));
sleep(1);
}
}

Feedback jobs, on the other hand, run tasks that will inform back to the main loop its progress, send partial data as is processed, just ping saying it's still alive and processing, or anything that needs the thread to talk back to the main loop.

static void
_feedback_job(void *data EINA_UNUSED, Ecore_Thread *th)
{
time_t t;
int i, count;
Feedback_Thread_Data *ftd = NULL;
App_Msg *msg;
char *name;
count = (int)(uintptr_t)ecore_thread_global_data_find("count");
for (i = 0; i < count; i++)
{
char buf[32];
snprintf(buf, sizeof(buf), "data%d", i);
if (!ftd)
continue;
if (eina_lock_take_try(&ftd->mutex))
break;
else
ftd = NULL;
}
if (!ftd)
return;
it = eina_file_ls(ftd->base);
if (!it)
goto the_end;
msg = calloc(1, sizeof(App_Msg));
t = time(NULL);
{
if (time(NULL) >= (t + 2))
{
break;
}

Finally, one more feedback job, but this one will be running outside of Ecore's pool, so we can use the pool for real work and keep this very light function unchecked. All it does is check if some condition is met and send a message to the main loop telling it it's time to close.

static void
_out_of_pool_job(void *data, Ecore_Thread *th)
{
App_Data *ad = data;
App_Msg *msg;
while (1)
{
int msgs;
eina_condition_wait(&ad->condition);
msgs = ad->msgs_received;
eina_lock_release(&ad->mutex);
if (msgs == ad->max_msgs)
{
msg = calloc(1, sizeof(App_Msg));
msg->all_done = 1;
return;
}
}
}

Every now and then the program prints its status, counting threads running and pending jobs.

static void
_print_status(void)
{
int active, pending_total, pending_feedback, pending_short, available;
pending_total = ecore_thread_pending_total_get();
pending_feedback = ecore_thread_pending_feedback_get();
pending_short = ecore_thread_pending_get();
printf("Status:\n\t* Active threads: %d\n"
"\t* Available threads: %d\n"
"\t* Pending short jobs: %d\n"
"\t* Pending feedback jobs: %d\n"
"\t* Pending total: %d\n", active, available, pending_short,
pending_feedback, pending_total);
}

In our main loop, we'll be receiving messages from our feedback jobs using the same callback for both of them.

static void
_feedback_job_msg_cb(void *data, Ecore_Thread *th, void *msg_data)
{
App_Data *ad = data;
App_Msg *msg = msg_data;
char *str;

The light job running out of the pool will let us know when we can exit our program.

if (msg->all_done)
{
free(msg);
return;
}

Next comes the handling of data sent from the actual worker threads, always remembering that the data belongs to us now, and not the thread, so it's our responsibility to free it.

_print_status();
if (!msg->list)
printf("Received an empty list from thread %p\n", th);
else
{
int i = 0;
printf("Received %d elements from threads %p (printing first 5):\n",
eina_list_count(msg->list), th);
EINA_LIST_FREE(msg->list, str)
{
if (i <= 5)
printf("\t%s\n", str);
free(str);
i++;
}
}

Last, the condition to exit is given by how many messages we want to handle, so we need to count them and inform the condition checking thread that the value changed.

eina_lock_take(&ad->mutex);
ad->msgs_received++;
eina_condition_signal(&ad->condition);
eina_lock_release(&ad->mutex);
free(msg);
}

When a thread finishes its job or gets canceled, the main loop is notified through the callbacks set when creating the task. In this case, we just print what happen and keep track of one of them used to exemplify canceling. Here we are pretending one of our short jobs has a timeout, so if it doesn't finish before a timer is triggered, it will be canceled.

static void
_thread_end_cb(void *data, Ecore_Thread *th)
{
App_Data *ad = data;
printf("Normal termination for thread %p.\n", th);
if (th == ad->thread_3)
ad->thread_3 = NULL;
}
static void
_thread_cancel_cb(void *data, Ecore_Thread *th)
{
App_Data *ad = data;
printf("Thread %p got cancelled.\n", th);
if (th == ad->thread_3)
ad->thread_3 = NULL;
}
static Eina_Bool
_cancel_timer_cb(void *data)
{
App_Data *ad = data;
if (ad->thread_3 && !ecore_thread_check(ad->thread_3))
ecore_thread_cancel(ad->thread_3);
return EINA_FALSE;
}

The main function does some setup that includes reading parameters from the command line to change its behaviour and test different results. These are:

  • -t <some_num> maximum number of threads to run at the same time.
  • -p <some_path> adds some_path to the list used by the feedback jobs. This parameter can be used multiple times.
  • -m <some_num> the number of messages to process before the program is signalled to exit.

Skipping some bits, we init Ecore and our application data.

printf("Initial max threads: %d\n", i);
memset(&appdata, 0, sizeof(App_Data));
appdata.max_msgs = 1;

If any paths for the feedback jobs were given, we use them, otherwise we fallback to some defaults. Always initializing the proper mutexes used by the threaded job.

if (!path_list)
{
Feedback_Thread_Data *ftd;
ecore_thread_global_data_add("count", (void *)3, NULL, EINA_FALSE);
ftd = calloc(1, sizeof(Feedback_Thread_Data));
ftd->name = strdup("data0");
#ifdef _WIN32
ftd->base = strdup("c:/windows/System32");
#else
ftd->base = strdup("/usr/bin");
#endif
eina_lock_new(&ftd->mutex);
ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
ftd = calloc(1, sizeof(Feedback_Thread_Data));
ftd->name = strdup("data1");
#ifdef _WIN32
ftd->base = strdup("c:/windows/Fonts");
#else
ftd->base = strdup("/usr/lib");
#endif
eina_lock_new(&ftd->mutex);
ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
ftd = calloc(1, sizeof(Feedback_Thread_Data));
ftd->name = strdup("data2");
#ifdef _WIN32
ftd->base = strdup("c:/windows/Help");
#else
ftd->base = strdup("/usr/lib");
#endif
eina_lock_new(&ftd->mutex);
ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
}
else
{
Feedback_Thread_Data *ftd;
char *str;
(void *)(uintptr_t)eina_list_count(path_list), NULL,
i = 0;
EINA_LIST_FREE(path_list, str)
{
char buf[32];
snprintf(buf, sizeof(buf), "data%d", i);
ftd = calloc(1, sizeof(Feedback_Thread_Data));
ftd->name = strdup(buf);
ftd->base = strdup(str);
eina_lock_new(&ftd->mutex);
ecore_thread_global_data_add(ftd->name, ftd, NULL, EINA_TRUE);
free(str);
i++;
}
}

Initialize the mutex needed for the condition checking thread

eina_lock_new(&appdata.mutex);
eina_condition_new(&appdata.condition, &appdata.mutex);

And start our tasks.

ecore_thread_feedback_run(_out_of_pool_job, _feedback_job_msg_cb, NULL,
NULL, &appdata, EINA_TRUE);
ecore_thread_run(_short_job, _thread_end_cb, _thread_cancel_cb, &appdata);
ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb,
_thread_end_cb, _thread_cancel_cb, &appdata,
appdata.thread_3 = ecore_thread_run(_short_job, _thread_end_cb,
_thread_cancel_cb, &appdata);
ecore_thread_feedback_run(_feedback_job, _feedback_job_msg_cb,
_thread_end_cb, _thread_cancel_cb, &appdata,

To finalize, set a timer to cancel one of the tasks if it doesn't end before the timeout, one more timer for status report and get into the main loop. Once we are out, destroy our mutexes and finish the program.

ecore_timer_add(1.0, _cancel_timer_cb, &appdata);
ecore_timer_add(2.0, _status_timer_cb, NULL);
_print_status();
eina_condition_free(&appdata.condition);
eina_lock_free(&appdata.mutex);
return 0;
}