Noen ganger tar det litt tid før generative modeller returnerer et resultat, og da er det interessant å utnytte token-streaming for å se resultatet vises direkte i brukergrensesnittet. Slik kan du lage en slik frontend for tekststrømming for LLM-modellen din med Go, FastAPI og Javascript.
Som en påminnelse er et token en unik enhet som enten kan være et lite ord, en del av et ord eller tegnsetting. I gjennomsnitt består 1 token av 4 tegn, og 100 tokens tilsvarer omtrent 75 ord. Modeller for naturlig språkbehandling må gjøre teksten om til tokens for å kunne behandle den.
Når du bruker en AI-modell for tekstgenerering (også kjent som "generativ" modell), kan responstiden være ganske høy, avhengig av maskinvaren og størrelsen på modellen. Hvis du for eksempel bruker en stor språkmodell (også kjent som "LLM") som LLaMA 30B på en NVIDIA A100 GPU i fp16, genererer modellen 100 tokens på rundt 3 sekunder. Så hvis du forventer at den generative modellen skal generere en stor tekst på hundrevis eller tusenvis av ord, vil ventetiden være høy, og du må kanskje vente i mer enn 10 sekunder på å få hele teksten. mer enn 10 sekunder for å få hele svaret.
Det kan være problematisk for brukeropplevelsen å vente så lenge på å få svar. Løsningen i slike tilfeller er token streaming!
Token-streaming handler om å generere hvert nytt token underveis i stedet for å vente på at hele svaret skal være klart. Det er dette du kan kan se i ChatGPT-appen, eller for eksempel i NLP Cloud ChatDolphin-assistenten. Ordene vises så snart de er generert av modellen. Prøv ChatDolphin AI-assistenten her.
Token-strømming med ChatDolphin-assistenten på NLP Cloud. Prøv den her.
Det første du må gjøre, er å bruke en inferensmotor som støtter token-strømming.
Her er noen alternativer du bør vurdere:
Her er et eksempel på bruk av HuggingFace generate()-metoden:
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
I dette eksemplet genererer vi en utdata med GPT-2-modellen, og vi skriver ut hvert token i konsollen så snart det ankommer.
Nå som du har valgt en inferensmotor, må du betjene modellen og returnere de strømmede tokens.
Modellen din vil sannsynligvis kjøres i et Python-miljø, så du trenger en Python-server for å kunne returnere tokens og gjøre dem tilgjengelige via et HTTP-API. FastAPI har blitt et de facto-valg for slike situasjoner.
Her bruker vi Uvicorn og FastAPIs StreamingResponse for å servere hvert token så snart det er generert. Her er et eksempel:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Du kan teste strømmeserveren med følgende cURL-kommando:
curl -N localhost:8000
Vi har nå en fungerende AI-modell som returnerer streamede tokens på riktig måte.
Vi kan lese disse streamede tokens direkte fra en klientapplikasjon i en nettleser. Men det skal vi ikke gjøre av to grunner.
For det første er det viktig å frikoble AI-modellen AI-modellen fra resten av stacken, fordi vi ikke ønsker å starte modellen på nytt hver gang vi skal vi skal gjøre en liten endring i API-et. Husk at moderne generative AI-modeller er veldig tunge og ofte tar flere og tar ofte flere minutter å starte på nytt.
En annen grunn er at Python ikke nødvendigvis er det beste valget når det gjelder å bygge når det gjelder å bygge en samtidig applikasjon med høy gjennomstrømning, slik vi skal gjøre. Dette valget kan selvsagt diskuteres, og det kan også være en smakssak!
Som nevnt ovenfor er det viktig å legge til en gateway mellom modellen og den endelige klienten, og Go er et godt programmeringsspråk for en slik applikasjon. I produksjonssammenheng kan det også være lurt å legge til en reversert proxy mellom Go-gatewayen og sluttklienten, og en lastbalanseringsenhet mellom Go-gatewayen og AI-modellen for å fordele for å spre belastningen på flere replikaer av modellen. Men det er utenfor artikkelens omfang!
Go-applikasjonen vår har også ansvaret for å rendere den endelige HTML-siden.
Denne applikasjonen sender en forespørsel til FastAPI-appen, mottar de strømmede tokens fra FastAPI og videresender hvert token til frontend ved hjelp av token til frontenden ved hjelp av SSE (Server Sent Events). SSE er enklere enn websockets fordi det er enveis. Det er er et godt valg når du vil bygge en applikasjon som sender informasjon til en klient uten å lytte til et eventuelt svar fra klienten. svar fra klienten.
Her er Go-koden (HTML/JS/CSS-malen vises i neste avsnitt):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
Vår "/home"-side gjengir HTML/CSS/JS-siden (vises senere). "/start"-siden mottar en POST-forespørsel fra JS-applikasjonen som JS-applikasjonen som utløser en forespørsel til AI-modellen vår. Og vår "/generate"-side returnerer resultatet til JS-appen via hendelser sendt fra serveren.
Når start()-funksjonen mottar en POST-forespørsel fra frontenden, oppretter den automatisk en goroutine som sender en forespørsel til FastAPI-appen vår. til FastAPI-appen vår.
GenerateText()-funksjonen kaller FastAPI og returnerer hvert token som mottas underveis gjennom en dedikert kanal (streamedTextCh). Hvis EOF-tegnet mottas fra FastAPI, betyr det at tekstgenereringen er over.
Generere()-funksjonen er en uendelig løkke som venter på nye tokens som mottas fra streamedTextCh-kanalen. Når et nytt token er mottatt, sendes det automatisk til frontend som en server-sendt hendelse. Server-sendte hendelser må følge en bestemt formatering som bruker prefiksene "event:" og "data:". prefikser, derav funksjonen formatServerSentEvent().
For at SSE skal være komplett, trenger vi en Javascript-klient som kan lytte til hendelser fra serveren ved å abonnere på "generer"-siden. Se neste avsnitt for å forstå hvordan du gjør det.
Du må nå opprette en "templates"-katalog og legge til en "home.html"-fil i den.
Her er innholdet i "home.html":
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
Som du ser, er det ganske enkelt å lytte til SSE i nettleseren.
Først må du abonnere på SSE-endepunktet vårt (siden "/generate"). Deretter må du legge til en hendelseslytter som leser de strømmede tokens så snart de mottas. de mottas.
Moderne nettlesere prøver automatisk å koble til hendelseskilden i tilfelle tilkoblingsproblemer.
Nå vet du hvordan du kan lage en moderne generativ AI-applikasjon som dynamisk strømmer tekst i nettleseren, à la ChatGPT!
Som du sikkert har lagt merke til, er en slik applikasjon ikke nødvendigvis enkel, siden den består av flere lag er involvert. Og selvfølgelig er koden ovenfor forenklet for eksemplets skyld. eksemplets skyld.
Den største utfordringen med token-strømming er håndtering av nettverksfeil. De fleste av disse feilene vil skje mellom Go-backend og Javascript-frontend. Du må mer avanserte strategier for gjenoppretting av tilkoblinger og sørge for at feil rapporteres til brukergrensesnittet på feil rapporteres på riktig måte til brukergrensesnittet.
Jeg håper at du fant denne veiledningen nyttig!
Vincent
Utvikleradvokat hos NLP Cloud