I den här artikeln kommer du att lära dig grunderna för parallellitet i Elixir och se hur man kan spawna processer, skicka och ta emot meddelanden och skapa långsiktiga processer. Du kommer också att lära dig om GenServer, se hur det kan användas i din ansökan och upptäck några godis som det ger dig.
Som du säkert vet är Elixir ett funktionellt språk som används för att bygga feltoleranta, samtidiga system som hanterar många samtidiga förfrågningar. BEAM (Erlang virtuell maskin) använder processer att utföra olika uppgifter samtidigt, vilket innebär att till exempel att betjäna en förfrågan inte blockerar någon annan. Processerna är lätta och isolerade, vilket innebär att de inte delar något minne och även om en process kraschar, kan andra fortsätta att köra.
BEAM-processer skiljer sig mycket från Operativsystem. I grund och botten kör BEAM i en OS-process och använder sin egen schemaläggare. Varje schemaläggare upptar en CPU-kärna, körs i en separat tråd och kan hantera tusentals processer samtidigt (det blir omvänd att utföra). Du kan läsa lite mer om BEAM och multithreading på StackOverflow.
Så, som du ser, BEAM-processer (jag kommer bara säga "processer" från och med nu) är mycket viktiga i Elixir. Språket ger dig några verktyg på låg nivå för att manuellt spawna processer, behålla staten och hantera förfrågningarna. Men få människor använder dem - det är vanligare att lita på Open Telecom Platform (OTP) ram för att göra det.
OTP har numera ingenting att göra med telefoner - det är en generell ram för att bygga komplexa samtidiga system. Det definierar hur dina applikationer ska struktureras och ger en databas samt en massa mycket användbara verktyg för att skapa serverprocesser, återhämta sig från fel, utföra loggning etc. I den här artikeln kommer vi att prata om en serverns beteende kallad GenServer som tillhandahålls av OTP.
Du kan tänka på GenServer som en abstraktion eller en hjälpare som förenklar arbetet med serverprocesser. För det första kommer du att se hur man krossar processer med hjälp av vissa lågnivåfunktioner. Då kommer vi att byta till GenServer och se hur det förenklar saker för oss genom att ta bort behovet att skriva tråkiga (och ganska generiska) kod varje gång. Låt oss börja!
Om du frågade mig hur man skapar en process i Elixir, skulle jag svara: rom Det! spawn / 1 är en funktion definierad inuti Kärna
modul som returnerar en ny process. Denna funktion accepterar en lambda som kommer att utföras i den skapade processen. Så snart genomförandet har slutförts, avslutas processen också:
spawn (fn -> IO.puts ("hi") slutet) |> IO.inspect # => hi # => #PID<0.72.0>
Så här rom
returnerade ett nytt process-id. Om du lägger till en fördröjning till lambda kommer strängen "hej" att skrivas ut efter en tid:
spawn (fn ->: timer.sleep (5000) IO.puts ("hi") slutet) |> IO.inspect # => #PID<0.82.0> # => (efter 5 sekunder) "hej"
Nu kan vi gissa så många processer som vi vill, och de kommer att köras samtidigt:
spawn_it = fn (num) -> spawn (fn ->: timer.sleep (5000) IO.puts ("hej # num") slut) slut Enum.each (1 ... 10, fn (_) -> spawn_it . (: rand.uniform (100)) slutet) # => (alla tryckta ut samtidigt, efter 5 sekunder) # => hej 5 # => hej 10 etc ...
Här gyter vi tio processer och skriver ut en teststräng med ett slumptal. :rand
är en modul som tillhandahålls av Erlang, så namnet är en atom. Vad är coolt är att alla meddelanden kommer att skrivas ut samtidigt, efter fem sekunder. Det händer eftersom alla tio processerna genomförs samtidigt.
Jämför det med följande exempel som utför samma uppgift men utan att använda rom / 1
:
dont_spawn_it = fn (num) ->: timer.sleep (5000) IO.puts ("hej # num") slutar Enum.each (1 ... 10, fn (_) -> dont_spawn_it. 100)) slutet) # => (efter 5 sekunder) hej 70 # => (efter ytterligare 5 sekunder) hej 45 # => etc ...
Medan denna kod körs kan du gå till köket och göra en annan kopp kaffe eftersom det tar nästan en minut att slutföra. Varje meddelande visas i följd, vilket givetvis inte är optimalt!
Du kanske frågar: "Hur mycket minne förbrukar en process?" Tja, det beror på, men i första hand upptar det ett par kilobytes, vilket är ett mycket litet antal (även min gamla bärbara dator har 8 GB minne, för att inte tala om coola moderna servrar).
Än så länge är allt bra. Innan vi börjar jobba med GenServer, låt oss dock diskutera ytterligare en viktig sak: passera och ta emot meddelanden.
Det är ingen överraskning att processer (som är isolerade, som du kommer ihåg) behöver kommunicera på något sätt, särskilt när det gäller att bygga mer eller mindre komplexa system. För att uppnå detta kan vi använda meddelanden.
Ett meddelande kan skickas med en funktion med ganska uppenbart namn: skicka / 2. Den accepterar en destination (port, process-id eller ett procesnamn) och det faktiska meddelandet. När meddelandet har skickats visas det i brevlåda av en process och kan bearbetas. Som du ser är den allmänna tanken väldigt lik vår vardagliga verksamhet att utbyta e-postmeddelanden.
En brevlåda är i grunden en "första i första ut" (FIFO) -kön. Efter att meddelandet har bearbetats tas det bort från köen. För att börja ta emot meddelanden behöver du-gissa vad! -A få makro. Detta makro innehåller en eller flera klausuler och ett meddelande matchas mot dem. Om en matchning hittas, behandlas meddelandet. Annars sätts meddelandet tillbaka i brevlådan. Dessutom kan du ställa in en tillval efter
klausul som löper om ett meddelande inte mottogs under den angivna tiden. Du kan läsa mer om skicka / 2
och motta
i de officiella dokumenten.
Okej nog med teorin - låt oss försöka arbeta med meddelandena. Först och främst, skicka något till den nuvarande processen:
skicka (själv), "hej!")
Själva / 0-makroen ger en pid av anropsprocessen, vilket är exakt vad vi behöver. Lämna inte bort parenteser efter funktionen så får du en varning om tvetydighetskampanjen.
Nu får du meddelandet medan du ställer in efter
klausul:
mottaga gör msg -> IO.puts "Yay, ett meddelande: # msg" msg efter 1000 -> IO.puts: stderr, "Jag vill ha meddelanden!" slutet |> IO.puts # => Yay, ett meddelande: hej! # => hej!
Observera att klausulen returnerar resultatet av att utvärdera den sista raden, så vi får "hej!" sträng.
Kom ihåg att du kan införa så många klausuler som behövs:
Skicka (self), : ok, "hej!") mottaga gör : ok, msg -> IO.puts "Yay, ett meddelande: # msg" msg : error, msg -> IO .puts: stderr, "Åh nej, något dåligt har hänt: # msg" _ -> IO.puts "Jag vet inte vad det här meddelandet är ..." efter 1000 -> IO.puts: stderr, "Jag vill ha meddelanden!" slutet |> IO.puts
Här har vi fyra klausuler: en för att hantera ett succémeddelande, en annan att hantera fel, och sedan en "fallback" -klausul och en timeout.
Om meddelandet inte stämmer överens med någon av klausulerna, förvaras det i brevlådan, vilket inte alltid är önskvärt. Varför? För varje gång ett nytt meddelande anländer behandlas de gamla i det första huvudet (eftersom brevlådan är en FIFO-kö), saktar programmet ner. Därför kan en "fallback" -klausul komma till nytta.
Nu när du vet hur man ska krossa processer, skicka och ta emot meddelanden, låt oss ta en titt på ett något mer komplext exempel som innebär att du skapar en enkel server som svarar på olika meddelanden.
I det föregående exemplet skickade vi bara ett meddelande, mottog det och utförde lite arbete. Det är bra, men inte väldigt funktionellt. Vanligtvis händer det att vi har en server som kan svara på olika meddelanden. Med "server" menar jag en långsiktig process byggd med en återkommande funktion. Låt oss till exempel skapa en server för att utföra några matematiska ekvationer. Det kommer att få ett meddelande som innehåller den begärda operationen och några argument.
Börja med att skapa servern och looping-funktionen:
defmodule MathServer gör def start gör kasta och lyssna / 0 slut defp lyssna mottaga gör : sqrt, caller, arg -> IO.puts arg _ -> IO.puts: stderr, "Not implemented." slutet lyssna () slutet slutet
Så vi spawnar en process som fortsätter att lyssna på inkommande meddelanden. Efter att meddelandet har tagits emot, lyssna / 0
funktionen kallas igen, vilket skapar en ändlös slinga. Inuti lyssna / 0
funktion, lägger vi till stöd för : sqrt
meddelande, som kommer att beräkna kvadratroten av ett tal. De arg
kommer att innehålla det faktiska numret för att utföra operationen mot. Vi definierar också en återgångsklausul.
Du kan nu starta servern och tilldela dess process-id till en variabel:
math_server = MathServer.start IO.inspect math_server # => #PID<0.85.0>
Lysande! Låt oss nu lägga till en implementeringsfunktion att faktiskt utföra beräkningen:
defmodule MathServer gör # ... def sqrt (server, arg) skicka (: some_name, : sqrt, self (), arg) slutet
Använd den här funktionen nu:
MathServer.sqrt (math_server, 3) # => 3
För nu skriver det bara ut det övergångna argumentet, så tweak din kod så här för att utföra den matematiska operationen:
defmodule MathServer gör # ... defp lyssna mottaga gör : sqrt, caller, arg -> skicka (: någotnamn, : resultat, do_sqrt (arg)) _ -> IO.puts: stderr, "Not implemented." avsluta lyssna () sluta defp do_sqrt (arg) gör: math.sqrt (arg) slutet slutet
Nu skickas ett annat meddelande till servern som innehåller resultatet av beräkningen.
Vad är intressant är att sqrt / 2
funktionen skickar helt enkelt ett meddelande till servern som ber om att utföra en operation utan att vänta på resultatet. Så, i grunden, utför den en asynkront samtal.
Självklart vill vi ta tag i resultatet vid någon tidpunkt, så koda en annan offentlig funktion:
def grab_result mottaga gör : resultat, resultat -> resultat efter 5000 -> IO.puts: stderr, "Timeout" slutet slutet
Använd nu det:
math_server = MathServer.start MathServer.sqrt (math_server, 3) MathServer.grab_result |> IO.puts # => 1.7320508075688772
Det fungerar! Självklart kan du även skapa en pool av servrar och distribuera uppgifter mellan dem och uppnå samtidighet. Det är bekvämt när förfrågningarna inte relaterar till varandra.
Okej, vi har täckt en handfull funktioner som gör att vi kan skapa långvariga serverns processer och skicka och ta emot meddelanden. Det här är bra, men vi måste skriva för mycket pannkodskod som startar en serielöga (start / 0
), svarar på meddelanden (lyssna / 0
privat funktion), och returnerar ett resultat (grab_result / 0
). I mer komplicerade situationer kan vi också behöva upprätthålla ett delat tillstånd eller hantera felen.
Som jag sa i början av artikeln är det inte nödvändigt att uppfinna en cykel igen. I stället kan vi använda GenServer-beteende som redan tillhandahåller all källkodskod för oss och har stort stöd för serverprocesser (som vi såg i föregående avsnitt).
Beteende i Elixir är en kod som implementerar ett gemensamt mönster. För att kunna använda GenServer måste du definiera en speciell återuppringningsmodul som uppfyller avtalet som diktat av beteendet. Specifikt bör det genomföra några återuppringningsfunktioner, och det faktiska genomförandet är upp till dig. Efter återkallelserna är skrivna beteendemodul kan använda dem.
Som anges av docs kräver GenServer sex återuppringningar som ska genomföras, även om de har standardinställningar också. Det innebär att du bara kan omdefiniera de som kräver viss anpassad logik.
Första saker först: vi måste starta servern innan du gör något annat, så fortsätt till nästa avsnitt!
För att visa användningen av GenServer, låt oss skriva en CalcServer
som tillåter användare att tillämpa olika operationer till ett argument. Resultatet av operationen lagras i a serverstatus, och sedan kan en annan operation appliceras på det också. Eller en användare kan få ett slutresultat av beräkningarna.
Först och främst använd använd makroet för att ansluta GenServer:
defmodule CalcServer använder GenServer-änden
Nu måste vi omdefiniera några återuppringningar.
Den första är init / 1, som påkallas när en server startas. Det överförda argumentet används för att ställa in en första serverns tillstånd. I det enklaste fallet ska denna återuppringning returnera : ok, initial_state
tuple, men det finns andra möjliga returvärden som : stop, reason
, vilket gör att servern omedelbart slutar.
Jag tror att vi kan tillåta användare att definiera det ursprungliga tillståndet för vår server. Vi måste dock kontrollera att det överlämnade argumentet är ett nummer. Så använd en skyddsklausul för det:
defmodule CalcServer använder GenServer def init (initial_value) när is_number (initial_value) gör : ok, initial_value slut def init (_) gör : stop, "Värdet måste vara ett heltal!" slutet
Nu startar du bara servern genom att använda start / 3-funktionen och ge din CalcServer
som en återkopplingsmodul (det första argumentet). Det andra argumentet kommer att vara det ursprungliga tillståndet:
GenServer.start (CalcServer, 5.1) |> IO.inspect # => : ok, #PID<0.85.0>
Om du försöker skicka ett icke-nummer som ett andra argument, kommer servern inte att startas, vilket är exakt vad vi behöver.
Bra! Nu när vår server körs kan vi börja kodning av matematiska operationer.
Asynkrona förfrågningar kallas avgjutningar i GenServers termer. För att utföra en sådan förfrågan, använd cast / 2-funktionen, som accepterar en server och den faktiska begäran. Det liknar sqrt / 2
funktion som vi kodade när vi pratade om serverprocesser. Det använder också "brand och glöm" -metoden, vilket innebär att vi inte väntar på att begäran ska slutföras.
För att hantera de asynkrona meddelandena används en handtagen_cast / 2 återuppringning. Den accepterar en förfrågan och ett tillstånd och ska svara med en tupel : noreply, new_state
i det enklaste fallet (eller : stop, reason, new_state
för att stoppa serverns loop). Låt oss exempelvis hantera en asynkron : sqrt
kasta:
def handle_cast (: sqrt, state) gör : noreply,: math.sqrt (state) slutet
Så här behåller vi vår servers tillstånd. I början var numret (skickat när servern startades) var 5,1
. Nu uppdaterar vi staten och ställer den till : Math.sqrt (5,1)
.
Koda gränssnittsfunktionen som använder cast / 2
:
def sqrt (pid) gör GenServer.cast (pid,: sqrt) slut
För mig liknar detta en ond trollkarl som kastar en stavning men bryr sig inte om den inverkan den orsakar.
Observera att vi behöver ett process-ID för att utföra casten. Kom ihåg att när en server startas framgångsrikt, en tuple : ok, pid
returneras. Låt oss därför använda mönstermatchning för att extrahera process-id:
: ok, pid = GenServer.start (CalcServer, 5.1) CalcServer.sqrt (pid)
Trevlig! Samma tillvägagångssätt kan användas för att implementera, säga multiplikation. Koden blir lite mer komplex eftersom vi måste passera det andra argumentet, en multiplikator:
def multiplicera (pid, multiplikator) slutar GenServer.cast (pid, : multiplicera, multiplikator)
De kasta
funktionen stöder bara två argument, så jag måste bygga en tupel och ge ett ytterligare argument där.
Nu återkallelsen:
def handle_cast (: multiplicera, multiplikator, state) gör : noreply, state * multiplikator slut
Vi kan också skriva en singel handle_cast
återuppringning som stöder operation samt stoppar servern om operationen är okänd:
def hand_cast (operation, state) gör falloperation göra: sqrt -> : noreply: math.sqrt (state) : multiplicera multiplikator -> : noreply, state * multiplikator _ -> : stop, "Ej implementerat", state slutet slutet
Använd nu den nya gränssnittsfunktionen:
CalcServer.multiply (pid, 2)
Bra, men för närvarande finns det inget sätt att få ett resultat av beräkningarna. Därför är det dags att definiera ännu en återuppringning.
Om asynkrona förfrågningar är gjorda heter de synkrona samtal. För att köra sådana förfrågningar, använd samtal / 3-funktionen, som accepterar en server, förfrågan och en valfri timeout som motsvarar fem sekunder som standard.
Synkrona förfrågningar används när vi vill vänta tills svaret faktiskt kommer från servern. Det typiska användningsfallet får information som ett resultat av beräkningar, som i dagens exempel (kom ihåg det grab_result / 0
funktion från en av de föregående sektionerna).
För att behandla synkrona förfrågningar, a handle_call / 3
återuppringning används. Den accepterar en förfrågan, en tupel som innehåller serverns pid och en term som identifierar samtalet samt det aktuella tillståndet. I det enklaste fallet ska det reagera med en tupel : svara, svara, new_state
.
Koda denna återuppringning nu:
def handle_call (: result, _ state) gör : svar, state, state slutet
Som ni ser, inget komplicerat. De svar
och det nya tillståndet är lika med det nuvarande tillståndet eftersom jag inte vill ändra något efter att resultatet returnerats.
Nu gränssnittet resultat / 1
fungera:
def result (pid) gör GenServer.call (pid,: result) slutet
Detta är det! Den slutliga användningen av CalcServer visas nedan:
: ok, pid = GenServer.start (CalcServer, 5.1) CalcServer.sqrt (pid) CalcServer.multiply (pid, 2) CalcServer.result (pid) |> IO.puts # => 4.516635916254486
Det blir lite tråkigt att alltid tillhandahålla ett process-id när man ringer gränssnittsfunktionerna. Lyckligtvis är det möjligt att ge din process ett namn, eller en alias. Detta görs vid start av servern genom inställning namn
:
GenServer.start (CalcServer, 5.1, namn:: calc) CalcServer.sqrt CalcServer.multiply (2) CalcServer.result |> IO.puts
Observera att jag inte lagrar pid nu, men du kanske vill göra mönstermatchning så att servern verkligen startades.
Nu blir gränssnittsfunktionerna lite enklare:
def sqrt gör GenServer.cast (: calc,: sqrt) end def multiplicera (multiplicerare) gör GenServer.cast (: calc, : multiplicera, multiplikator) slutresultatet gör GenServer.call (: calc,: result) slut
Glöm inte att du inte kan starta två servrar med samma alias.
Alternativt kan du introducera ytterligare en gränssnittsfunktion start / 1
inuti din modul och utnyttja __MODULE __ / 0-makroet, som returnerar nuvarande modulens namn som en atom:
defmodule CalcServer använder GenServer def start (initial_value) gör GenServer.start (CalcServer, initial_value, namn: __MODULE__) slut def sqrt gör GenServer.cast (__ MODULE__,: sqrt) slut def multiplicera (multiplikator) gör GenServer.cast (__ MODULE__, : multiplicera, multiplikator) avsluta def resultat gör GenServer.call (__ MODULE__,: result) slutet # ... slutet CalcServer.start (6.1) CalcServer.sqrt CalcServer.multiply (2) CalcServer.result |> IO.puts
En annan återuppringning som kan omdefinieras i din modul heter terminate / 2. Den accepterar en anledning och det aktuella tillståndet, och det kallas när en server håller på att avsluta. Detta kan hända när du till exempel skickar ett felaktigt argument till multiplicera / 1
gränssnittsfunktion:
# ... CalcServer.multiply (2)
Återuppringningen kan se ut så här:
def terminate (_reason, _state) gör IO.puts "Slutet på serverns slut"
I den här artikeln har vi täckt grunderna för samtidighet i Elixir och diskuterat funktioner och makron som rom
, motta
, och skicka
. Du har lärt dig vilka processer som är, hur man skapar dem och hur man skickar och tar emot meddelanden. Vi har också sett hur man bygger en enkel långsiktig serverprocess som svarar på både synkrona och asynkrona meddelanden.
Utöver det har vi diskuterat GenServer-beteendet och sett hur det förenklar koden genom att införa olika callbacks. Vi har arbetat med i det
, avsluta
, handle_call
och handle_cast
callbacks och skapade en enkel beräknings server. Om något tycktes oklart för dig, tveka inte att skicka dina frågor!
Det finns mer till GenServer, och det är naturligtvis omöjligt att täcka allt i en artikel. I mitt nästa inlägg kommer jag att förklara vad handledare är och hur du kan använda dem för att övervaka dina processer och återställa dem från fel. Fram till dess lycklig kodning!