Author: R. Koucha
Last update: 23-Jul-2015

Page principale
Page précédente


Approche détaillée des futex








s

Avant propos
Introduction
1. Des sémaphores système V aux futex
1.1. Exemple de mise en oeuvre d'un sémaphore système V
1.2. Les problèmes de performance
1.3. Principe des futex
1.4. Le programme ipc revu avec un futex
2. L'appel système futex()
2.1. Le prototype
2.2. Invocation
2.3. Les paramètres
2.4. Création et initialisation
2.5. Destruction
3. Exemples d'applications
3.1. Implémentation d'un mutex
3.1.1. Exécution du programme d'exemple
3.1.2. Amélioration des performances
3.2. Synchronisation sur fin de thread
3.3. Implémentation d'une barrière
4. Etude détaillée
4.1. Gestion des futex dans le noyau
4.2. Nombre de threads à réveiller
4.3. Optimisation intra processus
4.4. Terminaison anticipée des threads
4.4.1. La structure de données de l'utilisateur
4.4.2. La tête de liste
4.4.3. Prise de contrôle sur le futex
4.4.4. Mise en attente sur le futex
4.4.5. Le réveil du thread en attente
4.4.6. Limitations
4.5. Héritage de priorité
4.5.1. Inversion de priorité
4.5.2. Solution
4.6. Multiplexage
4.7. Le paramètre de temporisation
4.7.1. Temporisation relative
4.7.2. Temporisation absolue
4.7.3. Temporisation et héritage de priorité
4.8. Réveil sur condition
4.9. Changement implicite de futex
4.10. Autres opérations
5. Conclusion
6. Ressources
A propos de l'auteur

Avant propos

Cet article a été publié dans glmf_logo numéros 173, 175, 176 et 178.

Introduction

A l'origine, à travers les sémaphores des outils de communication inter-processus (IPC) système V, Unix et par la suite Linux offraient des mécanismes de synchronisation assez lourds en termes de performances. Véritables goulots d'étranglement à mesure de la généralisation de Linux sur des serveurs très sollicités, des processus multi-thread, des machines SMP et des environnements pseudo temps-réel, les sémaphores système V cèdent peu à peu la place à la notion de futex issue des travaux de H. Franke, R. Russel et M. Kirkwood décrits dans [1]. Linux propose ce mécanisme à partir de sa version 2.5.7.

Si un futex, contraction de l'expression anglo-saxonne "Fast User Space Mutex", était à l'origine destiné à l'implémentation de mutex efficaces, ils se sont généralisés comme des bases pour de nombreux mécanismes de synchronisation évolués (sémaphores, variables conditionnelles...). Le principe consiste à minimiser autant que possible le nombre d'appels systèmes coûteux en termes de performance. A l'instigation des développeurs de la GLIBC, cette solution particulièrement adaptée au multi-threading, a subi de nombreuses améliorations pour devenir la pierre angulaire des fonctions de synchronisation des threads POSIX.

L'utilisation des futex n'est pas aisée au premier abord. Il existe certes un manuel en ligne (i.e. man futex) mais il est succinct et n'est pas à jour du fait des évolutions constantes. L'utilisation par le programmeur moyen y est même déconseillée : "As these semantics involve writing non-portable assembly instructions, this in turn probably means that most users will  in fact be library authors and not general application developers." (sic)

Cet article a pour but de faire la lumière sur les futex afin de comprendre leur efficacité, d'appréhender le code source du noyau Linux qui les implémente et des librairies open source qui les utilisent pour en faciliter l'utilisation par tout programmeur.

1. Des sémaphores système V aux futex

1.1. Exemple de mise en oeuvre d'un sémaphore système V

Pour utiliser un sémaphore, il est d'abord nécessaire d'obtenir un identifiant via l'appel système semget(). Puis, les opérations sont effectuées à l'aide de l'appel système semop() tandis que la destruction est réalisée par l'appel système semctl().
Le programme décrit dans le listing 1, consiste en deux threads. Le second met à jour la variable globale data avec le compteur de secondes traduit sous forme de chaîne de caractères tandis que le premier affiche la valeur courante du compteur sur demande de l'opérateur. Les deux flux d'exécution se disputent la même ressource data pour la modifier ou la lire. Le conflit d'accès est résolu par le sémaphore semid qui synchronise les deux threads en garantissant un accès exclusif à la variable dans les deux sections critiques où elle est modifiée ou lue (cf. les lignes de code rouge dans le listing 1). Ainsi le thread principal lira toujours une valeur cohérente de la chaîne de caractères.

[...]
#define NB_SECONDS 20

static char *str_second[NB_SECONDS] =
{
  "zero", "un", "deux", "trois", "quatre", "cinq",
  "six", "sept", "huit", "neuf", "dix",
  "onze", "douze", "treize", "quatorze", "quinze",
  "seize", "dix sept", "dix huit", "dix neuf"
};

char data[100];

static int semid;

struct sembuf sembuf_for_P =  {0, -1, SEM_UNDO};
struct sembuf sembuf_for_V =  {0, +1, SEM_UNDO};

#define P(id)  semop(id, &sembuf_for_P, 1)
#define V(id)  semop(id, &sembuf_for_V, 1)

static void *thd_main(void *p)
{
unsigned int i = 0;

  while(1)
  {
    // Mise a jour du compteur en ASCII
    P(semid);
    strcpy(data, str_second[i]);
    V(semid);

    // Attente d'une seconde
    sleep(1);

    // Incrementation du compteur dans la plage [0, NB_SECONDS[
    i = (i + 1) % NB_SECONDS;
  }

  return NULL;
}

int main(int ac, char *av[])
{
int            rc;
pthread_t      tid;
unsigned short val = 1;

  // Création du semaphore
  semid = semget(IPC_PRIVATE, 1, IPC_CREAT|0777);

  // Initialisation du semaphore a la valeur 1
  semctl(semid, 0, SETALL, &val);

  // Creation du thread
  rc = pthread_create(&tid, NULL, thd_main, NULL);

  // Interaction avec l'operateur
  while(fgetc(stdin) != EOF)
  {
    // Affichage de la valeur ASCII courante du compteur
    P(semid);
    printf("Compteur courant: %s\n", data);
    V(semid);
  }
}
Listing 1 : Le fichier source ipc.c

Après compilation et exécution du programme, il suffit de taper sur la touche ENTREE pour que la fonction fgetc(), dans le thread principal, retourne et permettent l'affichage de la valeur courante du compteur mis à jour en parallèle par le thread secondaire toutes les secondes :

$ gcc ipc.c -o ipc -lpthread
$ ./ipc

Compteur courant: dix

Compteur courant: treize
<CTRL D>
$

L'utilitaire strace (avec l'option -f pour espionner tous les threads du processus) donne une très bonne idée des appels au système Linux pendant l'exécution de ce programme :

$ strace -f ./ipc
[...]
[pid 13265] semop(16678923, 0x8049ab0, 1) = 0
[pid 13265] semop(16678923, 0x8049ab6, 1) = 0
[pid 13265] rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
[pid 13265] rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
[pid 13265] rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
[pid 13265] nanosleep({1, 0}, {1, 0})   = 0
[...]
[pid 13264] <... read resumed> "\n", 4096) = 1
[pid 13264] semop(16678923, 0x8049ab0, 1) = 0
[pid 13264] write(1, "Compteur courant: six\n", 22Compteur courant: six
) = 22
[pid 13264] semop(16678923, 0x8049ab6, 1) = 0
[pid 13264] read(0,  <unfinished ...>
[pid 13265] <... nanosleep resumed> {1, 0}) = 0
[pid 13265] semop(16678923, 0x8049ab0, 1) = 0
[pid 13265] semop(16678923, 0x8049ab6, 1) = 0
[pid 13265] rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
[pid 13265] rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
[pid 13265] rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
[pid 13265] nanosleep({1, 0}, {1, 0})   = 0
[...]

En rouge, le thread secondaire avec le pid 13265 est dans sa boucle infinie de mise à jour du compteur toutes les secondes : nous voyons les appels aux services semop() dans le noyau pour effectuer les opérations P() et V() sur le sémaphore afin de protéger la mise à jour de l'emplacement mémoire data. Ensuite le thread s'endort une seconde avec l'appel à la fonction sleep() de la librairie C qui aboutit à l'appel système nanosleep().
En bleu, le thread principal avec le pid 13264, est en attente sur l'entrée standard via la fonction fgetc() de la librairie C qui se traduit par un appel bloquant au service read() du noyau. Quand l'opérateur appuie sur la touche ENTREE, read() rend la main et le thread fait appel aux services semop() dans le noyau pour effectuer les opérations P() et V() sur le sémaphore afin de protéger la lecture de l'emplacement mémoire data et d'imprimer à l'écran son contenu via l'appel système write(), sous-jacent à la fonction printf() de la librairie C.

1.2. Les problèmes de performance

L'exemple précédent et notamment les informations affichées par l'outil strace, montrent que le thread secondaire a recours au noyau via l'appel semop() même s'il n'y a pas conflit d'accès sur data (tant que l'opérateur n'appuie pas sur la touche ENTREE, il n'y a pas conflit d'accès sur la ressource partagée). Ces appels systématiques au noyau par le thread secondaire sont consommateurs de temps CPU. Dans un contexte pseudo temps réel ou des applications nécessitant une grande vitesse d'exécution comme par exemple les outils de gestion de bases de données, ce comportement est rédhibitoire.

Pour remédier à ce problème de performance, il faut éviter de solliciter le noyau quand le sémaphore est libre. Le document [1] propose une solution à travers la notion de futex introduite à partir de la version 2.5.7 de Linux. Diverses corrections et améliorations ont été apportées dans les moutures suivantes du système d'exploitation.

1.3. Principe des futex

Pour l'accès à la ressource partagée (la variable data dans notre exemple), on différencie les situations non conflictuelles (i.e. aucune tâche n'a pris le contrôle dessus) de celles où il y a conflit d'accès (i.e. une tâche a le contrôle dessus). Dans le premier cas, les traitements sont réalisés en espace utilisateur. Le recours aux services du noyau se limite au second cas. C'est là que réside la grande différence entre les sémaphores système V et les futex en termes de performance : les premiers font systématiquement appel au noyau en cas de conflit ou non sur la ressource partagée tandis que les seconds ne sollicitent le noyau qu'en cas de conflit.

Plus précisément, un futex est une variable de type entier partagée par les threads ou les processus qui se synchronisent. Par conséquent, il peut être tout simplement défini globalement s'il n'est partagé que par les threads d'un même processus ou situé dans un segment de mémoire partagée s'il est partagé par des threads au sein de plusieurs processus.

La variable ainsi partagée contient un état modifié en espace utilisateur à l'aide d'opérations atomiques (cf. [3]). Quand l'état de la variable indique qu'il n'y a pas de conflit, aucun appel système n'est nécessaire. Par contre, s'il y a conflit, le noyau est sollicité à l'aide de l'appel système futex() pour mettre en sommeil la tâche appelante ou réveiller des tâches en attente. Ce mécanisme relativement simple est la base pour implémenter des services beaucoup plus évolués comme les mutex et les rwlocks entre autres. Par exemple, les fonctions POSIX de synchronisation des threads dans la GLIBC/NPTL (i.e. Native Posix Thread Library décrite dans [18]) sont basées sur les futex : pthread_mutex_lock(), pthread_cond_signal(), pthread_rwlock_rdlock()...

N'étant pas toujours disponibles dans les langages de programmation évolués comme le langage C, les opérations atomiques nécessitent le recours au langage assembleur. Cela pose des problèmes de portabilité d'une architecture à l'autre et c'est la raison pour laquelle le manuel en ligne indique que les futex sont plutôt réservés aux développeurs des librairies telles que la GLIBC qui cachent les spécificités des différentes plate-formes par des services standardisés de haut niveau. Mais certains compilateurs proposent des facilités pour éviter la programmation en assembleur. Par exemple, [2] décrit les "built-ins" disponibles pour certaines opérations atomiques avec le compilateur GCC.

1.4. Le programme ipc revu avec un futex

La figure 1 décrit le comportement du programme d'exemple ipc du listing 1 quand il est basé sur un futex au lieu d'un IPC. Les opérations atomiques sont mises en valeur avec la couleur rouge. La séquence schématisée se situe au moment où l'opérateur a saisi la touche ENTREE pour débloquer la fonction fgetc() dans le thread principal. Le futex est représenté par la variable globale futex_var de type entier. Dans la boucle du thread principal, l'opération P() est remplacée par l'opération atomique qui consiste à positionner la variable futex_var à 1 si sa valeur courante est 0. Pendant ce temps, le thread secondaire effectue la même opération mais ne change pas le contenu de la variable futex_var car elle a été modifiée par le thread principal et a donc la valeur 1 au lieu de 0. Il se met alors en sommeil en appelant le service futex() du noyau. Le thread principal continue dans sa lancée en affichant le contenu de la variable data puis effectue l'opération V() qui consiste à mettre la valeur 0 dans la variable futex_var de manière atomique. Puis il appelle le service futex() du noyau pour réveiller le thread secondaire. Ce dernier teste de nouveau le contenu de la variable futex_var et le met à 1 atomiquement afin de bloquer le thread principal si celui-ci veut consulter la variable data pendant sa mise à jour. Une fois la mise à jour terminée, le futex est de nouveau remis à 0 atomiquement et les éventuels threads en attente sont réveillés par l'appel futex() avant d'appeler de nouveau sleep()...

fs1
Figure 1 : Le programme ipc basé sur un futex

Dans cette séquence, l'opération P() ne nécessite pas d'appel système futex() quand il n'y a pas conflit. Ce qui est une différence majeure par rapport aux sémaphores système V où P() entraîne forcément un appel système qu'il y ait conflit ou non ! Par contre, l'opération V() se termine systématiquement par un appel à futex() pour réveiller les éventuels threads en attente. Dans la suite, il sera montré qu'il est possible d'être encore plus efficace sur l'opération V() en appelant futex() seulement quand un thread est en sommeil.

2. L'appel système futex()

2.1. Le prototype

La section 7 du manuel en ligne donne une description générale tandis que la section 2 décrit l'appel système futex() avec le prototype ci-après :

$ man futex
FUTEX(2)                   Linux Programmer's Manual                  FUTEX(2)

NAME
       futex - Fast Userspace Locking system call

SYNOPSIS
       #include <linux/futex.h>

       #include <sys/time.h>

       int futex(int *uaddr, int op, int val, const struct timespec *timeout, int *uaddr2, int val3);
[...]

Le paramètre op, dont les différentes valeurs possibles sont définies dans le fichier d'en-tête <linux/futex.h>, détermine le comportement de la fonction.

2.2. Invocation

Considérons le fichier futex.c dans le listing 2 qui utilise l'appel système comme préconisé dans le manuel.

#include <linux/futex.h>

int main(int ac, char *av[])
{
int rc;

 rc = futex((int *)0, 0, 0, (const struct timespec *)0, (int *)0, 0);

 return 0;
}
Listing 2 : Le fichier futex.c

La compilation se termine en erreur car la fonction futex() n'existe pas dans la librairie C :

$ gcc futex.c
/tmp/ccGwg1Zs.o: In function `main':
futex.c:(.text+0x41): undefined reference to `futex'
collect2: ld returned 1 exit status

Mais le service existe au sein de Linux car il est répertorié dans le fichier d'en-tête standard <sys/syscall.h> :

$ cat /usr/include/bits/syscall.h | grep -i futex
#define SYS_futex __NR_futex

Il faut passer le code SYS_futex à la fonction syscall() pour déclencher le service dans le noyau comme le montre le listing 3 de la version modifiée du fichier futex.c.

#include <sys/syscall.h>

#define futex(a, b, c, d, e, f) syscall(SYS_futex, a, b, c, d, e, f)

int main(int ac, char *av[])
{
int rc;

 rc = futex((int *)0, 0, 0, (const struct timespec *)0, (int *)0, 0);

 return 0;
}
Listing 3 : Version modifiée de futex.c

2.3. Les paramètres

Le paramètre uaddr est l'adresse de la variable du futex. Le paramètre op est le code de l'opération à réaliser sur le futex. Les valeurs de ce dernier, définies dans le fichier d'en-tête <linux/futex.h>, déterminent la signification des paramètres qui suivent :

[...]
#define FUTEX_WAIT              0
#define FUTEX_WAKE              1
#define FUTEX_FD                2
#define FUTEX_REQUEUE           3
#define FUTEX_CMP_REQUEUE       4
#define FUTEX_WAKE_OP           5
#define FUTEX_LOCK_PI           6
[...]

Par exemple :

2.4. Création et initialisation

S'il est partagé par plusieurs threads au sein d'un même processus, le futex est une simple variable globale ou un entier alloué dynamiquement par malloc().
S'il est partagé par des threads s'exécutant dans divers processus, l'entier sera localisé dans un segment de mémoire partagé créé via un IPC (i.e. appels système shmget() et shmat()) ou par appel au service POSIX shm_open() suivi de mmap()...
Nous verrons ces différentes méthodes de définition des futex à travers les programmes d'exemples de cet article.

2.5. Destruction

Détruire un futex signifie en premier lieu de n'avoir aucun thread en attente dessus afin que les structures de données qui lui sont associées dans le noyau soient désallouées. Ensuite, côté espace utilisateur, la destruction en elle même est fonction de la méthode utilisée pour l'allouer. Si c'est une simple variable globale, le fait de ne plus avoir de thread en attente dessus fait que le futex n'existe plus même si la variable est persistente pendant la durée de vie du programme. Si c'est une allocation dynamique via malloc(), un free() le désallouera. Si c'est un mmap(), on utilisera munmap()...

3. Exemples d'applications

3.1. Implémentation d'un mutex

Un mutex est un verrou à deux états : ouvert ou fermé. Il est typiquement utilisé pour protéger les portions de code qui nécessitent un accès exclusif à une ressource partagée entre threads concurrents. Ces portions de code sont appelées "sections critiques". En général, deux opérations sont définies : LOCK() et UNLOCK() pour respectivement verrouiller et déverrouiller la section critique. L'exemple du listing 1 est une application de ce principe : les sections critiques sont les portions de code encadrées par les opérations P() et V() (terminologie préférée à LOCK()/UNLOCK() quand il s'agit de sémaphores). Le listing 4 en présente une version non plus basée sur les sémaphores système V mais sur un mutex construit à l'aide d'un futex. Les parties de code source communes entre les listings 1 et 4 ont été volontairement élaguées.

[...]
int futex_var;

#define futex_wait(addr, val) \
           syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL)
#define futex_wakeup(addr, nb) \
           syscall(SYS_futex, addr, FUTEX_WAKE, nb)

static void LOCK(int *f)
{
int old;

  while (1)
  {
    old = __sync_val_compare_and_swap(f, 0, 1);

    if (0 == old)
    {
      // Le futex etait libre car il contenait la valeur 0
      // et maintenant il contient la valeur 1
      return;
    }
    else
    {
      // Le mutex contient la valeur 1 : l'autre thread l'a pris
      // ==> Attente tant que le futex a la valeur 1
      futex_wait(f, 1);
    }
  }
}

static void UNLOCK(int *f)
{
int old;

  old = __sync_fetch_and_sub(f, 1);

  futex_wakeup(f, 1);
}

static void *thd_main(void *p)
{
unsigned int i = 0;

  while(1)
  {
    // Mise a jour du compteur en ASCII
    LOCK(&futex_var);
    strcpy(data, str_second[i]);
    UNLOCK(&futex_var);

    // Attente d'une seconde
    sleep(1);

    // Incrementation du compteur dans la plage [0, NB_SECONDS[
    i = (i + 1) % NB_SECONDS;
  }

  return NULL;
}

int main(int ac, char *av[])
{
int            rc;
pthread_t      tid;
unsigned short val = 1;

  // Creation du thread
  rc = pthread_create(&tid, NULL, thd_main, NULL);

  // Interaction avec l'operateur
  while(fgetc(stdin) != EOF)
  {
    // Affichage de la valeur ASCII courante du compteur
    LOCK(&futex_var);
    printf("Compteur courant: %s\n", data);
    UNLOCK(&futex_var);
  }
}
Listing 4 : le fichier source mutex.c

Le futex est représenté par la variable globale futex_var.
Les opérations FUTEX_WAIT et FUTEX_WAKE sont respectivement définies sous la forme des macro instructions futex_wait() et futex_wakeup(). L'opération LOCK() est très intéressante. Elle doit effectuer une opération atomique en espace utilisateur et en fonction du résultat elle verrouille le mutex ou met le thread appelant en sommeil. L'opération en question est "compare and swap" (CAS) décrite dans [4] qui reçoit trois paramètres. Elle consiste à comparer le contenu du futex (premier paramètre) avec la valeur 0 (contenu du second paramètre). S'il y a égalité, alors le futex prend la valeur 1 (contenu du troisième paramètre). S'il n'y a pas égalité, le futex n'est pas modifié. Dans tous les cas, l'opération retourne la valeur du futex juste avant l'appel. Normalement, une telle opération nécessite d'être écrite en assembleur.
Par exemple, sur Intel, c'est l'instruction cmpxchg (cf. [5]) qu'il faut préfixer par l'instruction lock pour les architectures SMP afin de garantir l'usage exclusif de l'emplacement mémoire au processeur en cours d'exécution :

mov    0x8(%ebp),%eax        ; EAX = @f
mov    %eax,0xffffffe8(%ebp) ; old = EAX = @f
mov    $0x0,%ecx             ; ECX = 0
mov    $0x1,%edx             ; EDX = 1
mov    0xffffffe8(%ebp),%ebx ; EBX = old = @f
mov    %ecx,%eax             ; EAX = ECX = 0
lock cmpxchg %edx,(%ebx)     ; Si *EBX (f) == EAX (0) alors *EBX (f) = EDX (1)
                             ; Sinon EAX = *EBX (f)
mov    %eax,%edx             ; EDX = EAX
mov    %edx,0xfffffff8(%ebp) ; old = EDX = résultat de cmpxchg

Et voici l'équivalent sur une plate-forme PowerPC 32 bits qui se base sur la technique "load-link and store-conditional" (LL/SC) à l'aide des instructions lwarx et stwcx comme expliqué dans [6] :

stwu    r1,-48(r1)
mflr    r0
stw     r0,52(r1)
stw     r31,44(r1)
mr      r31,r1
stw     r3,24(r31)
lwz     r9,24(r31)
li      r11,1
sync   
lwarx   r0,0,r9
cmpwi   r0,0
bne-    100005bc <LOCK+0x40>
mr      r10,r11
stwcx.  r10,0,r9
bne-    100005a0 <LOCK+0x24>
isync
stw     r0,8(r31)
lwz     r0,8(r31)

Heureusement quand on utilise GCC, on peut compter sur les "built-ins" (cf. [2]) qui permettent d'avoir un code relativement portable d'une architecture à l'autre car on laisse au compilateur le soin de générer les instructions adaptées à la plate-forme utilisée. Ainsi nul besoin d'être un spécialiste de l'assembleur de toutes les machines auxquelles le projet est destiné : __sync_val_compare_and_swap() répond au besoin.

En résumé, le résultat de la "built-in" de GCC permet de savoir si le futex est à 0 ou à 1. Dans le premier cas, cela veut dire que le mutex est libre et qu'on peut le verrouiller (i.e. __sync_val_compare_and_swap() met le futex à la valeur 1). Dans le second cas, le mutex est verrouillé (i.e. le futex a déjà la valeur 1) et donc le thread appelant se met en sommeil via l'opération FUTEX_WAIT de l'appel système futex() jusqu'à ce que le mutex soit déverrouillé via la fonction UNLOCK().

L'opération UNLOCK() est beaucoup plus simple que LOCK() : le futex est repassé à la valeur 0 grâce à la "built-in" __sync_fetch_and_sub() de GCC qui le décrémente atomiquement. Son principe consiste à retourner ce qui est stocké à l'adresse passée en premier paramètre (opération fetch dans le nom) avant de lui soustraire la valeur passée en deuxième paramètre (opération sub dans le nom). Il faut voir cette opération comme la version atomique de la post-soustraction "(*f)--" du langage C. Puis les éventuels threads en attente sont réveillés via l'opération FUTEX_WAKE de l'appel système futex().

Le reste du programme reste identique à celui du listing 1 sauf qu'il n'y a plus d'IPC à initialiser car la variable globale futex_var s'est substituée et que les appels P() et V() sont respectivement remplacés par LOCK() et UNLOCK().

3.1.1. Exécution du programme d'exemple

Le fichier source du listing 5 peut être compilé comme suit sur plate-forme Intel 32 bits :

$ gcc mutex.c -o mutex -lpthread

Pour certaines architectures de PC, on notera le recours à l'option -march du compilateur afin qu'il ne génère pas du code i386 qui ne supporte pas les opérations générées par les "built-ins"__sync_val_compare_and_swap() et __sync_fetch_and_sub() :

$ gcc mutex.c -o mutex -lpthread
/tmp/ccOfVXw3.o: In function `LOCK':
mutex.c:(.text+0x1d): undefined reference to `__sync_val_compare_and_swap_4'
/tmp/ccOfVXw3.o: In function `UNLOCK':
mutex.c:(.text+0x6e): undefined reference to `__sync_fetch_and_sub_4'
collect2: ld returned 1 exit status
$ gcc mutex.c -o mutex -lpthread -march=i486

L'exécution du programme mutex avec l'outil strace met tout de suite en avant la grande différence avec le programme ipc basé sur les IPC : il y a moins d'appels système. Comme précédemment, les traces relatives au thread principal sont en bleu tandis que celles du thread secondaire sont en rouge.

$ strace -f ./mutex
[...]
[pid 15956] rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
[pid 15956] rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
[pid 15956] rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
[pid 15956] nanosleep({1, 0}, {1, 0})   = 0
[pid 15956] futex(0x8049a20, FUTEX_WAKE, 1) = 0
[pid 15956] rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
[pid 15956] rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
[pid 15956] rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
[pid 15956] nanosleep({1, 0}, <unfinished ...>
[pid 15955] <... read resumed> "\n", 4096) = 1
[pid 15955] fstat64(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(136, 3), ...}) = 0
[pid 15955] mmap2(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0xb7f3d000
[pid 15955] write(1, "Compteur courant: six\n", 22Compteur courant: six
) = 22
[pid 15955] futex(0x8049a20, FUTEX_WAKE, 1) = 0
[pid 15955] read(0,  <unfinished ...>
[pid 15956] <... nanosleep resumed> {1, 0}) = 0
[pid 15956] futex(0x8049a20, FUTEX_WAKE, 1) = 0
[pid 15956] rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
[pid 15956] rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
[pid 15956] rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
[pid 15956] nanosleep({1, 0}, {1, 0})   = 0
[...]

Sur l'opération LOCK(), le thread vérifie en espace utilisateur (donc sans appel système !) si le futex est nul et le met à 1 de manière atomique pour le verrouiller. Cette opération n'engendre un appel système futex() avec l'opération FUTEX_WAIT seulement si le futex vaut 1 (i.e. verrouillé par l'autre thread). Dans la fonction P() équivalente, le programme du listing 1 exécute systématiquement l'appel système semop(). Par contre, les deux threads appellent toujours le système avec l'opération FUTEX_WAKE via UNLOCK() une fois le futex remis atomiquement à 0 afin de réveiller l'éventuel autre thread en attente. On n'a donc économisé les appels système que sur l'opération de verrouillage. Nous allons voir comment on peut aussi optimiser le nombre d'appels au noyau lors du déverrouillage.

3.1.2. Amélioration des performances

Il est possible d'aller plus loin dans la minimisation des appels au système. En effet, le traçage du programme mutex montre qu'on a un appel systématique au service système FUTEX_WAKE lors de l'appel à la fonction UNLOCK() alors que la plupart du temps, il n'y a pas de thread en attente sur le futex. Dans [8], Ulrich Drepper propose une solution à ce problème en ajoutant un état supplémentaire au futex. En d'autres termes, il peut prendre les valeurs 0, 1 et 2 qui signifient respectivement :
L'appel au système pour exécuter l'opération FUTEX_WAKE ne se fera donc qu'à la condition où le futex est dans l'état 2 (i.e. des threads sont en attente). Les fonctions LOCK() et UNLOCK() sont revues comme indiqué dans le listing 5. Elles sont légèrement plus complexes que leur version d'origine dans le listing 4. Le reste du programme restant identique au listing 4, n'est pas reproduit.

[...]
static void LOCK(int *f)
{
int old;

  old = __sync_val_compare_and_swap(f, 0, 1);

  switch(old)
  {
    case 0:
    {
      // Le futex etait libre car il contenait la valeur 0
      // et maintenant il contient la valeur 1 car au moment
      // de l'opération atomique il n'y avait pas de thread
      // en attente
      return;
    }
    break;

    default:
    {
      // Le mutex contient la valeur 1 ou 2 : il est occupe car
      // un autre thread l'a pris

      // On positionne le futex a la valeur 2 pour indiquer qu'il
      // y a des threads en attente
      old = __sync_lock_test_and_set(f, 2);

      while (old != 0)
      {
        // Attente tant que le futex a la valeur 2
        futex_wait(f, 2);

        // Le futex a soit la valeur 0 (libre) ou 1 ou 2
        old = __sync_lock_test_and_set(f, 2);
      }
    }
    break;
  }
}

static void UNLOCK(int *f)
{
int old;

  old = __sync_fetch_and_sub(f, 1);

  switch(old)
  {
    case 1:
    {
      // Il n'y avait pas de thread en attente
      // ==> Personne a reveiller
    }
    break;

    case 2:
    {
      // Il y avait des threads en attente
      // Le futex a la valeur 1, on le force à 0
      old = __sync_lock_test_and_set(f, 0);

      // Reveil des threads en attente
      futex_wakeup(f, 1);
    }
    break;

    default:
    {
      // Impossible
    }
  }
}
[...]
Listing 5 : Amélioration de LOCK()/UNLOCK()

En cas de conflit, la fonction LOCK() force le futex à la valeur 2 pour indiquer qu'il y a un thread en attente. Pour cette opération qui doit être atomique, GCC propose la "built-in" __sync_lock_test_and_set(). Désassemblée sur plateforme Intel, cette "built-in" n'est autre que l'instruction xchg (le préfixe Intel lock n'est pas utile pour cette dernière car le manuel [5] indique que cette instruction effectue implicitement l'opération lock quand un des opérandes est une adresse en mémoire). Désassemblée sur architecture Intel 32 bits, l'instruction en langage C "old = __sync_lock_test_and_set(f, 2)" se traduit par :

mov    0x8(%ebp),%eax        ; EAX = f
mov    $0x2,%edx             ; EDX = 2
xchg   %edx,(%eax)           ; *f = 2 et EDX = ancienne valeur de *f
mov    %edx,0xfffffff8(%ebp) ; old = ancienne valeur de *f

La fonction UNLOCK() quant à elle, ne va faire l'appel système futex() qu'à la condition où le futex est dans l'état 2 indiquant un thread en attente. Dans ce cas, il faut bien-entendu au préalable forcer atomiquement le futex à la valeur 0 pour que le thread réveillé le voit libre.

Après compilation de cette nouvelle mouture du programme mutex, l'outil strace montre qu'il n'y a plus d'appel systématique à futex() dans la fonction UNLOCK() :

$ gcc mutex_1.c -o mutex_1 -lpthread
$ strace -f ./mutex_1
[...]
[pid 24759] rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
[pid 24759] rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
[pid 24759] rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
[pid 24759] nanosleep({1, 0}, {1, 0})   = 0
[pid 24759] rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
[pid 24759] rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
[pid 24759] rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
[pid 24759] nanosleep({1, 0},
 <unfinished ...>
[pid 24758] <... read resumed> "\n", 4096) = 1
[pid 24758] fstat64(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(136, 3), ...}) = 0
[pid 24758] mmap2(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0xb7f39000
[pid 24758] write(1, "Compteur courant: trois\n", 24Compteur courant: trois
) = 24
[pid 24758] read(0,  <unfinished ...>
[pid 24759] <... nanosleep resumed> {1, 0}) = 0
[pid 24759] rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
[pid 24759] rt_sigaction(SIGCHLD, NULL, {SIG_DFL, [], 0}, 8) = 0
[pid 24759] rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
[pid 24759] nanosleep({1, 0}, {1, 0})   = 0
[...]

Pour le lecteur qui voudrait aller plus loin sur le sujet de l'optimisation, [15] propose un algorithme qui serait encore plus efficace en ajoutant un quatrième état aux trois précédemment définis pour le futex :

3.2. Synchronisation sur fin de thread

Ici nous voyons un cas de synchronisation entre un service du noyau de création d'un thread et l'espace utilisateur. Le programme clone du listing 6 est un exemple de création d'un thread. Le thread créé par le service pthread_create() affiche le message "Bonjour de la part du thread secondaire" avant de se terminer et le thread principal après avoir attendu la fin du thread secondaire via le service pthread_join() affiche le message "Fin du programme".

[...]
#define STR_BONJOUR "Bonjour de la part du thread secondaire\n"
#define STR_FIN "Fin du programme\n"

void *thd_main(void *p)
{
  write(1, STR_BONJOUR, sizeof(STR_BONJOUR) - 1);

  return NULL;
} // thd_main

int main(int ac, char *av[])
{
pthread_t tid;

  // Creation du thread
  pthread_create(&tid, NULL, thd_main, NULL);

  // Attente de la fin du thread
  pthread_join(tid, NULL);

  write(1, STR_FIN, sizeof(STR_FIN) - 1);

  return 0;
} // main
Listing 6 : Synchronisation sur fin de thread

L'intérêt de cet exemple réside dans la manière dont les threads se synchronisent de sorte à ne pas terminer le thread principal donc le processus dans son ensemble, avant que le thread secondaire n'ait eu le temps de s'exprimer :

$ gcc clone.c -o clone
$ ./clone
Bonjour de la part du thread secondaire
Fin du programme

En utilisant l'utilitaire strace, on voit ce qui est réalisé par le système. Tout d'abord, pthread_create() alloue la pile, le TCB (de l'anglo-saxon Task Control Block ou descripteur de tâche en français) et la page de garde (ou zone rouge) de protection contre les débordements de la pile. Pour cela, l'appel système getrlimit() permet de connaître la taille par défaut de la pile (8192 x 1024 soit 8 MB). Ensuite mmap() permet d'allouer l'espace en mémoire virtuelle (à l'adresse 0x7fc556389000) pour contenir à la fois la zone rouge, la pile et le TCB d'une taille totale de 8392704 octets. Puis mprotect() sert à créer la page de garde en interdisant la lecture, l'écriture et l'exécution (i.e. PROT_NONE) sur  les 4096 premiers octets de cet espace (vu que la pile croît des adresses hautes vers les adresses basses) :

$ strace -f ./clone
[...]
getrlimit(RLIMIT_STACK, {rlim_cur=8192*1024, rlim_max=RLIM_INFINITY}) = 0
mmap(NULL, 8392704, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MAP_STACK, -1, 0) = 0x7fc556389000
brk(0)                                  = 0x23aa000
brk(0x23cb000)                          = 0x23cb000
mprotect(0x7fc556389000, 4096, PROT_NONE) = 0
[...]

L'espace mémoire étant alloué et initialisé, pthread_create() démarre le thread à l'aide de l'appel système clone(). Le thread ainsi créé a l'identifiant 7057 :

[...]
clone(Process 7057 attached
child_stack=0x7fc556b88fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc556b899d0, tls=0x7fc556b89700, child_tidptr=0x7fc556b899d0) = 7057
[...]

La figure 2 décrit l'espace mémoire du thread secondaire au moment de son exécution :

fs1
Figure 2 : Représentation mémoire du thread secondaire

Dans le thread principal, pthread_join() effectue l'opération FUTEX_WAIT sur le futex à l'adresse 0x7fc556b899d0 dans le TCB afin d'attendre la fin du thread secondaire tant que la variable du futex est égale à l'identifiant du thread secondaire (i.e. 7057) :

[...]
[pid  7056] futex(0x7fc556b899d0, FUTEX_WAIT, 7057, NULL <unfinished ...>
[...]

La synchronisation entre les deux threads est réalisée par le truchement des drapeaux CLONE_PARENT_SETTID et CLONE_CHILD_CLEARTID de l'appel système clone(). Il est ainsi demandé au noyau de stocker le numéro du thread créé à l'adresse parent_tidptr (i.e. 0x7fc556b899d0 qui est donc l'adresse du futex sur lequel le thread principal attend) puis de stocker 0 dans le futex child_tidptr (i.e. la même adresse 0x7fc556b899d0 que parent_tidptr)  et d'y réaliser l'opération FUTEX_WAKE lorsque le thread est terminé. Cela a donc pour conséquence de réveiller le thread principal et ainsi la pile du thread terminé peut être désallouée ou réutilisée pour un nouveau thread.

3.3. Implémentation d'une barrière

Une barrière au sens POSIX du terme (cf. [9]) est un endroit dans le programme où N threads décident de suspendre leur exécution avant de la reprendre dès qu'ils sont tous arrêtés. Cela permet par exemple de s'assurer que des threads aient exécuté un ensemble d'actions avant de reprendre leur exécution. Le listing 7 est une application où tous les threads du programme stockent leur identifiant au sens noyau Linux du terme (i.e. résultat de l'appel système gettid()) dans les entrées de tab[] avant de se mettre en attente sur la barrière jusqu'à ce que tous les threads aient renseigné la table. Une fois les entrées actualisées, tous les threads sont réveillés dont le thread principal qui affiche le contenu de tab[] à l'écran. Sans la barrière, on n'aurait aucunes garanties sur le niveau de remplissage de la table au moment de l'affichage car le thread principal peut prendre la main à tout moment et en particulier avant que certains threads secondaires n'aient eu le temps de mettre à jour le tableau !

[...]
int futex_var;

int nb_barrier;

pid_t *tab;

#define futex_wait(addr, val) \
           syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL)

#define futex_wakeup(addr, nb) \
           syscall(SYS_futex, addr, FUTEX_WAKE, nb)

#define gettid() syscall(__NR_gettid)

void barrier_wait(void)
{
int old;

  old = __sync_fetch_and_sub(&futex_var, 1);

  if (1 == old)
  {
    futex_wakeup(&futex_var, nb_barrier - 1);
    return;
  }

  // On se met en attente tant que le futex ne vaut pas 0
  old = old - 1;
  do
  {
    futex_wait(&futex_var, old);

    old = __sync_fetch_and_sub(&futex_var, 0);
    if (0 == old)
    {
      break;
    }
  } while(1);
}

void *thd_main(void *p)
{
int *idx = (int *)p;

  tab[*idx] = gettid();

  barrier_wait();

  return NULL;
}

int main(int ac, char *av[])
{
pthread_t *tid;
int        i;
int       *idx;

  nb_barrier = 4;
  futex_var = nb_barrier;

  tab = (pid_t *)malloc(nb_barrier * sizeof(pid_t));
  idx = (int *)malloc(nb_barrier * sizeof(int));
  tid = (pthread_t *)malloc(nb_barrier * sizeof(pthread_t));

  tab[0] = gettid();

  for (i = 1; i < nb_barrier; i ++)
  {
    idx[i] = i;
    pthread_create(&(tid[i]), NULL, thd_main, (void *)&(idx[i]));
  }

  barrier_wait();

  free(idx);

  for (i = 0; i < nb_barrier; i ++)
  {
    printf("tab[%d] = %d\n", i, tab[i]);
  }

  free(tab);

  for (i = 1; i < nb_barrier; i ++)
  {
    pthread_join(tid[i], NULL);
  }

  free(tid);

  return 0;
}
Listing 7 : Implémentation d'une barrière

La compilation et l'exécution de ce programme donne le résultat suivant :

$ gcc barrier.c -o barrier -lpthread
$ ./barrier
tab[0] = 3656
tab[1] = 3657
tab[2] = 3658
tab[3] = 3659

Pour réaliser une barrière à partir d'un futex, il suffit de l'initialiser avec le nombre de threads à mettre en attente. Puis chaque thread effectue une décrémentation atomique de ce dernier et se suspend tant que le futex n'a pas atteint la valeur 0. Le thread pour lequel la décrémentation donne 0, réveille les autres. Encore une fois, lorsqu'on utilise GCC, nul besoin de produire du code assembleur pour la décrémentation atomique : on utilise la built-in __sync_fetch_and_sub() vue plus haut au § 3.1. Dans le programme, la fonction barrier_wait() utilise cette facilité.
La variable globale futex_var est préalablement initialisée avec le nombre de threads à synchroniser en début de fonction main(). Quand la valeur 1 est retournée, cela signifie que la nouvelle valeur du futex est à 0 et par conséquent on peut réveiller les (nb_barrier - 1) threads en attente via l'opération FUTEX_WAKE de futex(). Tant que __sync_fetch_and_sub() retourne une valeur supérieure à 1, cela veut dire que le futex n'est pas encore à 0 et qu'il faut se mettre en attente en invoquant l'appel système futex() avec l'opération FUTEX_WAIT. L'attente est dans une boucle afin de gérer les cas de croisements où N threads décrémentent le futex juste après qu'un thread l'ait décrémenté mais avant qu'il ne se soit mis en attente. La valeur "old - 1" sur laquelle ce dernier thread se met en attente n'est plus "old - 1" mais "old - 1 - N" et par conséquent, l'opération FUTEX_WAIT retourne immédiatement le compte-rendu -1 avec l'erreur EAGAIN. La boucle récupère donc la valeur courante du futex (astuce de la soustraction atomique avec la valeur 0) et ne remet en attente le thread courant sur cette valeur qu'à la condition où elle diffère de 0. L'outil strace permet d'observer ce fonctionnement :

$ strace -f ./barrier
[...]
[pid  3656] futex(0x8049950, FUTEX_WAIT, 3, NULL <unfinished ...>
[...]
[pid  3657] gettid( <unfinished ...>
[pid  3658] gettid( <unfinished ...>
[pid  3659] gettid( <unfinished ...>
[pid  3657] <... gettid resumed> )      = 3657
[pid  3658] <... gettid resumed> )      = 3658
[pid  3659] <... gettid resumed> )      = 3659
[pid  3657] futex(0x8049950, FUTEX_WAIT, 2, NULL <unfinished ...>
[pid  3658] futex(0x8049950, FUTEX_WAIT, 1, NULL <unfinished ...>
[pid  3659] futex(0x8049950, FUTEX_WAKE, 3 <unfinished ...>
[pid  3657] <... futex resumed> )       = -1 EAGAIN (Resource temporarily unavailable)
[pid  3658] <... futex resumed> )       = -1 EAGAIN (Resource temporarily unavailable)
[pid  3659] <... futex resumed> )       = 1
[pid  3656] <... futex resumed> )       = 0
[...]
[pid  3656] write(1, "tab[0] = 3656\n", 14tab[0] = 3656) = 14
[pid  3656] write(1, "tab[1] = 3657\n", 14tab[1] = 3657) = 14
[pid  3656] write(1, "tab[2] = 3658\n", 14tab[2] = 3658) = 14
[pid  3656] write(1, "tab[3] = 3659\n", 14tab[3] = 3659) = 14
[...]

Dans cette exécution, le premier thread à appeler futex() avec l'opération FUTEX_WAIT est le thread principal d'identifiant 3656. La valeur du futex est "4 - 1" soit 3. Ensuite les threads secondaires d'identifiants 3657, 3658 et 3659 prennent la main pour appeler gettid() puis futex(). Le futex prend successivement les valeurs 2, puis 1 comme on peut le voir en troisième paramètre de futex(). Lors de ce dernier appel par les threads 3657 et 3658, la valeur du futex change du fait de la décrémentation concurrente et par conséquent, l'appel système retourne -1 avec le code d'erreur EAGAIN. C'est le thread 3659 qui décrémente le futex pour la dernière fois et réveille tous les autres avec l'opération FUTEX_WAKE de l'appel système futex(). Le troisième paramètre est 3 car sur les 4 threads qui se synchronisent, seuls 3 sont en attente vu que le dernier est celui qui réveille les autres.

Dans la GLIBC/NPTL, les barrières sont gérées par les services tels que pthread_barrier_init() ou pthread_barrier_wait(). Les développeurs ont fait le choix d'une implémentation légèrement plus lourde en utilisant un mutex basé sur un futex (comme étudié précédemment) pour protéger le compteur de threads en attente. De plus, la fonction pthread_barrier_wait() retourne 0 pour tous les threads éveillés et PTHREAD_BARRIER_SERIAL_THREAD pour le thread qui provoque le réveil des autres (i.e. le dernier à atteindre la barrière !). Dans notre exemple, il suffit de changer la fonction barrier_wait() comme indiqué en rouge dans le listing 8 pour avoir un comportement analogue : le thread qui appelle futex() avec l'opération FUTEX_WAKE est celui qui reçoit le compte-rendu BARRIER_SERIAL_THREAD.

[...]
#define BARRIER_SERIAL_THREAD 1

int barrier_wait(void)
{
int old;

  old = __sync_fetch_and_sub(&futex_var, 1);

  if (1 == old)
  {
    futex_wakeup(&futex_var, nb_barrier - 1);
    return BARRIER_SERIAL_THREAD;
  }

  // On se met en attente tant que le futex
  // ne vaut pas 0
  old = old - 1;
  do
  {
    futex_wait(&futex_var, old);

    old = __sync_fetch_and_sub(&futex_var, 0);
    if (0 == old)
    {
      break;
    }
  } while(1);

  return 0;
}
[...]
Listing 8 : Retour particulier pour le thread qui provoque le réveil

4. Etude détaillée

Ce paragraphe aborde la face cachée du concept de futex dans la mesure où il donne une vue détaillée des traitements réalisés au sein du noyau de Linux. Cette connaissance permet d'utiliser les futex de manière efficace et robuste.

4.1. Gestion des futex dans le noyau

La gestion des futex par le noyau Linux est configurable par le drapeau CONFIG_FUTEX (fichier init/Kconfig). Les traitements sont essentiellement effectués dans le fichier source kernel/futex.c. La structure de donnée principale décrite en figure 3, est une table de hachage (cf. [7]) nommée futex_queues. Chaque entrée est une structure futex_hash_bucket.

2
Figure 3 : Les futex dans le noyau Linux

Le futex dont l'adresse virtuelle est fournie par l'espace utilisateur via l'appel système futex() est converti, par la fonction get_futex_key(), en une clé unique représentée par l'union futex_key définie dans le fichier include/linux/futex.h :

union futex_key {
struct {
unsigned long pgoff;
struct inode *inode;
int offset;
} shared;
struct {
unsigned long address;
struct mm_struct *mm;
int offset;
} private;
struct {
unsigned long word;
void *ptr;
int offset;
} both;
};

Passée à la fonction de hachage hash_futex(), la clé est transformée en index dans la table futex_queues. A chaque entrée de la table de hachage est associée une liste de priorité (plist définie dans le fichier d'en-tête include/linux/plist.h). C'est en fait une liste de listes qui classe ses éléments futex_q par valeur croissante du champ prio (flèches rouges sur la figure 3) et classe les éléments dont le champ prio est identique en "First In, First Out" (FIFO représentée par les flèches vertes sur la figure 3). Chaque futex_q est associé via le champ task, à un thread en attente sur un futex identifié par sa clé dans le champ key et est alloué dans la pile du thread (i.e. variable locale de l'appel système futex()).
Il convient de préciser que du point de vue de l'utilisateur, plus la valeur numérique de la priorité d'un thread est grande, plus il est prioritaire. Il suffit de se reporter au manuel en ligne de l'appel système sched_setscheduler(). Cela est perturbant car en interne, le noyau Linux  et notamment l'ordonnanceur, utilisent la logique inverse. Linux supporte les threads temps réel (politiques d'ordonnancement SCHED_RR et SCHED_FIFO) et les threads temps partagé (politique d'ordonnancement SCHED_OTHER entre autres). Les appels système sched_get_priority_min() et sched_get_priority_max() retournent respectivement les valeurs minimales et maximales pour les priorités en fonction du type d'ordonnancement comme indiqué dans le tableau de la figure 4.

s5
Figure 4 : Plages de priorités par politique d'ordonnancement

Les threads temps réel sont toujours plus prioritaires que les threads temps partagé vu que ces derniers ont la priorité 0 alors que les premiers ont une priorité minimale de 1. Pour garantir un temps de réponse optimal, Linux tient compte de la priorité des threads lors de la mise en file d'attente sur les futex. Par contre, la politique d'ordonnancement n'est pas prise en ligne de compte. Comme les files d'attente sont des plist ordonnées par ordre croissant des champs prio, nous sommes dans la logique inverse des significations des priorités pour les threads (du point de vue de l'utilisateur tout au moins !). C'est la raison pour laquelle le champ prio qui sert à ordonner les éléments futex_q, est dérivé de la priorité des threads qui leur sont associés par la formule suivante :

 prio = MIN(normal_prio, MAX_RT_PRIO)

avec :
  • normal_prio égal :
    • "MAX_RT_PRIO - 1 - priorité" pour les threads temps réel
    • "100 à 139" pour les threads temps partagé
  • MAX_RT_PRIO égal à 100

Pour les threads temps réel, la formule utilise les priorités passées par l'utilisateur qui vont de 1 à 99 avec l'appel système sched_setscheduler(). Comme ces valeurs sont inférieures à MAX_RT_PRIO, il en résulte que le champ prio aura respectivement les valeurs de 98 à 0.
Pour les threads temps partagé, la formule utilise la priorité interne au noyau qui va de 100 à 139 en fonction du niceness du thread modifiable par l'appel système nice().  Comme ces valeurs sont supérieures à MAX_RT_PRIO, il en résulte que le champ prio est toujours égal à 100.

Par conséquent, les threads temps réel, sortent toujours de la file en premier par rapport aux threads temps partagé et dans l'ordre du plus prioritaire au moins prioritaire. De plus, quand il y a égalité de priorité, l'algorithme des plist range les futex_q de manière FIFO. Ce qui implique que les threads temps partagé, tous rangés avec le champ prio égal à MAX_RT_PRIO (100) et les threads temps réel de priorités identiques sont sortis de la liste dans l'ordre du plus anciennement au plus récemment mis en attente.

Pour étayer le propos, le programme futex_prio du listing 9 crée tout d'abord, pour un futex, un emplacement de la taille d'un entier à l'adresse futex_var dans un segment de mémoire partagée via l'appel système mmap(). Ensuite il crée des processus dont les priorités sont passées en argument. Quand l'argument vaut 0, c'est un processus temps partagé de priorité par défaut qui est créé. Lorsque le paramètre est supérieur à 0 et précédé de la lettre "f" ou "r", c'est un processus de priorité temps réel avec les politiques d'ordonnancement respectives SCHED_FIFO ou SCHED_RR qui est créé. Les processus sont mis en attente, dans l'ordre de leur création, tant que le futex vaut la valeur 0. Après avoir passé le futex à la valeur 1, le thread principal réveille un à un les processus endormis en passant 1 en troisième paramètre de l'opération FUTEX_WAKE de l'appel système futex()

[...]
#define futex_wait(addr, val) \
           syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL)

#define futex_wakeup(addr, nb) \
           syscall(SYS_futex, addr, FUTEX_WAKE, nb)

int *futex_var;


int main(int ac, char *av[])
{
pid_t               pid;
unsigned int        i;
int                 prio;
int                 policy;
struct sched_param  param;
const char         *policy_str;

  // Segment de memoire partagee pour le futex
  futex_var = (int *)mmap(0, sizeof(int),
                          PROT_READ | PROT_WRITE,
                          MAP_SHARED | MAP_ANONYMOUS,
                          -1, 0);

  *futex_var = 0;

  for (i = 1; i < ac; i ++)
  {
    switch(av[i][0])
    {
      case 'r':
      {
        policy = SCHED_RR;
        prio = atoi(&(av[i][1]));
        policy_str = ", RR";
      }
      break;
      case 'f':
      {
        policy = SCHED_FIFO;
        prio = atoi(&(av[i][1]));
        policy_str = ", FIFO";
      }
      break;
      default :
      {
        prio = 0;
        policy_str = "";
      }
    }

    pid = fork();

    // Processus fils
    if (0 == pid)
    {
      if (prio)
      {
        param.sched_priority = prio;
        sched_setscheduler(0, policy, &param);
      }

      futex_wait(futex_var, 0);

      printf("Fils#%d reveille\n", getpid());

      exit(0);
    }

    printf("Fils#%d, priorite %d%s\n", pid, prio, policy_str);
  } // End for

  // On s'assure que tous les fils sont bloques sur le futex
  sleep(1);

  printf("Changement d'etat du futex\n");

  *futex_var = 1;

  for (i = 1; i < ac; i ++)
  {
    futex_wakeup(futex_var, 1);

    // On laisse le temps au fils d'afficher son message
    sleep(1);
  } // End for

  return 0;
} // main
Listing 9 : Réveil de threads en fonction de leur priorité

L'exécution du programme requiert les droits du super utilisateur (e.g. commande sudo) car par défaut, Linux n'autorise pas la création de threads temps réel par les utilisateurs non privilégiés.

Dans ce premier exemple d'exécution, quatre threads temps réel sont créés. On constate bien que peu importe la politique d'ordonnancement, les threads sont réveillés dans l'ordre croissant des priorités :

$ gcc futex_prio.c -o futex_prio
$ sudo ./futex_prio f1 f99 r4 r8
Fils#3643, priorite 1, FIFO
Fils#3644, priorite 99, FIFO
Fils#3645, priorite 4, RR
Fils#3646, priorite 8, RR
Changement d'etat du futex
Fils#3644 reveille
Fils#3646 reveille
Fils#3645 reveille
Fils#3643 reveille

Dans ce second exemple, seuls des threads temps partagé sont créés. On constate bien le réveil dans l'ordre FIFO :

$ sudo ./futex_prio 0 0 0 0
Fils#3658, priorite 0
Fils#3659, priorite 0
Fils#3660, priorite 0
Fils#3661, priorite 0
Changement d'etat du futex
Fils#3658 reveille
Fils#3659 reveille
Fils#3660 reveille
Fils#3661 reveille

Dans ce dernier exemple qui est le cas illustré en figure 3, des threads temps réél et temps partagé sont créés. On constate bien le réveil des premiers avant les seconds et l'ordre FIFO pour les threads de mêmes priorités :

$ sudo ./futex_prio r1 f1 0 0 r4 0 r4 0 f67
Fils#3756, priorite 1, RR
Fils#3757, priorite 1, FIFO
Fils#3758, priorite 0
Fils#3759, priorite 0
Fils#3760, priorite 4, RR
Fils#3761, priorite 0
Fils#3762, priorite 4, RR
Fils#3763, priorite 0
Fils#3764, priorite 67, FIFO
Changement d'etat du futex
Fils#3764 reveille
Fils#3760 reveille
Fils#3762 reveille
Fils#3756 reveille
Fils#3757 reveille
Fils#3758 reveille
Fils#3759 reveille
Fils#3761 reveille
Fils#3763 reveille

On notera que l'ordre de réveil des threads établi au moment de la mise en attente sur le futex ne varie pas en cas de changement de priorité comme le met en exergue le programme du listing 10 qui est une variante du listing 9. Une fois les processus fils créés et mis en attente sur un premier futex, le processus père inverse leur priorité avec l'opération "100 - prio", puis les réveille. Ensuite, les processus fils se remettent en attente sur un deuxième futex avant d'être de nouveau réveillés par le processus père.

[...]
int main(int ac, char *av[])
{
pid_t               pid[300];
unsigned int        i;
int                 prio;
int                 policy;
struct sched_param  param;
const char         *policy_str;

  // Segment de memoire partagee pour le futex
  futex_var = (int *)mmap(0, 2 * sizeof(int),
                          PROT_READ | PROT_WRITE,
                          MAP_SHARED | MAP_ANONYMOUS,
                          -1, 0);

  futex_var[0] = futex_var[1] = 0;

  for (i = 1; i < ac; i ++)
  {
[...]
    pid[i] = fork();

    // Processus fils
    if (0 == pid[i])
    {
      if (prio)
      {
        param.sched_priority = prio;
        sched_setscheduler(0, policy, &param);
      }

      // Attente sur le futex 0
      futex_wait(&(futex_var[0]), 0);

      sched_getparam(pid[i], &param);
      printf("Fils#%d reveille avec la priorite %d%s\n", getpid(), param.sched_priority, policy_str);

      // Attente sur le futex 1
      futex_wait(&(futex_var[1]), 0);

      sched_getparam(pid[i], &param);
      printf("Fils#%d reveille avec la priorite %d%s\n", getpid(), param.sched_priority, policy_str);

      exit(0);
    }

    printf("Fils#%d, priorite %d%s\n", pid[i], prio, policy_str);
  } // End for

  // On s'assure que tous les fils sont bloques sur le futex
  sleep(1);

  // Inversement des priorités de tous les processus en attente
  for (i = 1; i < ac; i ++)
  {
    switch(av[i][0])
    {
      case 'r':
      {
        policy = SCHED_RR;
        prio = atoi(&(av[i][1]));
        policy_str = ", RR";
      }
      break;
      case 'f':
      {
        policy = SCHED_FIFO;
        prio = atoi(&(av[i][1]));
        policy_str = ", FIFO";
      }
      break;
      default :
      {
        prio = 0;
        policy_str = "";
      }
    }

    if (prio)
    {
      printf("Changement de la priotite du Fils#%d de %d%s a %d%s\n",
             pid[i], prio, policy_str, 100 - prio, policy_str);

      param.sched_priority = 100 - prio;
      sched_setscheduler(pid[i], policy, &param);
      }
    }

  } // End for

  printf("Changement d'etat du futex 0\n");

  futex_var[0] = 1;

  for (i = 1; i < ac; i ++)
  {
    futex_wakeup(&(futex_var[0]), 1);

    // On laisse le temps au fils d'afficher son message
    sleep(1);
  } // End for


  printf("Changement d'etat du futex 1\n");

  futex_var[1] = 1;

  for (i = 1; i < ac; i ++)
  {
    futex_wakeup(&(futex_var[1]), 1);

    // On laisse le temps au fils d'afficher son message
    sleep(1);
  } // End for

  return 0;
} // main
Listing 10 : Changement de priorité pendant l'attente sur le futex

L'exécution montre qu'après attente sur le premier futex, l'ordre de réveil ne change pas malgré l'inversement des priorité des processus fils. Les nouvelles priorités ne prennent effet qu'au moment de la mise en attente sur le deuxième futex :

$ gcc futex_prio_1.c -o futex_prio_1
$ sudo ./futex_prio_1 f1 f99 r4 r8
Fils#10728, priorite 1, FIFO
Fils#10729, priorite 99, FIFO
Fils#10730, priorite 4, RR
Fils#10731, priorite 8, RR
Changement de la priotite du Fils#10728 de 1, FIFO a 99, FIFO
Changement de la priotite du Fils#10729 de 99, FIFO a 1, FIFO
Changement de la priotite du Fils#10730 de 4, RR a 96, RR
Changement de la priotite du Fils#10731 de 8, RR a 92, RR
Changement d'etat du futex 0
Fils#10729 reveille avec la priorite 1, FIFO
Fils#10731 reveille avec la priorite 92, RR
Fils#10730 reveille avec la priorite 96, RR
Fils#10728 reveille avec la priorite 99, FIFO
Changement d'etat du futex 1
Fils#10728 reveille avec la priorite 99, FIFO
Fils#10730 reveille avec la priorite 96, RR
Fils#10731 reveille avec la priorite 92, RR
Fils#10729 reveille avec la priorite 1, FIFO

4.2. Nombre de threads à réveiller

Le troisième paramètre de l'opération FUTEX_WAKE est le nombre de threads à réveiller. Quand on doit gérer plus de deux threads en concurrence, on peut s'interroger sur la valeur à utiliser.

Dans certains cas particuliers comme les barrières vues au §3.3, on connaît le nombre de threads en attente et la sémantique du service implique qu'il faut réveiller tous les threads en attente. Dans ce cas, le troisième paramètre sera au moins égal à ce nombre.

Cependant, d'autres services comme les mutex peuvent avoir un nombre indéterminé de threads en attente. On peut vouloir réveiller plusieurs threads voire tous les threads suspendus pour laisser l'ordonnanceur de Linux choisir le thread qui prendra effectivement le CPU. Dans [8], Ulrich Drepper déconseille de faire toute supposition sur tel ou tel comportement du noyau car c'est sujet à modifications. Il propose d'utiliser soit la valeur 1, soit la valeur INT_MAX définie dans le fichier d'en-tête <limits.h>, mais pas de valeurs intermédiaires. Toutefois, [8] n'indique pas quelle valeur choisir de préférence entre 1 ou INT_MAX. Avec la connaissance de la gestion des files d'attente sur les futex côté noyau telle que présentée au § 4.1, il peut être suggéré les choix suivants :
  1. Si le futex ne concerne que des threads temps réel avec la politique d'ordonnancement SCHED_FIFO, il est conseillé de spécifier la valeur 1 en troisième paramètre car c'est le thread de priorité maximum et qui est en attente depuis le plus longtemps (en cas d'égalité de priorité) qui sera réveillé. Il en est de même pour les threads temps réel avec la politique d'ordonnancement SCHED_RR car s'ils sont en attente sur le futex, cela veut dire qu'ils n'ont pas consommé leur quantum de temps CPU et qu'il peuvent donc reprendre leur exécution.
  2. Si le futex ne concerne que des threads temps partagé, tous les threads sont ordonnés en FIFO sous la même priorité MAX_RT_PRIO. Si on précise la valeur 1 alors on favorise le thread qui est en attente depuis le plus longtemps sans se soucier des priorités réelles des threads. La valeur INT_MAX laisse, une fois les threads réveillés, l'opportunité à l'ordonnanceur de Linux de choisir le thread non seulement en fonction du temps d'attente sur le futex mais aussi en fonction de sa priorité réelle qui est recalculée régulièrement en fonction de la consommation CPU antérieure.
  3. Si le futex concerne à la fois des threads temps réel et temps partagé alors il semble que la valeur INT_MAX soit la plus appropriée car à tout moment on sait que si au moins un thread temps réel est en attente sur le futex alors il sera réveillé en premier et s'il n'y a pas de threads temps réel alors le cas 2 s'applique.
Bien-entendu, ces règles doivent être pondérées par le temps CPU consommé par un thread pour reprendre le contrôle sur la ressource gérée par le futex et par le nombre potentiel de threads en attente. Sous peine d'être confronté au fameux problème de "thundering herd" évoqué dans [10], il est souvent préférable de ne réveiller qu'un thread à la fois.

4.3. Optimisation intra processus

Lorsque les threads en compétition sont dans un seul et même processus Linux, il est possible de rendre l'appel système futex() plus efficace en utilisant les versions "PRIVATE" de certains opérateurs définis dans le fichier d'en-tête <linux/futex.h> :

[...]
#define FUTEX_WAIT_PRIVATE      (FUTEX_WAIT | FUTEX_PRIVATE_FLAG)
#define FUTEX_WAKE_PRIVATE      (FUTEX_WAKE | FUTEX_PRIVATE_FLAG)
[...]

Dans le cas où le futex ne concerne que les threads d'un même processus, les traitements internes au noyau pour identifier la variable (association d'une clé d'identification effectuée par la fonction get_futex_key() dans le fichier kernel/futex.c dont un extrait est donné dans le listing 11) sont moins lourds car il n'est pas besoin d'identifier la page mémoire ou l'inode associé à l'objet dans lequel se trouve le futex et il n'est même pas besoin d'incrémenter le compteur de référence associé. L'adresse virtuelle suffit à identifier une variable au sein d'un même processus vu que tous ses threads partagent son espace d'adressage. On parle alors de futex privés par opposition aux futex partagés. Dans le listing 11, les traitements associés aux clés des futex privés est en rouge. Le reste du code, beaucoup plus long et complexe, concerne les clés des futex partagés.

static int get_futex_key(u32 __user *uaddr, int fshared, union futex_key *key, int rw)
{
unsigned long address = (unsigned long)uaddr;
struct mm_struct *mm = current->mm;
struct page *page, *page_head;
int err, ro = 0;

/*
* The futex address must be "naturally" aligned.
*/
key->both.offset = address % PAGE_SIZE;
if (unlikely((address % sizeof(u32)) != 0))
return -EINVAL;
address -= key->both.offset;

/*
* PROCESS_PRIVATE futexes are fast.
* As the mm cannot disappear under us and the 'key' only needs
* virtual address, we dont even have to find the underlying vma.
* Note : We do have to check 'uaddr' is a valid user address,
* but access_ok() should be faster than find_vma()
*/
if (!fshared) {
if (unlikely(!access_ok(VERIFY_WRITE, uaddr, sizeof(u32))))
return -EFAULT;
key->private.mm = mm;
key->private.address = address;
get_futex_key_refs(key);
return 0;
}

again:
err = get_user_pages_fast(address, 1, 1, &page);
[...]
if (err == -EFAULT && rw == VERIFY_READ) {
err = get_user_pages_fast(address, 1, 0, &page);
ro = 1;
}
if (err < 0)
return err;
else
err = 0;
[...]
page_head = compound_head(page);
if (page != page_head) {
get_page(page_head);
put_page(page);
}
[...]
lock_page(page_head);
[...]
if (!page_head->mapping) {
int shmem_swizzled = PageSwapCache(page_head);
unlock_page(page_head);
put_page(page_head);
if (shmem_swizzled)
goto again;
return -EFAULT;
}
[...]
if (PageAnon(page_head)) {
[...]
if (ro) {
err = -EFAULT;
goto out;
}

key->both.offset |= FUT_OFF_MMSHARED; /* ref taken on mm */
key->private.mm = mm;
key->private.address = address;
} else {
key->both.offset |= FUT_OFF_INODE; /* inode-based key */
key->shared.inode = page_head->mapping->host;
key->shared.pgoff = page_head->index;
}

get_futex_key_refs(key);

out:
unlock_page(page_head);
put_page(page_head);
return err;
}
Listing 11 : la fonction get_futex_key() dans kernel/futex.c

A titre d'exemple, dans la GLIBC/NPTL, ce sont les versions "PRIVATE" des opérations qui sont utilisées par défaut pour les futex se cachant derrière les mutex ou les barrières. Pour une utilisation par des threads appartenant à différents processus, les services pthread_mutexattr_setpshared() et pthread_barrierattr_setpshared() permettent de positionner l'attribut PTHREAD_PROCESS_SHARED afin de passer par les versions non "PRIVATE" des opérations sur les futex.

4.4. Terminaison anticipée des threads

Lorsque des threads sont en attente sur un futex parce que un ou plusieurs autres threads ont pris la main dessus, il peut arriver la situation où les threads détenant le mutex se terminent sans réveiller les threads en attente. Par exemple, une erreur de programmation qui aboutirait à la terminaison d'un thread qui détient un mutex. C'est un cas d'étreinte fatale (cf. [16]) pour les threads en attente sur le futex concerné car il ne seront plus réveillés !

A titre d'exemple, le programme futex_robust du listing 12 crée deux processus fils : le premier se met en attente sur un futex tant que ce dernier est à 0, le second le met à 1 mais ne réveille pas les threads en attente dessus avant de se terminer. Le thread principal attend la fin du second processus puis du premier via l'appel système waitpid().

[...]
#define futex_wait(addr, val) \
           syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL)

#define futex_wakeup(addr, nb) \
           syscall(SYS_futex, addr, FUTEX_WAKE, nb)

int *futex_var;

int main(int ac, char *av[])
{
pid_t pid1, pid2;

  // Segment de memoire partagee pour le futex
  futex_var = (int *)mmap(0, sizeof(int),
                          PROT_READ | PROT_WRITE,
                          MAP_SHARED | MAP_ANONYMOUS,
                          -1, 0);

  *futex_var = 0;

  pid1 = fork();

  // Le premier processus fils se met en attente sur le futex
  if (0 == pid1)
  {
    futex_wait(futex_var, 0);

    printf("Fils#%d reveille\n", getpid());

    exit(0);
  }

  // On s'assure que le fils est bloque sur le futex
  sleep(1);

  pid2 = fork();

  // Le second processus fils passe le futex à la valeur 1
  // mais ne reveille pas le premier fils avant de se terminer !
  if (0 == pid2)
  {
    *futex_var = 1;

    printf("Changement d'etat du futex : 0x%x\n", *futex_var);

    exit(0);
  }

  printf("Attente du second fils#%d\n", pid2);
  waitpid(pid2, NULL, 0);

  printf("Second fils#%d termine\n", pid2);

  printf("Attente du premier fils#%d\n", pid1);
  waitpid(pid1, NULL, 0);

  printf("Premier fils#%d termine\n", pid1);

  return 0;
} // main
Listing 12 : Terminaison de thread sans réveil des threads en attente

A l'exécution, le programme ne se termine pas car le processus principal reste en attente infinie sur la fin du premier fils qui lui même est en attente infinie sur le futex non libéré par le second fils :

$ gcc futex_robust.c -o futex_robust
$ ./futex_robust
Changement d'etat du futex : 0x1
Attente du second fils#13042
Second fils#13042 termine
Attente du premier fils#13041

Pour pallier le problème, Linux propose la notion de listes robustes (i.e. "robust lists") décrite succinctement dans la documentation fournie avec ses sources : Documentation/robust-futexes.txt. Le principe consiste à passer au noyau une liste de futex occupés par les threads en espace utilisateur. Au moment de la terminaison des threads, le noyau parcourt cette liste pour identifier les futex occupés sur lesquels au moins un thread est en attente afin de déclencher l'opération FUTEX_WAKE pour un seul des threads en attente (i.e. comme lorsque l'on passe la valeur 1 en troisième paramètre de l'appel système). Conformément, à la description donnée au § 4.1, c'est le thread le plus prioritaire ou le plus anciennement mis en attente en cas d'égalité de priorités, qui est réveillé. La mise en oeuvre n'est pas triviale au premier abord. Une ABI ("Application Binary Interface") est définie dans la documentation des sources : Documentation/robust-futex-ABI.txt.

Pour aider à la compréhension, le listing 13 est une nouvelle version du programme précédent : les lignes de code différentes sont en rouge.

[...]
#define set_robust_list(l, s) \
           syscall(SYS_set_robust_list, l, s)

struct udata_t
{
  struct robust_list link;
  int                futex_var;
} *udata;

struct robust_list_head head;

int main(int ac, char *av[])
{
pid_t pid1, pid2;

  head.list.next = (struct robust_list *)&(head.list);
  head.futex_offset = offsetof(struct udata_t, futex_var);
  head.list_op_pending = 0;
  set_robust_list(&head, sizeof(struct robust_list_head));

  // Segment de memoire partagee pour le futex
  udata = (struct udata_t *)mmap(0, sizeof(struct udata_t),
                          PROT_READ | PROT_WRITE,
                          MAP_SHARED | MAP_ANONYMOUS,
                          -1, 0);

  udata->futex_var = 0;

  pid1 = fork();

  // Le premier processus fils se met en attente sur le futex
  if (0 == pid1)
  {
    set_robust_list(&head, sizeof(struct robust_list_head));

    udata->futex_var |= FUTEX_WAITERS;

    futex_wait(&(udata->futex_var), udata->futex_var);

    printf("Fils#%d reveille : futex_var = 0x%x\n", getpid(), udata->futex_var);

    exit(0);
  }

  // On s'assure que le fils est bloque sur le futex
  sleep(1);

  pid2 = fork();

  // Le second processus fils verrouille le futex avec son pid
  // mais ne reveille pas le premier fils avant de se terminer !
  if (0 == pid2)
  {
    set_robust_list(&head, sizeof(struct robust_list_head));

    head.list_op_pending = &(udata->link);

    udata->futex_var |= getpid();

    printf("Changement d'etat du futex : 0x%x\n", udata->futex_var);

    udata->link.next = head.list.next;
    head.list.next = &(udata->link);
    head.list_op_pending = (struct robust_list *)0;

    exit(0);
  }

  printf("Attente du second fils (pid#%d)\n", pid2);
  waitpid(pid2, NULL, 0);

  printf("Second fils#%d termine\n", pid2);

  printf("Attente du premier fils#%d\n", pid1);
  waitpid(pid1, NULL, 0);

  printf("Premier fils#%d termine\n", pid1);

  return 0;
} // main
Listing 13 : Application des listes robustes à la terminaison de thread

La figure 5 qui schématise l'organisation des données en mémoire durant l'exécution du programme, sert de support pour la description des modifications.

fd
Figure 5 : Organisation des données en mémoire

4.4.1. La structure de données de l'utilisateur

L'application doit prévoir une structure de données qui embarque le futex et les pointeurs pour la liste chaînée. D'où la structure udata_t qui contient respectivement les champs futex_var et link. Le type struct robust_list est défini dans le fichier d'en-tête <linux/futex.h> :

struct robust_list {
        struct robust_list *next;
};

Afin d'ajouter ou supprimer efficacement en O(1) n'importe quel élément de la liste,  il est conseillé d'utiliser un double chaînage sinon on devra parcourir tout ou partie de la liste pour retrouver les éléments. Ici pour simplifier le propos nous nous contentons d'une liste simplement chaînée car c'est le minimum requis par le noyau.

4.4.2. La tête de liste

L'application doit définir et initialiser une tête de liste dont le type est défini dans le fichier d'en-tête <linux/futex.h> :

struct robust_list_head {
        /*
         * The head of the list. Points back to itself if empty:
         */
        struct robust_list list;

        /*
         * This relative offset is set by user-space, it gives the kernel
         * the relative position of the futex field to examine. This way
         * we keep userspace flexible, to freely shape its data-structure,
         * without hardcoding any particular offset into the kernel:
         */
        long futex_offset;

        /*
         * The death of the thread may race with userspace setting
         * up a lock's links. So to handle this race, userspace first
         * sets this field to the address of the to-be-taken lock,
         * then does the lock acquire, and then adds itself to the
         * list, and then clears this field. Hence the kernel will
         * always have full knowledge of all locks that the thread
         * _might_ have taken. We check the owner TID in any case,
         * so only truly owned locks will be handled.
         */
        struct robust_list *list_op_pending;
};

D'où la définition de la variable globale head avec le type robust_list_head et son initialisation en début de fonction main().
Le champ list est une liste circulaire qui pointe sur la structure robust_list des enregistrements qui contiennent le futex. Le champ futex_offset est la valeur à ajouter à l'adresse de la structure robust_list dans l'enregistrement pour retrouver l'adresse du futex. La fonction de service offsetof() est préconisée pour calculer un offset de champ dans une structure donnée. Le champ list_op_pending est l'adresse de la structure robust_list de l'enregistrement qu'on s'apprête à ajouter à la liste juste avant de prendre la main sur le futex. On verra par la suite son utilité. On notera que la liste circulaire est vide lorsque le champ list pointe sur lui-même.

Il faut ensuite enregistrer la tête de liste au sein du noyau via l'appel système set_robust_list(). Cela a pour conséquence de mémoriser l'adresse de la tête de liste dans le descripteur du thread au sein du noyau. Chacun des threads doit effectuer cette opération et au plus une tête de liste peut être enregistrée par thread. D'où l'invocation de set_robust_list() au début de chaque processus (le principal et les deux secondaires).

4.4.3. Prise de contrôle sur le futex

Dans le listing 13, c'est le second fils qui prend le contrôle sur le futex (e.g. opération de verrouillage si c'est un mutex). Cela consiste tout d'abord à assigner le champ list_op_pending de la tête de liste avec l'adresse de l'enregistrement associé au futex. Ensuite il faut renseigner les 29 bits de poids faible du futex avec l'identifiant du thread qui prend le contrôle. Ici comme nous avons à faire à trois processus monothread, l'identifiant de process est aussi l'identifiant du thread (à ne pas confondre avec l'identifiant retourné par le service POSIX pthread_self() en espace utilisateur !). D'où l'appel à getpid() (Cela devrait se faire par opération atomique mais ce n'est pas justifié dans cet exemple simple car on sait que le thread concurrent n'accède pas à la variable à ce moment). Enfin, l'enregistrement du futex est inséré dans la liste circulaire puis le champ list_op_pending est remis à 0.

La prise de contrôle sur le futex (affectation de l'identifiant de thread) et la mise en liste circulaire de l'enregistrement associé n'est pas une opération atomique. S'il arrivait que le thread se termine entre les deux actions (suite à la réception d'un signal de terminaison par exemple), le noyau n'aurait pas le moyen de savoir que le futex est verrouillé et donc ne réveillerait pas les autres threads en attente. Sur terminaison de thread, une valeur non nulle pour le champ list_op_pending permet au noyau de savoir qu'une opération de verrouillage était en cours et qu'il faut donc procéder au réveil d'éventuels threads en attente.

4.4.4. Mise en attente sur le futex

Dans le listing 13, c'est le premier fils qui se met en attente. Cela consiste simplement à positionner le bit FUTEX_WAITERS dans la variable du futex. Il faudrait prévoir une opération atomique pour cela mais ce n'est pas utile dans cet exemple car à ce moment, le thread concurrent n'accède plus au futex. Ensuite, le thread se met en attente sur le futex via l'appel système futex().

4.4.5. Le réveil du thread en attente

Dans le cas normal, pour libérer le futex, le thread qui a la main dessus doit assigner le champ list_op_pending de la tête de liste avec l'adresse de l'enregistrement associé au futex, enlever l'enregistrement de la liste circulaire, remettre les 29 bits de poids faible à 0 (i.e. remise à 0 de l'identifiant de thread) et, à l'aide de l'opération FUTEX_WAKE de l'appel système futex(), réveiller les threads en attente sur le futex si le bit FUTEX_WAITERS est positionné. Cela doit se faire de manière atomique en général. Enfin, il faut remettre le champ list_op_pending à zéro.

Dans notre exemple simple, nous simulons un arrêt prématuré du thread qui a la main sur la ressource : le second processus se termine sans appeler l'appel système futex() pour réveiller le premier qui est en attente. Le noyau se charge de remédier au problème en parcourant la liste robuste. L'enregistrement correspondant au futex s'y trouvant, il positionne le bit FUTEX_OWNER_DIED en plus du bit FUTEX_WAITERS, remet à 0 les 29 premiers bits (i.e. l'identifiant de thread) et réveille le thread en attente.

Avec ce nouveau comportement, le programme se termine bien à l'exécution :

$ gcc futex_robust_1.c -o futex_robust_1
Changement d'etat du futex : 0x800038fe
Attente du second fils (pid#14590)
Second fils#14590 termine
Attente du premier fils#14589
Fils#14589 reveille : futex_var = 0xc0000000
Premier fils#14589 termine
$

4.4.6. Limitations

Le protocole imposé par l'ABI restreint l'utilisation de la variable du futex vu que les 29 bits de poids faible sont destinés à stocker l'identifiant de thread, le bit 29 est réservé pour de futures améliorations, le bit 30 est réservé pour FUTEX_OWNER_DIED et le bit 31 pour FUTEX_WAITERS (Cf. figure 6).

fr
Figure 6 : Format du futex en mode robuste

Des mécanismes d'optimisation du nombre d'appels système futex() tels que vu au § 3.1.2 sont rendus plus difficiles à mettre en place car ils nécessitent de pouvoir stocker différentes valeurs dans la variable. Ce qui est offert ici est un fonctionnement basique du futex : libre (ou déverrouillé) quand les 29 bits de poids faible sont à 0, occupé (ou verrouillé) quand les 29 bits de poids faible sont différents de 0 (i.e. un identifiant de thread y est stocké) avec toutefois la possibilité de voir si des threads sont en attente ou non via le bit FUTEX_WAITERS.

Il a été vu qu'au plus une liste robuste pouvait être enregistrée par thread. Comme ce mécanisme est utilisé par l'extension non POSIX pthread_mutexattr_setrobust_np() avec le drapeau PTHREAD_MUTEX_ROBUST_NP pour rendre un mutex robuste, il en résulte que ces services sont prohibés dans les applications liées à la librairie GLIBC/NPTL. Avec cette dernière, le service pthread_mutex_lock() retourne le compte rendu d'erreur EOWNERDEAD tout en ayant verrouillé le mutex quand il est avéré qu'un thread s'est terminé sans libérer le mutex (i.e. en interne, le bit FUTEX_OWNER_DIED est positionné dans la variable du futex associé). Un exemple est donné dans le listing 14.

[...]
pthread_mutex_t mutex;

void *thread(void *p)
{
int rc = pthread_mutex_lock(&mutex);

  printf("Thread#%x, rc = %d\n", pthread_self(), rc);

  if (EOWNERDEAD == rc)
  {
    printf("Le proprietaire precedent du mutex est termine sans deverrouillage\n");

    pthread_mutex_unlock(&mutex);
  }

  return NULL;
}

int main(int ac, char *av[])
{
pthread_t           tid1, tid2;
pthread_mutexattr_t attr;

  // Initialisation du mutex
  memset(&attr, 0, sizeof(attr));
  pthread_mutexattr_setrobust_np(&attr, PTHREAD_MUTEX_ROBUST_NP);
  pthread_mutex_init(&mutex, &attr);

  pthread_create(&tid1, NULL, thread, NULL);
  pthread_create(&tid2, NULL, thread, NULL);

  printf("Attente du 1er thread#%x\n", tid1);
  pthread_join(tid1, NULL);

  printf("Attente du 2nd thread#%x\n", tid2);
  pthread_join(tid2, NULL);

  return 0;
} // main
Listing 14 : Mutex robuste avec la GLIBC/NPTL

Le programme crée un mutex robuste et deux threads. Celui qui prend la main en premier, verrouille le mutex et se termine sans le déverrouiller. Par conséquent, le second reçoit le code d'erreur EOWNERDEAD (de valeur 130) sur le verrouillage :

$ gcc mutex_robust.c -o mutex_robust -lpthread
$ ./mutex_robust
Attente du 1er thread#b7fb7b90
Thread#b7fb7b90, rc = 0
Attente du 2nd thread#b75b6b90
Thread#b75b6b90, rc = 130
Le proprietaire precedent du mutex est termine sans deverrouillage

Utilisée avec ou sans la GLIBC/NPTL, cette fonctionnalité semble surtout destinée à repérer la terminaison anormale d'un thread pendant l'exécution de sa section critique. Le thread réveillé par le noyau aura la plupart du temps la plus grande difficulté à savoir si les données modifiées durant la section critique inachevée sont valides ou non.

4.5. Héritage de priorité

Aux § 4.1 et 4.2, il a été vu que le réveil des threads se faisait en respectant la priorité afin de s'assurer que les threads temps réel soient réveillés dans l'ordre décroissant des priorités. Cependant, il faut aussi envisager les situations où des threads se mettent en attente sur un futex parce qu'un autre thread moins prioritaire a la main dessus.

4.5.1. Inversion de priorité

Considérons le cas d'un mutex basé sur un futex où un thread#1 de priorité donnée a pris la main sur le futex et qu'un thread#3 plus prioritaire se retrouve en attente sur ce même futex. Si un thread#2 de priorité intermédiaire entre les deux précédents vient à se réveiller, il y a de fortes chances que l'ordonnanceur de Linux préempte le thread#1 pour donner le CPU au thread#2. Cela revient à dire que le thread#3 normalement plus prioritaire que les deux autres se retrouve privé de CPU à cause du thread#1 qui détient le futex et du thread#2 qui a pris la main car il n'est pas en attente sur le futex. Ce phénomène s'appelle l'inversion de priorité (cf. [11]). Il est rédhibitoire dans un environnement temps réel car il nuit au déterminisme du système. Le listing 15 illustre ce cas de figure.

[...]
#define futex_wait(addr, val) \
           syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL)
#define futex_wakeup(addr, nb) \
           syscall(SYS_futex, addr, FUTEX_WAKE, nb)

int futex_var;

unsigned int nb_loops;

int policy = SCHED_FIFO;

pthread_barrier_t barrier;

static void nop(void)
{
  // Sur Intel
  asm("rep; nop;");
  //asm("nop;");
}
[...]
void *thread_1(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 1;
  pthread_setschedparam(pthread_self(), policy, &param);

  pthread_barrier_wait(&barrier);

  for (i = 0; i < nb_loops; i ++)
  {
    nop();
  }

  futex_var = 1;
  futex_wakeup(&futex_var, 1);

  printf("Thread_1 termine\n");

  return NULL;
}

void *thread_2(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 50;
  pthread_setschedparam(pthread_self(), policy, &param);

  pthread_barrier_wait(&barrier);

  for (i = 0; i < nb_loops; i ++)
  {
    nop();
  }

  printf("Thread_2 termine\n");

  return NULL;
}

void *thread_3(void *p)
{
struct timeval     t0, t1, t;
unsigned int       latence;
struct sched_param param;
cpu_set_t          cpu;

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 97;
  pthread_setschedparam(pthread_self(), policy, &param);

  pthread_barrier_wait(&barrier);

  gettimeofday(&t0, NULL);

  futex_wait(&futex_var, 0);

  gettimeofday(&t1, NULL);

  timersub(&t1, &t0, &t);

  latence = t.tv_sec * 1000 + t.tv_usec / 1000000;

  printf("Latence : %u ms\n", latence);

  printf("Thread_3 termine\n");

  return NULL;
}

int main(int ac, char *av[])
{
pthread_t          tid1, tid2, tid3;
struct sched_param param;
cpu_set_t          cpu;

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 99; 
  sched_setscheduler(0, policy, &param);

  // Calibration
  nb_loops = calibration(atoi(av[1]));

  printf("Nombre d'iterations pour %u ms : %u\n", atoi(av[1]), nb_loops);

  pthread_barrier_init(&barrier, NULL, 3);

  pthread_create(&tid1, NULL, thread_1, NULL);
  pthread_create(&tid2, NULL, thread_2, NULL);
  pthread_create(&tid3, NULL, thread_3, NULL);

  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);
  pthread_join(tid3, NULL);

  return 0;
}
Listing 15 : Inversion de priorité

Le programme reçoit en paramètre un nombre de millisecondes qu'il convertit en nombre d'itérations pour occuper le CPU pendant cette durée (variable globale nb_loops mises à jour par une boucle de calibration). Le thread#1 de priorité 1, qui a la main sur le futex, effectue une attente active en effectuant toutes ces itérations. Le thread#3 se met en attente jusqu'au passage du futex à la valeur 1. Le thread#2 se met aussi en attente active pendant le nombre d'itérations demandées. Comme le thread#1 a une priorité égale à 1 inférieure à la priorité du thread#2 qui est égale à 50 et que le thread#3 plus prioritaire que les deux premiers est en attente sur le futex, le thread#2 prend très rapidement la main pour exécuter sa boucle active. Quand il se termine, l'ordonnanceur de Linux donne la main au thread#1 pour qu'il termine sa boucle active avant de passer le futex à la valeur 1 pour réveiller le thread#3. Ce dernier affiche son temps de latence (durée d'attente sur le futex) qui est logiquement le résultat de la durée des boucles actives des thread#1 et thread#2. Dans l'exemple d'exécution qui suit, le programme recevant 10000 millisecondes en paramètre, fait attendre le thread#3 environ 19000 millisecondes (cumul des durées des boucles actives de 10000 millisecondes exécutées dans les threads#2 et thread#1). Sur certains systèmes, cet exemple peut nécessiter les droits du super utilisateur (e.g. commande sudo) pour fonctionner car il utilise des threads temps réel :

$ gcc futex_pi.c -o futex_pi -lpthread
$ sudo ./futex_pi 10000
Nombre d'iterations pour 10000 ms : 505283000
Thread_2 termine
Latence : 19000 ms
Thread_3 termine
Thread_1 termine

Nous sommes donc bien confronté à un phénomène d'inversion de priorité : le thread#3 a du non seulement attendre la fin du thread#1 mais aussi la fin du thread#2 alors qu'il est plus prioritaire que ce dernier (priorité 97 contre 50) !

4.5.2. Solution

Il aurait fallu que le thread#1 ait une priorité au moins supérieure à celle du thread#2 afin d'éviter le phénomène d'inversion de priorité. Le listing 16 est une version du listing 15 avec le thread#1 plus prioritaire que le thread#2 (la seule ligne qui diffère est en rouge).

[...]
void *thread_1(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 51;
  pthread_setschedparam(pthread_self(), policy, &param);
[...]
Listing 16 : Sans inversion de priorité

La priorité 51 pour le thread#1 fait qu'il garde la main par rapport au thread#2 de priorité 50. Le premier exécute donc sa boucle active de 10000 millisecondes avant de passer le futex à 1 et de rendre le CPU au thread#3 qui s'exécute avant le thread#2 car il est plus prioritaire. Le temps de latence affiché n'est donc plus que d'environ 9000 millisecondes (temps d'exécution de la boucle active dans le thread#1) :

$ gcc futex_pi_1.c -o futex_pi_1 -lpthread
$ sudo ./futex_pi_1 10000
Nombre d'iterations pour 10000 ms : 505267000
Latence : 9000 ms
Thread_3 termine
Thread_1 termine
Thread_2 termine

Nous touchons du doigt une solution couramment adoptée pour réduire le temps de latence des threads : l'héritage de priorité (cf. [12]). Cela consiste à augmenter provisoirement la priorité du thread qui détient le futex au même niveau que celle du thread de priorité la plus importante en attente sur ce même futex. Ce changement dure le temps où le thread détient le futex. Dès qu'il le libère, il retrouve sa priorité d'origine. Ainsi, dans notre exemple, le thread#1 hériterait de la priorité du thread#3 qui étant plus importante que celle du thread#2, empêcherait ce dernier de préempter le thread#1.

Comme décrit dans les documents accompagnant ses sources (Documentation/pi-futex.txt et Documentation/futex-requeue-pi.txt), le noyau Linux offre l'héritage de priorité à travers les opérations suffixées par "PI" (i.e. Priority Inheritance) dans le fichier d'en-tête <linux/futex.h> :

[...]
#define FUTEX_LOCK_PI           6
#define FUTEX_UNLOCK_PI         7
#define FUTEX_TRYLOCK_PI        8
[...]

Le principe est similaire aux listes robustes pour ce qui est de la gestion du contenu de la variable du futex (cf. figure 6).

En espace utilisateur, le verrouillage du futex se fait généralement à l'aide d'une opération atomique telle que __sync_val_compare_and_swap() vue au § 3.1. Deux cas de figure peuvent se présenter :
Le déverrouillage du futex (qui ne peut se faire que par le thread qui l'a verrouillé !) est aussi réalisé à l'aide de l'opération atomique afin de le mettre à 0. Il peut aussi se présenter deux cas de figure :
Le listing 17 est une réécriture du programme décrit dans le listing 15 avec l'utilisation des opérations suffixées par PI. La fonction main(), étant complètement identique, n'est pas reproduite ici.

[...]
#define futex_lock_pi(addr) \
  syscall(SYS_futex, addr, FUTEX_LOCK_PI, 0, 0)

#define futex_unlock_pi(addr) \
  syscall(SYS_futex, addr, FUTEX_UNLOCK_PI, 0, 0)

#define gettid() syscall(__NR_gettid)

int futex_var;

unsigned int nb_loops;

int policy = SCHED_FIFO;

pthread_barrier_t barrier;

static void nop(void)
{
  //asm("rep; nop;");
  asm("nop;");
}
[...]
void *thread_1(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;
int                 old;
int                 mytid = gettid();

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 1;
  pthread_setschedparam(pthread_self(), policy, &param);

  futex_var = mytid;

  pthread_barrier_wait(&barrier);

  for (i = 0; i < nb_loops; i ++)
  {
    nop();
  }

  old = __sync_val_compare_and_swap(&futex_var, mytid, 0);

  if (mytid == old)
  {
    // Rien à faire car il n'y a pas de threads en attente sur le futex
    printf("Pas de thread en attente sur le futex\n");
  }
  else
  {
    // Il y a un thread en attente sur le futex (futex_var = FUTEX_WAITERS | mytid)
    futex_unlock_pi(&futex_var);
  }

  printf("0x%x : Thread_1 termine\n", mytid);

  return NULL;
}

void *thread_2(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;
int                 mytid = gettid();

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 50;
  pthread_setschedparam(pthread_self(), policy, &param);

  pthread_barrier_wait(&barrier);

  for (i = 0; i < nb_loops; i ++)
  {
    nop();
  }

  printf("0x%x : Thread_2 termine\n", mytid);

  return NULL;
}

void *thread_3(void *p)
{
struct timeval     t0, t1, t;
unsigned int       latence;
struct sched_param param;
cpu_set_t          cpu;
int                old;
int                mytid = gettid();

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 97;
  pthread_setschedparam(pthread_self(), policy, &param);

  pthread_barrier_wait(&barrier);

  gettimeofday(&t0, NULL);

  old = __sync_val_compare_and_swap(&futex_var, 0, mytid);

  printf("0x%x : La valeur du futex avant LOCK est : 0x%x\n", mytid, futex_var);

  switch(old)
  {
    case 0 : // Le futex etait libre
    {
      // Desormais le thread courant detient le futex
    }
    break;

    default : // Le futex n'est pas libre
    {
      futex_lock_pi(&futex_var);
    }
  }

  printf("0x%x : La valeur du futex après LOCK est : 0x%x\n", mytid, futex_var);

  futex_unlock_pi(&futex_var);

  printf("0x%x : La valeur du futex après UNLOCK est : 0x%x\n", mytid, futex_var);

  gettimeofday(&t1, NULL);

  timersub(&t1, &t0, &t);

  latence = t.tv_sec * 1000 + t.tv_usec / 1000000;

  printf("0x%x : Latence : %u ms\n", mytid, latence);

  printf("0x%x : Thread_3 termine\n", mytid);

  return NULL;
}
[...]
Listing 17 : Utilisation des opérations PI

Malgré la valeur 1 pour la priorité du thread#1, l'exécution donne le même résultat que le programme du listing 16 où la priorité avait été modifiée avec la valeur 51 pour ne pas préempter le thread#1 par le thread#2. En d'autres termes, le phénomène d'inversion de priorité a été évité car la latence du thread#3 est aux alentours de 9000 millisecondes au lieu de 19000.

$ gcc futex_pi_2.c -o futex_pi_2 -lpthread
$ sudo ./futex_pi_2 10000
0x7b8 : Nombre d'iterations pour 10000 ms : 505298000
0x7bd : La valeur du futex avant LOCK est : 0x7bb
0x7bd : La valeur du futex après LOCK est : 0x800007bd
0x7bd : La valeur du futex après UNLOCK est : 0x0
0x7bd : Latence : 9000 ms
0x7bd : Thread_3 termine
0x7bc : Thread_2 termine
0x7bb : Thread_1 termine

On pourra remarquer que l'héritage de priorité concerne non seulement les threads temps réel mais aussi les threads temps partagé. Le listing 18 est une modification du listing 17 avec le thread#1 défini avec des paramètres d'ordonnancement temps partagé.

[...]
void *thread_1(void *p)
{
unsigned int        i;
struct sched_param  param;
cpu_set_t           cpu;
int                 old;
int                 mytid = gettid();

  CPU_ZERO(&cpu);
  CPU_SET(0, &cpu);
  pthread_setaffinity_np(pthread_self(), sizeof(cpu), &cpu);

  param.sched_priority = 0;
  pthread_setschedparam(pthread_self(), SCHED_OTHER, &param);
[...]
Listing 18 : Utilisation des opérations PI avec un thread temps partagé
 
L'exécution donne exactement le même résultat que précédemment car le thread#1 temps partagé est promu avec la priorité temps réel du thread#3 dont la latence reste aux alentours de 9000 millisecondes.

$ gcc futex_pi_3.c -o futex_pi_3 -lpthread
$ sudo ./futex_pi_3 10000
0x817 : Nombre d'iterations pour 10000 ms : 505303000
0x81c : La valeur du futex avant LOCK est : 0x81a
0x81c : La valeur du futex après LOCK est : 0x8000081c
0x81c : La valeur du futex après UNLOCK est : 0x0
0x81c : Latence : 9000 ms
0x81c : Thread_3 termine
0x81a : Thread_1 termine
0x81b : Thread_2 termine

Dans la GLIBC/NPTL, c'est l'attribut PTHREAD_PRIO_INHERIT du service pthread_mutexattr_setprotocol() qui active l'héritage de priorité sur un mutex.

4.6. Multiplexage

L'opération FUTEX_WAIT_BITSET permet de constituer des files d'attente de threads sur un futex. L'identificateur de file est un ensemble de bits passé en sixième paramètre de l'appel système futex() :

int futex(int *uaddr1, FUTEX_WAIT_BITSET, int nb_wake1, const struct timespec *timeout, NULL, int bitset)

Toute combinaison de 32 bits est autorisée sauf la valeur 0 sous peine de recevoir le compte rendu d'erreur EINVAL. Au sein du noyau Linux, ce paramètre est placé dans le champ bitset de la structure futex_q présentée en figure 3. En d'autres termes, lorsqu'un thread se met en attente sur un futex, la structure futex_q qui lui est attachée dans le kernel est rangée dans la table de hachage futex_queues comme pour tout autre appel au service futex() avec un code FUTEX_WAIT mais le champ bitset va permettre de discriminer les threads à réveiller dans l'ordre du plus prioritaire au moins prioritaire lors de l'invocation de l'appel système avec le code FUTEX_WAKE_BITSET. Le réveil d'un thread a lieu si au moins un des bits à 1 de son ensemble de bits correspond à un des bits à 1 de l'ensemble passé à l'opération FUTEX_WAKE_BITSET (i.e. c'est le résultat d'une opération logique AND et non pas un test d'égalité). On déduit qu'il peut y avoir au maximum 32 files d'attente par futex car c'est le nombre maximum de sous ensembles disjoints sur 32 bits (c'est-à-dire tous les sous-ensembles avec des bits à 1 non communs aux autres donc tous les sous-ensembles disjoints avec un seul bit à 1).

En réalité la notion de file d'attente est inhérente à toute opération sur les futex vu que le champ bitset est dans la structure futex_q commune à tous les futex. Cependant ce champ est positionné par défaut à 0xFFFFFFFF pour toutes les opérations autres que FUTEX_WAIT_BITSET de sorte à ce que les threads en attente soient réveillés par toute opération de réveil. D'où la constante suivante définie dans le fichier d'en-tête <linux/futex.h> :

/*
 * bitset with all bits set for the FUTEX_xxx_BITSET OPs to request a
 * match of any bit.
 */
#define FUTEX_BITSET_MATCH_ANY    0xffffffff

FUTEX_BITSET_MATCH_ANY utilisée avec l'opération FUTEX_WAIT_BITSET permet au thread appelant d'être réveillé non seulement par tout appel à futex() avec l'opération FUTEX_WAKE_BITSET mais aussi avec n'importe quelle autre opération de réveil telle que FUTEX_WAKE par exemple. Utilisé avec l'opération FUTEX_WAKE_BITSET, il permet de réveiller tous les threads en attente avec un ensemble de bits quelconque. FUTEX_BITSET_MATCH_ANY est utilisée par défaut pour le champ bitset dans le noyau Linux grâce à la structure suivante d'initialisation d'une structure futex_q nouvellement allouée (cf. fichier .../kernel/futex.c) :

static const struct futex_q futex_q_init = {
    /* list gets initialized in queue_me()*/
    .key = FUTEX_KEY_INIT,
    .bitset = FUTEX_BITSET_MATCH_ANY
};

Le listing 19 donne un exemple d'application à travers l'implémentation de la notion de rwlock (cf. [13]). C'est un mutex particulier qui autorise tout thread à accéder à une ressource partagée en lecture de manière concurrente mais n'autorise qu'un thread à la fois pour l'accès à cette même ressource en écriture.

[...]
#define futex_wait_bitset(addr, val, bitset)   \
  syscall(SYS_futex, addr, FUTEX_WAIT_BITSET, val, 0, 0, bitset)
#define futex_wakeup_bitset(addr, nb, bitset)  \
  syscall(SYS_futex, addr, FUTEX_WAKE_BITSET, nb, 0, 0, bitset)

struct
{
  int          lock;
  int          wait;
  unsigned int readers;
  unsigned int waiting_readers;
  unsigned int writer;
  unsigned int waiting_writers;
} rwlock;

int nb_thread;

[...]

void RWLOCK_READ(void)
{
  LOCK(&(rwlock.lock));

  while(1)
  {
    // S'il n'y a pas d'ecrivain en cours et pas d'ecrivain en attente
    // ==> Le lecteur prend le rwlock
    if (0 == rwlock.writer && 0 == rwlock.waiting_writers)
    {
      rwlock.wait = 1;
      rwlock.readers ++;

      UNLOCK(&(rwlock.lock));

      break;
    }
    else // Il y a un ecrivain en cours ou en attente
    {
      // Un lecteur supplementaire en attente
      rwlock.waiting_readers ++;

      UNLOCK(&(rwlock.lock));

      // Mise en attente de liberation par l'ecrivain
      futex_wait_bitset(&(rwlock.wait), 1, 0x01);

      LOCK(&(rwlock.lock));

      // Un lecteur en attente de moins
      rwlock.waiting_readers --;
    }
  }
}

void RWLOCK_WRITE(void)
{
  LOCK(&(rwlock.lock));

  while(1)
  {
    // S'il n'y a pas d'ecrivain et de lecteurs en cours
    // ==> L'ecrivain prend le rwlock
    if (0 == rwlock.writer && 0 == rwlock.readers)
    {
      rwlock.wait = 1;
      rwlock.writer = 1;

      UNLOCK(&(rwlock.lock));

      break;
    }
    else // Il y a un ecrivain ou des lecteurs en cours
    {
      // Un ecrivain supplementaire en attente
      rwlock.waiting_writers ++;

      UNLOCK(&(rwlock.lock));

      // Mise en attente de liberation par les lecteurs ou l'ecrivain
      futex_wait_bitset(&(rwlock.wait), 1, 0x02);

      LOCK(&(rwlock.lock));

      // Un ecrivain de moins en attente
      rwlock.waiting_writers --;
    }
  }
}

void RWUNLOCK(void)

  LOCK(&(rwlock.lock));

  // Si c'est l'ecrivain qui deverrouille
  if (rwlock.writer)
  {
    // Il n'y a plus d'ecrivain en cours
    assert(1 == rwlock.writer);
    rwlock.writer = 0;
  }
  else // C'est un lecteur qui deverrouille
  {
    // Decrementation du nombre de lecteurs
    assert(rwlock.readers > 0);
    rwlock.readers --;

    // S'il reste des lecteurs
    if (0 != rwlock.readers)
    {
      // Les lecteurs gardent le rwlock
      UNLOCK(&(rwlock.lock));

      return;
    }
  }

  // Il n'y ni ecrivain, ni lecteur en cours
  // ==> On reveille les eventuels ecrivains ou lecteurs
  // en attente en donnant priorite aux ecrivains
  rwlock.wait = 0;

  // S'il y a au moins un ecrivain en attente, on le reveille
  if (rwlock.waiting_writers)
  {
    UNLOCK(&(rwlock.lock));

    futex_wakeup_bitset(&(rwlock.wait), 1, 0x02);
  }
  else if (rwlock.waiting_readers)
  {
    // On reveille les lecteurs en attente
    UNLOCK(&(rwlock.lock));

    futex_wakeup_bitset(&(rwlock.wait), INT_MAX, 0x01);
  }
  else
  {
    UNLOCK(&(rwlock.lock));
  }
}

static void *thd_main(void *p)
{
int *idx = (int *)p;

  printf("Thread %d demarre...\n", *idx);

  while (1)
  {
    RWLOCK_READ();

    if (*idx >= nb_thread)
    {
      RWUNLOCK();
      break;
    }

    RWUNLOCK();
  }

  printf("Thread %d se termine...\n", *idx);

  return NULL;
}

int main(int ac, char *av[])
{
int       nb;
pthread_t tid[255];
int       param[255];
int       i;
int       nb_thread_old;

  for (i = 0; i < 255; i ++)
  {
    param[i] = i;
  }

  while(1)
  {
    printf("Nombre de threads : ");
    if (EOF == fscanf(stdin, "%d", &nb))
    {
      break;
    }

    if (nb > 255)
    {
      fprintf(stderr, "La valeur max est 255\n");
      continue;
    }

    RWLOCK_WRITE();

    nb_thread_old = nb_thread;
    nb_thread = nb;

    RWUNLOCK();

    if (nb < nb_thread_old)
    {
      // Attente de la fin des threads en trop
      for (i = nb; i < nb_thread_old; i ++)
      {
        pthread_join(tid[i], NULL);
      }
    }
    else if (nb > nb_thread_old)
    {
      // Creation des threads supplementaires
      for (i = nb_thread_old; i < nb; i ++)
      {
        pthread_create(&(tid[i]), NULL, thd_main, &(param[i]));
      }
    }
  }

  return 0;
}
Listing 19 : Futex avec ensemble de bits

Le programme consiste en un thread principal (fonction main()) qui reçoit de l'opérateur le nombre de threads secondaires (fonction thd_main()) qui doivent s'exécuter.  Chaque thread secondaire reçoit en paramètre un index unique afin que à tout moment, seuls les threads d'indices zéro au nombre de threads secondaires moins un tournent. Si la valeur saisie est supérieure au nombre de threads secondaires courants, les threads additionnels sont créés. Dans le cas contraire, les threads superflus sont arrêtés. Voici un exemple de compilation et d'exécution du programme où on crée cinq threads secondaires, puis on demande à n'en avoir que deux puis zéro :

$ gcc rwlock.c -o rwlock -lpthread
$ ./rwlock
Nombre de threads : 5
Thread 0 demarre...
Thread 1 demarre...
Nombre de threads : Thread 2 demarre...
Thread 4 demarre...
Thread 3 demarre...
2
Thread 3 se termine...
Thread 4 se termine...
Thread 2 se termine...
Nombre de threads : 0
Thread 0 se termine...
Thread 1 se termine...
Nombre de threads :
<CTRL D>
$

Le nombre courant de threads secondaires est stocké dans la variable globale nb_thread par le thread principal qui y accède donc en écriture. Les threads secondaires effectuent une boucle pour accéder à cette variable en lecture afin de s'arrêter dès que leur indice est supérieur ou égal à nb_thread. Sur certaines architectures, la mise à jour ou la consultation d'un entier n'est pas une opération atomique. Il faut par conséquent protéger la ressource partagée nb_thread de sorte à s'assurer que les threads secondaires lisent toujours une valeur cohérente avant et après la mise à jour faite par le thread principal. D'où l'utilité d'un rwlock représenté par la variable globale rwlock : structure composée d'un mutex (les fonctions LOCK() et UNLOCK() associées ne sont pas reproduites ici car ce sont les mêmes que celles du listing 5) nommé lock qui protège l'accès aux autres champs qui sont un futex nommé wait et des compteurs de nombre de lecteurs et écrivains en attente sur le rwlock ou ayant verrouillé le rwlock. Trois opérations sont définies pour le rwlock :
Deux files d'attente sont définies sur le futex wait : l'ensemble de bits 0x01 est la file d'attente pour les threads en lecture tandis que la valeur 0x02 est pour les threads en écriture.
Le verrouillage en lecture ne peut se faire qu'à la condition où un écrivain n'a pas verrouillé le rwlock (i.e. champ writer égal à 0) et s'il n'y a pas d'écrivains en attente sur le rwlock (i.e. champ waiting_writers égal à 0). Cette dernière conditions prévient les phénomènes de famine (cf. [17]) qui pourraient survenir au niveau des écrivains si le nombre de lecteurs est important : les écrivains pourraient ne pas avoir accès à la ressource assez rapidement. La priorité d'accès est donc donnée aux écrivains car il semble logique de modifier le plus rapidement possible la ressource partagée de sorte à donner aux lecteurs une valeur la plus à jour possible au plus vite. Si les conditions ne sont pas vérifiées, l'appelant se met en attente en invoquant l'appel système futex() avec le code opération FUTEX_WAIT_BITSET et l'ensemble de bits 0x01 sur le futex wait.
Le verrouillage en écriture ne peut se faire que s'il n'y a pas un écrivain (i.e. champ writer égal à 0) et des lecteurs (i.e. champ readers égal à 0) qui ont déjà verrouillé le rwlock. Ici on ne se préoccupe pas du nombre de lecteurs en attente (champ waiting_readers) car les écrivains sont prioritaires pour verrouiller le rwlock. Si, les conditions ne sont pas vérifiées, l'appelant se met en attente en invoquant l'appel système futex() avec le code opération FUTEX_WAIT_BITSET et l'ensemble de bits 0x02 sur le futex wait.
Le déverrouillage est une fonction commune aux lecteurs et écrivains car le champ writer permet de différencier l'appel par un écrivain (valeur égale à 1) ou un lecteur (valeur égale à 0). Pour un écrivain, le déverrouillage consiste à remettre le champ writer à 0. Pour un lecteur, le champ readers est décrémenté et le déverrouillage n'est effectif qu'à partir du moment où ce champ atteint la valeur 0 (pour signifier qu'il n'y a plus de lecteurs). Le déverrouillage se conclut par le réveil d'un seul écrivain s'il y en a au moins un qui est en attente (i.e. valeur du champ waiting_writers supérieur à 0) ou de tous les lecteurs s'il y en a en attente (i.e. valeur du champ waiting_readers supérieur à 0). Dans le premier cas, futex() est appelé avec l'opération FUTEX_WAKE_BITSET, le nombre de threads à 1 et l'ensemble de bits 0x02 (pour ne sélectionner que les écrivains en attente sur le futex wait dans la table de hachage du noyau). Dans le second cas, le nombre de threads est positionné à INT_MAX (pour réveiller tous les lecteurs en attente) et l'ensemble de bits 0x01 (pour ne sélectionner que les lecteurs en attente sur le futex wait dans la table de hachage du noyau).

Si cette notion de files d'attente permet de faciliter l'implémentation de fonctions de synchronisation évoluées comme les rwlock, [14] souligne toutefois la limite des files d'attente multiples sur un même futex du point de vue de la performance. En effet, la conséquence est l'allongement des listes chaînées correspondantes dans la table de hachage futex_queues du kernel (cf. figure 3). Le parcours de liste est plus long pour ne sélectionner que les threads correspondant au bitset passé à l'opération FUTEX_WAKE_BITSET. Il est généralement préférable d'utiliser plusieurs futex avec une file d'attente (fonctionnement par défaut) qu'un seul futex avec plusieurs files d'attente. C'est sans doute la raison pour laquelle la GLIBC/NPTL n'utilise pas ces opérations pour les services relatifs aux rwlocks (i.e. pthread_rwlock_...()).

Par la suite il sera vu que ces opérations ont tout de même un intérêt primordiale au niveau du paramètre de temporisation qui offre une certaine flexibilité par rapport aux autres opérations vues jusqu'ici.

4.7. Le paramètre de temporisation

4.7.1. Temporisation relative

Quand il n'est pas NULL, le quatrième paramètre de l'appel système futex() spécifie la valeur d'une temporisation pour les opérations comme FUTEX_WAIT ou FUTEX_LOCK_PI qui impliquent une notion d'attente. Voici un exemple de prototype :

int futex(int *uaddr1, FUTEX_WAIT, int nb_wake1, const struct timespec *timeout)

Le type de ce paramètre est l'adresse d'une structure timespec qui décrit la durée maximum en secondes et nanosecondes pendant laquelle l'appelant sera bloqué sur le futex :

struct timespec {
        time_t  tv_sec;         /* seconds */
        long    tv_nsec;        /* nanoseconds */
}

Dans le noyau, une temporisation de haute résolution est associée au thread appelant lorsqu'il est mis en attente sur le futex. A l'échéance, l'appel système retourne le code d'erreur -1 avec la variable errno positionnée avec la valeur ETIMEDOUT. Le domaine d'application de cette fonctionnalité concerne généralement la mise en attente sur un mutex avec un délai maximum. Le programme du listing 20 propose une autre application par la réécriture simple d'un service tel que l'appel système nanosleep() qui suspend le thread appelant pendant une durée donnée.

[...]
#define futex_wait_to(addr, val, to) \
           syscall(SYS_futex, addr, FUTEX_WAIT_PRIVATE, val, to)

void mysleep(struct timespec *timeout)
{
int rc;
int futex_var = 0;

  rc = futex_wait_to(&futex_var, 0, timeout);
  printf("rc=%d, errno='%m' (%d)\n", rc, errno);
}

int main(int ac, char *av[])
{
struct timespec timeout;

  timeout.tv_sec = atoi(av[1]);
  timeout.tv_nsec = 0;
  mysleep(&timeout);
} // main
Listing 20 : Futex avec temporisation

La macro futex_wait() utilisée jusque là dans les exemples est renommée en futex_wait_to() avec un paramètre supplémentaire spécifiant le temps maximum de l'attente. La fonction mysleep() suspend le thread appelant pendant la durée spécifiée en paramètre. Pour cela l'opération FUTEX_WAIT_PRIVATE est appelée avec la variable locale futex_var à 0 et le délai d'attente maximum passé en paramètre. Comme aucune opération FUTEX_WAKE_PRIVATE n'est effectuée, la temporisation arrive à son terme :

$ gcc futex_timeout.c -o futex_timeout
$ ./futex_timeout 2
rc=-1, errno='Connection timed out' (110)

On en déduit que par défaut, les temporisations sont de type CLOCK_MONOTONIC : durée relative par rapport au moment du lancement de l'opération.
Pour implémenter les services POSIX tels que pthread_mutex_timedlock() ou pthread_cond_timedwait(), ce n'est pas très pratique car la norme impose d'exprimer la durée d'attente en heure absolue c'est-à-dire l'heure que l'attente ne doit pas dépasser. Si les primitives de la GLIBC/NPTL qui offre ces services standards utilisaient les temporisations par défaut des opérations sur les futex, on aurait un nombre d'appel système rédhibitoire pour les performances. En effet, l'utilisateur du service devrait appeler un service tel que gettimeofday() pour avoir l'heure courante afin de lui ajouter l'intervalle de temps à attendre avant de passer cette heure absolue à la librairie standard. Cette dernière devrait quant à elle effectuer l'opération inverse en appelant de nouveau gettimeofday() pour soustraire la valeur retournée au paramètre de temporisation pour obtenir la durée d'attente relative par rapport à l'heure courante avant de la soumettre à l'appel système futex(). Ce n'est pas efficace !

Dans ce contexte, l'opération FUTEX_WAIT_BITSET est une aubaine pour les services POSIX de la GLIBC/NPTL car elle donne en plus la possibilité de temporisations de type CLOCK_REALTIME qui sont exprimées en heure absolue.

4.7.2. Temporisation absolue

Le fichier d'en-tête <linux/futex.h> définit la macro FUTEX_CLOCK_REALTIME afin de positionner le bit 9 à 1 dans la commande via un "ou" logique (e.g. FUTEX_WAIT_BITSET | FUTEX_CLOCK_REALTIME). C'est donc un moyen de réduire le nombre des appels système car la librairie C standard de GNU n'a pas besoin de faire une conversion en intervalle de temps relatif.  Considérons par exemple le listing 21 qui crée un thread qui se met en attente 10 secondes en appelant pthread_mutex_timedlock().

[...]
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *thd_main(void *p)
{
struct timespec timeout;
int             rc;
struct timeval  tv;

  gettimeofday(&tv, NULL);
  tv.tv_sec += 10;
  TIMEVAL_TO_TIMESPEC(&tv, &timeout);

  printf("Echeance absolue : { %ld, %ld }\n", timeout.tv_sec, timeout.tv_nsec);

  rc = pthread_mutex_timedlock(&mutex, &timeout);
  printf("rc = %d\n", rc); 

  return NULL;
}

int main(int ac, char *av[])
{
int       rc;
pthread_t tid;

  pthread_mutex_lock(&mutex);

  pthread_create(&tid, NULL, thd_main, NULL);

  pthread_join(tid, NULL);

  return 0;
} // main
Listing 21 : Thread en attente

Le thread affiche l'heure absolue de l'échéance de l'attente décrite dans une structure timespec avant de la passer au service POSIX. L'outil strace montre que c'est bien l'opération FUTEX_WAIT_BITSET_PRIVATE | FUTEX_CLOCK_REALTIME qui est utilisée en interne par la librairie GNU. De plus, le paramètre de temporisation passé par l'utilisateur (en bleu) est directement utilisé par le service en paramètre de l'appel système futex() (en rouge) sans aucun calcul ou conversion supplémentaire :

$ gcc timed_wait.c -o timed_wait -lpthread
$ strace -f ./timed_wait
[...]
[pid  5212] write(1, "Echeance absolue : { 1373120661,"..., 45Echeance absolue : { 1373120661, 419074000 }
) = 45
[pid  5212] futex(0x6010a0, FUTEX_WAIT_BITSET_PRIVATE|FUTEX_CLOCK_REALTIME, 2, {1373120661, 419074000}, ffffffff) = -1 ETIMEDOUT (Connection timed out)
[pid  5212] write(1, "rc = 110\n", 9rc = 110
[...]

On notera que le dernier paramètre passé à l'appel système futex() est 0xFFFFFFFF c'est-à-dire FUTEX_BITSET_MATCH_ANY vu précédemment afin de réveiller le thread appelant avec toute opération de réveil (telle que FUTEX_WAKE).

Lorsque la temporisation est exprimée en heure absolue (CLOCK_REALTIME), il est utile de préciser qu'elle est impactée par les éventuels changements d'heure du système. En d'autres termes, l'échéance de la temporisation peut être avancée ou retardée en fonction de l'évolution de l'heure. Ce n'est en revanche pas le cas pour les intervalles de temps relatifs (CLOCK_MONOTONIC).

Toute tentative d'utilisation du bit FUTEX_CLOCK_REALTIME avec une opération autre que FUTEX_WAIT_BITSET et FUTEX_WAIT_REQUEUE_PI, se conclut par un retour erreur ENOSYS pour indiquer que ce n'est pas supporté.

4.7.3. Temporisation et héritage de priorité

Concernant l'héritage de priorité, Linux supporte le réajustement de la priorité d'un thread ayant verrouillé un futex lorsqu'un thread de plus haute priorité en attente temporisée sur ce même futex voit sa temporisation échoir. Contrairement aux autres opérations, la temporisation passée en paramètre pour FUTEX_LOCK_PI, est exprimée en heure absolue. Le listing 22 donne un exemple d'utilisation.

[...]
#define futex_lock_pi_to(addr, to) \
  syscall(SYS_futex, addr, FUTEX_LOCK_PI, 0, to)

#define futex_unlock_pi(addr) \
  syscall(SYS_futex, addr, FUTEX_UNLOCK_PI, 0, 0)

#define futex_wait_to(addr, to) \
           syscall(SYS_futex, addr, FUTEX_WAIT_PRIVATE, 0, to)

#define gettid() syscall(__NR_gettid)

int futex_var;

void *thd_main(void *p)
{
struct sched_param param;
int                old;
int                mytid = gettid();
struct timespec    timeout;
int                rc;
struct timeval     t;
unsigned int       seconds = *((unsigned int *)p);

  gettimeofday(&t, NULL);
  fprintf(stderr, "0x%x : Date courante %lu s / %lu us\n", mytid, t.tv_sec, t.tv_usec);

  old = __sync_val_compare_and_swap(&futex_var, 0, mytid);

  fprintf(stderr, "0x%x : La valeur du futex@%p avant LOCK est : 0x%x\n", mytid, &futex_var, old);

  switch(old)
  {
    case 0 : // Le futex etait libre
    {
      // Desormais le thread courant detient le futex
      assert(0);
    }
    break;

    default : // Le futex n'est pas libre
    {
      timeout.tv_sec = (t.tv_sec += seconds);
      timeout.tv_nsec = t.tv_usec * 1000;
      fprintf(stderr, "0x%x : timeout a la date %lu s / %lu us\n", mytid, t.tv_sec, t.tv_usec);
      rc = futex_lock_pi_to(&futex_var, &timeout);
    }
  }

  gettimeofday(&t, NULL);

  fprintf(stderr, "0x%x : La valeur du futex@%p apres LOCK est : 0x%x\n"
                  "       Le code de retour de l'appel futex() est : %d (errno = %d)\n"
                  "       Date courante %lu s / %lu us\n"
          ,
      mytid, &futex_var, futex_var, rc, errno, t.tv_sec, t.tv_usec);

  fprintf(stderr, "0x%x : Thread termine\n", mytid);

  return NULL;
}

int main(int ac, char *av[])
{
pthread_t          tid;
int                mytid = gettid();
unsigned int       duree;

  duree = (unsigned int)atoi(av[1]);

  // Le thread courant verrouille le futex
  futex_var = gettid();

  pthread_create(&tid, NULL, thd_main, (void *)&duree);

  pthread_join(tid, NULL);

  return 0;
}

Listing 22 : Thread en attente sur un futex avec héritage de priorité

Le programme consiste en un thread principal qui verrouille un futex avec héritage de priorité : il y stocke sont numéro de tâche vu du noyau Linux (appel système gettid()) avant de créer un thread qui va se mettre en attente sur le futex verrouillé pendant le nombre de secondes passé en paramètre. L'attente s'effectue avec l'opération FUTEX_LOCK_PI et la valeur absolue de la date d'échéance de la temporisation c'est-à-dire la date courante plus le nombre de secondes passé en paramètre. La compilation et l'exécution pour une attente de 5 secondes donne :

$ gcc futex_timeout_pi.c -o futex_timeout_pi -lpthread
$ ./futex_timeout_pi 5
0xf95 : Date courante 1373367174 s / 456851 us
0xf95 : La valeur du futex@0x601084 avant LOCK est : 0xf94
0xf95 : timeout a la date 1373367179 s / 456851 us
0xf95 : La valeur du futex@0x601084 apres LOCK est : 0x80000f94
       Le code de retour de l'appel futex() est : -1 (errno = 110)
       Date courante 1373367179 s / 456871 us
0xf95 : Thread termine

Sur échéance de temporisation, l'appel système futex() retourne -1 avec errno positionné à ETIMEDOUT.

On notera de plus que le mécanisme de protection contre l'inversion de priorité s'applique toujours : tout thread en attente sur un futex avec héritage de priorité et dont la temporisation tombe à échéance provoque le réajustement de la priorité du thread qui détient le futex avec la valeur de la priorité du thread de plus haute priorité encore en attente sur le futex. S'il n'y a plus de threads en attente, alors le thread qui détient le futex retrouve sa priorité d'origine.

4.8. Réveil sur condition

L'opération de réveil des futex FUTEX_WAKE_OP est certainement l'une des plus complexe à comprendre. Elle interprète les paramètres de l'appel système futex() comme suit :

int futex(int *uaddr1, FUTEX_WAKE_OP, int nb_wake1, int nb_wake2, int *uaddr2, int operation)

Cela permet de réveiller au maximum les nb_wake1 threads plus prioritaires en attente sur le futex uaddr1. De plus, ne seront réveillés au maximum les nb_wake2 threads plus prioritaires en attente sur le futex uaddr2 que si le résultat de l'opération spécifiée dans le dernier paramètre sur la valeur contenue à l'adresse uaddr2 est vrai. Le retour de l'appel système est le nombre de threads réveillés qui étaient en attente sur le futex uaddr1 cumulé au nombre de threads réveillés qui étaient en attente sur le futex uaddr2. L'opération est décrite à l'aide de la macro FUTEX_OP() définie dans <linux/futex.h> :

[...]
#define FUTEX_OP_SET        0    /* *(int *)UADDR2 = OPARG; */
#define FUTEX_OP_ADD        1    /* *(int *)UADDR2 += OPARG; */
#define FUTEX_OP_OR         2    /* *(int *)UADDR2 |= OPARG; */
#define FUTEX_OP_ANDN       3    /* *(int *)UADDR2 &= ~OPARG; */
#define FUTEX_OP_XOR        4    /* *(int *)UADDR2 ^= OPARG; */

#define FUTEX_OP_OPARG_SHIFT    8    /* Use (1 << OPARG) instead of OPARG.  */

#define FUTEX_OP_CMP_EQ        0    /* if (oldval == CMPARG) wake */
#define FUTEX_OP_CMP_NE        1    /* if (oldval != CMPARG) wake */
#define FUTEX_OP_CMP_LT        2    /* if (oldval < CMPARG) wake */
#define FUTEX_OP_CMP_LE        3    /* if (oldval <= CMPARG) wake */
#define FUTEX_OP_CMP_GT        4    /* if (oldval > CMPARG) wake */
#define FUTEX_OP_CMP_GE        5    /* if (oldval >= CMPARG) wake */

/* FUTEX_WAKE_OP will perform atomically
   int oldval = *(int *)UADDR2;
   *(int *)UADDR2 = oldval OP OPARG;
   if (oldval CMP CMPARG)
     wake UADDR2;  */

#define FUTEX_OP(op, oparg, cmp, cmparg) \
  (((op & 0xf) << 28) | ((cmp & 0xf) << 24) \
   | ((oparg & 0xfff) << 12) | (cmparg & 0xfff))
[...]

Le fichier d'en-tête détaille l'opération en commentaire (en rouge) mais il est incomplet car il ne mentionne pas le réveil systématique effectué sur le futex uaddr1. Les actions sont correctement décrites dans [8] par Ulrich Drepper :

int oldval = *(int *)uaddr2;
*(int *)uaddr2 = oldval OP OPARG;
  wake uaddr1
if (oldval CMP CMPARG)
  wake uaddr2;

La première sous-opération (i.e. OP) est atomique de sorte à ce que le contenu de uaddr2 est toujours cohérent lors des lectures tant côté kernel, que utilisateur. A titre d'exemple, considérons le morceau de code source suivant :

[...]
#define futex_wakeup_op(addr1, nb_wake1, addr2, nb_wake2, op)  \
  syscall(SYS_futex, addr1, FUTEX_WAKE_OP, nb_wake1, nb_wake2, addr2, op)
[...]
int futex1;
int futex2;
[...]
rc = futex_wakeup_op(&futex1, 1, &futex2, INT_MAX, FUTEX_OP(FUTEX_OP_ADD, -1, FUTEX_OP_CMP_EQ, 0))
[...]

Le dernier paramètre passé à l'appel système demande de décrémenter la variable futex2 de manière atomique (résultat de l'opération FUTEX_OP_ADD avec le paramètre -1) puis de tester si la valeur avant l'opération est égale à 0 (résultat de l'opération FUTEX_OP_CMP_EQ avec le paramètre 0). En sortie de cet appel système, le thread le plus prioritaire en attente sur futex1 sera réveillé (deuxième paramètre), tous les threads (INT_MAX en quatrième paramètre) en attente sur futex2 ne seront réveillés qu'à la condition où futex2 était égal à 0 avant l'appel (i.e. avant l'opération de décrémentation demandée) et le code retour stocké dans rc est le nombre de threads effectivement réveillés sur les deux futex.

4.9. Changement implicite de futex

L'opération FUTEX_CMP_REQUEUE interprète les paramètres de l'appel système futex() comme suit :

int futex(int *uaddr1, FUTEX_CMP_REQUEUE, int nb_wake, int nb_move, int *uaddr2, int val1)

Si le futex uaddr1 contient la valeur val1 alors nb_wake threads en attente sur uaddr1 sont réveillés et s'il y a des threads supplémentaires en attente alors nb_move de ces derniers sont retirés de la file d'attente de uaddr1 pour être mis dans la celle du futex uaddr2. Que ce soient les threads réveillés ou déplacés, il s'agit toujours des plus prioritaires. La valeur de retour de l'appel système est le nombre de threads réveillés plus le nombre de threads déplacés. De plus, rien n'indiquera au réveil des threads déplacés qu'ils ont changé de file d'attente de futex : il s'agit donc bien d'un déplacement implicite de la file d'attente d'un futex vers une autre.

Le programme du listing 23 démontre le fonctionnement de ce service. Le thread principal (thread#0) crée quatre threads secondaires temps réel (thread#1 à thread#4) de priorités décroissantes. Le thread#0 s'endort 1 seconde pour laisser le temps aux autres de se mettre en attente sur futex1. Au réveil, le thread#0 exécute l'appel système futex() avec l'opération FUTEX_CMP_REQUEUE pour ne réveiller qu'un thread (i.e. paramètre nb_wake égal à 1) et de faire migrer deux threads vers futex2 (i.e. nb_move égal à 2). Cela a pour conséquence de :
Le compte-rendu de l'appel système est donc égal à 3 (i.e. 1 thread réveillé plus 2 threads migrés). Ensuite le thread#0 réveille le thread#2 et le thread#3 puis le thread#4 en appelant futex() avec l'opération FUTEX_WAKE respectivement sur futex2 et futex1 avec le nombre de threads à réveiller positionné à INT_MAX. Les comptes-rendus de ces deux derniers appels système sont respectivement 2 et 1 pour dénoter le nombre de threads réveillés à chaque fois.

[...]
int nb_thread = 5;

#define futex_wait(addr, val) \
           syscall(SYS_futex, addr, FUTEX_WAIT, val, NULL)

#define futex_wakeup(addr, nb) \
           syscall(SYS_futex, addr, FUTEX_WAKE, nb)

int futex1;
int futex2;

void *thd_main(void *p)
{
int                *idx = (int *)p;
struct sched_param  param;

  param.sched_priority = nb_thread - *idx;
  pthread_setschedparam(pthread_self(), SCHED_RR, &param);

  printf("Thread#%d attend sur le futex1 (prio = %d)\n", *idx, param.sched_priority);

  futex_wait(&futex1, 0);

  printf("Thread#%d reveille\n", *idx);

  return NULL;
}

#define futex_cmp_requeue(addr1, nb_wake, nb_move, addr2, val1) \
    syscall(SYS_futex, addr1, FUTEX_CMP_REQUEUE, nb_wake, nb_move, addr2, val1)

int main(int ac, char *av[])
{
pthread_t *tid;
int        i;
int       *idx;
int        rc;

  printf("Thread#%d demarre\n", 0);

  idx = (int *)malloc(nb_thread * sizeof(int));
  tid = (pthread_t *)malloc(nb_thread * sizeof(pthread_t));

  tid[0] = pthread_self();

  for (i = 1; i < nb_thread; i ++)
  {
    idx[i] = i;
    pthread_create(&(tid[i]), NULL, thd_main, (void *)&(idx[i]));
  }

  // Attente pour que les threads soient sur le futex1
  mysleep(1);

  // Reveil de thread#1
  // Transfert de thread#2 et thread#3 sur futex2
  // Thread#4 reste en attente sur futex1
  rc = futex_cmp_requeue(&futex1, 1, 2, &futex2, 0);
  printf("Thread#%d : rc = %d\n", 0, rc);

  // Attente de la terminaison du thread#1
  pthread_join(tid[1], NULL);

  // Reveil des thread#2 et thread#3 en attente sur le futex2
  rc = futex_wakeup(&futex2, INT_MAX);
  printf("Thread#%d : rc = %d\n", 0, rc);

  // Attente de la terminaison du thread#2 et thread#3
  pthread_join(tid[2], NULL);
  pthread_join(tid[3], NULL);

  // Reveil du thread#4 en attente sur le futex1
  rc = futex_wakeup(&futex1, INT_MAX);
  printf("Thread#%d : rc = %d\n", 0, rc);

  // Attente de la terminaison du thread#4
  pthread_join(tid[4], NULL);

  free(idx);
  free(tid);
 
  return 0;
}
Listing 23 : Application de FUTEX_CMP_REQUEUE

La compilation et l'exécution donne ceci (on notera le besoin des droits du super utilisateur pour créer des threads temps réel sur certains systèmes) :

$ gcc requeue.c -o requeue -lpthread
$ sudo ./requeue
Thread#0 demarre
Thread#1 attend sur le futex1 (prio = 4)
Thread#2 attend sur le futex1 (prio = 3)
Thread#3 attend sur le futex1 (prio = 2)
Thread#4 attend sur le futex1 (prio = 1)
Thread#0 : rc = 3
Thread#1 reveille
Thread#0 : rc = 2
Thread#2 reveille
Thread#3 reveille
Thread#0 : rc = 1
Thread#4 reveille

L'intérêt de cet appel système réside dans le fait que des threads en attente sur un futex peuvent migrer vers un autre sans être réveillés. C'est un moyen d'éviter le phénomène de "thundering herd" (cf. [10]) où plusieurs threads se réveilleraient en même temps pour se remettre aussitôt en attente sur un autre futex provoquant ainsi de la consommation CPU par la multiplication des appels système et des rechargements de cache que cela induit (surtout en environnement SMP). La motivation première de ce service est d'optimiser les services POSIX relatifs aux variables conditionnelles dans la GLIBC/NPTL et notamment pthread_cond_broadcast(). En effet, une variable conditionnelle est associée à un mutex utilisateur et utilise un mutex interne pour protéger l'accès à ses données privées. De telle sorte que lorsqu'un thread se met en attente sur une variable conditionnelle, il doit d'abord verrouiller le mutex utilisateur (via pthread_mutex_lock()) avant d'appeler le service pthread_cond_wait(). Les traitements de ce dernier peuvent être schématisés par le pseudo code suivant :

LOCK(mutex interne)
UNLOCK(mutex utilisateur)
[...]
do
[...]
futex_val = futex
UNLOCK(mutex interne)
futex_wait(futex, futex_val)
LOCK(mutex interne)
[...]
while (...)
[...]
UNLOCK(mutex interne)
LOCK(mutex utilisateur)

Le thread appelant est mis en attente sur un futex (ligne en rouge) pour être réveillé lors de l'appel à la fonction pthread_cond_signal() ou pthread_cond_broadcast() par un autre thread. Une fois réveillé le thread verrouille de nouveau puis déverrouille le mutex interne avant de terminer avec le verrouillage du mutex utilisateur à la fin de pthread_cond_wait(). Comme la fonction pthread_cond_broadcast() réveille systématiquement tous les threads en attente sur une variable conditionnelle. Si elle procédait par appel à futex() avec l'opération FUTEX_WAKE, tous les threads en attente se réveilleraient en même temps pour effectuer les verrouillages tel un bruyant troupeau (traduction de l'expression anglo-saxonne "thundering herd"). Cela induirait de la consommation de temps CPU car un seul d'entre eux obtiendrait le verrou alors que les autres se remettraient en attente sur le futex du mutex interne puis sur celui du mutex utilisateur. D'où l'intérêt de l'opération FUTEX_CMP_REQUEUE dans la fonction pthread_cond_broadcast() afin de ne réveiller qu'un thread en attente et de faire migrer les autres sur la file d'attente du futex associé au mutex utilisateur sans les réveiller. Voici le pseudo code pour ce service :

LOCK(mutex interne)
[...]
if (threads à réveiller)
[...]
UNLOCK(mutex interne)
futex_cmp_requeue(futex, 1, INT_MAX, mutex utilisateur, futex_val)
[...]

Les threads ainsi migrés seront réveillés lorsque le mutex utilisateur sera relâché par le thread ayant été réveillé par pthread_cond_broadcast() et non pas directement après l'appel à cette dernière.

L'opération FUTEX_REQUEUE est la version simplifiée de FUTEX_CMP_REQUEUE : elle réveille nb_move threads et transfert nb_wake threads de la file d'attente du futex à l'adresse uaddr1 vers celle du futex à l'adresse uaddr2.  Le prototype est le suivant :

int futex(int *uaddr1, FUTEX_REQUEUE, int nb_wake, int nb_move, int *uaddr2)

D'après [8], l'opération FUTEX_CMP_REQUEUE rend obsolète FUTEX_REQUEUE qui présenterait un problème de cas de croisement pour l'implémentation des variables conditionnelles POSIX vues plus haut. Cependant, [15] semble contredire cela en proposant un nouvel algorithme pour les variables conditionnelles basées sur cette dernière opération et qui serait plus efficace que l'implémentation actuelle à l'aide de FUTEX_CMP_REQUEUE. L'opération FUTEX_REQUEUE resterait donc d'actualité.

Pour supporter l'héritage de priorité, les opérations FUTEX_WAIT_REQUEUE_PI et FUTEX_CMP_REQUEUE_PI sont disponibles. Ces deux opérations agissent conjointement et sont respectivement utilisées à la place des opérations FUTEX_WAIT et FUTEX_CMP_REQUEUE.

Le prototype de l'appel système pour l'opération FUTEX_CMP_REQUEUE_PI est similaire à celui de l'opération FUTEX_CMP_REQUEUE :

int futex(int *uaddr1, FUTEX_CMP_REQUEUE_PI, int nb_wake, int nb_move, int *uaddr2, int val1)

Le prototype de l'appel système pour l'opération FUTEX_WAIT_REQUEUE_PI a une forme inédite :

int futex(int *uaddr1, FUTEX_WAIT_REQUEUE_PI, int val, const struct timespec *timeout, int *uaddr2)

L'appelant se met en attente sur le futex à l'adresse uaddr1 si la valeur de ce dernier est égale à au paramètre val. Si ce n'est pas le cas, le code d'erreur EAGAIN (synonyme de EWOULDBLOCK) est retourné. Au retour de cette fonction, l'appelant a la main sur le futex à l'adresse uaddr2.
Les futex uaddr1 et uaddr2 doivent être différents sous peine de retour erreur avec le code EINVAL. Le paramètre timeout s'il n'est pas NULL, définit le temps d'attente. Il peut être exprimé en heure absolue si le bit FUTEX_CLOCK_REALTIME a été positionné au niveau de la commande (e.g. FUTEX_WAIT_REQUEUE_PI | FUTEX_CLOCK_REALTIME) sinon c'est un intervalle de temps relatif au moment de l'appel système.

4.10. Autres opérations

Pour compléter l'étude des futex, il convient de citer cette dernière opération disponibles sur les futex : FUTEX_FD. Cette opération est destinée à associer un descripteur de fichier à un futex de sorte à bénéficier d'opérations telles que select() ou poll(). Le prototype est :

int futex(int *uaddr1, FUTEX_FD, int val)

Cependant, d'après [8], cette opération est inutilisable car présente un certain nombre d'inconvénients et le noyau Linux ne la supporte plus dans ses dernières moutures. Toute tentative d'utilisation se traduira par un retour erreur avec le code ENOSYS.

5. Conclusion

Cet article a levé une grande partie du voile sur la notion de futex. Il permet de combler les lacunes voire corriger les erreurs du manuel en ligne disponible sur le sujet.

La fonction première des futex est de constituer un outil de base pour implémenter des services de synchronisation inter-thread (au sein d'un ou plusieurs processus) en évitant au maximum la multiplication des appels au système qui sont coûteux en termes de performance. Le principe consiste à effectuer l'opération en espace utilisateur et ne recourir au noyau qu'en cas de contention pour mettre en sommeil le thread appelant.

L'utilisation des futex s'est surtout démocratisée chez les développeurs de librairies en recherche permanente de performances et d'adaptation de l'environnement Linux aux contraintes temps réel et aux standards comme POSIX. La GLIBC/NPTL en est l'un des principaux bénéficiaires.

L'étude fut prétexte à effectuer un tour d'horizon sur des notions importantes dans le domaine de la synchronisation inter-thread et du déterminisme des systèmes d'exploitation : les opérations atomiques, les sections critiques, l'ordonnancement des threads, l'inversion de priorité, l'héritage de priorité, la réduction du temps de latence... Fort de ces connaissances, le lecteur programmeur pourra utiliser les fonctions de la GLIBC/NPTL avec plus de recul voire même écrire des services de synchronisation adaptés à ses propres besoins. Cet article devrait aussi constituer un bon support de cours et travaux pratiques pour l'enseignement des méthodes de synchronisation.

Cependant, il ne faut pas oublier que les opérations en espace utilisateur nécessitent l'usage d'opérations atomiques qui ne sont souvent disponibles qu'en assembleur. Se posent alors des problèmes de portabilité du programme qui peuvent être circonscrits dans le cas de l'utilisation de compilateurs comme GCC qui proposent ces opérations sous forme d'extensions du langage. D'ailleurs les exemples de cet article ont été compilés et exécutés sans modifications sur architecture Intel (32 et 64 bits) et PowerPc (32 bits) grâce aux facilités de GCC.

6. Ressources

  1. "Fuss, Futexes and Furwocks: Fast Userlevel Locking in Linux" par Hubertus Franke et Rusty Russel
    http://www.kernel.org/doc/ols/2002/ols2002-pages-479-495.pdf
  2. "Built-in functions for atomic memory access" extrait du manuel du compilateur GCC
    http://gcc.gnu.org/onlinedocs/gcc-4.1.1/gcc/Atomic-Builtins.html
  3. Les opérations atomiques
    http://fr.wikipedia.org/wiki/Atomicité_(informatique)
  4. L'opération atomique Compare And Swap
    http://en.wikipedia.org/wiki/Compare-and-swap
  5. Intel 64 and IA-32 Architectures - Software Developer’s Manual - Volume 2 (2A, 2B & 2C): Instruction Set Reference, A-Z
  6. Load-link/store-conditional
    http://en.wikipedia.org/wiki/Load-link/store-conditional
  7. Principe de la table de hachage
    http://fr.wikipedia.org/wiki/Table_de_hachage
  8. "Futex are tricky" par Ulrich Drepper
    http://people.redhat.com/drepper/futex.pdf
  9. Barrière de synchronisation
    http://fr.wikipedia.org/wiki/Barri%C3%A8re_de_synchronisation
  10. Thundering herd problem
    http://en.wikipedia.org/wiki/Thundering_herd_problem
  11. L'inversion de priorité
    http://en.wikipedia.org/wiki/Priority_inversion
  12. L'héritage de priorité
    http://en.wikipedia.org/wiki/Priority_inheritance
  13. La notion de rwlock
    http://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock
  14. "Futex Cheat Sheet"
    http://locklessinc.com/articles/futex_cheat_sheet/
  15. Mutexes and Condition Variables using Futexes
    http://locklessinc.com/articles/mutex_cv_futex/
  16. Notion d'étreinte fatale
    http://fr.wikipedia.org/wiki/Interblocage
  17. Notion de famine
    https://fr.wikipedia.org/wiki/Famine_%28informatique%29
  18. Native Posix Thread Library
    http://en.wikipedia.org/wiki/Native_POSIX_Thread_Library

A propos de l'auteur

L'auteur est un ingénieur en informatique travaillant en France. Il peut être contacté ici ou vous pouvez consulter son site WEB.



Page principale
  Page précédente