Generativním modelům někdy trvá nějakou dobu, než vrátí výsledek, takže je zajímavé využít streamování tokenů, aby se výsledek zobrazil v uživatelském rozhraní za běhu. Zde se dozvíte, jak můžete takového frontendu pro streamování textů pro svůj LLM dosáhnout pomocí Go, FastAPI a Javascriptu.
Připomínáme, že token je jedinečná entita, která může být buď malé slovo, část slova, nebo interpunkční znaménko. V průměru se 1 token skládá ze 4 znaků, a 100 tokenů odpovídá zhruba 75 slovům. Modely zpracování přirozeného jazyka musí váš text převést na tokeny, aby jej mohly zpracovat.
Při použití modelu umělé inteligence pro generování textu (známého také jako "generativní" model) může být doba odezvy poměrně dlouhá v závislosti na hardwaru a velikosti modelu. Například v případě velkého jazykového modelu (alsko známého jako "LLM"), jako je LLaMA 30B, nasazeného na grafickém procesoru NVIDIA A100 ve fp16, model vygeneruje 100 tokenů přibližně za 3 sekundy. Pokud tedy očekáváte, že váš generativní model vygeneruje velký kus textu o stovkách nebo tisících slov, latence bude vysoká a budete muset čekat třeba více než 10 sekund, abyste získali úplnou odpověď.
Dlouhé čekání na odpověď může být z hlediska uživatelské zkušenosti problém. Řešením je v takovém případě token streaming!
Token streaming spočívá v tom, že se každý nový token generuje za běhu, místo aby se čekalo, až bude připravena celá odpověď. To je to, co můžete vidět například v aplikaci ChatGPT nebo v asistentu NLP Cloud ChatDolphin. Slova se zobrazují ihned, jakmile je model vygeneruje. Asistenta s umělou inteligencí ChatDolphin si můžete vyzkoušet zde.
Streamování tokenů pomocí asistenta ChatDolphin v NLP Cloud. Zkuste to zde.
Prvním krokem bude využití inferenčního enginu, který podporuje token streaming.
Zde je několik možností, které byste mohli zvážit:
Zde je příklad s použitím metody HuggingFace generate():
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
V tomto příkladu generujeme výstup s modelem GPT-2 a každý token vypíšeme do konzole, jakmile dorazí.
Nyní, když jste si vybrali inferenční engine, je třeba obsluhovat váš model a vracet streamované tokeny.
Váš model bude s největší pravděpodobností spuštěn v prostředí Pythonu, takže pro vrácení tokenů budete potřebovat server Pythonu. a zpřístupnit je prostřednictvím rozhraní HTTP API. FastAPI se pro takové situace stalo de facto volbou.
Zde používáme Uvicorn a FastAPI's StreamingResponse, abychom každý token doručili ihned po jeho vygenerování. Zde je příklad:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Streamovací server můžete otestovat pomocí následujícího příkazu cURL:
curl -N localhost:8000
Nyní máme funkční model umělé inteligence, který správně vrací streamované tokeny.
Tyto streamované tokeny bychom mohli číst přímo z klientské aplikace v prohlížeči. To však neuděláme, a to ze dvou důvodů.
Za prvé, je důležité oddělit model umělé inteligence od zbytku zásobníku, protože nechceme model restartovat pokaždé, když se v něm objeví provedeme malou změnu v rozhraní API. Mějte na paměti, že moderní generativní modely AI jsou velmi těžké a jejich restartování často trvá několik minut.
Druhým důvodem je, že jazyk Python nemusí být nutně tou nejlepší volbou. pokud jde o budování vysoce výkonné souběžné aplikace, jakou se chystáme vytvořit. Tato volba lze samozřejmě diskutovat a může to být také otázka vkusu!
Jak bylo uvedeno výše, je důležité přidat bránu mezi váš model a konečného klienta, a Go je pro takovou aplikaci vhodným programovacím jazykem. V produkčním prostředí byste také mohli chtít přidat zpětnou vazbu. proxy mezi bránou Go a koncovým klientem a load balancer mezi bránou Go a modelem umělé inteligence, abyste mohli použít reverzní proxy mezi bránou Go a koncovým klientem. rozložit zátěž na několik replik vašeho modelu. Ale to je mimo rozsah našeho článku!
Naše aplikace Go bude mít na starosti také vykreslování konečné stránky HTML.
Tato aplikace provede požadavek na aplikaci FastAPI, přijme streamované tokeny od FastAPI a předá každý z nich dál. token pomocí událostí odeslaných serverem (SSE). SSE je jednodušší než webové sokety, protože je jednosměrný. Je to je dobrou volbou, pokud chcete vytvořit aplikaci, která odesílá informace klientovi, aniž by naslouchala potenciálnímu klientovi. odpovědi klienta.
Zde je kód Go (šablona HTML/JS/CSS bude ukázána v další části):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
Naše stránka "/home" vykresluje stránku HTML/CSS/JS (ukázáno později). Stránka "/start" obdrží požadavek POST od stránky JS aplikace, která spustí požadavek na náš model AI. A naše stránka "/generate" vrátí výsledek aplikaci JS prostřednictvím událostí zaslaných serverem.
Jakmile funkce start() obdrží požadavek POST z frontendu, automaticky vytvoří goroutine, který provede požadavek. do naší aplikace FastAPI.
Funkce generateText() volá rozhraní FastAPI a vrací každý token přijatý za běhu prostřednictvím vyhrazeného kanálu (streamedTextCh). Pokud je z FastAPI přijat znak EOF, znamená to, že generování textu skončilo.
Funkce generate() je nekonečná smyčka, která čeká na nové tokeny přijaté z kanálu streamedTextCh. Jakmile je přijat nový token, je automaticky odeslán do frontendu jako událost odeslaná serverem. Události odeslané serverem musí dodržovat specifické formátování, které používá "event:" a "data:". proto je zde funkce formatServerSentEvent().
Aby bylo SSE kompletní, potřebujeme klienta Javascriptu, který je schopen naslouchat událostem zasílaným serverem tím, že se přihlásí k odběru stránky "generate". Jak toho dosáhnout, se dozvíte v následující části.
Nyní je třeba vytvořit adresář "templates" a přidat do něj soubor "home.html".
Zde je obsah souboru "home.html":
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
Jak vidíte, poslech SSE v prohlížeči je poměrně jednoduchý.
Nejprve se musíte přihlásit k odběru našeho koncového bodu SSE (stránka "/generate"). pak je třeba přidat posluchače událostí, který bude načítat streamované tokeny, jakmile se objeví jsou přijaty.
Moderní prohlížeče se automaticky pokoušejí o opětovné připojení zdroj událostí v případě problémů s připojením.
Nyní víte, jak vytvořit moderní generativní aplikaci AI, která dynamicky streamuje text v prohlížeči, à la ChatGPT!
Jak jste si všimli, taková aplikace nemusí být nutně jednoduchá, protože několik vrstev. Výše uvedený kód je samozřejmě zjednodušený, aby se dalo příkladu.
Hlavní problém při streamování tokenů spočívá v řešení výpadků sítě. Většina těchto selhání dojde mezi backendem Go a frontendem Javascriptu. Budete budete muset prozkoumat některé pokročilejší strategie opětovného připojení a zajistit, aby chyby byly odstraněny. byly chyby správně hlášeny uživatelskému rozhraní.
Doufám, že pro vás byl tento návod užitečný!
Vincent
Zástupce vývojářů ve společnosti NLP Cloud