Python är ett av de mest populära språken för databehandling och datavetenskap i allmänhet. Ekosystemet ger många bibliotek och ramar som underlättar högpresterande datorer. Att göra parallellprogrammering i Python kan dock vara ganska knepigt.
I denna handledning ska vi studera varför parallellism är svår speciellt i Python-kontexten, och för det kommer vi att gå igenom följande:
De Global Interpreter Lock (GIL) är en av de mest kontroversiella ämnena i Python-världen. I CPython, det mest populära genomförandet av Python, är GIL en mutex som gör sakerna trådlösa. GIL gör det enkelt att integrera med externa bibliotek som inte är trådsäkra, och det gör en icke-parallell kod snabbare. Detta kommer till en kostnad, dock. På grund av GIL kan vi inte uppnå sann parallellitet via multithreading. I grund och botten kan två olika inbyggda trådar av samma process inte köra Python-kod på en gång.
Det är dock inte så illa, och det är därför: saker som händer utanför GIL-riket är fritt att vara parallella. I den här kategorin faller långvariga uppgifter som I / O och, lyckligtvis, bibliotek som numpy
.
Så Python är inte riktigt multithreaded. Men vad är en tråd? Låt oss ta ett steg tillbaka och titta på saker i perspektiv.
En process är en grundläggande operativsystemabstraktion. Det är ett program som körs, med andra ord, kod som körs. Flera processer körs alltid i en dator, och de körs parallellt.
En process kan ha flera trådar. De utför samma kod som tillhör moderprocessen. Helst löper de parallellt, men inte nödvändigtvis. Anledningen till att processer inte räcker är att applikationer måste vara lyhörda och lyssna på användaråtgärder medan uppdatering av skärmen och lagring av en fil.
Om det fortfarande är lite oklart, här är ett cheatsheet:
PROCESSER | TRÅDAR |
---|---|
Processer delar inte minne | Trådar delar minne |
Spawning / switching processer är dyra | Gytnings- / omkopplingstrådar är billigare |
Processer kräver fler resurser | Trådar kräver färre resurser (kallas ibland lätta processer) |
Ingen minnessynkronisering behövs | Du måste använda synkroniseringsmekanismer för att vara säker på att du hanterar data korrekt |
Det finns inte ett recept som rymmer allt. Att välja en är starkt beroende av sammanhanget och den uppgift du försöker uppnå.
Nu går vi ett steg längre och dyker in i samtidighet. Samtidighet är ofta missförstådd och felaktig för parallellitet. Det är inte fallet. Samtidighet innebär att man planerar oberoende kod som ska utföras på ett kooperativt sätt. Utnyttja det faktum att en del kod väntar på I / O-operationer, och under den tiden kör en annan men oberoende del av koden.
I Python kan vi uppnå lätta, samtidiga beteende via gröntar. Ur ett parallelliseringsperspektiv är användningen av trådar eller gröntar ekvivalent eftersom ingen av dem går parallellt. Greenlets är ännu billigare att skapa än trådar. På grund av det är greenlets kraftigt vanliga för att utföra ett stort antal enkla I / O-uppgifter, som de som vanligtvis finns i nätverks- och webbservrar.
Nu när vi känner till skillnaden mellan trådar och processer, parallellt och samtidigt, kan vi illustrera hur olika uppgifter utförs på de två paradigmerna. Det här är vad vi ska göra: vi kommer att springa, flera gånger, en uppgift utanför GIL och en inuti den. Vi kör dem seriellt, använder trådar och använder processer. Låt oss definiera uppgifterna:
import os importtid import threading import multiprocessing NUM_WORKERS = 4 def only_sleep (): "" "Gör ingenting, vänta på en timer för att utgå" "" print ("PID:% s, Processnamn:% s, Trådnamn:% s "% (os.getpid (), multiprocessing.current_process () .namn, threading.current_thread () .namn)) time.sleep (1) def crunch_numbers ():" "" Gör några beräkningar " :% s, Processnamn:% s, Trådnamn:% s "% (os.getpid (), multiprocessing.current_process () .namn, threading.current_thread () .namn)) x = 0 medan x < 10000000: x += 1
Vi har skapat två uppgifter. Båda är långa, men bara crunch_numbers
utför aktivt beräkningar. Låt oss springa only_sleep
seriellt, multithreaded och använder flera processer och jämför resultaten:
## Kör uppgifter seriellt start_time = time.time () för _ inom intervallet (NUM_WORKERS): only_sleep () end_time = time.time () print ("Serietid =", end_time - start_time) # Kör uppgifter med trådar start_time = tid .time () tråden = [threading.Thread (target = only_sleep) för _ inom intervall (NUM_WORKERS)] [thread.start () för tråd i trådar] [thread.join () för tråd i trådar] end_time = time.time () print ("Trådar time =", end_time - start_time) # Kör uppgifter med processer start_time = time.time () processes = [multiprocessing.Process (target = only_sleep ()) för _ inom intervall (NUM_WORKERS)] [process. start () för process i processer] [process.join () för process i processer] end_time = time.time () print ("Parallel tid =", end_time - start_time)
Här är den produktion jag har (din ska vara lik, även om PID och tider varierar lite):
PID: 95726, Processnamn: MainProcess, Trådnamn: MainThread PID: 95726, Processnamn: MainProcess, Trådnamn: MainThread PID: 95726, Processnamn: MainProcess, Trådnamn: MainThread PID: 95726, Processnamn: MainProcess, Trådnamn : MainThread Seriell tid = 4.018089056015015 PID: 95726, Processnamn: MainProcess, Trådnamn: Tråd-1 PID: 95726, Processnamn: MainProcess, Trådnamn: Tråd-2 PID: 95726, Processnamn: MainProcess, Trådnamn: Tråd- 3 PID: 95726, Processnamn: MainProcess, Trådnamn: Tråd-4 Trådar tid = 1.0047411918640137 PID: 95728, Processnamn: Process-1, Trådnamn: MainThread PID: 95729, Processnamn: Process-2, Trådnamn: MainThread PID: 95730, Processnamn: Process-3, Trådnamn: MainThread PID: 95731, Processnamn: Process-4, Trådnamn: MainThread Parallell tid = 1.014023780822754
Här är några observationer:
I fallet med seriell tillvägagångssätt, saker är ganska uppenbara. Vi kör uppgifterna efter varandra. Alla fyra körningarna exekveras av samma tråd i samma process.
Använda processer Vi sänker körtiden till en fjärdedel av den ursprungliga tiden, helt enkelt för att uppgifterna utförs parallellt. Lägg märke till hur varje uppgift utförs i en annan process och på MainThread
av den processen.
Använda trådar Vi utnyttjar det faktum att uppgifterna kan utföras samtidigt. Utförandetiden skärs också ner till kvart, även om ingenting går parallellt. Så här går det: vi krossar den första tråden och det börjar vänta på att tiden går ut. Vi pausar dess körning, så att det väntar på att timern upphör att gälla, och i den här tiden hämtar vi den andra tråden. Vi upprepar detta för alla trådar. Vid ett ögonblick utgår tidpunkten för den första tråden så vi byter körning till det och vi avslutar det. Algoritmen upprepas för den andra och för alla andra trådar. I slutet är resultatet som om sakerna kördes parallellt. Du kommer också märka att de fyra olika trådarna grenar sig från och lever inom samma process: MainProcess
.
Du kan till och med märka att den trådade inställningen är snabbare än den riktigt parallella. Det beror på överhettningsprocesser. Som vi noterade tidigare är gyte- och växlingsprocesser en dyr operation.
Låt oss göra samma rutin men den här gången körs crunch_numbers
uppgift:
start_time = time.time () för _ inom intervallet (NUM_WORKERS): crunch_numbers () end_time = time.time () print ("Serietid =", end_time - start_time) start_time = time.time () thread = [threading.Thread (target = crunch_numbers) för _ inom intervallet (NUM_WORKERS)] [thread.start () för tråd i trådar] [thread.join () för tråd i trådar] end_time = time.time () print ("Trådar time =", end_time - start_time) start_time = time.time () processes = [multiprocessing.Process (target = crunch_numbers) för _ inom intervall (NUM_WORKERS)] [process.start () för process i processer] [process.join () för process i processer] end_time = time.time () print ("Parallell tid =", end_time - start_time)
Här är produktionen jag har:
PID: 96285, Processnamn: MainProcess, Trådnamn: MainThread PID: 96285, Processnamn: MainProcess, Trådnamn: MainThread PID: 96285, Processnamn: MainProcess, Trådnamn: MainThread PID: 96285, Processnamn: MainProcess, Trådnamn : MainThread Seriell tid = 2.705625057220459 PID: 96285, Processnamn: MainProcess, Trådnamn: Tråd-1 PID: 96285, Processnamn: MainProcess, Trådnamn: Tråd-2 PID: 96285, Processnamn: MainProcess, Trådnamn: Tråd- 3 PID: 96285, Processnamn: MainProcess, Trådnamn: Tråd-4 Trådar tid = 2.6961309909820557 PID: 96289, Processnamn: Process-1, Trådnamn: MainThread PID: 96290, Processnamn: Process-2, Trådnamn: MainThread PID: 96291, Processnamn: Process-3, Trådnamn: MainThread PID: 96292, Processnamn: Process-4, Trådnamn: MainThread Parallell tid = 0.8014059066772461
Huvudskillnaden här är resultatet av den multithreaded approachen. Den här gången fungerar den väldigt lik den seriella tillvägagångssättet, och här är varför: Eftersom det utförs beräkningar och Python inte utför verklig parallellitet, går trådarna i grund och botten och kör varandra efter varandra, tills de är färdiga.
Python har rika API för parallell / samtidig programmering. I den här handledningen täcker vi de mest populära, men du måste veta att för något behov du har på den här domänen finns det förmodligen någonting där ute som kan hjälpa dig att nå ditt mål.
I nästa avsnitt bygger vi en praktisk applikation i många former, med alla de presenterade biblioteken. Utan ytterligare ado är här de moduler / bibliotek vi ska täcka:
threading
: Det vanliga sättet att arbeta med trådar i Python. Det är en API-wrapper på högre nivå över funktionaliteten som exponeras av _tråd
modulen, som är ett gränssnitt på låg nivå över operativsystemets trådimplementering.
concurrent.futures
: En moduldel av standardbiblioteket som ger ett jämnare abstraktionsskikt över trådar. Trådarna modelleras som asynkrona uppgifter.
multi
: Liknande till threading
modulen, som erbjuder ett mycket liknande gränssnitt men använder processer istället för trådar.
gevent och greenlets
: Greenlets, även kallade mikro-trådar, är enheter för utförande som kan schemaläggas tillsammans och kan utföra uppgifter samtidigt utan mycket överhuvudtaget.
selleri
: En distribuerad uppdragsköna på hög nivå. Uppgifterna köras och körs samtidigt med olika paradigmer som multi
eller gevent
.
Att veta teorin är bra och bra, men det bästa sättet att lära sig är att bygga något praktiskt, eller hur? I det här avsnittet kommer vi att bygga en klassisk typ av applikation som går igenom alla olika paradigmer.
Låt oss bygga en applikation som kontrollerar upptidstiden för webbplatser. Det finns många sådana lösningar där ute, de mest kända är förmodligen Jetpack Monitor och Uptime Robot. Syftet med dessa appar är att meddela dig när din webbplats är nere så att du snabbt kan vidta åtgärder. Så här fungerar de:
Därför är det viktigt att ta en parallell / samtidig inställning till problemet. Eftersom listan över webbplatser växer går det inte att garantera att varje sida kontrolleras var femte minut eller så genom att gå igenom listan seriellt. Webbplatserna kan vara ner i timmar, och ägaren kommer inte att bli underrättad.
Låt oss börja med att skriva några verktyg:
# utils.py importtid import loggar importförfrågningar klass WebsiteDownException (Undantag): pass def ping_website (adress, timeout = 20): "" "Kontrollera om en webbplats är nere. En webbplats anses vara nere om status_code> = 400 eller om tidsgränsen löper ut Kasta en WebsiteDownException om någon av webbplatsens nedförhållanden är uppfyllda "" "försök: svar = requests.head (adress, timeout = timeout) om response.status_code> = 400: logging.warning (" Website% s returneras status_code =% s "% (adress, response.status_code)) höja WebsiteDownException () utom requests.exceptions.RequestException: logging.warning (" Timeout utgått för webbplats% s "% adress) höja WebsiteDownException () def notify_owner (adress) "" "Skicka ägaren till adressen en anmälan om att deras hemsida är nere För närvarande ska vi bara sova i 0,5 sekunder men det är här du skulle skicka ett mail, push notification eller textmeddelande loggning. info ("Meddelande ägaren till% s hemsida"% adress) time.sleep (0.5) def check_webs ite (adress): "" "Verktygsfunktion: kontrollera om en webbplats är nere, om så, meddela användaren" "" försök: ping_website (adress) utom WebsiteDownException: notify_owner
Vi behöver faktiskt en webbplatslista för att prova vårt system. Skapa din egen lista eller använd min:
# websites.py WEBSITE_LIST = ['http://envato.com', 'http://amazon.co.uk', 'http://amazon.com', 'http://facebook.com', ' http://google.com "," http://google.fr "," http://google.es "," http://google.co.uk "," http://internet.org " , "http://gmail.com", "http://stackoverflow.com", "http://github.com", "http://heroku.com", "http: // really cool- available-domain.com "," http://djangoproject.com "," http://rubyonrails.org "," http://basecamp.com "," http://trello.com "," http: //yiiframework.com "," http://shopify.com "," http://another-really-interesting-domain.co "," http://airbnb.com "," http: // instagram. com "," http://snapchat.com "," http://youtube.com "," http://baidu.com "," http://yahoo.com "," http: // live. com "," http://linkedin.com "," http://yandex.ru "," http://netflix.com "," http://wordpress.com "," http: // bing. com ',]
Normalt skulle du behålla denna lista i en databas tillsammans med information om ägaren, så att du kan kontakta dem. Eftersom detta inte är huvudämnet för denna handledning, och för enkelhets skull, kommer vi bara att använda denna Python-lista.
Om du betalat mycket bra uppmärksamhet, kanske du har märkt två riktigt långa domäner i listan som inte är giltiga webbplatser (jag hoppas ingen köpte dem när du läser detta för att visa mig fel!). Jag lade till dessa två domäner för att vara säker på att vi har några webbplatser på varje körning. Låt oss även namnge vår app UptimeSquirrel.
Låt oss först försöka seriell tillvägagångssätt och se hur illa det utför. Vi kommer att betrakta detta som utgångspunkt.
# serial_squirrel.py importtid start_time = time.time () för adress i WEBSITE_LIST: check_website (adress) end_time = time.time () print ("Tid för SerialSquirrel:% ssecs"% (end_time - start_time)) # VARNING: root : Timeout utgått för webbplatsen http://really-cool-available-domain.com # VARNING: root: Timeout utgått för webbplats http://another-really-interesting-domain.co # VARNING: root: Webbplats http: // bing.com returnerade status_code = 405 # Tid för SerialSquirrel: 15.881232261657715secs
Vi kommer att bli lite mer kreativa med genomförandet av det trådade tillvägagångssättet. Vi använder en kö för att lägga in adresserna och skapa arbetstrådar för att få dem ur kön och bearbeta dem. Vi ska vänta på att köen är tom, vilket innebär att alla adresser har bearbetats av våra arbetstrådar.
# threaded_squirrel.py importtid från kö import Queue from threading import Tråd NUM_WORKERS = 4 task_queue = Queue () def worker (): # Kontrollera köen kontinuerligt för adresser medan True: adress = task_queue.get () check_website (adress) # Mark den bearbetade uppgiften som färdig task_queue.task_done () start_time = time.time () # Skapa arbetstråden trådar = [Tråd (mål = arbetare) för _ inom intervallet (NUM_WORKERS)] Lägg till webbplatser till uppgiftskön [task_queue. put (item) för objekt i WEBSITE_LIST] # Starta alla arbetare [thread.start () för tråd i trådar] # Vänta på att alla uppgifter i kön ska behandlas task_queue.join () end_time = time.time () print ("Time for ThreadedSquirrel:% ssecs"% (end_time - start_time)) # VARNING: root: Timeout utgått för webbplatsen http://really-cool-available-domain.com # VARNING: root: Timeout utgått för webbplatsen http: / /another-really-interesting-domain.co # VARNING: root: Hemsida http://bing.com återvände status_code = 405 # Tid för trådad ekorre: 3.1107530 59387207secs
Som tidigare sagt, concurrent.futures
är ett API på hög nivå för att använda trådar. Tillvägagångssättet vi tar här innebär att man använder a ThreadPoolExecutor
. Vi ska skicka uppgifter till poolen och få tillbaka terminer, vilket är resultat som kommer att finnas tillgängliga för oss i framtiden. Självklart kan vi vänta på att alla framtidar blir verkliga resultat.
# future_squirrel.py importtid import samtidigt.futures NUM_WORKERS = 4 start_time = time.time () med samtident.futures.ThreadPoolExecutor (max_workers = NUM_WORKERS) som exekutör: futures = exekutör.submit (check_website, adress) för adress i WEBSITE_LIST simultant.futures.wait (futures) end_time = time.time () print ("Tid för FutureSquirrel:% ssecs"% (end_time - start_time)) # VARNING: root: Timeout utgått för webbplatsen http: // really cool -domän.com # VARNING: root: Timeout utgått för webbplats http://another-really-interesting-domain.co # VARNING: root: Webbplats http://bing.com returnerad status_code = 405 # Tid för FutureSquirrel: 1.812899112701416secs
De multi
biblioteket tillhandahåller ett API för nästan inlämningsbyte för threading
bibliotek. I det här fallet kommer vi att ta ett tillvägagångssätt som mer liknar concurrent.futures
ett. Vi sätter upp en multiprocessing.Pool
och skicka in uppgifter till den genom att kartlägga en funktion i adresslistan (tänk på den klassiska Python Karta
fungera).
# multiprocessing_squirrel.py importtid importuttag import multiprocessing NUM_WORKERS = 4 start_time = time.time () med multiprocessing.Pool (processer = NUM_WORKERS) som pool: results = pool.map_async (check_website, WEBSITE_LIST) results.wait () end_time = tid .time () print ("Tid för MultiProcessingSquirrel:% ssecs"% (end_time - start_time)) # VARNING: root: Timeout utgått för webbplatsen http://really-cool-available-domain.com # VARNING: root: Timeout utgått för webbplats http://another-really-interesting-domain.co # VARNING: root: Webbplats http://bing.com återvände status_code = 405 # Tid för MultiProcessingSquirrel: 2.82245993614196767secs
Gevent är ett populärt alternativ för att uppnå massiv samtidighet. Det finns några saker du behöver veta innan du använder den:
Kod som utförs samtidigt av greenlets är deterministisk. I motsats till de andra presenterade alternativen garanterar detta paradigm att du för samma två order alltid får samma resultat i samma ordning.
Du behöver apa patch standardfunktioner så att de samarbetar med gevent. Här är vad jag menar med det. Normalt blockerar en socketoperation. Vi väntar på att operationen ska slutföras. Om vi befann oss i en multithreaded miljö skulle schemaläggaren helt enkelt byta till en annan tråd medan den andra väntar på I / O. Eftersom vi inte befinner oss i en multithreaded miljö, levereras patenterade standard-patroner så att de blir blockerande och returnerar kontrollen till den befintliga schemaläggaren.
För att installera, kör du: pip installerat
Så här använder du gevent för att utföra vår uppgift med en gevent.pool.Pool
:
# green_squirrel.py importtid från gevent.pool import Pool från gjord importerad apa # Observera att du kan skaffa många arbetare med bevent eftersom kostnaden för att skapa och byta är mycket låg NUM_WORKERS = 4 # uttagsmodul för HTTP-förfrågningar apa. patch_socket () start_time = time.time () pool = Pool (NUM_WORKERS) för adress i WEBSITE_LIST: pool.spawn (check_website, adress) # Vänta på att saker avslutas pool.join () end_time = time.time () print Tid för GreenSquirrel:% ssecs "% (end_time - start_time)) # Tid för GreenSquirrel: 3.8395519256591797secs
Selleri är ett tillvägagångssätt som mest skiljer sig från vad vi hittills har sett. Det är slagetestat i samband med mycket komplexa och högpresterande miljöer. Att ställa in Selleri kommer att kräva lite mer tinkering än alla ovanstående lösningar.
Först måste vi installera Selleri:
pip installera selleri
Uppgifterna är de centrala begreppen inom Celery-projektet. Allt som du vill köra inuti Selleri måste vara en uppgift. Selleri erbjuder stor flexibilitet för löpande uppgifter: Du kan köra dem synkront eller asynkront, i realtid eller schemalagd, på samma maskin eller på flera maskiner, och använda trådar, processer, Eventlet eller gevent.
Arrangemanget blir lite mer komplext. Selleri använder andra tjänster för att skicka och ta emot meddelanden. Dessa meddelanden är vanligtvis uppgifter eller resultat från uppgifter. Vi ska använda Redis i denna handledning för detta ändamål. Redis är ett bra val eftersom det är väldigt enkelt att installera och konfigurera, och det är verkligen möjligt att du redan använder den i din ansökan för andra ändamål, till exempel caching och pub / sub.
Du kan installera Redis genom att följa anvisningarna på Redis Quick Start-sida. Glöm inte att installera redis
Python bibliotek, pip installera redis
, och bunten som behövs för att använda Redis och selleri: pip installera selleri [redis]
.
Starta Redis-servern så här: $ redis-server
För att komma igång med att bygga saker med selleri måste vi först skapa ett selleriansökan. Därefter måste Selleri veta vilka slags uppgifter den kan utföra. För att uppnå det måste vi registrera uppgifter till selleriansökan. Vi gör det här med hjälp av @ app.task
dekoratör:
# celery_squirrel.py importtid från utils importera check_website från dataimport WEBSITE_LIST från selleri import Selleria från selleria.result import ResultatSet app = Selleri ("celery_squirrel", mäklare = "redis: // localhost: 6379/0", backend = "redis : // localhost: 6379/0 ") @ app.task def check_website_task (adress): returnera check_website (adress) om __name__ ==" __main__ ": start_time = time.time () # Med hjälp av" delay "körs uppgiftssynkroniseringen = ResultatSet ([check_website_task.delay (adress) för adress i WEBSITE_LIST]) # Vänta på att uppgifterna ska slutföras rs.get () end_time = time.time () print ("Selleriaquirrel:", end_time - start_time) # Selleriaquirrel: 2.4979639053344727
Var inte panik om ingenting händer. Kom ihåg att Selleri är en tjänst, och vi måste springa det. Hittills lade vi bara upp uppgifterna i Redis men började inte Selleriet att utföra dem. För att göra det måste vi köra det här kommandot i den mapp där vår kod finns:
selleriarbetare-en gör_celery --loglevel = debug --concurrency = 4
Återupprätta Python-skriptet och se vad som händer. En sak att vara uppmärksam på: Lägg märke till hur vi passerade Redis-adressen till vår Redis-ansökan två gånger. De mäklare
Parametern anger var uppgifterna skickas till Celery, och backend
är där Selleri lägger resultaten så att vi kan använda dem i vår app. Om vi inte anger ett resultat backend
, Det finns inget sätt för oss att veta när uppgiften behandlades och vad resultatet var.
Också vara medveten om att stockarna nu är i standardproduktionen av Selleri processen, så var noga med att kolla in dem i lämplig terminal.
Jag hoppas att det här har varit en intressant resa för dig och en bra introduktion till världen av parallell / samtidig programmering i Python. Detta är slutet på resan, och det finns några slutsatser vi kan dra:
threading
och concurrent.futures
bibliotek.multi
ger ett mycket liknande gränssnitt till threading
men för processer snarare än trådar.Lär dig Python med vår kompletta handledning för pythonhandledning, oavsett om du bara har börjat eller du är en erfaren kodare som vill lära dig nya färdigheter.