Está a ter dificuldades com a IA ou com o desenvolvimento full-stack? Os nossos especialistas estão aqui para o orientar: aconselhamento personalizado, integração técnica e muito mais. Entre em contacto com [email protected].

Como desenvolver uma interface de usuário de streaming de token para seu LLM com Go, FastAPI e JS

Os modelos generativos às vezes levam algum tempo para retornar um resultado, por isso é interessante aproveitar o streaming de token para ver o resultado aparecer rapidamente na interface do usuário. Aqui está como você pode conseguir um frontend de streaming de texto para seu LLM com Go, FastAPI e Javascript.

Programador no PC

O que é o Token Streaming?

Como lembrete, um token é uma entidade única que pode ser uma pequena palavra, parte de uma palavra ou pontuação. Em média, 1 token é composto por 4 caracteres, e 100 tokens são aproximadamente equivalentes a 75 palavras. Os modelos de Processamento de Linguagem Natural precisam de transformar o seu texto em tokens para o processar.

Ao utilizar um modelo de IA de geração de texto (também conhecido como modelo "generativo"), o tempo de resposta pode ser bastante elevado, dependendo do seu hardware e do tamanho do seu modelo. Por exemplo, no caso de um modelo de linguagem grande (também conhecido como "LLM") como o LLaMA 30B, implementado numa GPU NVIDIA A100 em fp16, o modelo gera 100 tokens em cerca de 3 segundos. Assim, se espera que o seu modelo generativo gere um grande texto de centenas ou milhares de palavras, a latência será elevada e terá de esperar talvez mais de 10 segundos para obter a resposta completa.

Esperar muito tempo para obter uma resposta pode ser um problema do ponto de vista da experiência do utilizador. A solução nesse caso é o streaming de tokens!

O streaming de tokens consiste em gerar cada novo token em tempo real, em vez de esperar que toda a resposta esteja pronta. É isto que pode pode ver na aplicação ChatGPT, ou no assistente NLP Cloud ChatDolphin, por exemplo. As palavras aparecem assim que são geradas pelo modelo. Experimente o assistente de IA ChatDolphin aqui.

Streaming de tokens com ChatDolphin no NLP Cloud Transmissão de tokens com o assistente ChatDolphin no NLP Cloud. Experimente aqui.

Seleção de um motor de inferência que suporte o fluxo de tokens

O primeiro passo será utilizar um motor de inferência que suporte o fluxo de tokens.

Eis algumas opções que pode querer considerar:

Eis um exemplo que utiliza o método HuggingFace generate():

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

Neste exemplo, geramos uma saída com o modelo GPT-2 e imprimimos cada token na consola assim que ele chega.

Transmissão em fluxo contínuo da resposta com FastAPI

Agora que escolheu um motor de inferência, terá de servir o seu modelo e devolver os tokens transmitidos.

O seu modelo será muito provavelmente executado num ambiente Python, pelo que necessitará de um servidor Python para devolver os tokens e disponibilizá-los através de uma API HTTP. A FastAPI tornou-se uma escolha de facto para estas situações.

Aqui usamos o Uvicorn e o StreamingResponse da FastAPI para servir cada token assim que ele é gerado. Aqui está um exemplo:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Pode testar o seu servidor de streaming graças ao seguinte comando cURL:

curl -N localhost:8000

Agora temos um modelo de IA funcional que está a devolver corretamente os tokens transmitidos.

Poderíamos ler diretamente estes tokens transmitidos a partir de uma aplicação cliente num browser. Mas não o vamos fazer, por duas razões.

Em primeiro lugar, é importante dissociar o modelo de IA do resto da pilha porque não queremos reiniciar o modelo sempre que vamos fazer uma pequena alteração na API. Lembre-se de que os modelos modernos de IA generativa são muito pesados e muitas vezes levam vários minutos para serem reiniciados.

Uma segunda razão é que Python não é necessariamente a melhor escolha quando se trata de construir uma aplicação concorrente de alto rendimento como a que vamos fazer. Esta escolha pode ser discutida, é claro, e também pode ser uma questão de gosto!

Encaminhamento de tokens através de um gateway Go

Como já foi referido, é importante acrescentar uma porta de entrada entre o modelo e o cliente final, e Go é uma boa linguagem de programação para esse tipo de aplicação. Em produção, você também pode querer adicionar um proxy reverso entre o gateway Go e o cliente final, e um balanceador de carga entre o gateway Go e o modelo de IA para para distribuir a carga em várias réplicas do seu modelo. Mas isso está fora do escopo do nosso artigo!

A nossa aplicação Go também se encarregará de renderizar a página HTML final.

Esta aplicação faz um pedido à aplicação FastAPI, recebe os tokens transmitidos da FastAPI e encaminha cada token para o front-end usando eventos enviados pelo servidor (SSE). O SSE é mais simples que os websockets porque é unidirecional. Ele é uma boa escolha quando se deseja criar um aplicativo que envia informações para um cliente, sem ouvir uma possível resposta do cliente.

Aqui está o código Go (o modelo HTML/JS/CSS será apresentado na secção seguinte):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Nossa página "/home" renderiza a página HTML/CSS/JS (mostrada mais adiante). A página "/start" recebe um pedido POST da aplicação aplicação JS que acciona um pedido ao nosso modelo de IA. E a nossa página "/generate" devolve o resultado à aplicação JS através de eventos enviados pelo servidor.

Quando a função start() recebe uma solicitação POST do frontend, ela cria automaticamente uma goroutine que fará uma solicitação ao nosso aplicativo FastAPI.

A função generateText() chama a FastAPI e devolve todos os tokens recebidos em tempo real através de um canal dedicado (streamedTextCh). Se o carácter EOF for recebido da FastAPI, isso significa que a geração de texto terminou.

A função generate() é um loop infinito que espera por novos tokens recebidos do canal streamedTextCh. Quando um novo token é recebido, ele é automaticamente enviado para o front-end como um evento enviado pelo servidor. Os eventos enviados pelo servidor precisam seguir uma formatação específica que usa os prefixos "event:" e "data:". daí a função formatServerSentEvent().

Para que o SSE esteja completo, precisamos de um cliente Javascript que seja capaz de ouvir os eventos enviados pelo servidor, subscrevendo a página "generate". Veja a próxima secção para perceber como conseguir isso.

Receber tokens com Javascript no navegador

Agora, é necessário criar um diretório "templates" e adicionar um ficheiro "home.html" dentro dele.

Aqui está o conteúdo de "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

Como pode ver, ouvir o SSE no browser é bastante simples.

Primeiro, é necessário subscrever o nosso ponto de extremidade SSE (a página "/generate"). Em seguida, é necessário adicionar um ouvinte de eventos que lerá os tokens transmitidos assim que assim que eles forem recebidos.

Os browsers modernos tentam automaticamente restabelecer a ligação a origem do evento em caso de problemas de ligação.

Conclusão

Agora já sabe como criar uma aplicação moderna de IA generativa que dinamicamente transmite texto no browser, à la ChatGPT!

Como se apercebeu, uma aplicação deste tipo não é necessariamente simples, uma vez que estão envolvidas várias camadas estão envolvidas. E, como é óbvio, o código acima está demasiado simplificado para efeitos do exemplo.

O principal desafio do streaming de tokens é lidar com as falhas da rede. A maioria das essas falhas ocorrerão entre o backend Go e o frontend Javascript. Você precisará precisará explorar algumas estratégias de reconexão mais avançadas e garantir que os erros sejam corretamente relatados para a interface do usuário.

Espero que este tutorial lhe tenha sido útil!

Vincent
Advogado do programador na NLP Cloud