Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

TP 4 : Fondamentaux du Calcul Parallèle et Distribué

Objectifs

Comprendre les paradigmes du calcul parallèle

  • Exécution séquentielle vs parallèle

  • Concurrence vs parallélisme

  • Loi d’Amdahl et calculs d’accélération

Maîtriser la programmation fonctionnelle en Python

  • filter(), map(), reduce() avec expressions lambda

  • Générateurs et itérateurs pour un traitement efficace en mémoire

Implémenter le traitement parallèle

  • Module multiprocessing (Process, Pool, Queue, Pipe)

  • concurrent.futures (ThreadPoolExecutor, ProcessPoolExecutor)

  • Mémoire partagée et primitives de synchronisation

Mesure et optimisation des performances

  • Benchmarking du code parallèle

  • Identification des goulots d’étranglement

  • Stratégies de découpage (chunking)

Prérequis

  • Achèvement des TP 0-3

  • Compréhension des fonctions et structures de données Python

  • Connaissances de base de l’architecture informatique (cœurs CPU)

Aperçu des exercices

ExerciceDifficultéSujets
Exercice 1Programmation fonctionnelle : filter(), map(), reduce()
Exercice 2Expressions lambda et fonctions d’ordre supérieur
Exercice 3★★Générateurs et itérateurs pour les grandes données
Exercice 4★★Introduction au multiprocessing
Exercice 5★★Communication inter-processus : Queues et Pipes
Exercice 6★★★concurrent.futures et modèles asynchrones
Exercice 7★★★Benchmarking et optimisation des performances

Exercice 1 [★] - Programmation fonctionnelle : filter(), map(), reduce()

Dans cet exercice, nous explorons les concepts de programmation fonctionnelle qui forment la base du traitement de données parallèle et distribué. Ces modèles sont utilisés intensivement dans des frameworks comme Apache Spark.

1. Introduction à la programmation fonctionnelle

La programmation fonctionnelle met l’accent sur :

  • Fonctions pures : Fonctions qui produisent toujours le même résultat pour la même entrée et n’ont pas d’effets de bord

  • Immutabilité : Les données ne sont pas modifiées après leur création

  • Fonctions de première classe : Les fonctions peuvent être passées en arguments et retournées par d’autres fonctions

Ces principes sont essentiels pour le traitement parallèle car ils éliminent l’état partagé et rendent sûre l’exécution concurrente des opérations.

2. La fonction filter()

La fonction filter(function, iterable) retourne un itérateur contenant les éléments de l’itérable pour lesquels la fonction retourne True.

?filter
# Exemple basique de filter : filtrer les nombres pairs
num = [i for i in range(1, 20)]
print("Liste originale :", num)

def est_pair(item):
    return item % 2 == 0

filtre = list(filter(est_pair, num))
print("Nombres pairs :", filtre)
# Filter avec None comme fonction - supprime les valeurs fausses (falsy)
mixte = [0, 1, "", "bonjour", None, 42, [], [1, 2]]
print("Original :", mixte)
print("Filtré (uniquement truthy) :", list(filter(None, mixte)))
# Filtrer les nombres impairs
def est_impair(item):
    return item % 2 != 0

num = [i for i in range(1, 20)]
filtre = list(filter(est_impair, num))
print("Nombres impairs :", filtre)

Question 1.1 : Écrivez une fonction est_premier(n) qui retourne True si n est un nombre premier. Ensuite, utilisez filter() pour extraire tous les nombres premiers d’une liste de 1000 entiers aléatoires entre 1 et 1000.

import random

# Générer 1000 entiers aléatoires
nombres_aleatoires = [random.randint(1, 1000) for _ in range(1000)]

# TODO: Implémenter la fonction est_premier et filtrer les nombres premiers

Question 1.2 : Utilisez filter() pour extraire les nombres divisibles à la fois par 3 et par 5 de la même liste.

# TODO: Filtrer les nombres divisibles par 3 et 5

Filtrage avec des structures imbriquées

La fonction filter() peut être appliquée à des structures de données complexes comme des listes de dictionnaires.

employes = [
    {"nom": "Alice", "age": 28, "departement": "RH", "salaire": 55000},
    {"nom": "Bob", "age": 35, "departement": "Ingénierie", "salaire": 85000},
    {"nom": "Charlie", "age": 22, "departement": "Marketing", "salaire": 45000},
    {"nom": "David", "age": 45, "departement": "Ingénierie", "salaire": 95000},
    {"nom": "Eve", "age": 31, "departement": "RH", "salaire": 60000},
    {"nom": "Frank", "age": 29, "departement": "Ingénierie", "salaire": 78000},
    {"nom": "Grace", "age": 38, "departement": "Marketing", "salaire": 72000},
    {"nom": "Henry", "age": 42, "departement": "Ingénierie", "salaire": 92000}
]

Question 1.3 : En utilisant filter(), complétez les tâches suivantes :

  1. Créer une liste des employés qui travaillent dans le département “Ingénierie”

  2. Trouver les employés dont l’âge est entre 25 et 40 ans (inclus)

  3. Trouver les employés avec un salaire supérieur à 70000

  4. Trouver les employés dont le nom commence par une voyelle

# TODO: Implémenter les filtres

Filtrage avancé de chaînes de caractères

phrases = [
    "La science des données transforme les industries dans le monde entier.",
    "Python est génial.",
    "Les algorithmes d'apprentissage automatique nécessitent de grandes quantités de données.",
    "Les technologies Big Data permettent le traitement de pétaoctets.",
    "L'IA évolue.",
    "Le calcul distribué permet la mise à l'échelle horizontale sur les clusters.",
    "Le cloud fournit des ressources informatiques élastiques à la demande.",
    "Simple fonctionne."
]

Question 1.4 : En utilisant filter() :

  1. Sélectionner les phrases avec plus de 6 mots

  2. Sélectionner les phrases contenant le mot “données” (insensible à la casse)

  3. Sélectionner les phrases où la longueur moyenne des mots est supérieure à 5 caractères

# TODO: Implémenter les filtres

3. La fonction map()

La fonction map(function, iterable, ...) applique une fonction à chaque élément d’un itérable et retourne un itérateur des résultats.

?map
# Exemple basique de map : mettre au carré chaque nombre
def carre(x):
    return x * x

num = [i for i in range(1, 11)]
carres = list(map(carre, num))
print("Original :", num)
print("Carrés :", carres)
# Map avec plusieurs itérables
def multiplier(x, y):
    return x * y

liste1 = [1, 2, 3, 4, 5]
liste2 = [10, 20, 30, 40, 50]

produits = list(map(multiplier, liste1, liste2))
print("Produits :", produits)
# Map avec trois itérables
def somme_ponderee(a, b, c):
    return a + 2*b + 3*c

x = [1, 2, 3]
y = [4, 5, 6]
z = [7, 8, 9]

resultat = list(map(somme_ponderee, x, y, z))
print("Sommes pondérées :", resultat)

Question 1.5 : Étant donné une liste de 100 000 chemins de fichiers, utilisez map() pour :

  1. Extraire les extensions de fichiers (ex : .txt, .csv)

  2. Extraire les noms de fichiers sans extensions

  3. Calculer les profondeurs de chemin (nombre de répertoires)

import os
import random

# Générer des chemins de fichiers exemples
extensions = ['.txt', '.csv', '.json', '.xml', '.py', '.md']
repertoires = ['data', 'src', 'docs', 'config', 'tests', 'output']

chemins_fichiers = [
    os.path.join(
        random.choice(repertoires),
        random.choice(repertoires),
        f"fichier_{i}{random.choice(extensions)}"
    )
    for i in range(100000)
]

print("Exemples de chemins :", chemins_fichiers[:5])

# TODO: Utiliser map() pour extraire les extensions, noms et calculer les profondeurs

Question 1.6 : Utilisez map() pour normaliser un ensemble de 50 000 chaînes de texte :

  • Convertir en minuscules

  • Supprimer les espaces en début et fin

  • Supprimer la ponctuation

import string

# Générer des données textuelles exemples
mots_exemples = ["Bonjour", "Monde", "Données", "Science", "Python", "Analyse"]
signes_ponctuation = list("!.,;:?")

donnees_texte = [
    f"  {random.choice(mots_exemples)}{random.choice(signes_ponctuation)}  ".upper()
    if random.random() > 0.5 else
    f"{random.choice(mots_exemples)} {random.choice(mots_exemples)}{random.choice(signes_ponctuation)}"
    for _ in range(50000)
]

print("Données exemples :", donnees_texte[:5])

# TODO: Utiliser map() pour normaliser les données textuelles

4. La fonction reduce()

La fonction reduce(function, iterable) applique une fonction de deux arguments de manière cumulative aux éléments d’un itérable, de gauche à droite, le réduisant à une seule valeur.

from functools import reduce

?reduce
# Somme de nombres avec reduce
from functools import reduce

def addition(x, y):
    return x + y

num = [i for i in range(1, 11)]
total = reduce(addition, num)
print(f"Somme de {num} = {total}")
# Produit de nombres
def multiplier(x, y):
    return x * y

num = [1, 2, 3, 4, 5]
produit = reduce(multiplier, num)
print(f"Produit de {num} = {produit}")
# Trouver le maximum avec reduce
def max_de_deux(x, y):
    return x if x > y else y

num = [45, 12, 89, 34, 67, 23, 90, 11]
maximum = reduce(max_de_deux, num)
print(f"Maximum de {num} = {maximum}")

Question 1.7 : Utilisez reduce() pour :

  1. Trouver la chaîne la plus longue dans une liste de 10 000 chaînes

  2. Aplatir une liste imbriquée (3 niveaux de profondeur)

  3. Concaténer une liste de chaînes avec un séparateur

# Générer des données exemples
mots = ["algorithme", "données", "traitement", "distribué", "calcul", 
        "parallèle", "optimisation", "scalabilité", "performance"]

liste_chaines = [random.choice(mots) * random.randint(1, 5) for _ in range(10000)]

liste_imbriquee = [
    [[1, 2], [3, 4]],
    [[5, 6], [7, 8]],
    [[9, 10], [11, 12]]
]

# TODO: Implémenter les opérations reduce

Opérations matricielles avec map() et reduce()

matrices = [
    [[1, 2], [3, 4]],
    [[5, 6], [7, 8]],
    [[9, 10], [11, 12]],
    [[13, 14], [15, 16]]
]

Question 1.8 : En utilisant map() et reduce() :

  1. Calculer la somme de toutes les matrices

  2. Calculer le produit élément par élément de deux matrices

  3. Filtrer les matrices où tous les éléments sont supérieurs à 5

# TODO: Implémenter les opérations matricielles

Transformation et agrégation de données

produits = [
    {"nom": "Ordinateur portable", "prix": 1200, "quantite": 3, "categorie": "Électronique"},
    {"nom": "Smartphone", "prix": 800, "quantite": 5, "categorie": "Électronique"},
    {"nom": "Tablette", "prix": 300, "quantite": 10, "categorie": "Électronique"},
    {"nom": "Bureau", "prix": 250, "quantite": 8, "categorie": "Mobilier"},
    {"nom": "Chaise", "prix": 150, "quantite": 20, "categorie": "Mobilier"},
    {"nom": "Moniteur", "prix": 350, "quantite": 7, "categorie": "Électronique"},
    {"nom": "Clavier", "prix": 75, "quantite": 25, "categorie": "Électronique"},
    {"nom": "Bibliothèque", "prix": 180, "quantite": 5, "categorie": "Mobilier"}
]

Question 1.9 : En utilisant map(), filter() et reduce() :

  1. Calculer la valeur totale de l’inventaire (prix * quantité pour tous les produits)

  2. Trouver les produits avec un prix supérieur à 200

  3. Appliquer une remise de 15% à tous les produits Électronique et retourner la liste mise à jour

  4. Calculer la valeur totale par catégorie

# TODO: Implémenter les opérations

Exercice 2 [★] - Expressions Lambda et Fonctions d’Ordre Supérieur

Les expressions lambda sont des fonctions anonymes qui peuvent être définies en ligne. Elles sont particulièrement utiles avec filter(), map() et reduce().

1. Bases des expressions lambda

# Lambda avec filter
num = list(range(1, 21))
pairs = list(filter(lambda x: x % 2 == 0, num))
print("Nombres pairs :", pairs)
# Lambda avec map
carres = list(map(lambda x: x ** 2, num))
print("Carrés :", carres)
# Lambda avec plusieurs arguments
liste1 = [1, 2, 3, 4, 5]
liste2 = [10, 20, 30, 40, 50]
produits = list(map(lambda x, y: x * y, liste1, liste2))
print("Produits :", produits)
# Lambda avec reduce
from functools import reduce

total = reduce(lambda x, y: x + y, num)
print(f"Somme : {total}")

produit = reduce(lambda x, y: x * y, [1, 2, 3, 4, 5])
print(f"Produit : {produit}")

2. Fonctions d’ordre supérieur et fermetures (closures)

# Une fermeture - fonction qui mémorise les valeurs de la portée englobante
def creer_multiplicateur(n):
    def multiplicateur(x):
        return x * n
    return multiplicateur

double = creer_multiplicateur(2)
triple = creer_multiplicateur(3)

print("Double de 5 :", double(5))
print("Triple de 5 :", triple(5))

# Utilisation avec map
nombres = [1, 2, 3, 4, 5]
print("Doublés :", list(map(double, nombres)))
# Utilisation de functools.partial pour l'application partielle de fonctions
from functools import partial

def puissance(base, exposant):
    return base ** exposant

carre = partial(puissance, exposant=2)
cube = partial(puissance, exposant=3)

print("Carré de 5 :", carre(5))
print("Cube de 5 :", cube(5))

Question 2.1 : Créez une fonction fabrique de filtres qui génère des prédicats de filtre :

  1. creer_filtre_plage(val_min, val_max) - retourne une fonction qui vérifie si une valeur est dans la plage

  2. creer_filtre_seuil(seuil, comparaison) - retourne une fonction pour les comparaisons de seuil

  3. Utilisez-les avec filter() sur un jeu de données

# TODO: Implémenter les fonctions fabriques de filtres

3. Composition de fonctions

# Composer des fonctions : f(g(x))
def composer(f, g):
    return lambda x: f(g(x))

def ajouter_un(x):
    return x + 1

def carre(x):
    return x * x

# (x + 1)^2
ajouter_puis_carre = composer(carre, ajouter_un)
print("(3 + 1)^2 =", ajouter_puis_carre(3))

# x^2 + 1
carre_puis_ajouter = composer(ajouter_un, carre)
print("3^2 + 1 =", carre_puis_ajouter(3))

Question 2.2 : Implémentez une fonction pipeline() qui compose plusieurs fonctions et utilisez-la pour créer un pipeline de nettoyage de données :

  1. Supprimer les espaces

  2. Convertir en minuscules

  3. Supprimer les caractères spéciaux

  4. Remplacer les espaces multiples par un seul espace

# TODO: Implémenter la fonction pipeline et le pipeline de nettoyage de données

Analyse de texte avec expressions lambda

phrases = [
    "L'analyse de Big Data transforme les données brutes en insights actionnables.",
    "Le cloud computing permet une infrastructure évolutive.",
    "Les modèles d'apprentissage automatique nécessitent un entraînement sur de grands ensembles de données.",
    "Les pipelines de données automatisent le flux d'information.",
    "Les systèmes distribués offrent la tolérance aux pannes et la haute disponibilité.",
    "Le traitement en temps réel gère efficacement les flux de données.",
    "La gouvernance des données assure la qualité et la conformité.",
    "Les API permettent une intégration transparente entre les services."
]

Question 2.3 : En utilisant map(), filter(), reduce() avec des expressions lambda :

  1. Compter le nombre total de mots dans toutes les phrases

  2. Trouver la phrase avec le plus de mots

  3. Extraire tous les mots uniques de toutes les phrases

  4. Calculer la longueur moyenne des phrases (en mots)

# TODO: Implémenter l'analyse de texte avec lambdas

Traitement de données financières

transactions = [
    {"date": "2025-01-10", "type": "revenu", "montant": 5000, "categorie": "salaire"},
    {"date": "2025-01-11", "type": "depense", "montant": 150, "categorie": "services"},
    {"date": "2025-01-12", "type": "depense", "montant": 80, "categorie": "alimentation"},
    {"date": "2025-01-13", "type": "revenu", "montant": 200, "categorie": "freelance"},
    {"date": "2025-01-14", "type": "depense", "montant": 500, "categorie": "loyer"},
    {"date": "2025-01-15", "type": "depense", "montant": 60, "categorie": "transport"},
    {"date": "2025-01-16", "type": "revenu", "montant": 150, "categorie": "freelance"},
    {"date": "2025-01-17", "type": "depense", "montant": 200, "categorie": "alimentation"},
    {"date": "2025-01-18", "type": "depense", "montant": 100, "categorie": "loisirs"}
]

Question 2.4 : En utilisant des expressions lambda :

  1. Calculer le solde net (total revenus - total dépenses)

  2. Trouver toutes les dépenses supérieures à 100

  3. Grouper les transactions par catégorie et calculer les totaux

  4. Trouver la plus grande dépense

# TODO: Implémenter le traitement des données financières

Exercice 3 [★★] - Générateurs et Itérateurs pour les Grandes Données

Lors du traitement de jeux de données massifs, charger tout en mémoire est souvent impossible. Les générateurs fournissent un moyen efficace en mémoire de traiter les données de manière paresseuse.

1. Bases des générateurs

# Compréhension de liste vs expression génératrice
import sys

# Liste - stocke toutes les valeurs en mémoire
comp_liste = [x ** 2 for x in range(1000000)]
print(f"Taille de la liste : {sys.getsizeof(comp_liste):,} octets")

# Générateur - calcule les valeurs à la demande
exp_gen = (x ** 2 for x in range(1000000))
print(f"Taille du générateur : {sys.getsizeof(exp_gen):,} octets")
# Fonction génératrice utilisant yield
def compter_jusqua(n):
    """Générateur qui yield les nombres de 1 à n"""
    i = 1
    while i <= n:
        yield i
        i += 1

# Utilisation du générateur
compteur = compter_jusqua(5)
print("Type :", type(compteur))
print("Valeurs :", list(compteur))
# Les générateurs ne peuvent être itérés qu'une seule fois
gen = (x for x in range(5))
print("Première itération :", list(gen))
print("Deuxième itération :", list(gen))  # Vide !

Question 3.1 : Créez une fonction génératrice lire_gros_fichier(chemin, taille_chunk) qui :

  1. Lit un fichier par morceaux de taille_chunk lignes

  2. Yield chaque morceau sous forme de liste de lignes

  3. Ne charge jamais le fichier entier en mémoire

Testez avec un fichier contenant des millions de lignes.

import os

fichier_test = "gros_fichier_test.txt"
with open(fichier_test, 'w') as f:
    for i in range(100000):
        f.write(f"Ligne {i}: Ceci est une donnée de test pour l'exercice générateur\n")

print(f"Fichier créé avec taille : {os.path.getsize(fichier_test):,} octets")

# TODO: Implémenter le générateur lire_gros_fichier

2. Pipelines de générateurs

# Chaîner des générateurs crée un pipeline
def lire_lignes(nom_fichier):
    """Générateur qui yield les lignes d'un fichier"""
    with open(nom_fichier, 'r') as f:
        for ligne in f:
            yield ligne.strip()

def filtrer_non_vides(lignes):
    """Générateur qui yield les lignes non vides"""
    for ligne in lignes:
        if ligne:
            yield ligne

def extraire_numeros(lignes):
    """Générateur qui extrait le numéro de ligne"""
    for ligne in lignes:
        parties = ligne.split(':')
        if len(parties) >= 1:
            partie_num = parties[0].replace('Ligne ', '')
            if partie_num.isdigit():
                yield int(partie_num)

# Pipeline: lire -> filtrer -> extraire -> sommer
pipeline = extraire_numeros(filtrer_non_vides(lire_lignes(fichier_test)))

# Traiter les 100 premiers nombres
from itertools import islice
premiers_100 = list(islice(pipeline, 100))
print(f"10 premiers numéros de ligne : {premiers_100[:10]}")
print(f"Somme des 100 premiers : {sum(premiers_100)}")

Question 3.2 : Créez un pipeline de générateurs pour l’analyse de fichiers log :

  1. lire_logs(chemin) - yield les entrées de log

  2. analyser_logs(logs) - analyse chaque log en dictionnaire avec timestamp, niveau, message

  3. filtrer_erreurs(logs) - yield uniquement les logs de niveau ERROR

  4. extraire_messages(logs) - yield uniquement le champ message

Utilisez le pipeline pour traiter un gros fichier log sans le charger entièrement.

# Créer un fichier log exemple
import random
from datetime import datetime, timedelta

fichier_log = "logs_exemples.txt"
niveaux = ["INFO", "DEBUG", "WARNING", "ERROR", "INFO", "INFO", "DEBUG"]
messages = [
    "Connexion établie",
    "Traitement de la requête",
    "Requête base de données exécutée",
    "Échec de connexion au serveur",
    "Timeout survenu",
    "Cache miss",
    "Utilisateur authentifié",
    "Entrée invalide reçue"
]

temps_base = datetime.now()
with open(fichier_log, 'w') as f:
    for i in range(50000):
        timestamp = (temps_base + timedelta(seconds=i)).strftime("%Y-%m-%d %H:%M:%S")
        niveau = random.choice(niveaux)
        message = random.choice(messages)
        f.write(f"{timestamp} [{niveau}] {message}\n")

print(f"Fichier log créé avec {50000} entrées")

# TODO: Implémenter le pipeline de traitement des logs

3. Le module itertools

import itertools

# chain - combiner plusieurs itérables
liste1 = [1, 2, 3]
liste2 = [4, 5, 6]
liste3 = [7, 8, 9]
combine = list(itertools.chain(liste1, liste2, liste3))
print("Chaîné :", combine)
# islice - découper un itérateur
infini = itertools.count(start=0, step=2)  # 0, 2, 4, 6, ...
premiers_10_pairs = list(itertools.islice(infini, 10))
print("10 premiers nombres pairs :", premiers_10_pairs)
# groupby - grouper les éléments consécutifs
donnees = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]
for cle, groupe in itertools.groupby(donnees, key=lambda x: x[0]):
    print(f"Clé : {cle}, Groupe : {list(groupe)}")
# product - produit cartésien
couleurs = ['rouge', 'bleu']
tailles = ['S', 'M', 'L']
combinaisons = list(itertools.product(couleurs, tailles))
print("Combinaisons produit :", combinaisons)

Question 3.3 : Utilisez itertools pour :

  1. Générer toutes les paires de deux listes sans créer de listes intermédiaires

  2. Créer des lots de 100 éléments à partir d’un grand itérateur

  3. Implémenter une fenêtre glissante de taille 3 sur un itérateur

  4. Trouver les 1000 premiers nombres divisibles par 7 et 11

# TODO: Implémenter les exercices itertools
# Nettoyage des fichiers de test
import os
for f in [fichier_test, fichier_log]:
    if os.path.exists(f):
        os.remove(f)
        print(f"Supprimé {f}")

Exercice 4 [★★] - Introduction au Multiprocessing

Le Global Interpreter Lock (GIL) de Python empêche l’exécution parallèle réelle des threads. Pour les tâches intensives en CPU, nous utilisons le module multiprocessing pour atteindre le parallélisme via des processus séparés.

1. Comprendre le parallélisme

import multiprocessing as mp

# Obtenir le nombre de CPU
nb_cpu = mp.cpu_count()
print(f"Nombre de cœurs CPU : {nb_cpu}")
# Une fonction intensive en CPU
def est_premier(n):
    """Vérifie si n est premier (intensif en CPU pour les grands n)"""
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(n**0.5) + 1, 2):
        if n % i == 0:
            return False
    return True

def compter_premiers_dans_plage(debut, fin):
    """Compte les nombres premiers dans une plage"""
    return sum(1 for n in range(debut, fin) if est_premier(n))
import time

# Exécution séquentielle
temps_debut = time.perf_counter()
resultat_sequentiel = compter_premiers_dans_plage(2, 100000)
temps_sequentiel = time.perf_counter() - temps_debut

print(f"Séquentiel : Trouvé {resultat_sequentiel} nombres premiers en {temps_sequentiel:.2f} secondes")

2. Utilisation de multiprocessing.Pool

def wrapper_verifier_premier(n): 
    """Wrapper qui retourne un tuple (n, est_premier)"""
    return (n, est_premier(n)) 

# Exécution parallèle avec Pool 
nombres = list(range(2, 100)) 
temps_debut = time.perf_counter() 
with mp.Pool(processes=nb_cpu) as pool: 
    resultats = pool.map(wrapper_verifier_premier, nombres) 

resultat_parallele = sum(1 for _, est_p in resultats if est_p) 
temps_parallele = time.perf_counter() - temps_debut 

print(f"Parallèle ({nb_cpu} cœurs) : Trouvé {resultat_parallele} premiers en {temps_parallele:.2f} secondes") 
print(f"Accélération : {temps_sequentiel / temps_parallele:.2f}x")

3. Comparaison des méthodes Pool

def carre_lent(x):
    """Une fonction lente pour la démonstration"""
    time.sleep(0.01)  # Simuler du travail
    return x * x

nombres = list(range(100))

# pool.map - bloque jusqu'à ce que tous les résultats soient prêts, ordonné
debut = time.perf_counter()
with mp.Pool(4) as pool:
    resultats_map = pool.map(carre_lent, nombres)
print(f"pool.map : {time.perf_counter() - debut:.2f}s, 5 premiers : {resultats_map[:5]}")
# pool.imap - retourne un itérateur, ordonné, peut commencer le traitement avant que tout soit fait
debut = time.perf_counter()
with mp.Pool(4) as pool:
    resultats_imap = list(pool.imap(carre_lent, nombres))
print(f"pool.imap : {time.perf_counter() - debut:.2f}s, 5 premiers : {resultats_imap[:5]}")
# pool.imap_unordered - retourne un itérateur, non ordonné (plus rapide pour les charges inégales)
debut = time.perf_counter()
with mp.Pool(4) as pool:
    resultats_non_ordonnes = list(pool.imap_unordered(carre_lent, nombres))
print(f"pool.imap_unordered : {time.perf_counter() - debut:.2f}s")
print(f"Les résultats ne sont pas ordonnés : {resultats_non_ordonnes[:5]}")

Question 4.1 : Comparez les performances de pool.map, pool.imap et pool.imap_unordered sur une charge de travail où différents éléments prennent des temps différents. Quand chaque méthode est-elle la plus appropriée ?

# TODO: Comparer les méthodes pool avec une charge de travail inégale

4. Stratégies de découpage (Chunking)

def travail_simple(x):
    return x * x

donnees = list(range(100000))

# Tester différentes tailles de chunks
tailles_chunk = [1, 10, 100, 1000, 10000]

for taille_chunk in tailles_chunk:
    debut = time.perf_counter()
    with mp.Pool(4) as pool:
        resultats = pool.map(travail_simple, donnees, chunksize=taille_chunk)
    temps_ecoule = time.perf_counter() - debut
    print(f"Taille chunk {taille_chunk:5d} : {temps_ecoule:.4f}s")

Question 4.2 : Expérimentez avec différentes tailles de chunks pour une tâche intensive en CPU. Trouvez la taille de chunk optimale et expliquez pourquoi elle est la plus performante.

# TODO: Trouver la taille de chunk optimale

5. Application pratique : Traitement de fichiers en parallèle

import requests
import json
import os

def telecharger_entite_wikidata(id_entite):
    """Télécharge une entité Wikidata et retourne ses données"""
    url = f"https://www.wikidata.org/wiki/Special:EntityData/{id_entite}.json"
    try:
        response = requests.get(url, timeout=10)
        if response.status_code == 200:
            return {"id": id_entite, "statut": "succes", "taille": len(response.content)}
        else:
            return {"id": id_entite, "statut": "erreur", "code": response.status_code}
    except Exception as e:
        return {"id": id_entite, "statut": "erreur", "message": str(e)}

# Télécharger des entités en parallèle
entites = [f"Q{i}" for i in range(1, 21)]  # Q1 à Q20

print("Téléchargement des entités Wikidata...")
debut = time.perf_counter()
with mp.Pool(4) as pool:
    resultats = pool.map(telecharger_entite_wikidata, entites)
temps_ecoule = time.perf_counter() - debut

print(f"\nTéléchargé {len(resultats)} entités en {temps_ecoule:.2f}s")
for r in resultats[:5]:
    print(f"  {r['id']}: {r['statut']}")

Question 4.3 : Écrivez un programme qui :

  1. Télécharge 50 pages Wikipedia en parallèle

  2. Compte le nombre de liens sur chaque page

  3. Retourne la page avec le plus de liens

Implémentez avec une gestion d’erreurs appropriée et une limitation de débit.

# TODO: Implémenter le web scraping parallèle

Question 4.4 : Créez un pipeline de traitement d’images parallèle qui :

  1. Lit des images depuis un répertoire

  2. Redimensionne chaque image en 256x256

  3. Convertit en niveaux de gris

  4. Sauvegarde les images traitées

Comparez les temps d’exécution séquentielle vs parallèle.

# TODO: Implémenter le traitement d'images parallèle

Exercice 5 [★★] - Communication Inter-processus : Queues et Pipes

Quand les processus ont besoin de partager des données, nous utilisons des mécanismes de communication inter-processus (IPC) comme les Queues et les Pipes.

1. multiprocessing.Queue

from multiprocessing import Process, Queue
import time

def producteur(queue, n_elements):
    """Produit des éléments et les met dans la queue"""
    for i in range(n_elements):
        element = f"element_{i}"
        queue.put(element)
        print(f"Produit : {element}")
        time.sleep(0.1)
    queue.put(None)  # Sentinelle pour signaler la fin

def consommateur(queue):
    """Consomme les éléments de la queue"""
    while True:
        element = queue.get()
        if element is None:
            break
        print(f"Consommé : {element}")

# Note: Ce pattern fonctionne mieux en tant que script, pas dans Jupyter
# Dans Jupyter, nous simulerons avec des threads pour la démonstration
# Démonstration simplifiée de Queue avec des threads (fonctionne dans Jupyter)
from queue import Queue
from threading import Thread

def producteur_thread(queue, n_elements):
    for i in range(n_elements):
        element = f"element_{i}"
        queue.put(element)
    queue.put(None)

def consommateur_thread(queue, resultats):
    while True:
        element = queue.get()
        if element is None:
            break
        resultats.append(f"traite_{element}")

q = Queue()
resultats = []

prod = Thread(target=producteur_thread, args=(q, 10))
cons = Thread(target=consommateur_thread, args=(q, resultats))

prod.start()
cons.start()
prod.join()
cons.join()

print("Résultats :", resultats)

2. Pattern Producteur-Consommateur

from queue import Queue
from threading import Thread
import time

def producteur_donnees(queue, source_donnees, n_workers):
    """Produit des éléments de données depuis une source"""
    for element in source_donnees:
        queue.put(element)
    # Envoyer le signal de terminaison pour chaque worker
    for _ in range(n_workers):
        queue.put(None)

def processeur_donnees(queue, resultats, id_worker):
    """Traite les éléments de données de la queue"""
    resultats_locaux = []
    while True:
        element = queue.get()
        if element is None:
            break
        # Simuler le traitement
        traite = element ** 2
        resultats_locaux.append(traite)
    resultats[id_worker] = resultats_locaux

# Créer la queue de travail et le stockage des résultats
queue_travail = Queue()
n_workers = 4
resultats = {}
donnees = list(range(100))

# Démarrer les workers
workers = []
for i in range(n_workers):
    w = Thread(target=processeur_donnees, args=(queue_travail, resultats, i))
    w.start()
    workers.append(w)

# Démarrer le producteur
thread_producteur = Thread(target=producteur_donnees, args=(queue_travail, donnees, n_workers))
thread_producteur.start()

# Attendre la fin
thread_producteur.join()
for w in workers:
    w.join()

# Combiner les résultats
tous_resultats = []
for resultats_worker in resultats.values():
    tous_resultats.extend(resultats_worker)

print(f"Traité {len(tous_resultats)} éléments")
print(f"Résultats exemples : {sorted(tous_resultats)[:10]}")

Question 5.1 : Implémentez un système producteur-consommateur qui :

  1. A un producteur générant des nombres aléatoires

  2. A 4 workers traitant les nombres (vérifier si premier)

  3. A un agrégateur collectant et résumant les résultats

  4. Utilise des queues bornées pour éviter les problèmes de mémoire

# TODO: Implémenter producteur-consommateur avec agrégateur

3. Mémoire partagée

from multiprocessing import Value, Array, Lock

# Valeur partagée (valeur unique)
compteur_partage = Value('i', 0)  # 'i' = entier

# Tableau partagé
tableau_partage = Array('d', [0.0] * 10)  # 'd' = double

print(f"Compteur initial : {compteur_partage.value}")
print(f"Tableau initial : {list(tableau_partage)}")
# Démonstration de condition de course (conceptuel - mieux visible avec multiprocessing)
from threading import Thread, Lock

compteur = 0
verrou = Lock()

def incrementer_non_securise():
    global compteur
    for _ in range(100000):
        compteur += 1

def incrementer_securise():
    global compteur
    for _ in range(100000):
        with verrou:
            compteur += 1

# Version non sécurisée - peut donner un mauvais résultat
compteur = 0
threads = [Thread(target=incrementer_non_securise) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Compteur non sécurisé (attendu 400000) : {compteur}")

# Version sécurisée - toujours correcte
compteur = 0
threads = [Thread(target=incrementer_securise) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Compteur sécurisé (attendu 400000) : {compteur}")

Question 5.2 : Implémentez un compteur de mots parallèle utilisant la mémoire partagée :

  1. Plusieurs workers lisent différentes parties d’un fichier texte

  2. Chaque worker met à jour un dictionnaire partagé de comptages de mots

  3. Utilisez un verrouillage approprié pour éviter les conditions de course

  4. Comparez les performances avec et sans verrouillage

# TODO: Implémenter le compteur de mots parallèle avec mémoire partagée

Exercice 6 [★★★] - concurrent.futures et Patterns Asynchrones

Le module concurrent.futures fournit une interface de haut niveau pour exécuter des callables de manière asynchrone en utilisant des threads ou des processus.

1. ThreadPoolExecutor vs ProcessPoolExecutor

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time

def tache_intensive_cpu(n):
    """Tâche intensive en CPU"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

def tache_intensive_io(url):
    """Tâche intensive en I/O"""
    time.sleep(0.1)  # Simuler le délai réseau
    return f"Récupéré {url}"

# CPU-bound : ProcessPoolExecutor est plus rapide
donnees = [1000000] * 8

debut = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
    resultats = list(executor.map(tache_intensive_cpu, donnees))
print(f"ThreadPool (tâche CPU) : {time.perf_counter() - debut:.2f}s")

debut = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
    resultats = list(executor.map(tache_intensive_cpu, donnees))
print(f"ProcessPool (tâche CPU) : {time.perf_counter() - debut:.2f}s")
# I/O-bound : ThreadPoolExecutor est suffisant et a moins d'overhead
urls = [f"http://exemple.com/page{i}" for i in range(20)]

debut = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
    resultats = list(executor.map(tache_intensive_io, urls))
print(f"ThreadPool (tâche I/O) : {time.perf_counter() - debut:.2f}s")

debut = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
    resultats = list(executor.map(tache_intensive_io, urls))
print(f"ProcessPool (tâche I/O) : {time.perf_counter() - debut:.2f}s")

2. Travailler avec les Futures

from concurrent.futures import ThreadPoolExecutor, as_completed
import random

def tache_temps_variable(id_tache):
    """Tâche avec temps d'exécution variable"""
    temps_sommeil = random.uniform(0.1, 1.0)
    time.sleep(temps_sommeil)
    return {"id_tache": id_tache, "duree": temps_sommeil}

# Traiter les résultats dès qu'ils sont prêts
with ThreadPoolExecutor(max_workers=4) as executor:
    # Soumettre toutes les tâches
    futures = {executor.submit(tache_temps_variable, i): i for i in range(10)}
    
    # Traiter les résultats dès qu'ils sont prêts
    for future in as_completed(futures):
        id_tache = futures[future]
        resultat = future.result()
        print(f"Tâche {id_tache} terminée en {resultat['duree']:.2f}s")
# Gestion des exceptions
def peut_echouer(id_tache):
    if random.random() < 0.3:  # 30% de chance d'échec
        raise ValueError(f"Tâche {id_tache} a échoué !")
    return f"Tâche {id_tache} réussie"

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(peut_echouer, i) for i in range(10)]
    
    for future in as_completed(futures):
        try:
            resultat = future.result()
            print(resultat)
        except ValueError as e:
            print(f"Erreur : {e}")

Question 6.1 : Implémentez un planificateur de tâches qui :

  1. Accepte des tâches avec différentes priorités

  2. Traite les tâches de priorité supérieure en premier

  3. Implémente la gestion des timeouts pour les tâches lentes

  4. Fournit un rapport de progression

# TODO: Implémenter le planificateur de tâches avec priorité

3. Introduction à asyncio (Sujet avancé optionnel)

import asyncio

async def tache_async(id_tache, delai):
    """Une tâche async qui simule des I/O"""
    print(f"Tâche {id_tache} démarrée")
    await asyncio.sleep(delai)
    print(f"Tâche {id_tache} terminée")
    return f"Résultat de la tâche {id_tache}"

async def main():
    # Exécuter plusieurs tâches en concurrence
    taches = [
        tache_async(1, 1),
        tache_async(2, 2),
        tache_async(3, 1),
    ]
    resultats = await asyncio.gather(*taches)
    return resultats

# Dans Jupyter, utiliser await directement
resultats = await main()
print("Résultats :", resultats)

Question 6.2 : Comparez les performances de :

  1. Opérations I/O séquentielles

  2. ThreadPoolExecutor

  3. asyncio

Pour télécharger plusieurs pages web. Quelle approche est la meilleure pour les tâches I/O-bound ?

# TODO: Comparer les approches I/O

Exercice 7 [★★★] - Benchmarking et Optimisation des Performances

Comprendre comment mesurer et optimiser le code parallèle est essentiel pour utiliser efficacement ces techniques.

1. Mesurer le temps d’exécution

import time
from functools import wraps

def decorateur_timing(func):
    """Décorateur pour mesurer le temps d'exécution d'une fonction"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        debut = time.perf_counter()
        resultat = func(*args, **kwargs)
        temps_ecoule = time.perf_counter() - debut
        print(f"{func.__name__} a pris {temps_ecoule:.4f}s")
        return resultat
    return wrapper

@decorateur_timing
def fonction_lente():
    time.sleep(0.5)
    return "Terminé"

fonction_lente()
# Utiliser timeit pour des mesures plus précises
import timeit

def fonction_test():
    return sum(i**2 for i in range(1000))

# Timer 1000 exécutions
temps_execution = timeit.timeit(fonction_test, number=1000)
print(f"Temps moyen par appel : {temps_execution/1000*1000:.4f}ms")

2. Loi d’Amdahl

import matplotlib.pyplot as plt
import numpy as np

def loi_amdahl(p, n):
    """
    Calcule l'accélération théorique en utilisant la loi d'Amdahl.
    p: fraction du programme qui peut être parallélisée (0 à 1)
    n: nombre de processeurs
    """
    return 1 / ((1 - p) + p / n)

# Visualiser la loi d'Amdahl
processeurs = np.arange(1, 65)
fractions_paralleles = [0.5, 0.75, 0.9, 0.95, 0.99]

plt.figure(figsize=(10, 6))
for p in fractions_paralleles:
    accelerations = [loi_amdahl(p, n) for n in processeurs]
    plt.plot(processeurs, accelerations, label=f'{p*100:.0f}% parallélisable')

plt.xlabel('Nombre de processeurs')
plt.ylabel('Accélération')
plt.title("Loi d'Amdahl : Limites théoriques d'accélération")
plt.legend()
plt.grid(True)
plt.show()

3. Benchmarking du code parallèle

import multiprocessing as mp
import time

def intensif_cpu(n):
    """Calcul intensif en CPU"""
    total = 0
    for i in range(n):
        total += i ** 0.5
    return total

def benchmark_parallele(func, donnees, max_workers=None):
    """Benchmark une fonction avec différents nombres de workers"""
    if max_workers is None:
        max_workers = mp.cpu_count()
    
    resultats = []
    
    # Ligne de base séquentielle
    debut = time.perf_counter()
    resultats_sequentiels = [func(d) for d in donnees]
    temps_sequentiel = time.perf_counter() - debut
    resultats.append((1, temps_sequentiel, 1.0))
    print(f"Séquentiel : {temps_sequentiel:.4f}s")
    
    # Parallèle avec différents nombres de workers
    for n_workers in range(2, max_workers + 1):
        debut = time.perf_counter()
        with mp.Pool(n_workers) as pool:
            resultats_paralleles = pool.map(func, donnees)
        temps_parallele = time.perf_counter() - debut
        acceleration = temps_sequentiel / temps_parallele
        efficacite = acceleration / n_workers
        resultats.append((n_workers, temps_parallele, acceleration))
        print(f"Workers={n_workers}: {temps_parallele:.4f}s, Accélération={acceleration:.2f}x, Efficacité={efficacite:.1%}")
    
    return resultats

# Exécuter le benchmark
donnees = [1000000] * 16
resultats_benchmark = benchmark_parallele(intensif_cpu, donnees)

Question 7.1 : Créez une suite de benchmarks complète qui :

  1. Teste différentes tailles de chunks

  2. Teste différentes tailles de données

  3. Génère des visualisations comparant les performances

  4. Identifie la configuration optimale pour votre charge de travail

# TODO: Créer une suite de benchmarks complète

4. Profilage et optimisation

import cProfile
import pstats
from io import StringIO

def profiler_fonction(func, *args, **kwargs):
    """Profile une fonction et affiche les résultats"""
    profiler = cProfile.Profile()
    profiler.enable()
    resultat = func(*args, **kwargs)
    profiler.disable()
    
    stream = StringIO()
    stats = pstats.Stats(profiler, stream=stream)
    stats.sort_stats('cumulative')
    stats.print_stats(10)
    print(stream.getvalue())
    
    return resultat

def fonction_a_profiler():
    """Exemple de fonction à profiler"""
    donnees = [i**2 for i in range(100000)]
    filtre = list(filter(lambda x: x % 2 == 0, donnees))
    total = sum(filtre)
    return total

profiler_fonction(fonction_a_profiler)

Question 7.2 : Profilez et optimisez le pipeline de traitement d’images de l’Exercice 4 :

  1. Identifiez les trois plus grands goulots d’étranglement

  2. Appliquez des optimisations à chacun

  3. Mesurez l’amélioration

  4. Documentez votre processus d’optimisation

# TODO: Profiler et optimiser

5. Étude de cas réel

Question 7.3 : Construisez un système complet de traitement de données qui :

  1. Lit des données depuis plusieurs fichiers CSV (100+ fichiers)

  2. Nettoie et transforme chaque fichier

  3. Agrège les résultats de tous les fichiers

  4. Écrit la sortie finale

Implémentez les versions séquentielle et parallèle, benchmarkez-les, et documentez l’accélération obtenue.

# TODO: Implémenter l'étude de cas réel

Résumé

Dans ce TP, vous avez appris :

  1. Programmation fonctionnelle : Utilisation de filter(), map(), reduce() pour les transformations de données

  2. Expressions lambda : Création de fonctions concises en ligne

  3. Générateurs : Traitement efficace en mémoire des grands ensembles de données

  4. Multiprocessing : Exécution parallèle avec Pool et parallélisme basé sur les processus

  5. Communication inter-processus : Utilisation des Queues et de la mémoire partagée

  6. concurrent.futures : Interface de haut niveau pour l’exécution parallèle

  7. Optimisation des performances : Benchmarking, profilage et optimisation du code parallèle

Ces concepts forment la base des frameworks de calcul distribué comme Apache Spark, que vous explorerez dans le TP 5.

Prochaines étapes

Continuez vers le TP 5 : Apache Spark pour le traitement de données massives pour apprendre comment ces concepts de traitement parallèle s’étendent au calcul distribué sur des clusters.