How To Develop A Token Streaming UI For Your LLM With Go, FastAPI And JS

Generative models sometimes take some time to return a result, so it is interesting to leverage token streaming in order to see the result appear on the fly in the UI. Here is how you can achieve such a text streaming frontend for your LLM with Go, FastAPI and Javascript.

Developer on PC

What Is Token Streaming?

As a reminder, a token is a unique entity that can either be a small word, part of a word, or punctuation. On average, 1 token is made up of 4 characters, and 100 tokens are roughly equivalent to 75 words. Natural Language Processing models need to turn your text into tokens in order to process it.

When using a text generation AI model (also known as "generative" model), the response time can be quite high, depending on your hardware and the size of your model. For example, in case of a large language model (alsko known as "LLM") like LLaMA 30B, deployed on an NVIDIA A100 GPU in fp16, the model generates 100 tokens in around 3 seconds. So if you expect your generative model to generate a large piece of text of hundreds or thousands of words, the latency will be high and you will need to wait maybe more than 10 seconds to get the full response.

Waiting for so long in order to get a response can be a problem from a user experience standpoint. The solution in that case is token streaming!

Token streaming is about generating every new token on the fly instead of waiting for the whole response to be ready. This is what you can see on the ChatGPT app, or on NLP Cloud's ChatDolphin assistant for example. Words appear as soon as they are generated by the model. Try the ChatDolphin AI assistant here.

Token streaming with ChatDolphin on NLP Cloud Token streaming with the ChatDolphin assistant on NLP Cloud. Try it here.

Note that you can easily use token streaming through the NLP Cloud API. See our documentation for more details.

Selecting An Inference Engine That Supports Token Streaming

The first step will be for you to leverage an inference engine that supports token streaming.

Here are a some options you might want to consider:

Here is an example using the HuggingFace generate() method:

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

In this example, we generate an output with the GPT-2 model and we print each token in the console as soon as it arrives.

Streaming The Response With FastAPI

Now that you have chosen an inference engine, you will need to serve your model and return the streamed tokens.

Your model will most likely run in a Python environment so you will need a Python server in order to return the tokens and make them available through an HTTP API. FastAPI has become a de facto choice for such situations.

Here we use Uvicorn and FastAPI's StreamingResponse in order to serve each token as soon as it is generated. Here is an example:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

You can test your streaming server thanks to the following cURL command:

curl -N localhost:8000

We now have a working AI model that is properly returning streamed tokens.

We could directly read these streamed tokens from a client application in a browser. But we are not going to do that, for 2 reasons.

First, it is important to decouple the AI model from the rest of the stack because we do not want to restart the model every time we are going to make a small change to the API. Keep in mind that modern generative AI models are very heavy and often take several minutes to restart.

A second reason is that Python is not necessarily the best choice when it comes to building a high throughput concurrent application like we are going to do. This choice can be discussed of course and it might also be a matter of taste!

Forwarding Tokens Through A Go Gateway

As mentioned above, adding a gateway between your model and your final client is important, and Go is a good programming language for such an application. In production, you might also want to add a reverse proxy between the Go gateway and the final client, and a load balancer between your Go gateway and your AI model in order to spread the load on several replicas of your model. But it is out of the scope of our article!

Our Go application will also be in charge of rendering the final HTML page.

This application makes a request to the FastAPI app, receives the streamed tokens from FastAPI, and forwards each token to the frontend using Server Sent Events (SSE). SSE is simpler than websockets because it is unidirectional. It is a good choice when you want to build an application that pushes information to a client, without listening to a potential client response.

Here is the Go code (the HTML/JS/CSS template will be showed in the next section):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Our "/home" page renders the HTML/CSS/JS page (showed later). The "/start" page receives a POST request from the JS application that triggers a request to our AI model. And our "/generate" page returns the result to the JS app through server sent events.

Once the start() function receives a POST request from the frontend, it automatically creates a goroutine that will make a request to our FastAPI app.

The generateText() function calls FastAPI and returns every token received on the fly through a dedicated channel (streamedTextCh). If the EOF character is received from FastAPI, it means that text generation is over.

The generate() function is an infinite loop that waits for new tokens received from the streamedTextCh channel. Once a new token is received, it is automatically pushed to the frontend as a server sent event. Server sent events need to follow a specific formatting that uses "event:" and "data:" prefixes, hence the formatServerSentEvent() function.

In order for SSE to be complete, we need a Javascript client that is able to listen to server sent events by subscribing to the "generate" page. See the next section to understand how to achieve that.

Receiving Tokens With Javascript In The Browser

You now need to create a "templates" directory and add a "home.html" file inside it.

Here is the content of "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

As you can see, listening to SSE in the browser is quite straightforward.

First you need to subscribe to our SSE endpoint (the "/generate" page). then you need to add an event listener that will read the streamed tokens as soon as they are received.

Modern browsers automatically try to reconnect the event source in case of connection problems.

Conclusion

You now know how to create a modern generative AI application that dynamically streams text in the browser, à la ChatGPT!

As you as noticed, such an application is not necessarily simple as several layers are involved. And of course the above code is oversimplified for the sake of the example.

The main challenge with token streaming is about handling network failures. Most of these failures will happen between the Go backend and the Javascript frontend. You will need to explore some more advanced reconnection strategies and make sure that errors are properly reported to the UI.

I hope that you found this tutorial useful!

Vincent
Developer Advocate at NLP Cloud