Så här utvecklar du ett Token Streaming UI för din LLM med Go, FastAPI och JS

Generativa modeller tar ibland lite tid på sig att returnera ett resultat, så det är intressant att utnyttja token streaming för att se resultatet visas direkt i användargränssnittet. Så här kan du skapa en sådan frontend för textstreaming för din LLM med Go, FastAPI och Javascript.

Developer på PC

Vad är Token Streaming?

En token är en unik enhet som antingen kan vara ett litet ord, en del av ett ord eller ett skiljetecken. I genomsnitt består 1 token av 4 tecken, och 100 tokens motsvarar ungefär 75 ord. Modeller för naturlig språkbehandling måste omvandla din text till tokens för att kunna bearbeta den.

När du använder en AI-modell för textgenerering (även kallad "generativ" modell) kan svarstiden vara ganska hög, beroende på din maskinvara och storleken på din modell. När det gäller en stor språkmodell (även kallad "LLM") som LLaMA 30B, som används på en NVIDIA A100 GPU i fp16, genererar modellen exempelvis 100 tokens på cirka 3 sekunder. Så om du förväntar dig att din generativa modell ska generera en stor text med hundratals eller tusentals ord, kommer latensen att vara hög och du kommer att behöva vänta kanske mer än 10 sekunder för att få det fullständiga svaret.

Att vänta så länge på att få ett svar kan vara ett problem ur användarupplevelsesynpunkt. Lösningen i det fallet är token streaming!

Token streaming handlar om att generera varje ny token i farten istället för att vänta på att hela svaret ska vara klart. Detta är vad du kan kan se i ChatGPT-appen, eller till exempel i NLP Cloud ChatDolphin-assistenten. Ord visas så snart de genereras av modellen. Prova AI-assistenten ChatDolphin här.

Token-streaming med ChatDolphin på NLP Cloud Token-streaming med ChatDolphin-assistenten på NLP Cloud. Prova här.

Välja en inferensmotor som stöder Token Streaming

Det första steget är att använda en inferensmotor som stöder token-streaming.

Här är några alternativ som du kanske vill överväga:

Här är ett exempel som använder metoden generera() för HuggingFace:

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

I det här exemplet genererar vi en utmatning med GPT-2-modellen och vi skriver ut varje token i konsolen så snart den anländer.

Strömma svaret med FastAPI

Nu när du har valt en inferensmotor måste du servera din modell och returnera de streamade tokens.

Din modell kommer sannolikt att köras i en Python-miljö, så du behöver en Python-server för att returnera tokens och göra dem tillgängliga via ett HTTP API. FastAPI har blivit ett de facto-val för sådana situationer.

Här använder vi Uvicorn och FastAPI:s StreamingResponse för att servera varje token så snart den genereras. Här är ett exempel:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Du kan testa din streaming-server med hjälp av följande cURL-kommando:

curl -N localhost:8000

Vi har nu en fungerande AI-modell som returnerar streamade tokens på rätt sätt.

Vi skulle kunna läsa dessa streamade tokens direkt från en klientapplikation i en webbläsare. Men vi kommer inte att göra det, av två skäl.

För det första är det viktigt att frikoppla AI-modellen från resten av stacken eftersom vi inte vill starta om modellen varje gång vi ska göra en liten ändring i API:et. Tänk på att moderna generativa AI-modeller är mycket tunga och tar ofta flera minuter att starta om.

En andra anledning är att Python inte nödvändigtvis är det bästa valet när det gäller att bygga en samtidig applikation med hög genomströmning som vi kommer att göra. Detta val kan naturligtvis diskuteras och det kan också vara en smakfråga!

Vidarebefordra tokens genom en Go-gateway

Som nämnts ovan är det viktigt att lägga till en gateway mellan din modell och din slutkund, och Go är ett bra programmeringsspråk för en sådan applikation. I produktion kanske du också vill lägga till en omvänd mellan Go-gatewayen och slutklienten, och en lastbalanserare mellan Go-gatewayen och AI-modellen för att fördela för att sprida belastningen på flera repliker av din modell. Men det ligger utanför ramen för vår artikel!

Vår Go-applikation kommer också att ansvara för renderingen av den slutliga HTML-sidan.

Den här applikationen gör en förfrågan till FastAPI-appen, tar emot de streamade tokens från FastAPI och vidarebefordrar varje token till frontenden med hjälp av SSE (Server Sent Events). SSE är enklare än websockets eftersom det är enkelriktat. Det är är ett bra val när du vill bygga ett program som skickar information till en klient, utan att lyssna på ett potentiellt svar från klienten.

Här är Go-koden (HTML/JS/CSS-mallen visas i nästa avsnitt):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Vår "/home"-sida renderar HTML/CSS/JS-sidan (visas senare). Sidan "/start" tar emot en POST-begäran från JS-applikationen som utlöser en begäran till vår AI-modell. Och vår "/generate"-sida returnerar resultatet till JS-applikationen via serverskickade händelser.

När start()-funktionen tar emot en POST-förfrågan från frontenden skapar den automatiskt en goroutine som gör en förfrågan till vår FastAPI-app.

Funktionen generateText() anropar FastAPI och returnerar varje token som tas emot i farten via en dedikerad kanal (streamedTextCh). Om tecknet EOF tas emot från FastAPI betyder det att textgenereringen är över.

Funktionen generate() är en oändlig loop som väntar på nya tokens som tas emot från streamedTextCh-kanalen. När en ny token har tagits emot, skickas den automatiskt till frontenden som en serverskickad händelse. Serverskickade händelser måste följa en specifik formatering som använder prefixen "event:" och "data:" prefix, därav funktionen formatServerSentEvent().

För att SSE ska vara komplett behöver vi en Javascript-klient som kan lyssna på händelser som skickas till servern genom att prenumerera på sidan "generate". Se nästa avsnitt för att förstå hur du uppnår detta.

Mottagning av tokens med Javascript i webbläsaren

Du måste nu skapa en "templates"-katalog och lägga till en "home.html"-fil i den.

Här är innehållet i "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

Som du kan se är det ganska enkelt att lyssna på SSE i webbläsaren.

Först måste du prenumerera på vår SSE-endpoint (sidan "/generate"). Sedan måste du lägga till en event-lyssnare som läser de streamade tokens så snart de tas emot.

Moderna webbläsare försöker automatiskt återansluta händelsekällan om det uppstår problem med anslutningen.

Slutsats

Du vet nu hur man skapar en modern generativ AI-applikation som dynamiskt strömmar text i webbläsaren, à la ChatGPT!

Som du har märkt är en sådan applikation inte nödvändigtvis enkel eftersom flera lager är inblandade. Och naturligtvis är koden ovan alltför förenklad för exemplets skull. exempel.

Den största utmaningen med token streaming är att hantera nätverksfel. De flesta av dessa fel kommer att inträffa mellan Go-backend och Javascript-frontend. Du kommer att behöva utforska några mer avancerade återanslutningsstrategier och se till att fel rapporteras korrekt till användargränssnittet.

Jag hoppas att du tyckte att den här handledningen var användbar!

Vincent
Utvecklare på NLP Cloud