Objectifs¶
Comprendre les paradigmes du calcul parallèle¶
Exécution séquentielle vs parallèle
Concurrence vs parallélisme
Loi d’Amdahl et calculs d’accélération
Maîtriser la programmation fonctionnelle en Python¶
filter(),map(),reduce()avec expressions lambdaGénérateurs et itérateurs pour un traitement efficace en mémoire
Implémenter le traitement parallèle¶
Module
multiprocessing(Process, Pool, Queue, Pipe)concurrent.futures(ThreadPoolExecutor, ProcessPoolExecutor)Mémoire partagée et primitives de synchronisation
Mesure et optimisation des performances¶
Benchmarking du code parallèle
Identification des goulots d’étranglement
Stratégies de découpage (chunking)
Prérequis¶
Achèvement des TP 0-3
Compréhension des fonctions et structures de données Python
Connaissances de base de l’architecture informatique (cœurs CPU)
Aperçu des exercices¶
| Exercice | Difficulté | Sujets |
|---|---|---|
| Exercice 1 | ★ | Programmation fonctionnelle : filter(), map(), reduce() |
| Exercice 2 | ★ | Expressions lambda et fonctions d’ordre supérieur |
| Exercice 3 | ★★ | Générateurs et itérateurs pour les grandes données |
| Exercice 4 | ★★ | Introduction au multiprocessing |
| Exercice 5 | ★★ | Communication inter-processus : Queues et Pipes |
| Exercice 6 | ★★★ | concurrent.futures et modèles asynchrones |
| Exercice 7 | ★★★ | Benchmarking et optimisation des performances |
Exercice 1 [★] - Programmation fonctionnelle : filter(), map(), reduce()¶
Dans cet exercice, nous explorons les concepts de programmation fonctionnelle qui forment la base du traitement de données parallèle et distribué. Ces modèles sont utilisés intensivement dans des frameworks comme Apache Spark.
1. Introduction à la programmation fonctionnelle¶
La programmation fonctionnelle met l’accent sur :
Fonctions pures : Fonctions qui produisent toujours le même résultat pour la même entrée et n’ont pas d’effets de bord
Immutabilité : Les données ne sont pas modifiées après leur création
Fonctions de première classe : Les fonctions peuvent être passées en arguments et retournées par d’autres fonctions
Ces principes sont essentiels pour le traitement parallèle car ils éliminent l’état partagé et rendent sûre l’exécution concurrente des opérations.
2. La fonction filter()¶
La fonction filter(function, iterable) retourne un itérateur contenant les éléments de l’itérable pour lesquels la fonction retourne True.
?filter# Exemple basique de filter : filtrer les nombres pairs
num = [i for i in range(1, 20)]
print("Liste originale :", num)
def est_pair(item):
return item % 2 == 0
filtre = list(filter(est_pair, num))
print("Nombres pairs :", filtre)# Filter avec None comme fonction - supprime les valeurs fausses (falsy)
mixte = [0, 1, "", "bonjour", None, 42, [], [1, 2]]
print("Original :", mixte)
print("Filtré (uniquement truthy) :", list(filter(None, mixte)))# Filtrer les nombres impairs
def est_impair(item):
return item % 2 != 0
num = [i for i in range(1, 20)]
filtre = list(filter(est_impair, num))
print("Nombres impairs :", filtre)Question 1.1 : Écrivez une fonction est_premier(n) qui retourne True si n est un nombre premier. Ensuite, utilisez filter() pour extraire tous les nombres premiers d’une liste de 1000 entiers aléatoires entre 1 et 1000.
import random
# Générer 1000 entiers aléatoires
nombres_aleatoires = [random.randint(1, 1000) for _ in range(1000)]
# TODO: Implémenter la fonction est_premier et filtrer les nombres premiersQuestion 1.2 : Utilisez filter() pour extraire les nombres divisibles à la fois par 3 et par 5 de la même liste.
# TODO: Filtrer les nombres divisibles par 3 et 5Filtrage avec des structures imbriquées¶
La fonction filter() peut être appliquée à des structures de données complexes comme des listes de dictionnaires.
employes = [
{"nom": "Alice", "age": 28, "departement": "RH", "salaire": 55000},
{"nom": "Bob", "age": 35, "departement": "Ingénierie", "salaire": 85000},
{"nom": "Charlie", "age": 22, "departement": "Marketing", "salaire": 45000},
{"nom": "David", "age": 45, "departement": "Ingénierie", "salaire": 95000},
{"nom": "Eve", "age": 31, "departement": "RH", "salaire": 60000},
{"nom": "Frank", "age": 29, "departement": "Ingénierie", "salaire": 78000},
{"nom": "Grace", "age": 38, "departement": "Marketing", "salaire": 72000},
{"nom": "Henry", "age": 42, "departement": "Ingénierie", "salaire": 92000}
]Question 1.3 : En utilisant filter(), complétez les tâches suivantes :
Créer une liste des employés qui travaillent dans le département “Ingénierie”
Trouver les employés dont l’âge est entre 25 et 40 ans (inclus)
Trouver les employés avec un salaire supérieur à 70000
Trouver les employés dont le nom commence par une voyelle
# TODO: Implémenter les filtresFiltrage avancé de chaînes de caractères¶
phrases = [
"La science des données transforme les industries dans le monde entier.",
"Python est génial.",
"Les algorithmes d'apprentissage automatique nécessitent de grandes quantités de données.",
"Les technologies Big Data permettent le traitement de pétaoctets.",
"L'IA évolue.",
"Le calcul distribué permet la mise à l'échelle horizontale sur les clusters.",
"Le cloud fournit des ressources informatiques élastiques à la demande.",
"Simple fonctionne."
]Question 1.4 : En utilisant filter() :
Sélectionner les phrases avec plus de 6 mots
Sélectionner les phrases contenant le mot “données” (insensible à la casse)
Sélectionner les phrases où la longueur moyenne des mots est supérieure à 5 caractères
# TODO: Implémenter les filtres3. La fonction map()¶
La fonction map(function, iterable, ...) applique une fonction à chaque élément d’un itérable et retourne un itérateur des résultats.
?map# Exemple basique de map : mettre au carré chaque nombre
def carre(x):
return x * x
num = [i for i in range(1, 11)]
carres = list(map(carre, num))
print("Original :", num)
print("Carrés :", carres)# Map avec plusieurs itérables
def multiplier(x, y):
return x * y
liste1 = [1, 2, 3, 4, 5]
liste2 = [10, 20, 30, 40, 50]
produits = list(map(multiplier, liste1, liste2))
print("Produits :", produits)# Map avec trois itérables
def somme_ponderee(a, b, c):
return a + 2*b + 3*c
x = [1, 2, 3]
y = [4, 5, 6]
z = [7, 8, 9]
resultat = list(map(somme_ponderee, x, y, z))
print("Sommes pondérées :", resultat)Question 1.5 : Étant donné une liste de 100 000 chemins de fichiers, utilisez map() pour :
Extraire les extensions de fichiers (ex :
.txt,.csv)Extraire les noms de fichiers sans extensions
Calculer les profondeurs de chemin (nombre de répertoires)
import os
import random
# Générer des chemins de fichiers exemples
extensions = ['.txt', '.csv', '.json', '.xml', '.py', '.md']
repertoires = ['data', 'src', 'docs', 'config', 'tests', 'output']
chemins_fichiers = [
os.path.join(
random.choice(repertoires),
random.choice(repertoires),
f"fichier_{i}{random.choice(extensions)}"
)
for i in range(100000)
]
print("Exemples de chemins :", chemins_fichiers[:5])
# TODO: Utiliser map() pour extraire les extensions, noms et calculer les profondeursQuestion 1.6 : Utilisez map() pour normaliser un ensemble de 50 000 chaînes de texte :
Convertir en minuscules
Supprimer les espaces en début et fin
Supprimer la ponctuation
import string
# Générer des données textuelles exemples
mots_exemples = ["Bonjour", "Monde", "Données", "Science", "Python", "Analyse"]
signes_ponctuation = list("!.,;:?")
donnees_texte = [
f" {random.choice(mots_exemples)}{random.choice(signes_ponctuation)} ".upper()
if random.random() > 0.5 else
f"{random.choice(mots_exemples)} {random.choice(mots_exemples)}{random.choice(signes_ponctuation)}"
for _ in range(50000)
]
print("Données exemples :", donnees_texte[:5])
# TODO: Utiliser map() pour normaliser les données textuelles4. La fonction reduce()¶
La fonction reduce(function, iterable) applique une fonction de deux arguments de manière cumulative aux éléments d’un itérable, de gauche à droite, le réduisant à une seule valeur.
from functools import reduce
?reduce# Somme de nombres avec reduce
from functools import reduce
def addition(x, y):
return x + y
num = [i for i in range(1, 11)]
total = reduce(addition, num)
print(f"Somme de {num} = {total}")# Produit de nombres
def multiplier(x, y):
return x * y
num = [1, 2, 3, 4, 5]
produit = reduce(multiplier, num)
print(f"Produit de {num} = {produit}")# Trouver le maximum avec reduce
def max_de_deux(x, y):
return x if x > y else y
num = [45, 12, 89, 34, 67, 23, 90, 11]
maximum = reduce(max_de_deux, num)
print(f"Maximum de {num} = {maximum}")Question 1.7 : Utilisez reduce() pour :
Trouver la chaîne la plus longue dans une liste de 10 000 chaînes
Aplatir une liste imbriquée (3 niveaux de profondeur)
Concaténer une liste de chaînes avec un séparateur
# Générer des données exemples
mots = ["algorithme", "données", "traitement", "distribué", "calcul",
"parallèle", "optimisation", "scalabilité", "performance"]
liste_chaines = [random.choice(mots) * random.randint(1, 5) for _ in range(10000)]
liste_imbriquee = [
[[1, 2], [3, 4]],
[[5, 6], [7, 8]],
[[9, 10], [11, 12]]
]
# TODO: Implémenter les opérations reduceOpérations matricielles avec map() et reduce()¶
matrices = [
[[1, 2], [3, 4]],
[[5, 6], [7, 8]],
[[9, 10], [11, 12]],
[[13, 14], [15, 16]]
]Question 1.8 : En utilisant map() et reduce() :
Calculer la somme de toutes les matrices
Calculer le produit élément par élément de deux matrices
Filtrer les matrices où tous les éléments sont supérieurs à 5
# TODO: Implémenter les opérations matriciellesTransformation et agrégation de données¶
produits = [
{"nom": "Ordinateur portable", "prix": 1200, "quantite": 3, "categorie": "Électronique"},
{"nom": "Smartphone", "prix": 800, "quantite": 5, "categorie": "Électronique"},
{"nom": "Tablette", "prix": 300, "quantite": 10, "categorie": "Électronique"},
{"nom": "Bureau", "prix": 250, "quantite": 8, "categorie": "Mobilier"},
{"nom": "Chaise", "prix": 150, "quantite": 20, "categorie": "Mobilier"},
{"nom": "Moniteur", "prix": 350, "quantite": 7, "categorie": "Électronique"},
{"nom": "Clavier", "prix": 75, "quantite": 25, "categorie": "Électronique"},
{"nom": "Bibliothèque", "prix": 180, "quantite": 5, "categorie": "Mobilier"}
]Question 1.9 : En utilisant map(), filter() et reduce() :
Calculer la valeur totale de l’inventaire (prix * quantité pour tous les produits)
Trouver les produits avec un prix supérieur à 200
Appliquer une remise de 15% à tous les produits Électronique et retourner la liste mise à jour
Calculer la valeur totale par catégorie
# TODO: Implémenter les opérationsExercice 2 [★] - Expressions Lambda et Fonctions d’Ordre Supérieur¶
Les expressions lambda sont des fonctions anonymes qui peuvent être définies en ligne. Elles sont particulièrement utiles avec filter(), map() et reduce().
1. Bases des expressions lambda¶
# Lambda avec filter
num = list(range(1, 21))
pairs = list(filter(lambda x: x % 2 == 0, num))
print("Nombres pairs :", pairs)# Lambda avec map
carres = list(map(lambda x: x ** 2, num))
print("Carrés :", carres)# Lambda avec plusieurs arguments
liste1 = [1, 2, 3, 4, 5]
liste2 = [10, 20, 30, 40, 50]
produits = list(map(lambda x, y: x * y, liste1, liste2))
print("Produits :", produits)# Lambda avec reduce
from functools import reduce
total = reduce(lambda x, y: x + y, num)
print(f"Somme : {total}")
produit = reduce(lambda x, y: x * y, [1, 2, 3, 4, 5])
print(f"Produit : {produit}")2. Fonctions d’ordre supérieur et fermetures (closures)¶
# Une fermeture - fonction qui mémorise les valeurs de la portée englobante
def creer_multiplicateur(n):
def multiplicateur(x):
return x * n
return multiplicateur
double = creer_multiplicateur(2)
triple = creer_multiplicateur(3)
print("Double de 5 :", double(5))
print("Triple de 5 :", triple(5))
# Utilisation avec map
nombres = [1, 2, 3, 4, 5]
print("Doublés :", list(map(double, nombres)))# Utilisation de functools.partial pour l'application partielle de fonctions
from functools import partial
def puissance(base, exposant):
return base ** exposant
carre = partial(puissance, exposant=2)
cube = partial(puissance, exposant=3)
print("Carré de 5 :", carre(5))
print("Cube de 5 :", cube(5))Question 2.1 : Créez une fonction fabrique de filtres qui génère des prédicats de filtre :
creer_filtre_plage(val_min, val_max)- retourne une fonction qui vérifie si une valeur est dans la plagecreer_filtre_seuil(seuil, comparaison)- retourne une fonction pour les comparaisons de seuilUtilisez-les avec
filter()sur un jeu de données
# TODO: Implémenter les fonctions fabriques de filtres3. Composition de fonctions¶
# Composer des fonctions : f(g(x))
def composer(f, g):
return lambda x: f(g(x))
def ajouter_un(x):
return x + 1
def carre(x):
return x * x
# (x + 1)^2
ajouter_puis_carre = composer(carre, ajouter_un)
print("(3 + 1)^2 =", ajouter_puis_carre(3))
# x^2 + 1
carre_puis_ajouter = composer(ajouter_un, carre)
print("3^2 + 1 =", carre_puis_ajouter(3))Question 2.2 : Implémentez une fonction pipeline() qui compose plusieurs fonctions et utilisez-la pour créer un pipeline de nettoyage de données :
Supprimer les espaces
Convertir en minuscules
Supprimer les caractères spéciaux
Remplacer les espaces multiples par un seul espace
# TODO: Implémenter la fonction pipeline et le pipeline de nettoyage de donnéesAnalyse de texte avec expressions lambda¶
phrases = [
"L'analyse de Big Data transforme les données brutes en insights actionnables.",
"Le cloud computing permet une infrastructure évolutive.",
"Les modèles d'apprentissage automatique nécessitent un entraînement sur de grands ensembles de données.",
"Les pipelines de données automatisent le flux d'information.",
"Les systèmes distribués offrent la tolérance aux pannes et la haute disponibilité.",
"Le traitement en temps réel gère efficacement les flux de données.",
"La gouvernance des données assure la qualité et la conformité.",
"Les API permettent une intégration transparente entre les services."
]Question 2.3 : En utilisant map(), filter(), reduce() avec des expressions lambda :
Compter le nombre total de mots dans toutes les phrases
Trouver la phrase avec le plus de mots
Extraire tous les mots uniques de toutes les phrases
Calculer la longueur moyenne des phrases (en mots)
# TODO: Implémenter l'analyse de texte avec lambdasTraitement de données financières¶
transactions = [
{"date": "2025-01-10", "type": "revenu", "montant": 5000, "categorie": "salaire"},
{"date": "2025-01-11", "type": "depense", "montant": 150, "categorie": "services"},
{"date": "2025-01-12", "type": "depense", "montant": 80, "categorie": "alimentation"},
{"date": "2025-01-13", "type": "revenu", "montant": 200, "categorie": "freelance"},
{"date": "2025-01-14", "type": "depense", "montant": 500, "categorie": "loyer"},
{"date": "2025-01-15", "type": "depense", "montant": 60, "categorie": "transport"},
{"date": "2025-01-16", "type": "revenu", "montant": 150, "categorie": "freelance"},
{"date": "2025-01-17", "type": "depense", "montant": 200, "categorie": "alimentation"},
{"date": "2025-01-18", "type": "depense", "montant": 100, "categorie": "loisirs"}
]Question 2.4 : En utilisant des expressions lambda :
Calculer le solde net (total revenus - total dépenses)
Trouver toutes les dépenses supérieures à 100
Grouper les transactions par catégorie et calculer les totaux
Trouver la plus grande dépense
# TODO: Implémenter le traitement des données financièresExercice 3 [★★] - Générateurs et Itérateurs pour les Grandes Données¶
Lors du traitement de jeux de données massifs, charger tout en mémoire est souvent impossible. Les générateurs fournissent un moyen efficace en mémoire de traiter les données de manière paresseuse.
1. Bases des générateurs¶
# Compréhension de liste vs expression génératrice
import sys
# Liste - stocke toutes les valeurs en mémoire
comp_liste = [x ** 2 for x in range(1000000)]
print(f"Taille de la liste : {sys.getsizeof(comp_liste):,} octets")
# Générateur - calcule les valeurs à la demande
exp_gen = (x ** 2 for x in range(1000000))
print(f"Taille du générateur : {sys.getsizeof(exp_gen):,} octets")# Fonction génératrice utilisant yield
def compter_jusqua(n):
"""Générateur qui yield les nombres de 1 à n"""
i = 1
while i <= n:
yield i
i += 1
# Utilisation du générateur
compteur = compter_jusqua(5)
print("Type :", type(compteur))
print("Valeurs :", list(compteur))# Les générateurs ne peuvent être itérés qu'une seule fois
gen = (x for x in range(5))
print("Première itération :", list(gen))
print("Deuxième itération :", list(gen)) # Vide !Question 3.1 : Créez une fonction génératrice lire_gros_fichier(chemin, taille_chunk) qui :
Lit un fichier par morceaux de
taille_chunklignesYield chaque morceau sous forme de liste de lignes
Ne charge jamais le fichier entier en mémoire
Testez avec un fichier contenant des millions de lignes.
import os
fichier_test = "gros_fichier_test.txt"
with open(fichier_test, 'w') as f:
for i in range(100000):
f.write(f"Ligne {i}: Ceci est une donnée de test pour l'exercice générateur\n")
print(f"Fichier créé avec taille : {os.path.getsize(fichier_test):,} octets")
# TODO: Implémenter le générateur lire_gros_fichier2. Pipelines de générateurs¶
# Chaîner des générateurs crée un pipeline
def lire_lignes(nom_fichier):
"""Générateur qui yield les lignes d'un fichier"""
with open(nom_fichier, 'r') as f:
for ligne in f:
yield ligne.strip()
def filtrer_non_vides(lignes):
"""Générateur qui yield les lignes non vides"""
for ligne in lignes:
if ligne:
yield ligne
def extraire_numeros(lignes):
"""Générateur qui extrait le numéro de ligne"""
for ligne in lignes:
parties = ligne.split(':')
if len(parties) >= 1:
partie_num = parties[0].replace('Ligne ', '')
if partie_num.isdigit():
yield int(partie_num)
# Pipeline: lire -> filtrer -> extraire -> sommer
pipeline = extraire_numeros(filtrer_non_vides(lire_lignes(fichier_test)))
# Traiter les 100 premiers nombres
from itertools import islice
premiers_100 = list(islice(pipeline, 100))
print(f"10 premiers numéros de ligne : {premiers_100[:10]}")
print(f"Somme des 100 premiers : {sum(premiers_100)}")Question 3.2 : Créez un pipeline de générateurs pour l’analyse de fichiers log :
lire_logs(chemin)- yield les entrées de loganalyser_logs(logs)- analyse chaque log en dictionnaire avec timestamp, niveau, messagefiltrer_erreurs(logs)- yield uniquement les logs de niveau ERRORextraire_messages(logs)- yield uniquement le champ message
Utilisez le pipeline pour traiter un gros fichier log sans le charger entièrement.
# Créer un fichier log exemple
import random
from datetime import datetime, timedelta
fichier_log = "logs_exemples.txt"
niveaux = ["INFO", "DEBUG", "WARNING", "ERROR", "INFO", "INFO", "DEBUG"]
messages = [
"Connexion établie",
"Traitement de la requête",
"Requête base de données exécutée",
"Échec de connexion au serveur",
"Timeout survenu",
"Cache miss",
"Utilisateur authentifié",
"Entrée invalide reçue"
]
temps_base = datetime.now()
with open(fichier_log, 'w') as f:
for i in range(50000):
timestamp = (temps_base + timedelta(seconds=i)).strftime("%Y-%m-%d %H:%M:%S")
niveau = random.choice(niveaux)
message = random.choice(messages)
f.write(f"{timestamp} [{niveau}] {message}\n")
print(f"Fichier log créé avec {50000} entrées")
# TODO: Implémenter le pipeline de traitement des logs3. Le module itertools¶
import itertools
# chain - combiner plusieurs itérables
liste1 = [1, 2, 3]
liste2 = [4, 5, 6]
liste3 = [7, 8, 9]
combine = list(itertools.chain(liste1, liste2, liste3))
print("Chaîné :", combine)# islice - découper un itérateur
infini = itertools.count(start=0, step=2) # 0, 2, 4, 6, ...
premiers_10_pairs = list(itertools.islice(infini, 10))
print("10 premiers nombres pairs :", premiers_10_pairs)# groupby - grouper les éléments consécutifs
donnees = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]
for cle, groupe in itertools.groupby(donnees, key=lambda x: x[0]):
print(f"Clé : {cle}, Groupe : {list(groupe)}")# product - produit cartésien
couleurs = ['rouge', 'bleu']
tailles = ['S', 'M', 'L']
combinaisons = list(itertools.product(couleurs, tailles))
print("Combinaisons produit :", combinaisons)Question 3.3 : Utilisez itertools pour :
Générer toutes les paires de deux listes sans créer de listes intermédiaires
Créer des lots de 100 éléments à partir d’un grand itérateur
Implémenter une fenêtre glissante de taille 3 sur un itérateur
Trouver les 1000 premiers nombres divisibles par 7 et 11
# TODO: Implémenter les exercices itertools# Nettoyage des fichiers de test
import os
for f in [fichier_test, fichier_log]:
if os.path.exists(f):
os.remove(f)
print(f"Supprimé {f}")Exercice 4 [★★] - Introduction au Multiprocessing¶
Le Global Interpreter Lock (GIL) de Python empêche l’exécution parallèle réelle des threads. Pour les tâches intensives en CPU, nous utilisons le module multiprocessing pour atteindre le parallélisme via des processus séparés.
1. Comprendre le parallélisme¶
import multiprocessing as mp
# Obtenir le nombre de CPU
nb_cpu = mp.cpu_count()
print(f"Nombre de cœurs CPU : {nb_cpu}")# Une fonction intensive en CPU
def est_premier(n):
"""Vérifie si n est premier (intensif en CPU pour les grands n)"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(n**0.5) + 1, 2):
if n % i == 0:
return False
return True
def compter_premiers_dans_plage(debut, fin):
"""Compte les nombres premiers dans une plage"""
return sum(1 for n in range(debut, fin) if est_premier(n))import time
# Exécution séquentielle
temps_debut = time.perf_counter()
resultat_sequentiel = compter_premiers_dans_plage(2, 100000)
temps_sequentiel = time.perf_counter() - temps_debut
print(f"Séquentiel : Trouvé {resultat_sequentiel} nombres premiers en {temps_sequentiel:.2f} secondes")2. Utilisation de multiprocessing.Pool¶
def wrapper_verifier_premier(n):
"""Wrapper qui retourne un tuple (n, est_premier)"""
return (n, est_premier(n))
# Exécution parallèle avec Pool
nombres = list(range(2, 100))
temps_debut = time.perf_counter()
with mp.Pool(processes=nb_cpu) as pool:
resultats = pool.map(wrapper_verifier_premier, nombres)
resultat_parallele = sum(1 for _, est_p in resultats if est_p)
temps_parallele = time.perf_counter() - temps_debut
print(f"Parallèle ({nb_cpu} cœurs) : Trouvé {resultat_parallele} premiers en {temps_parallele:.2f} secondes")
print(f"Accélération : {temps_sequentiel / temps_parallele:.2f}x")3. Comparaison des méthodes Pool¶
def carre_lent(x):
"""Une fonction lente pour la démonstration"""
time.sleep(0.01) # Simuler du travail
return x * x
nombres = list(range(100))
# pool.map - bloque jusqu'à ce que tous les résultats soient prêts, ordonné
debut = time.perf_counter()
with mp.Pool(4) as pool:
resultats_map = pool.map(carre_lent, nombres)
print(f"pool.map : {time.perf_counter() - debut:.2f}s, 5 premiers : {resultats_map[:5]}")# pool.imap - retourne un itérateur, ordonné, peut commencer le traitement avant que tout soit fait
debut = time.perf_counter()
with mp.Pool(4) as pool:
resultats_imap = list(pool.imap(carre_lent, nombres))
print(f"pool.imap : {time.perf_counter() - debut:.2f}s, 5 premiers : {resultats_imap[:5]}")# pool.imap_unordered - retourne un itérateur, non ordonné (plus rapide pour les charges inégales)
debut = time.perf_counter()
with mp.Pool(4) as pool:
resultats_non_ordonnes = list(pool.imap_unordered(carre_lent, nombres))
print(f"pool.imap_unordered : {time.perf_counter() - debut:.2f}s")
print(f"Les résultats ne sont pas ordonnés : {resultats_non_ordonnes[:5]}")Question 4.1 : Comparez les performances de pool.map, pool.imap et pool.imap_unordered sur une charge de travail où différents éléments prennent des temps différents. Quand chaque méthode est-elle la plus appropriée ?
# TODO: Comparer les méthodes pool avec une charge de travail inégale4. Stratégies de découpage (Chunking)¶
def travail_simple(x):
return x * x
donnees = list(range(100000))
# Tester différentes tailles de chunks
tailles_chunk = [1, 10, 100, 1000, 10000]
for taille_chunk in tailles_chunk:
debut = time.perf_counter()
with mp.Pool(4) as pool:
resultats = pool.map(travail_simple, donnees, chunksize=taille_chunk)
temps_ecoule = time.perf_counter() - debut
print(f"Taille chunk {taille_chunk:5d} : {temps_ecoule:.4f}s")Question 4.2 : Expérimentez avec différentes tailles de chunks pour une tâche intensive en CPU. Trouvez la taille de chunk optimale et expliquez pourquoi elle est la plus performante.
# TODO: Trouver la taille de chunk optimale5. Application pratique : Traitement de fichiers en parallèle¶
import requests
import json
import os
def telecharger_entite_wikidata(id_entite):
"""Télécharge une entité Wikidata et retourne ses données"""
url = f"https://www.wikidata.org/wiki/Special:EntityData/{id_entite}.json"
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return {"id": id_entite, "statut": "succes", "taille": len(response.content)}
else:
return {"id": id_entite, "statut": "erreur", "code": response.status_code}
except Exception as e:
return {"id": id_entite, "statut": "erreur", "message": str(e)}
# Télécharger des entités en parallèle
entites = [f"Q{i}" for i in range(1, 21)] # Q1 à Q20
print("Téléchargement des entités Wikidata...")
debut = time.perf_counter()
with mp.Pool(4) as pool:
resultats = pool.map(telecharger_entite_wikidata, entites)
temps_ecoule = time.perf_counter() - debut
print(f"\nTéléchargé {len(resultats)} entités en {temps_ecoule:.2f}s")
for r in resultats[:5]:
print(f" {r['id']}: {r['statut']}")Question 4.3 : Écrivez un programme qui :
Télécharge 50 pages Wikipedia en parallèle
Compte le nombre de liens sur chaque page
Retourne la page avec le plus de liens
Implémentez avec une gestion d’erreurs appropriée et une limitation de débit.
# TODO: Implémenter le web scraping parallèleQuestion 4.4 : Créez un pipeline de traitement d’images parallèle qui :
Lit des images depuis un répertoire
Redimensionne chaque image en 256x256
Convertit en niveaux de gris
Sauvegarde les images traitées
Comparez les temps d’exécution séquentielle vs parallèle.
# TODO: Implémenter le traitement d'images parallèleExercice 5 [★★] - Communication Inter-processus : Queues et Pipes¶
Quand les processus ont besoin de partager des données, nous utilisons des mécanismes de communication inter-processus (IPC) comme les Queues et les Pipes.
1. multiprocessing.Queue¶
from multiprocessing import Process, Queue
import time
def producteur(queue, n_elements):
"""Produit des éléments et les met dans la queue"""
for i in range(n_elements):
element = f"element_{i}"
queue.put(element)
print(f"Produit : {element}")
time.sleep(0.1)
queue.put(None) # Sentinelle pour signaler la fin
def consommateur(queue):
"""Consomme les éléments de la queue"""
while True:
element = queue.get()
if element is None:
break
print(f"Consommé : {element}")
# Note: Ce pattern fonctionne mieux en tant que script, pas dans Jupyter
# Dans Jupyter, nous simulerons avec des threads pour la démonstration# Démonstration simplifiée de Queue avec des threads (fonctionne dans Jupyter)
from queue import Queue
from threading import Thread
def producteur_thread(queue, n_elements):
for i in range(n_elements):
element = f"element_{i}"
queue.put(element)
queue.put(None)
def consommateur_thread(queue, resultats):
while True:
element = queue.get()
if element is None:
break
resultats.append(f"traite_{element}")
q = Queue()
resultats = []
prod = Thread(target=producteur_thread, args=(q, 10))
cons = Thread(target=consommateur_thread, args=(q, resultats))
prod.start()
cons.start()
prod.join()
cons.join()
print("Résultats :", resultats)2. Pattern Producteur-Consommateur¶
from queue import Queue
from threading import Thread
import time
def producteur_donnees(queue, source_donnees, n_workers):
"""Produit des éléments de données depuis une source"""
for element in source_donnees:
queue.put(element)
# Envoyer le signal de terminaison pour chaque worker
for _ in range(n_workers):
queue.put(None)
def processeur_donnees(queue, resultats, id_worker):
"""Traite les éléments de données de la queue"""
resultats_locaux = []
while True:
element = queue.get()
if element is None:
break
# Simuler le traitement
traite = element ** 2
resultats_locaux.append(traite)
resultats[id_worker] = resultats_locaux
# Créer la queue de travail et le stockage des résultats
queue_travail = Queue()
n_workers = 4
resultats = {}
donnees = list(range(100))
# Démarrer les workers
workers = []
for i in range(n_workers):
w = Thread(target=processeur_donnees, args=(queue_travail, resultats, i))
w.start()
workers.append(w)
# Démarrer le producteur
thread_producteur = Thread(target=producteur_donnees, args=(queue_travail, donnees, n_workers))
thread_producteur.start()
# Attendre la fin
thread_producteur.join()
for w in workers:
w.join()
# Combiner les résultats
tous_resultats = []
for resultats_worker in resultats.values():
tous_resultats.extend(resultats_worker)
print(f"Traité {len(tous_resultats)} éléments")
print(f"Résultats exemples : {sorted(tous_resultats)[:10]}")Question 5.1 : Implémentez un système producteur-consommateur qui :
A un producteur générant des nombres aléatoires
A 4 workers traitant les nombres (vérifier si premier)
A un agrégateur collectant et résumant les résultats
Utilise des queues bornées pour éviter les problèmes de mémoire
# TODO: Implémenter producteur-consommateur avec agrégateur3. Mémoire partagée¶
from multiprocessing import Value, Array, Lock
# Valeur partagée (valeur unique)
compteur_partage = Value('i', 0) # 'i' = entier
# Tableau partagé
tableau_partage = Array('d', [0.0] * 10) # 'd' = double
print(f"Compteur initial : {compteur_partage.value}")
print(f"Tableau initial : {list(tableau_partage)}")# Démonstration de condition de course (conceptuel - mieux visible avec multiprocessing)
from threading import Thread, Lock
compteur = 0
verrou = Lock()
def incrementer_non_securise():
global compteur
for _ in range(100000):
compteur += 1
def incrementer_securise():
global compteur
for _ in range(100000):
with verrou:
compteur += 1
# Version non sécurisée - peut donner un mauvais résultat
compteur = 0
threads = [Thread(target=incrementer_non_securise) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Compteur non sécurisé (attendu 400000) : {compteur}")
# Version sécurisée - toujours correcte
compteur = 0
threads = [Thread(target=incrementer_securise) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Compteur sécurisé (attendu 400000) : {compteur}")Question 5.2 : Implémentez un compteur de mots parallèle utilisant la mémoire partagée :
Plusieurs workers lisent différentes parties d’un fichier texte
Chaque worker met à jour un dictionnaire partagé de comptages de mots
Utilisez un verrouillage approprié pour éviter les conditions de course
Comparez les performances avec et sans verrouillage
# TODO: Implémenter le compteur de mots parallèle avec mémoire partagéeExercice 6 [★★★] - concurrent.futures et Patterns Asynchrones¶
Le module concurrent.futures fournit une interface de haut niveau pour exécuter des callables de manière asynchrone en utilisant des threads ou des processus.
1. ThreadPoolExecutor vs ProcessPoolExecutor¶
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
def tache_intensive_cpu(n):
"""Tâche intensive en CPU"""
total = 0
for i in range(n):
total += i ** 2
return total
def tache_intensive_io(url):
"""Tâche intensive en I/O"""
time.sleep(0.1) # Simuler le délai réseau
return f"Récupéré {url}"
# CPU-bound : ProcessPoolExecutor est plus rapide
donnees = [1000000] * 8
debut = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
resultats = list(executor.map(tache_intensive_cpu, donnees))
print(f"ThreadPool (tâche CPU) : {time.perf_counter() - debut:.2f}s")
debut = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
resultats = list(executor.map(tache_intensive_cpu, donnees))
print(f"ProcessPool (tâche CPU) : {time.perf_counter() - debut:.2f}s")# I/O-bound : ThreadPoolExecutor est suffisant et a moins d'overhead
urls = [f"http://exemple.com/page{i}" for i in range(20)]
debut = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
resultats = list(executor.map(tache_intensive_io, urls))
print(f"ThreadPool (tâche I/O) : {time.perf_counter() - debut:.2f}s")
debut = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
resultats = list(executor.map(tache_intensive_io, urls))
print(f"ProcessPool (tâche I/O) : {time.perf_counter() - debut:.2f}s")2. Travailler avec les Futures¶
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
def tache_temps_variable(id_tache):
"""Tâche avec temps d'exécution variable"""
temps_sommeil = random.uniform(0.1, 1.0)
time.sleep(temps_sommeil)
return {"id_tache": id_tache, "duree": temps_sommeil}
# Traiter les résultats dès qu'ils sont prêts
with ThreadPoolExecutor(max_workers=4) as executor:
# Soumettre toutes les tâches
futures = {executor.submit(tache_temps_variable, i): i for i in range(10)}
# Traiter les résultats dès qu'ils sont prêts
for future in as_completed(futures):
id_tache = futures[future]
resultat = future.result()
print(f"Tâche {id_tache} terminée en {resultat['duree']:.2f}s")# Gestion des exceptions
def peut_echouer(id_tache):
if random.random() < 0.3: # 30% de chance d'échec
raise ValueError(f"Tâche {id_tache} a échoué !")
return f"Tâche {id_tache} réussie"
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(peut_echouer, i) for i in range(10)]
for future in as_completed(futures):
try:
resultat = future.result()
print(resultat)
except ValueError as e:
print(f"Erreur : {e}")Question 6.1 : Implémentez un planificateur de tâches qui :
Accepte des tâches avec différentes priorités
Traite les tâches de priorité supérieure en premier
Implémente la gestion des timeouts pour les tâches lentes
Fournit un rapport de progression
# TODO: Implémenter le planificateur de tâches avec priorité3. Introduction à asyncio (Sujet avancé optionnel)¶
import asyncio
async def tache_async(id_tache, delai):
"""Une tâche async qui simule des I/O"""
print(f"Tâche {id_tache} démarrée")
await asyncio.sleep(delai)
print(f"Tâche {id_tache} terminée")
return f"Résultat de la tâche {id_tache}"
async def main():
# Exécuter plusieurs tâches en concurrence
taches = [
tache_async(1, 1),
tache_async(2, 2),
tache_async(3, 1),
]
resultats = await asyncio.gather(*taches)
return resultats
# Dans Jupyter, utiliser await directement
resultats = await main()
print("Résultats :", resultats)Question 6.2 : Comparez les performances de :
Opérations I/O séquentielles
ThreadPoolExecutor
asyncio
Pour télécharger plusieurs pages web. Quelle approche est la meilleure pour les tâches I/O-bound ?
# TODO: Comparer les approches I/OExercice 7 [★★★] - Benchmarking et Optimisation des Performances¶
Comprendre comment mesurer et optimiser le code parallèle est essentiel pour utiliser efficacement ces techniques.
1. Mesurer le temps d’exécution¶
import time
from functools import wraps
def decorateur_timing(func):
"""Décorateur pour mesurer le temps d'exécution d'une fonction"""
@wraps(func)
def wrapper(*args, **kwargs):
debut = time.perf_counter()
resultat = func(*args, **kwargs)
temps_ecoule = time.perf_counter() - debut
print(f"{func.__name__} a pris {temps_ecoule:.4f}s")
return resultat
return wrapper
@decorateur_timing
def fonction_lente():
time.sleep(0.5)
return "Terminé"
fonction_lente()# Utiliser timeit pour des mesures plus précises
import timeit
def fonction_test():
return sum(i**2 for i in range(1000))
# Timer 1000 exécutions
temps_execution = timeit.timeit(fonction_test, number=1000)
print(f"Temps moyen par appel : {temps_execution/1000*1000:.4f}ms")2. Loi d’Amdahl¶
import matplotlib.pyplot as plt
import numpy as np
def loi_amdahl(p, n):
"""
Calcule l'accélération théorique en utilisant la loi d'Amdahl.
p: fraction du programme qui peut être parallélisée (0 à 1)
n: nombre de processeurs
"""
return 1 / ((1 - p) + p / n)
# Visualiser la loi d'Amdahl
processeurs = np.arange(1, 65)
fractions_paralleles = [0.5, 0.75, 0.9, 0.95, 0.99]
plt.figure(figsize=(10, 6))
for p in fractions_paralleles:
accelerations = [loi_amdahl(p, n) for n in processeurs]
plt.plot(processeurs, accelerations, label=f'{p*100:.0f}% parallélisable')
plt.xlabel('Nombre de processeurs')
plt.ylabel('Accélération')
plt.title("Loi d'Amdahl : Limites théoriques d'accélération")
plt.legend()
plt.grid(True)
plt.show()3. Benchmarking du code parallèle¶
import multiprocessing as mp
import time
def intensif_cpu(n):
"""Calcul intensif en CPU"""
total = 0
for i in range(n):
total += i ** 0.5
return total
def benchmark_parallele(func, donnees, max_workers=None):
"""Benchmark une fonction avec différents nombres de workers"""
if max_workers is None:
max_workers = mp.cpu_count()
resultats = []
# Ligne de base séquentielle
debut = time.perf_counter()
resultats_sequentiels = [func(d) for d in donnees]
temps_sequentiel = time.perf_counter() - debut
resultats.append((1, temps_sequentiel, 1.0))
print(f"Séquentiel : {temps_sequentiel:.4f}s")
# Parallèle avec différents nombres de workers
for n_workers in range(2, max_workers + 1):
debut = time.perf_counter()
with mp.Pool(n_workers) as pool:
resultats_paralleles = pool.map(func, donnees)
temps_parallele = time.perf_counter() - debut
acceleration = temps_sequentiel / temps_parallele
efficacite = acceleration / n_workers
resultats.append((n_workers, temps_parallele, acceleration))
print(f"Workers={n_workers}: {temps_parallele:.4f}s, Accélération={acceleration:.2f}x, Efficacité={efficacite:.1%}")
return resultats
# Exécuter le benchmark
donnees = [1000000] * 16
resultats_benchmark = benchmark_parallele(intensif_cpu, donnees)Question 7.1 : Créez une suite de benchmarks complète qui :
Teste différentes tailles de chunks
Teste différentes tailles de données
Génère des visualisations comparant les performances
Identifie la configuration optimale pour votre charge de travail
# TODO: Créer une suite de benchmarks complète4. Profilage et optimisation¶
import cProfile
import pstats
from io import StringIO
def profiler_fonction(func, *args, **kwargs):
"""Profile une fonction et affiche les résultats"""
profiler = cProfile.Profile()
profiler.enable()
resultat = func(*args, **kwargs)
profiler.disable()
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(10)
print(stream.getvalue())
return resultat
def fonction_a_profiler():
"""Exemple de fonction à profiler"""
donnees = [i**2 for i in range(100000)]
filtre = list(filter(lambda x: x % 2 == 0, donnees))
total = sum(filtre)
return total
profiler_fonction(fonction_a_profiler)Question 7.2 : Profilez et optimisez le pipeline de traitement d’images de l’Exercice 4 :
Identifiez les trois plus grands goulots d’étranglement
Appliquez des optimisations à chacun
Mesurez l’amélioration
Documentez votre processus d’optimisation
# TODO: Profiler et optimiser5. Étude de cas réel¶
Question 7.3 : Construisez un système complet de traitement de données qui :
Lit des données depuis plusieurs fichiers CSV (100+ fichiers)
Nettoie et transforme chaque fichier
Agrège les résultats de tous les fichiers
Écrit la sortie finale
Implémentez les versions séquentielle et parallèle, benchmarkez-les, et documentez l’accélération obtenue.
# TODO: Implémenter l'étude de cas réelRésumé¶
Dans ce TP, vous avez appris :
Programmation fonctionnelle : Utilisation de
filter(),map(),reduce()pour les transformations de donnéesExpressions lambda : Création de fonctions concises en ligne
Générateurs : Traitement efficace en mémoire des grands ensembles de données
Multiprocessing : Exécution parallèle avec
Poolet parallélisme basé sur les processusCommunication inter-processus : Utilisation des Queues et de la mémoire partagée
concurrent.futures : Interface de haut niveau pour l’exécution parallèle
Optimisation des performances : Benchmarking, profilage et optimisation du code parallèle
Ces concepts forment la base des frameworks de calcul distribué comme Apache Spark, que vous explorerez dans le TP 5.
Prochaines étapes¶
Continuez vers le TP 5 : Apache Spark pour le traitement de données massives pour apprendre comment ces concepts de traitement parallèle s’étendent au calcul distribué sur des clusters.