Kæmper du med AI eller full-stack-udvikling? Vores eksperter er her for at vejlede dig: skræddersyet rådgivning, teknisk integration og meget mere. Kontakt os på [email protected].

Sådan udvikler du en Token Streaming UI til din LLM med Go, FastAPI og JS

Generative modeller tager nogle gange noget tid om at returnere et resultat, så det er interessant at udnytte token-streaming for at se resultatet dukke op i brugergrænsefladen på et øjeblik. Her kan du se, hvordan du kan opnå en sådan tekststreaming-frontend til din LLM med Go, FastAPI og Javascript.

Udvikler på PC

Hvad er Token Streaming?

Som en påmindelse er et token en unik enhed, der enten kan være et lille ord, en del af et ord eller tegnsætning. I gennemsnit består 1 token af 4 tegn, og 100 tokens svarer nogenlunde til 75 ord. Natural Language Processing-modeller er nødt til at omdanne din tekst til tokens for at kunne behandle den.

Når du bruger en AI-model til tekstgenerering (også kendt som "generativ" model), kan responstiden være ret høj, afhængigt af din hardware og størrelsen på din model. For eksempel, i tilfælde af en stor sprogmodel (også kendt som "LLM") som LLaMA 30B, implementeret på en NVIDIA A100 GPU i fp16, genererer modellen 100 tokens på omkring 3 sekunder. Så hvis du forventer, at din generative model skal generere et stort stykke tekst på hundreder eller tusinder af ord, vil latenstiden være høj, og du bliver nødt til at vente måske mere end 10 sekunder for at få det fulde svar.

At vente så længe på at få et svar kan være et problem set fra et brugeroplevelsessynspunkt. Løsningen i det tilfælde er token-streaming!

Token-streaming handler om at generere hvert nyt token på farten i stedet for at vente på, at hele svaret er klar. Det er det, du kan kan se på ChatGPT-appen eller på NLP Cloud ChatDolphin-assistenten for eksempel. Ord vises, så snart de er genereret af modellen. Prøv ChatDolphin AI-assistenten her.

Token-streaming med ChatDolphin på NLP Cloud Token-streaming med ChatDolphin-assistenten på NLP Cloud. Prøv det her.

Valg af en inferensmotor, der understøtter Token Streaming

Det første skridt vil være at bruge en inferensmotor, der understøtter token-streaming.

Her er nogle muligheder, du måske bør overveje:

Her er et eksempel, der bruger HuggingFace generate()-metoden:

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

I dette eksempel genererer vi et output med GPT-2-modellen, og vi udskriver hvert token i konsollen, så snart det ankommer.

Streaming af svaret med FastAPI

Nu, hvor du har valgt en inferensmotor, skal du betjene din model og returnere de streamede tokens.

Din model vil sandsynligvis køre i et Python-miljø, så du skal bruge en Python-server for at returnere tokens og gøre dem tilgængelige via en HTTP API. FastAPI er blevet et de facto-valg til sådanne situationer.

Her bruger vi Uvicorn og FastAPI's StreamingResponse til at servere hvert token, så snart det er genereret. Her er et eksempel:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Du kan teste din streamingserver med følgende cURL-kommando:

curl -N localhost:8000

Vi har nu en fungerende AI-model, der returnerer streamede tokens korrekt.

Vi kunne læse disse streamede tokens direkte fra en klientapplikation i en browser. Men det har vi ikke tænkt os at gøre af to grunde.

For det første er det vigtigt at afkoble AI-modellen fra resten af stakken, fordi vi ikke ønsker at genstarte modellen hver gang vi laver en lille ændring i API'en. Husk på, at moderne generative AI-modeller er meget tunge og ofte tager flere minutter at genstarte.

En anden grund er, at Python ikke nødvendigvis er det bedste valg når det handler om at bygge en samtidig applikation med høj kapacitet, som vi har tænkt os at gøre. Dette valg kan selvfølgelig diskuteres, og det kan også være et spørgsmål om smag!

Videresendelse af tokens gennem en Go Gateway

Som nævnt ovenfor er det vigtigt at tilføje en gateway mellem din model og din endelige klient, og Go er et godt programmeringssprog til sådan en applikation. I produktion vil du måske også gerne tilføje en reverse proxy mellem Go-gatewayen og den endelige klient, og en load balancer mellem din Go-gateway og din AI-model for at fordele for at sprede belastningen på flere replikaer af din model. Men det er uden for rammerne af vores artikel!

Vores Go-applikation vil også være ansvarlig for at gengive den endelige HTML-side.

Denne applikation laver en forespørgsel til FastAPI-appen, modtager de streamede tokens fra FastAPI og videresender hvert token til frontenden ved hjælp af Server Sent Events (SSE). SSE er enklere end websockets, fordi det er ensrettet. Det er er et godt valg, når du vil bygge et program, der sender oplysninger til en klient uden at lytte til et potentielt svar fra klienten. svar fra klienten.

Her er Go-koden (HTML/JS/CSS-skabelonen vil blive vist i næste afsnit):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Vores "/home"-side gengiver HTML/CSS/JS-siden (vises senere). "/start"-siden modtager en POST-forespørgsel fra JS-applikationen, der udløser en anmodning til vores AI-model. Og vores "/generate"-side returnerer resultatet til JS-applikationen via server-sendte events.

Når start()-funktionen modtager en POST-anmodning fra frontenden, opretter den automatisk en goroutine, der sender en anmodning til vores FastAPI-app. til vores FastAPI-app.

Funktionen generateText() kalder FastAPI og returnerer hvert token, der modtages på farten gennem en dedikeret kanal (streamedTextCh). Hvis EOF-tegnet modtages fra FastAPI, betyder det, at tekstgenereringen er slut.

Funktionen generate() er en uendelig løkke, der venter på nye tokens modtaget fra streamedTextCh-kanalen. Når et nyt token er modtaget, skubbes det automatisk til frontenden som en server-sendt begivenhed. Server-sendte begivenheder skal følge en bestemt formatering, der bruger præfikserne "event:" og "data:" præfikser, deraf funktionen formatServerSentEvent().

For at SSE skal være komplet, har vi brug for en Javascript-klient, der kan lytte til server-sendte events ved at abonnere på "generer"-siden. Se næste afsnit for at forstå, hvordan vi opnår det.

Modtagelse af tokens med Javascript i browseren

Du skal nu oprette en "templates"-mappe og tilføje en "home.html"-fil i den.

Her er indholdet af "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

Som du kan se, er det ret ligetil at lytte til SSE i browseren.

Først skal du abonnere på vores SSE-endpoint (siden "/generate"). Derefter skal du tilføje en event listener, der læser de streamede tokens, så snart de er modtaget.

Moderne browsere forsøger automatisk at genoprette begivenhedskilden i tilfælde af forbindelsesproblemer.

Konklusion

Du ved nu, hvordan man skaber en moderne generativ AI-applikation, der dynamisk dynamisk streamer tekst i browseren, à la ChatGPT!

Som du har bemærket, er en sådan applikation ikke nødvendigvis enkel, da flere lag er involveret. Og selvfølgelig er ovenstående kode forsimplet af hensyn til eksemplet. eksemplets skyld.

Den største udfordring med token-streaming er at håndtere netværksfejl. De fleste af disse fejl vil ske mellem Go-backend og Javascript-frontend. Du bliver nødt til udforske nogle mere avancerede genforbindelsesstrategier og sørge for, at fejl rapporteres rapporteres korrekt til brugergrænsefladen.

Jeg håber, at du fandt denne vejledning nyttig!

Vincent
Udvikleradvokat hos NLP Cloud