Cómo desarrollar una interfaz de usuario de flujo de tokens para su LLM con Go, FastAPI y JS

Los modelos generativos a veces tardan algún tiempo en devolver un resultado, por lo que es interesante aprovechar el streaming de tokens para ver el resultado aparecer sobre la marcha en la interfaz de usuario. A continuación se muestra cómo se puede lograr un frontend de streaming de texto para su LLM con Go, FastAPI y Javascript.

Desarrollador en PC

¿Qué es Token Streaming?

Como recordatorio, un token es una entidad única que puede ser una palabra pequeña, parte de una palabra o un signo de puntuación. Por término medio, un token se compone de 4 caracteres, y 100 tokens equivalen aproximadamente a 75 palabras. Los modelos de Procesamiento del Lenguaje Natural necesitan convertir su texto en tokens para poder procesarlo.

Cuando se utiliza un modelo de IA de generación de texto (también conocido como modelo "generativo"), el tiempo de respuesta puede ser bastante elevado, dependiendo de tu hardware y del tamaño de tu modelo. Por ejemplo, en el caso de un modelo lingüístico de gran tamaño (también conocido como "LLM") como el LLaMA 30B, implementado en una GPU NVIDIA A100 en fp16, el modelo genera 100 tokens en unos 3 segundos. Así que si esperas que tu modelo generativo genere un texto grande de cientos o miles de palabras, la latencia será alta y tendrás que esperar quizás más de 10 segundos para obtener la respuesta completa.

Esperar tanto tiempo para obtener una respuesta puede ser un problema desde el punto de vista de la experiencia del usuario. La solución en ese caso es el streaming de tokens.

El streaming de tokens consiste en generar cada nuevo token sobre la marcha en lugar de esperar a que toda la respuesta esté lista. Esto es lo que puede ver en la aplicación ChatGPT, o en el asistente ChatDolphin de NLP Cloud, por ejemplo. Las palabras aparecen tan pronto como son generadas por el modelo. Prueba el asistente ChatDolphin AI aquí.

Token streaming con ChatDolphin en NLP Cloud Token streaming con el asistente ChatDolphin en NLP Cloud. Pruébelo aquí.

Selección de un motor de inferencia compatible con el flujo de tokens

El primer paso será aprovechar un motor de inferencia que admita el flujo de tokens.

He aquí algunas opciones que puede considerar:

He aquí un ejemplo que utiliza el método HuggingFace generate():

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

En este ejemplo, generamos una salida con el modelo GPT-2 e imprimimos cada token en la consola en cuanto llega.

Transmisión de la respuesta con FastAPI

Ahora que ha elegido un motor de inferencia, tendrá que servir su modelo y devolver los tokens transmitidos.

Lo más probable es que tu modelo se ejecute en un entorno Python, por lo que necesitarás un servidor Python para devolver los tokens y ponerlos a disposición a través de una API HTTP. FastAPI se ha convertido en una opción de facto para este tipo de situaciones.

Aquí usamos Uvicorn y FastAPI's StreamingResponse para servir cada token tan pronto como es generado. He aquí un ejemplo:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Puede probar su servidor de streaming gracias al siguiente comando cURL:

curl -N localhost:8000

Ahora tenemos un modelo de IA que funciona y devuelve correctamente los tokens transmitidos.

Podríamos leer directamente estos tokens transmitidos desde una aplicación cliente en un navegador. Pero no vamos a hacer eso, por 2 razones.

En primer lugar, es importante desacoplar el modelo de IA del resto de la pila porque no queremos reiniciar el modelo cada vez que vamos a hacer un pequeño cambio en la API. Tenga en cuenta que los modelos modernos de IA generativa son muy pesados y a menudo tardan varios minutos en reiniciarse.

Una segunda razón es que Python no es necesariamente la mejor opción cuando se trata de construir una aplicación concurrente de alto rendimiento como la que vamos a hacer. Esta elección puede ser discutida, por supuesto, y también puede ser una cuestión de gustos.

Reenvío de tokens a través de una pasarela Go

Como se mencionó anteriormente, es importante añadir una pasarela entre tu modelo y tu cliente final, y Go es un buen lenguaje de programación para una aplicación de este tipo. En producción, puede que también quieras añadir un proxy inverso entre la pasarela Go y el cliente final, y un balanceador de carga entre tu modelo y el cliente final. entre la pasarela Go y el cliente final, y un equilibrador de carga entre la pasarela Go y el modelo de IA para distribuir la carga en varias réplicas del modelo. para distribuir la carga en varias réplicas de tu modelo. Pero esto está fuera del alcance de nuestro artículo.

Nuestra aplicación Go también se encargará de renderizar la página HTML final.

Esta aplicación realiza una solicitud a la aplicación FastAPI, recibe los tokens transmitidos desde FastAPI y reenvía cada token al frontend mediante eventos enviados desde el servidor (SSE). token al frontend utilizando Server Sent Events (SSE). SSE es más sencillo que websockets porque es unidireccional. En Es una buena opción cuando quieres construir una aplicación que envíe información a un cliente, sin escuchar una potencial respuesta del cliente. respuesta del cliente.

Aquí está el código Go (la plantilla HTML/JS/CSS se mostrará en la siguiente sección):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Nuestra página "/home" renderiza la página HTML/CSS/JS (se muestra más adelante). La página "/start" recibe una petición POST de la aplicación JS que lanza una petición a nuestro modelo AI. Y nuestra página "/generate" devuelve el resultado a la aplicación JS a través de eventos enviados por el servidor.

Una vez que la función start() recibe una petición POST desde el frontend, crea automáticamente una goroutine que hará una petición a nuestra aplicación FastAPI.

La función generateText() llama a FastAPI y devuelve cada token recibido sobre la marcha a través de un canal dedicado (streamedTextCh). Si se recibe el carácter EOF de FastAPI, significa que la generación de texto ha terminado.

La función generate() es un bucle infinito que espera nuevos tokens recibidos del canal streamedTextCh. Una vez que se recibe un nuevo token, se envía automáticamente al frontend como un evento enviado por el servidor. Los eventos enviados por el servidor deben seguir un formato específico que utiliza los prefijos "event:" y "data:". de ahí la función formatServerSentEvent().

Para que SSE sea completo, necesitamos un cliente Javascript que sea capaz de escuchar los eventos enviados por el servidor suscribiéndose a la página "generar". Vea la siguiente sección para entender cómo lograrlo.

Recepción de tokens con Javascript en el navegador

Ahora necesitas crear un directorio "templates" y añadir un archivo "home.html" dentro de él.

Este es el contenido de "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

Como puede ver, escuchar SSE en el navegador es bastante sencillo.

Primero necesitas suscribirte a nuestro endpoint SSE (la página "/generate"). A continuación, debe añadir un receptor de eventos que leerá los tokens transmitidos tan pronto como se reciban. en cuanto se reciban.

Los navegadores modernos intentan automáticamente reconectar la fuente de eventos en caso de problemas de conexión.

Conclusión

Ahora ya sabes cómo crear una moderna aplicación de IA generativa que dinámicamente en el navegador, ¡a la ChatGPT!

Como habrá podido comprobar, una aplicación de este tipo no es necesariamente sencilla, ya que intervienen varias capas. capas. Y, por supuesto, el código anterior se ha simplificado en exceso para el ejemplo. ejemplo.

El principal reto del streaming de tokens es la gestión de los fallos de red. La mayoría de estos fallos ocurrirán entre el backend Go y el frontend Javascript. Usted necesitará estrategias de reconexión más avanzadas y asegurarte de que los errores se comunican correctamente a la interfaz de usuario. correctamente a la interfaz de usuario.

Espero que este tutorial te haya resultado útil.

Vincent
Defensor del desarrollador en NLP Cloud