Modele generatywne czasami potrzebują trochę czasu, aby zwrócić wynik, więc interesujące jest wykorzystanie strumieniowania tokenów, aby zobaczyć wynik w locie w interfejsie użytkownika. Oto, w jaki sposób można uzyskać taki frontend strumieniowania tekstu dla LLM za pomocą Go, FastAPI i Javascript.
Dla przypomnienia, token jest unikalną jednostką, która może być małym słowem, częścią słowa lub interpunkcją. Średnio 1 token składa się z 4 znaków, a 100 tokenów odpowiada mniej więcej 75 słowom. Modele przetwarzania języka naturalnego muszą przekształcić tekst w tokeny, aby go przetworzyć.
W przypadku korzystania z modelu sztucznej inteligencji do generowania tekstu (znanego również jako model "generatywny"), czas reakcji może być dość wysoki, w zależności od posiadanego sprzętu i rozmiaru modelu. Na przykład, w przypadku dużego modelu językowego (znanego również jako "LLM"), takiego jak LLaMA 30B, wdrożonego na procesorze graficznym NVIDIA A100 w fp16, model generuje 100 tokenów w około 3 sekundy. Jeśli więc spodziewasz się, że Twój model generatywny wygeneruje duży fragment tekstu składający się z setek lub tysięcy słów, opóźnienie będzie wysokie i będziesz musiał poczekać być może dłużej niż 10 sekund, aby uzyskać pełną odpowiedź.
Długie oczekiwanie na odpowiedź może stanowić problem z punktu widzenia użytkownika. Rozwiązaniem w takim przypadku jest streaming tokenów!
Token streaming polega na generowaniu każdego nowego tokena w locie, zamiast czekać, aż cała odpowiedź będzie gotowa. To jest to, co można zobaczyć na przykład w aplikacji ChatGPT lub w asystencie NLP Cloud ChatDolphin. Słowa pojawiają się, gdy tylko zostaną wygenerowane przez model. Wypróbuj asystenta ChatDolphin AI tutaj.
Token streaming z asystentem ChatDolphin na NLP Cloud. Wypróbuj tutaj.
Pierwszym krokiem będzie wykorzystanie silnika wnioskowania, który obsługuje strumieniowe przesyłanie tokenów.
Oto kilka opcji, które warto rozważyć:
Oto przykład wykorzystujący metodę generate() HuggingFace:
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
W tym przykładzie generujemy dane wyjściowe za pomocą modelu GPT-2 i wypisujemy każdy token w konsoli, gdy tylko się pojawi.
Teraz, gdy wybrałeś silnik wnioskowania, będziesz musiał obsłużyć swój model i zwrócić przesyłane strumieniowo tokeny.
Twój model będzie najprawdopodobniej działał w środowisku Python, więc będziesz potrzebował serwera Python, aby zwrócić tokeny i udostępniania ich za pośrednictwem interfejsu API HTTP. FastAPI stał się de facto wyborem w takich sytuacjach.
Tutaj używamy Uvicorn i FastAPI StreamingResponse, aby obsłużyć każdy token, gdy tylko zostanie wygenerowany. Oto przykład:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Możesz przetestować swój serwer streamingowy dzięki następującemu poleceniu cURL:
curl -N localhost:8000
Mamy teraz działający model sztucznej inteligencji, który prawidłowo zwraca przesyłane strumieniowo tokeny.
Moglibyśmy bezpośrednio odczytać te strumieniowane tokeny z aplikacji klienckiej w przeglądarce. Ale nie zamierzamy tego robić z dwóch powodów.
Po pierwsze, ważne jest, aby oddzielić model sztucznej inteligencji od reszty stosu, ponieważ nie chcemy restartować modelu za każdym razem, gdy zamierzamy wprowadzić niewielką zmianę w API. Należy pamiętać, że nowoczesne generatywne modele AI są bardzo ciężkie i często ich ponowne uruchomienie zajmuje kilka minut.
Drugim powodem jest to, że Python niekoniecznie jest najlepszym wyborem jeśli chodzi o tworzenie aplikacji współbieżnych o wysokiej przepustowości, tak jak zamierzamy to zrobić. Ten wybór można oczywiście przedyskutować i może to być również kwestia gustu!
Jak wspomniano powyżej, dodanie bramki między modelem a klientem końcowym jest ważne, a Go jest dobrym językiem programowania dla takiej aplikacji. W środowisku produkcyjnym warto również dodać odwrotne proxy proxy między bramą Go a klientem końcowym oraz load balancer między bramą Go a modelem AI w celu aby rozłożyć obciążenie na kilka replik modelu. Jest to jednak poza zakresem naszego artykułu!
Nasza aplikacja Go będzie również odpowiedzialna za renderowanie końcowej strony HTML.
Ta aplikacja wysyła żądanie do aplikacji FastAPI, odbiera przesyłane strumieniowo tokeny z FastAPI i przekazuje każdy token do frontendu za pomocą Server Sent Events (SSE). SSE jest prostsze niż websockets, ponieważ jest jednokierunkowe. Jest to jest dobrym wyborem, gdy chcesz zbudować aplikację, która przesyła informacje do klienta, bez nasłuchiwania potencjalnej odpowiedzi klienta. odpowiedź klienta.
Oto kod Go (szablon HTML/JS/CSS zostanie pokazany w następnej sekcji):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
Nasza strona "/home" renderuje stronę HTML/CSS/JS (pokazaną później). Strona "/start" otrzymuje żądanie POST z aplikacji JS, które wyzwala żądanie do naszego modelu AI. Nasza strona "/generate" zwraca wynik do aplikacji JS za pośrednictwem zdarzeń wysyłanych przez serwer.
Gdy funkcja start() otrzyma żądanie POST z frontendu, automatycznie utworzy goroutine, który wykona żądanie do naszej aplikacji do naszej aplikacji FastAPI.
Funkcja generateText() wywołuje FastAPI i zwraca każdy token otrzymany w locie przez dedykowany kanał (streamedTextCh). Jeśli z FastAPI zostanie odebrany znak EOF, oznacza to, że generowanie tekstu zostało zakończone.
Funkcja generate() jest nieskończoną pętlą, która czeka na nowe tokeny otrzymane z kanału streamedTextCh. Po otrzymaniu nowego tokena, jest on automatycznie przesyłany do interfejsu użytkownika jako zdarzenie wysłane przez serwer. Zdarzenia wysyłane przez serwer muszą być zgodne z określonym formatowaniem, które wykorzystuje prefiksy "event:" i "data:" stąd funkcja formatServerSentEvent().
Aby SSE było kompletne, potrzebujemy klienta Javascript, który jest w stanie nasłuchiwać zdarzeń wysyłanych przez serwer poprzez subskrybowanie strony "generate". Zobacz następną sekcję, aby zrozumieć, jak to osiągnąć.
Teraz należy utworzyć katalog "templates" i dodać do niego plik "home.html".
Oto zawartość pliku "home.html":
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
Jak widać, słuchanie SSE w przeglądarce jest dość proste.
Najpierw należy zasubskrybować nasz punkt końcowy SSE (strona "/generate"). Następnie należy dodać detektor zdarzeń, który będzie odczytywał przesyłane strumieniowo tokeny, gdy tylko gdy tylko zostaną odebrane.
Nowoczesne przeglądarki automatycznie próbują połączyć się ponownie źródła zdarzeń w przypadku problemów z połączeniem.
Wiesz już, jak stworzyć nowoczesną aplikację generatywnej sztucznej inteligencji, która dynamicznie strumieniuje tekst w przeglądarce, à la ChatGPT!
Jak zauważyłeś, taka aplikacja niekoniecznie jest prosta, ponieważ w grę wchodzi kilka warstw. Oczywiście powyższy kod jest nadmiernie uproszczony na potrzeby przykładu. przykładu.
Głównym wyzwaniem związanym ze streamingiem tokenów jest obsługa awarii sieci. Większość tych awarii wystąpi między backendem Go a frontendem Javascript. Będziesz musiał trzeba będzie zbadać bardziej zaawansowane strategie ponownego łączenia i upewnić się, że błędy są prawidłowo zgłaszane do interfejsu użytkownika.
Mam nadzieję, że ten poradnik okazał się przydatny!
Vincent
Rzecznik programistów w NLP Cloud