Os modelos generativos às vezes levam algum tempo para retornar um resultado, por isso é interessante aproveitar o streaming de token para ver o resultado aparecer rapidamente na interface do usuário. Aqui está como você pode conseguir um frontend de streaming de texto para seu LLM com Go, FastAPI e Javascript.
Como lembrete, um token é uma entidade única que pode ser uma pequena palavra, parte de uma palavra ou pontuação. Em média, 1 token é composto por 4 caracteres, e 100 tokens são aproximadamente equivalentes a 75 palavras. Os modelos de Processamento de Linguagem Natural precisam de transformar o seu texto em tokens para o processar.
Ao utilizar um modelo de IA de geração de texto (também conhecido como modelo "generativo"), o tempo de resposta pode ser bastante elevado, dependendo do seu hardware e do tamanho do seu modelo. Por exemplo, no caso de um modelo de linguagem grande (também conhecido como "LLM") como o LLaMA 30B, implementado numa GPU NVIDIA A100 em fp16, o modelo gera 100 tokens em cerca de 3 segundos. Assim, se espera que o seu modelo generativo gere um grande texto de centenas ou milhares de palavras, a latência será elevada e terá de esperar talvez mais de 10 segundos para obter a resposta completa.
Esperar muito tempo para obter uma resposta pode ser um problema do ponto de vista da experiência do utilizador. A solução nesse caso é o streaming de tokens!
O streaming de tokens consiste em gerar cada novo token em tempo real, em vez de esperar que toda a resposta esteja pronta. É isto que pode pode ver na aplicação ChatGPT, ou no assistente NLP Cloud ChatDolphin, por exemplo. As palavras aparecem assim que são geradas pelo modelo. Experimente o assistente de IA ChatDolphin aqui.
Transmissão de tokens com o assistente ChatDolphin no NLP Cloud. Experimente aqui.
O primeiro passo será utilizar um motor de inferência que suporte o fluxo de tokens.
Eis algumas opções que pode querer considerar:
Eis um exemplo que utiliza o método HuggingFace generate():
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
Neste exemplo, geramos uma saída com o modelo GPT-2 e imprimimos cada token na consola assim que ele chega.
Agora que escolheu um motor de inferência, terá de servir o seu modelo e devolver os tokens transmitidos.
O seu modelo será muito provavelmente executado num ambiente Python, pelo que necessitará de um servidor Python para devolver os tokens e disponibilizá-los através de uma API HTTP. A FastAPI tornou-se uma escolha de facto para estas situações.
Aqui usamos o Uvicorn e o StreamingResponse da FastAPI para servir cada token assim que ele é gerado. Aqui está um exemplo:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Pode testar o seu servidor de streaming graças ao seguinte comando cURL:
curl -N localhost:8000
Agora temos um modelo de IA funcional que está a devolver corretamente os tokens transmitidos.
Poderíamos ler diretamente estes tokens transmitidos a partir de uma aplicação cliente num browser. Mas não o vamos fazer, por duas razões.
Em primeiro lugar, é importante dissociar o modelo de IA do resto da pilha porque não queremos reiniciar o modelo sempre que vamos fazer uma pequena alteração na API. Lembre-se de que os modelos modernos de IA generativa são muito pesados e muitas vezes levam vários minutos para serem reiniciados.
Uma segunda razão é que Python não é necessariamente a melhor escolha quando se trata de construir uma aplicação concorrente de alto rendimento como a que vamos fazer. Esta escolha pode ser discutida, é claro, e também pode ser uma questão de gosto!
Como já foi referido, é importante acrescentar uma porta de entrada entre o modelo e o cliente final, e Go é uma boa linguagem de programação para esse tipo de aplicação. Em produção, você também pode querer adicionar um proxy reverso entre o gateway Go e o cliente final, e um balanceador de carga entre o gateway Go e o modelo de IA para para distribuir a carga em várias réplicas do seu modelo. Mas isso está fora do escopo do nosso artigo!
A nossa aplicação Go também se encarregará de renderizar a página HTML final.
Esta aplicação faz um pedido à aplicação FastAPI, recebe os tokens transmitidos da FastAPI e encaminha cada token para o front-end usando eventos enviados pelo servidor (SSE). O SSE é mais simples que os websockets porque é unidirecional. Ele é uma boa escolha quando se deseja criar um aplicativo que envia informações para um cliente, sem ouvir uma possível resposta do cliente.
Aqui está o código Go (o modelo HTML/JS/CSS será apresentado na secção seguinte):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
Nossa página "/home" renderiza a página HTML/CSS/JS (mostrada mais adiante). A página "/start" recebe um pedido POST da aplicação aplicação JS que acciona um pedido ao nosso modelo de IA. E a nossa página "/generate" devolve o resultado à aplicação JS através de eventos enviados pelo servidor.
Quando a função start() recebe uma solicitação POST do frontend, ela cria automaticamente uma goroutine que fará uma solicitação ao nosso aplicativo FastAPI.
A função generateText() chama a FastAPI e devolve todos os tokens recebidos em tempo real através de um canal dedicado (streamedTextCh). Se o carácter EOF for recebido da FastAPI, isso significa que a geração de texto terminou.
A função generate() é um loop infinito que espera por novos tokens recebidos do canal streamedTextCh. Quando um novo token é recebido, ele é automaticamente enviado para o front-end como um evento enviado pelo servidor. Os eventos enviados pelo servidor precisam seguir uma formatação específica que usa os prefixos "event:" e "data:". daí a função formatServerSentEvent().
Para que o SSE esteja completo, precisamos de um cliente Javascript que seja capaz de ouvir os eventos enviados pelo servidor, subscrevendo a página "generate". Veja a próxima secção para perceber como conseguir isso.
Agora, é necessário criar um diretório "templates" e adicionar um ficheiro "home.html" dentro dele.
Aqui está o conteúdo de "home.html":
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
Como pode ver, ouvir o SSE no browser é bastante simples.
Primeiro, é necessário subscrever o nosso ponto de extremidade SSE (a página "/generate"). Em seguida, é necessário adicionar um ouvinte de eventos que lerá os tokens transmitidos assim que assim que eles forem recebidos.
Os browsers modernos tentam automaticamente restabelecer a ligação a origem do evento em caso de problemas de ligação.
Agora já sabe como criar uma aplicação moderna de IA generativa que dinamicamente transmite texto no browser, à la ChatGPT!
Como se apercebeu, uma aplicação deste tipo não é necessariamente simples, uma vez que estão envolvidas várias camadas estão envolvidas. E, como é óbvio, o código acima está demasiado simplificado para efeitos do exemplo.
O principal desafio do streaming de tokens é lidar com as falhas da rede. A maioria das essas falhas ocorrerão entre o backend Go e o frontend Javascript. Você precisará precisará explorar algumas estratégias de reconexão mais avançadas e garantir que os erros sejam corretamente relatados para a interface do usuário.
Espero que este tutorial lhe tenha sido útil!
Vincent
Advogado do programador na NLP Cloud