Haben Sie Probleme mit KI oder Full-Stack-Entwicklung? Unsere Experten sind für Sie da: maßgeschneiderte Beratung, technische Integration und mehr. Erreichen Sie uns unter [email protected].

Wie man mit Go, FastAPI und JS eine Token-Streaming-UI für sein LLM entwickelt

Generative Modelle brauchen manchmal einige Zeit, um ein Ergebnis zu liefern. Daher ist es interessant, Token-Streaming zu nutzen, um das Ergebnis sofort in der Benutzeroberfläche erscheinen zu lassen. Hier erfahren Sie, wie Sie ein solches Text-Streaming-Frontend für Ihr LLM mit Go, FastAPI und Javascript realisieren können.

Entwickler auf PC

Was ist Token-Streaming?

Zur Erinnerung: Ein Token ist eine eindeutige Einheit, die entweder ein kleines Wort, ein Teil eines Wortes oder eine Interpunktion sein kann. Im Durchschnitt besteht 1 Token aus 4 Zeichen, und 100 Token entsprechen ungefähr 75 Wörtern. Modelle zur Verarbeitung natürlicher Sprache müssen Ihren Text in Token umwandeln, um ihn verarbeiten zu können.

Bei der Verwendung eines KI-Modells zur Texterzeugung (auch als "generatives" Modell bezeichnet) kann die Reaktionszeit je nach Hardware und Größe des Modells recht hoch sein. Im Falle eines großen Sprachmodells (auch bekannt als "LLM") wie LLaMA 30B, das auf einem NVIDIA A100-Grafikprozessor in fp16 eingesetzt wird, generiert das Modell beispielsweise 100 Token in etwa 3 Sekunden. Wenn Sie also erwarten, dass Ihr generatives Modell einen großen Text mit Hunderten oder Tausenden von Wörtern generiert, wird die Latenzzeit hoch sein und Sie müssen vielleicht mehr als 10 Sekunden warten, um die vollständige Antwort zu erhalten.

Wenn man so lange auf eine Antwort warten muss, kann das aus Sicht der Benutzer ein Problem darstellen. Die Lösung in diesem Fall ist Token-Streaming!

Beim Token-Streaming geht es darum, jeden neuen Token sofort zu generieren, anstatt zu warten, bis die gesamte Antwort fertig ist. Das ist es, was Sie in der ChatGPT-App oder im NLP Cloud ChatDolphin-Assistenten zu sehen. Die Wörter erscheinen, sobald sie vom Modell generiert werden. Probieren Sie den KI-Assistenten ChatDolphin hier aus.

Token-Streaming mit ChatDolphin auf der NLP Cloud Token-Streaming mit dem ChatDolphin-Assistenten auf der NLP Cloud. Versuchen Sie es hier.

Auswahl einer Inferenzmaschine, die das Token-Streaming unterstützt

Der erste Schritt besteht darin, eine Inferenzmaschine zu nutzen, die Token-Streaming unterstützt.

Hier sind einige Optionen, die Sie vielleicht in Betracht ziehen sollten:

Hier ist ein Beispiel, das die Methode HuggingFace generate() verwendet:

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

In diesem Beispiel erzeugen wir eine Ausgabe mit dem GPT-2-Modell und geben jedes Token in der Konsole aus, sobald es eintrifft.

Streaming der Antwort mit FastAPI

Nachdem Sie nun eine Inferenzmaschine ausgewählt haben, müssen Sie Ihr Modell bedienen und die gestreamten Token zurückgeben.

Ihr Modell wird höchstwahrscheinlich in einer Python-Umgebung laufen, so dass Sie einen Python-Server benötigen, um die Token zurückzugeben und sie über eine HTTP-API verfügbar zu machen. FastAPI hat sich in solchen Situationen als Standardlösung etabliert.

Hier verwenden wir Uvicorn und FastAPI's StreamingResponse, um jedes Token zu liefern, sobald es generiert wurde. Hier ist ein Beispiel:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Sie können Ihren Streaming-Server mit dem folgenden cURL-Befehl testen:

curl -N localhost:8000

Wir haben jetzt ein funktionierendes KI-Modell, das ordnungsgemäß gestreamte Token zurückgibt.

Wir könnten diese gestreamten Token direkt von einer Client-Anwendung in einem Browser lesen. Aber das werden wir aus zwei Gründen nicht tun.

Erstens ist es wichtig, das KI-Modell vom Rest des Stacks zu entkoppeln vom Rest des Stacks zu entkoppeln, denn wir wollen das Modell nicht jedes Mal neu starten, wenn wir wenn wir eine kleine Änderung an der API vornehmen. Denken Sie daran, dass moderne generative KI-Modelle sehr schwer sind und oft mehrere Minuten für einen Neustart benötigen.

Ein zweiter Grund ist, dass Python nicht unbedingt die beste Wahl ist ist, wenn es darum geht, eine gleichzeitige Anwendung mit hohem Durchsatz zu erstellen, wie wir es vorhaben. Diese Wahl kann natürlich diskutiert werden und ist vielleicht auch eine Frage des Geschmacks!

Weiterleitung von Token durch ein Go-Gateway

Wie bereits erwähnt, ist es wichtig, ein Gateway zwischen Ihrem Modell und Ihrem Endkunden einzubauen, und Go ist eine gute Programmiersprache für eine solche Anwendung. In der Produktion möchten Sie vielleicht auch einen Reverse Proxy zwischen dem Go-Gateway und dem endgültigen Client sowie einen Load Balancer zwischen dem Go-Gateway und dem KI-Modell um die Last auf mehrere Replikate Ihres Modells zu verteilen. Aber das würde den Rahmen dieses Artikels sprengen!

Unsere Go-Anwendung wird auch für das Rendern der endgültigen HTML-Seite zuständig sein.

Diese Anwendung stellt eine Anfrage an die FastAPI-Anwendung, empfängt die gestreamten Token von FastAPI und leitet jedes Token über Server Sent Events (SSE) an das Frontend weiter. SSE ist einfacher als Websockets, da es unidirektional ist. Es ist eine gute Wahl, wenn Sie eine Anwendung erstellen wollen, die Informationen an einen Client sendet, ohne auf eine mögliche Antwort des Clients zu hören.

Hier ist der Go-Code (die HTML/JS/CSS-Vorlage wird im nächsten Abschnitt gezeigt):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Unsere "/home"-Seite rendert die HTML/CSS/JS-Seite (siehe später). Die Seite "/start" empfängt eine POST-Anfrage von der JS-Anwendung, die eine Anfrage an unser KI-Modell auslöst. Und unsere "/generate"-Seite gibt das Ergebnis über vom Server gesendete Ereignisse an die JS-Anwendung zurück.

Sobald die start()-Funktion eine POST-Anfrage vom Frontend erhält, erstellt sie automatisch eine Goroutine, die eine Anfrage an unsere FastAPI-Anwendung stellt.

Die Funktion generateText() ruft FastAPI auf und gibt jedes Token zurück, das über einen speziellen Kanal (streamedTextCh) empfangen wurde. Wenn das EOF-Zeichen von FastAPI empfangen wird, bedeutet dies, dass die Texterzeugung beendet ist.

Die Funktion generate() ist eine Endlosschleife, die auf neue Token wartet, die vom streamedTextCh-Kanal empfangen werden. Sobald ein neues Token empfangen wird, wird es automatisch als ein vom Server gesendetes Ereignis an das Frontend weitergeleitet. Vom Server gesendete Ereignisse müssen einer bestimmten Formatierung folgen, die die Präfixe "event:" und "data:" Präfixe verwenden, daher die Funktion formatServerSentEvent().

Damit SSE vollständig ist, benötigen wir einen Javascript-Client, der in der Lage ist, auf vom Server gesendete Ereignisse zu hören, indem er die Seite "generate" abonniert. Im nächsten Abschnitt erfahren Sie, wie Sie das erreichen können.

Empfang von Token mit Javascript im Browser

Sie müssen nun ein Verzeichnis "templates" erstellen und darin eine Datei "home.html" einfügen.

Hier ist der Inhalt von "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

Wie Sie sehen, ist das Abhören von SSE im Browser recht einfach.

Zunächst müssen Sie unseren SSE-Endpunkt abonnieren (die Seite "/generate"). Dann müssen Sie einen Ereignislistener hinzufügen, der die gestreamten Token liest, sobald sie empfangen werden.

Moderne Browser versuchen automatisch, die Verbindung die Ereignisquelle im Falle von Verbindungsproblemen neu zu verbinden.

Schlussfolgerung

Sie wissen jetzt, wie man eine moderne generative KI-Anwendung erstellt, die dynamisch die dynamisch Text im Browser streamt, à la ChatGPT!

Wie Sie feststellen konnten, ist eine solche Anwendung nicht unbedingt einfach, da mehrere Schichten beteiligt sind. Und natürlich ist der obige Code zugunsten des Beispiels stark vereinfacht. Beispiels.

Die größte Herausforderung beim Token-Streaming ist der Umgang mit Netzwerkausfällen. Die meisten dieser dieser Ausfälle werden zwischen dem Go-Backend und dem Javascript-Frontend auftreten. Sie werden müssen Sie einige fortgeschrittene Wiederverbindungsstrategien erforschen und sicherstellen, dass Fehler ordnungsgemäß an die Benutzeroberfläche gemeldet werden.

Ich hoffe, dass Sie diese Anleitung nützlich fanden!

Vincent
Berater für Entwickler bei NLP Cloud