La programmation asynchrone avec python

La programmation asynchrone permet de lancer plusieurs processus simultanément . Cela permet dans certains cas un gain important de temps.

La programmation asynchrone en Python est un moyen de faire en sorte que votre programme puisse faire plusieurs choses en même temps, au lieu de devoir attendre que chaque chose soit terminée avant de passer à la suivante. Cela peut être utile lorsque vous avez des tâches qui prennent du temps, comme lire ou écrire des données sur Internet, car cela permet de ne pas bloquer le reste de votre programme pendant que ces tâches sont en cours.

Voici comment ça fonctionne : votre programme principal (appelé le "thread principal") crée de nouveaux threads (appelés "threads secondaires") pour exécuter les tâches asynchrones. Le thread principal continue d'exécuter le reste de votre programme pendant que les threads secondaires effectuent leurs tâches. Lorsqu'une tâche asynchrone est terminée, elle envoie une notification au thread principal pour qu'il puisse utiliser les résultats de la tâche.

Il y a plusieurs manières de faire de la programmation asynchrone en Python. L'une d'elles est d'utiliser des "coroutines", qui sont des fonctions spéciales qui peuvent "s'endormir" et reprendre à un point précis plus tard. Vous pouvez également utiliser des "tâches" (appelées "asyncio.Task" en Python), qui sont des objets qui représentent des tâches asynchrones et qui peuvent être lancées et surveillées de manière asynchrone.

Il y a aussi des bibliothèques tierces (c'est-à-dire des bibliothèques que vous devez télécharger et installer séparément) qui peuvent vous aider à faire de la programmation asynchrone en Python, comme "asyncio" et "Twisted". Ces bibliothèques vous permettent de créer des programmes asynchrones de manière plus facile et plus efficace.

La programmation asynchrone peut être un peu compliquée à comprendre au début, mais c'est une technique très utile qui peut vous aider à créer des programmes plus rapides et plus efficaces

Un exemple est souvent plus parlant. Voici un script qui scan votre réseau local:

# coding: utf-8

import os
import subprocess
import re

# on prépare la regex
regex = re.compile(r"(?P<received>\d+) received")

# la fonction qui assure le ping
def ping(hostname):
    p = subprocess.Popen(["ping", "-c1", "-w100", hostname], stdout=subprocess.PIPE).stdout.read()
    r = regex.search(p.decode())
    try:
        if(r.group("received") == "1"):
            print("L'adresse %s existe!" % hostname) 
    except:
        pass

# on boucle sur les adresses du réseau local
for i in range(254):
    hostname = "192.168.0.%i" % (i+1)
    ping(hostname)

Comme vous pouvez le constater le programme met un certain temps à s'exécuter puisque si un ping ne repond pas, on doit attendre le temps obligatoire avant de passer au test suivant. Pour gagner du temps on peut donc passer à la programmation asynchrone, c'est à dire de lancer toutes les instructions en même temps:

# coding: utf-8

import os
import subprocess
import re
import threading

# on prépare la regex
regex = re.compile(r"(?P<received>\d+) received")

# la fonction qui assure le ping
def ping(hostname):
    p = subprocess.Popen(["ping", "-c1", "-w100", hostname], stdout=subprocess.PIPE).stdout.read()
    r = regex.search(p.decode())
    try:
        if(r.group("received") == "1"):
            print("L'adresse %s existe!" % hostname) 
    except:
        pass

# on boucle sur les adresses du réseau local
for i in range(254):
    hostname = "192.168.0.%i" % (i+1)
    threading.Thread(target=ping,args=(hostname,)).start()

Vous pouvez passer par une classe également:

# coding: utf-8

import os
import subprocess
import re
import threading

regex = re.compile(r"(?P<received>\d+) received")

class Ping(threading.Thread):

    def __init__(self, hostname):
        threading.Thread.__init__(self)
        self.hostname = hostname

    def run(self):
        p = subprocess.Popen(["ping", "-c1", "-w100", self.hostname], stdout=subprocess.PIPE).stdout.read()
        r = regex.search(p.decode())
        try:
            if(r.group("received") == "1"):
                print("L'adresse %s existe!" % self.hostname) 
        except:
            pass

for i in range(254):
    hostname = "192.168.0.%i" % (i+1)
    background = Ping(hostname)
    background.start()


Apprendre programmation cours python 3
Django internet web - Documentation débutant et expert
Version anglaise