Hogyan fejlesszünk Token Streaming UI-t az LLM-hez Go, FastAPI és JS segítségével?

A generatív modelleknek néha időbe telik, amíg eredményt adnak, ezért érdekes kihasználni a token streaminget, hogy az eredmény menet közben jelenjen meg a felhasználói felületen. Íme, hogyan érhet el egy ilyen szövegstreaming frontendet az LLM-hez Go, FastAPI és Javascript segítségével.

Fejlesztő a PC-n

Mi az a Token Streaming?

Emlékeztetőül, a token egy egyedi entitás, amely lehet egy kis szó, egy szó része vagy írásjel. Átlagosan 1 token 4 karakterből áll, és 100 token nagyjából 75 szónak felel meg. A természetes nyelvfeldolgozó modelleknek tokenekké kell alakítaniuk a szöveget, hogy feldolgozhassák azt.

Szöveggeneráló mesterséges intelligenciamodell (más néven "generatív" modell) használata esetén a válaszidő a hardvertől és a modell méretétől függően meglehetősen magas lehet. Például egy nagyméretű nyelvi modell (más néven "LLM"), például az NVIDIA A100 GPU-n fp16-ban telepített LLaMA 30B esetében a modell 100 tokent körülbelül 3 másodperc alatt generál. Ha tehát arra számít, hogy a generatív modell egy nagy, több száz vagy több ezer szóból álló szöveget generál, akkor a késleltetés nagy lesz, és várnia kell, hogy talán 10 másodpercnél is többet kell várni a teljes válaszra.

A válaszadásra való hosszú várakozás a felhasználói élmény szempontjából problémát jelenthet. A megoldás ebben az esetben a token streaming!

A token streaming arról szól, hogy minden új tokent menet közben generálnak, ahelyett, hogy megvárnák, amíg az egész válasz elkészül. Ez az, amit láthatjuk például a ChatGPT alkalmazáson vagy az NLP Cloud ChatDolphin asszisztensén. A szavak azonnal megjelennek, amint a modell generálja őket. Próbálja ki a ChatDolphin AI asszisztenst itt.

Token streaming a ChatDolphin segítségével az NLP Cloudon Token streaming a ChatDolphin asszisztenssel az NLP Cloudon. Próbálja ki itt.

Token streaminget támogató következtetési motor kiválasztása

Az első lépés az lesz, hogy olyan következtetési motort használjon, amely támogatja a token streaminget.

Íme néhány lehetőség, amelyet érdemes megfontolni:

Íme egy példa az HuggingFace generate() metódus használatával:

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

Ebben a példában a GPT-2 modellel generálunk egy kimenetet, és minden egyes tokent kiírunk a konzolra, amint megérkezik.

A válasz streamelése a FastAPI segítségével

Most, hogy kiválasztott egy következtetési motort, ki kell szolgálnia a modelljét, és vissza kell adnia a streamelt tokeneket.

A modell valószínűleg Python környezetben fog futni, így szükséged lesz egy Python szerverre a tokenek visszaküldéséhez. és egy HTTP API-n keresztül elérhetővé tegye őket. A FastAPI de facto választássá vált az ilyen helyzetekben.

Itt az Uvicornt és a FastAPI StreamingResponse-t használjuk, hogy minden egyes tokent azonnal kiszolgáljunk, amint az generálódik. Íme egy példa:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

A következő cURL parancs segítségével tesztelheti a streaming szerverét:

curl -N localhost:8000

Most már van egy működő AI modellünk, amely megfelelően adja vissza a streaming tokeneket.

Ezeket a streamelt tokeneket közvetlenül egy böngészőben lévő ügyfélalkalmazásból olvashatjuk. De ezt nem fogjuk megtenni, 2 okból kifolyólag.

Először is, fontos, hogy szétválasszuk a az AI modellt a verem többi részétől, mert nem akarjuk a modellt minden alkalommal újraindítani, amikor egy apró változtatást hajtunk végre az API-n. Ne feledjük, hogy a modern generatív AI modellek nagyon nehézkesek. és gyakran több percig tart az újraindításuk.

A második ok az, hogy a Python nem feltétlenül a legjobb választás. ha egy olyan nagy átviteli sebességű, párhuzamos alkalmazás építéséről van szó, mint amilyet mi fogunk csinálni. Ez a választás természetesen megvitatható, és lehet, hogy ízlés kérdése is!

Tokenek továbbítása egy Go Gateway-n keresztül

Mint fentebb említettük, fontos, hogy a modell és a végső ügyfél között átjárót hozzon létre, és a Go egy jó programozási nyelv egy ilyen alkalmazáshoz. A gyártás során érdemes lehet egy fordított proxy-t a Go átjáró és a végső ügyfél közé, valamint egy terheléskiegyenlítőt a Go átjáró és az AI modell között, hogy hogy a terhelést a modell több replikájára ossza el. De ez nem tartozik cikkünk tárgykörébe!

A Go alkalmazásunk lesz felelős a végső HTML oldal megjelenítéséért is.

Ez az alkalmazás kérést intéz a FastAPI alkalmazáshoz, megkapja a FastAPI-tól a streamelt tokeneket, és minden egyes tokent továbbít a FastAPI alkalmazáshoz. tokeneket a frontendhez a Server Sent Events (SSE) segítségével. Az SSE egyszerűbb, mint a websockets, mivel egyirányú. A jó választás, ha olyan alkalmazást szeretne létrehozni, amely információt küld az ügyfélnek, anélkül, hogy meghallgatná a potenciális kliens válaszát.

Itt van a Go kód (a HTML/JS/CSS sablon a következő részben lesz látható):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

A "/home" oldalunk a HTML/CSS/JS oldalt rendereli (később mutatjuk be). A "/start" oldal egy POST kérést kap a JS alkalmazástól, amely egy kérést indít az AI modellünkhöz. A "/generate" oldalunk pedig a szerver által küldött eseményeken keresztül küldi vissza az eredményt a JS alkalmazásnak.

Amint a start() függvény megkapja a POST kérést a frontendtől, automatikusan létrehoz egy goroutine-t, amely egy kérést fog végrehajtani. a FastAPI alkalmazásunknak.

A generateText() függvény meghívja a FastAPI-t, és egy külön csatornán (streamedTextCh) keresztül visszaad minden menet közben kapott tokent. Ha a FastAPI-tól EOF karakter érkezik, az azt jelenti, hogy a szöveggenerálásnak vége.

A generate() függvény egy végtelen ciklus, amely a streamedTextCh csatornáról érkező új tokenekre vár. Amint egy új token érkezik, az automatikusan a frontendre kerül, mint egy kiszolgáló által küldött esemény. A kiszolgáló által küldött eseményeknek egy meghatározott formázást kell követniük, amely az "event:" és "data:" kifejezéseket használja. előtagokat használ, ezért van a formatServerSentEvent() függvény.

Ahhoz, hogy az SSE teljes legyen, szükségünk van egy Javascript kliensre, amely képes figyelni a szerver által küldött eseményeket a "generál" oldalra való feliratkozással. Lásd a következő szakaszt, hogy megértsd, hogyan érheted el ezt.

Tokenek fogadása Javascript segítségével a böngészőben

Most létre kell hoznod egy "sablonok" könyvtárat, és hozzá kell adnod egy "home.html" fájlt.

Íme a "home.html" tartalma:

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

Amint láthatod, az SSE hallgatása a böngészőben elég egyszerű.

Először is fel kell iratkoznia az SSE végpontunkra (a "/generate" oldalra). Aztán hozzá kell adnod egy eseményhallgatót, amely beolvassa a streamelt tokeneket, amint érkeznek.

A modern böngészők automatikusan megpróbálnak újracsatlakozni az eseményforrást, ha kapcsolati problémák merülnek fel.

Következtetés

Most már tudja, hogyan hozzon létre egy modern generatív AI alkalmazást, amely dinamikusan szövegfolyamot a böngészőben, à la ChatGPT!

Mint észrevetted, egy ilyen alkalmazás nem feltétlenül egyszerű, mivel számos rétegekből áll. És persze a fenti kódot túlságosan leegyszerűsítettük a példa kedvéért.

A token streaming fő kihívása a hálózati hibák kezelése. A legtöbb ezek a hibák a Go backend és a Javascript frontend között következnek be. A meg kell vizsgálnia néhány fejlettebb újrakapcsolási stratégiát, és gondoskodnia kell arról, hogy a hibák megfelelően jelezzük a hibákat a felhasználói felületnek.

Remélem, hogy hasznosnak találtad ezt a bemutatót!

Vincent
Fejlesztői tanácsadó az NLP Cloudnál