A generatív modelleknek néha időbe telik, amíg eredményt adnak, ezért érdekes kihasználni a token streaminget, hogy az eredmény menet közben jelenjen meg a felhasználói felületen. Íme, hogyan érhet el egy ilyen szövegstreaming frontendet az LLM-hez Go, FastAPI és Javascript segítségével.
Emlékeztetőül, a token egy egyedi entitás, amely lehet egy kis szó, egy szó része vagy írásjel. Átlagosan 1 token 4 karakterből áll, és 100 token nagyjából 75 szónak felel meg. A természetes nyelvfeldolgozó modelleknek tokenekké kell alakítaniuk a szöveget, hogy feldolgozhassák azt.
Szöveggeneráló mesterséges intelligenciamodell (más néven "generatív" modell) használata esetén a válaszidő a hardvertől és a modell méretétől függően meglehetősen magas lehet. Például egy nagyméretű nyelvi modell (más néven "LLM"), például az NVIDIA A100 GPU-n fp16-ban telepített LLaMA 30B esetében a modell 100 tokent körülbelül 3 másodperc alatt generál. Ha tehát arra számít, hogy a generatív modell egy nagy, több száz vagy több ezer szóból álló szöveget generál, akkor a késleltetés nagy lesz, és várnia kell, hogy talán 10 másodpercnél is többet kell várni a teljes válaszra.
A válaszadásra való hosszú várakozás a felhasználói élmény szempontjából problémát jelenthet. A megoldás ebben az esetben a token streaming!
A token streaming arról szól, hogy minden új tokent menet közben generálnak, ahelyett, hogy megvárnák, amíg az egész válasz elkészül. Ez az, amit láthatjuk például a ChatGPT alkalmazáson vagy az NLP Cloud ChatDolphin asszisztensén. A szavak azonnal megjelennek, amint a modell generálja őket. Próbálja ki a ChatDolphin AI asszisztenst itt.
Az első lépés az lesz, hogy olyan következtetési motort használjon, amely támogatja a token streaminget.
Íme néhány lehetőség, amelyet érdemes megfontolni:
Íme egy példa az HuggingFace generate() metódus használatával:
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
Ebben a példában a GPT-2 modellel generálunk egy kimenetet, és minden egyes tokent kiírunk a konzolra, amint megérkezik.
Most, hogy kiválasztott egy következtetési motort, ki kell szolgálnia a modelljét, és vissza kell adnia a streamelt tokeneket.
A modell valószínűleg Python környezetben fog futni, így szükséged lesz egy Python szerverre a tokenek visszaküldéséhez. és egy HTTP API-n keresztül elérhetővé tegye őket. A FastAPI de facto választássá vált az ilyen helyzetekben.
Itt az Uvicornt és a FastAPI StreamingResponse-t használjuk, hogy minden egyes tokent azonnal kiszolgáljunk, amint az generálódik. Íme egy példa:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
A következő cURL parancs segítségével tesztelheti a streaming szerverét:
curl -N localhost:8000
Most már van egy működő AI modellünk, amely megfelelően adja vissza a streaming tokeneket.
Ezeket a streamelt tokeneket közvetlenül egy böngészőben lévő ügyfélalkalmazásból olvashatjuk. De ezt nem fogjuk megtenni, 2 okból kifolyólag.
Először is, fontos, hogy szétválasszuk a az AI modellt a verem többi részétől, mert nem akarjuk a modellt minden alkalommal újraindítani, amikor egy apró változtatást hajtunk végre az API-n. Ne feledjük, hogy a modern generatív AI modellek nagyon nehézkesek. és gyakran több percig tart az újraindításuk.
A második ok az, hogy a Python nem feltétlenül a legjobb választás. ha egy olyan nagy átviteli sebességű, párhuzamos alkalmazás építéséről van szó, mint amilyet mi fogunk csinálni. Ez a választás természetesen megvitatható, és lehet, hogy ízlés kérdése is!
Mint fentebb említettük, fontos, hogy a modell és a végső ügyfél között átjárót hozzon létre, és a Go egy jó programozási nyelv egy ilyen alkalmazáshoz. A gyártás során érdemes lehet egy fordított proxy-t a Go átjáró és a végső ügyfél közé, valamint egy terheléskiegyenlítőt a Go átjáró és az AI modell között, hogy hogy a terhelést a modell több replikájára ossza el. De ez nem tartozik cikkünk tárgykörébe!
A Go alkalmazásunk lesz felelős a végső HTML oldal megjelenítéséért is.
Ez az alkalmazás kérést intéz a FastAPI alkalmazáshoz, megkapja a FastAPI-tól a streamelt tokeneket, és minden egyes tokent továbbít a FastAPI alkalmazáshoz. tokeneket a frontendhez a Server Sent Events (SSE) segítségével. Az SSE egyszerűbb, mint a websockets, mivel egyirányú. A jó választás, ha olyan alkalmazást szeretne létrehozni, amely információt küld az ügyfélnek, anélkül, hogy meghallgatná a potenciális kliens válaszát.
Itt van a Go kód (a HTML/JS/CSS sablon a következő részben lesz látható):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
A "/home" oldalunk a HTML/CSS/JS oldalt rendereli (később mutatjuk be). A "/start" oldal egy POST kérést kap a JS alkalmazástól, amely egy kérést indít az AI modellünkhöz. A "/generate" oldalunk pedig a szerver által küldött eseményeken keresztül küldi vissza az eredményt a JS alkalmazásnak.
Amint a start() függvény megkapja a POST kérést a frontendtől, automatikusan létrehoz egy goroutine-t, amely egy kérést fog végrehajtani. a FastAPI alkalmazásunknak.
A generateText() függvény meghívja a FastAPI-t, és egy külön csatornán (streamedTextCh) keresztül visszaad minden menet közben kapott tokent. Ha a FastAPI-tól EOF karakter érkezik, az azt jelenti, hogy a szöveggenerálásnak vége.
A generate() függvény egy végtelen ciklus, amely a streamedTextCh csatornáról érkező új tokenekre vár. Amint egy új token érkezik, az automatikusan a frontendre kerül, mint egy kiszolgáló által küldött esemény. A kiszolgáló által küldött eseményeknek egy meghatározott formázást kell követniük, amely az "event:" és "data:" kifejezéseket használja. előtagokat használ, ezért van a formatServerSentEvent() függvény.
Ahhoz, hogy az SSE teljes legyen, szükségünk van egy Javascript kliensre, amely képes figyelni a szerver által küldött eseményeket a "generál" oldalra való feliratkozással. Lásd a következő szakaszt, hogy megértsd, hogyan érheted el ezt.
Most létre kell hoznod egy "sablonok" könyvtárat, és hozzá kell adnod egy "home.html" fájlt.
Íme a "home.html" tartalma:
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
Amint láthatod, az SSE hallgatása a böngészőben elég egyszerű.
Először is fel kell iratkoznia az SSE végpontunkra (a "/generate" oldalra). Aztán hozzá kell adnod egy eseményhallgatót, amely beolvassa a streamelt tokeneket, amint érkeznek.
A modern böngészők automatikusan megpróbálnak újracsatlakozni az eseményforrást, ha kapcsolati problémák merülnek fel.
Most már tudja, hogyan hozzon létre egy modern generatív AI alkalmazást, amely dinamikusan szövegfolyamot a böngészőben, à la ChatGPT!
Mint észrevetted, egy ilyen alkalmazás nem feltétlenül egyszerű, mivel számos rétegekből áll. És persze a fenti kódot túlságosan leegyszerűsítettük a példa kedvéért.
A token streaming fő kihívása a hálózati hibák kezelése. A legtöbb ezek a hibák a Go backend és a Javascript frontend között következnek be. A meg kell vizsgálnia néhány fejlettebb újrakapcsolási stratégiát, és gondoskodnia kell arról, hogy a hibák megfelelően jelezzük a hibákat a felhasználói felületnek.
Remélem, hogy hasznosnak találtad ezt a bemutatót!
Vincent
Fejlesztői tanácsadó az NLP Cloudnál