Aveți probleme cu inteligența artificială sau cu dezvoltarea full-stack? Experții noștri sunt aici pentru a vă ghida: consiliere personalizată, integrare tehnică și multe altele. Contactați-ne la [email protected].

Cum să dezvoltați o interfață de utilizare a fluxului de token pentru LLM cu Go, FastAPI și JS

Modelele generative durează uneori ceva timp pentru a returna un rezultat, astfel încât este interesant să se utilizeze streamingul de token-uri pentru a vedea rezultatul apărând din mers în interfața utilizatorului. Iată cum puteți realiza un astfel de frontend de streaming de text pentru LLM-ul dvs. cu Go, FastAPI și Javascript.

Dezvoltator pe PC

Ce este Token Streaming?

Pentru a vă reaminti, un token este o entitate unică care poate fi un cuvânt mic, o parte a unui cuvânt sau un semn de punctuație. În medie, un token este format din 4 caractere, iar 100 de jetoane sunt aproximativ echivalente cu 75 de cuvinte. Modelele de procesare a limbajului natural trebuie să transforme textul dumneavoastră în token-uri pentru a-l putea procesa.

Atunci când se utilizează un model de inteligență artificială pentru generarea de text (cunoscut și sub numele de model "generativ"), timpul de răspuns poate fi destul de mare, în funcție de hardware și de dimensiunea modelului. De exemplu, în cazul unui model lingvistic de mari dimensiuni (cunoscut și sub numele de "LLM"), cum ar fi LLaMA 30B, implementat pe un GPU NVIDIA A100 în fp16, modelul generează 100 de token-uri în aproximativ 3 secunde. Prin urmare, dacă vă așteptați ca modelul generativ să genereze un text mare de sute sau mii de cuvinte, latența va fi mare și va trebui să așteptați poate mai mult de 10 secunde pentru a obține un răspuns complet.

Așteptarea atât de lungă pentru a primi un răspuns poate fi o problemă din punctul de vedere al experienței utilizatorului. Soluția în acest caz este token streaming!

Fluxul de jetoane constă în generarea din mers a fiecărui jeton nou, în loc să se aștepte ca întregul răspuns să fie gata. Aceasta este ceea ce trebuie să faceți poate vedea în aplicația ChatGPT sau în asistentul NLP Cloud ChatDolphin, de exemplu. Cuvintele apar imediat ce sunt generate de model. Încercați asistentul ChatDolphin AI aici.

Streaming de jetoane cu ChatDolphin pe NLP Cloud Streaming de jetoane cu asistentul ChatDolphin pe NLP Cloud. Încearcă aici.

Selectarea unui motor de inferență care acceptă fluxul de jetoane

Primul pas va fi să folosiți un motor de inferență care acceptă streamingul de token-uri.

Iată câteva opțiuni pe care ați putea dori să le luați în considerare:

Iată un exemplu care utilizează metoda HuggingFace generate():

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

În acest exemplu, generăm o ieșire cu modelul GPT-2 și imprimăm fiecare jeton în consolă imediat ce sosește.

Transmiterea în flux a răspunsului cu FastAPI

Acum, după ce ați ales un motor de inferență, va trebui să vă serviți modelul și să returnați token-urile transmise în flux.

Modelul dvs. va rula, cel mai probabil, într-un mediu Python, astfel încât veți avea nevoie de un server Python pentru a returna token-urile. și să le puneți la dispoziție prin intermediul unui API HTTP. FastAPI a devenit o alegere de facto pentru astfel de situații.

Aici folosim Uvicorn și StreamingResponse de la FastAPI pentru a servi fiecare token imediat ce este generat. Iată un exemplu:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Puteți testa serverul de streaming cu ajutorul următoarei comenzi cURL:

curl -N localhost:8000

Acum avem un model de inteligență artificială funcțional care returnează corect jetoanele transmise.

Am putea citi direct aceste jetoane transmise în flux dintr-o aplicație client într-un browser. Dar nu vom face acest lucru, din două motive.

În primul rând, este important să se decupleze modelul de inteligență artificială de restul stivei, deoarece nu dorim să repornim modelul de fiecare dată când când vom face o mică modificare în API. Rețineți că modelele generative AI moderne sunt foarte grele și deseori durează câteva minute pentru a fi repornite.

Un al doilea motiv este că Python nu este neapărat cea mai bună alegere atunci când vine vorba de construirea unei aplicații concurente de mare randament, așa cum vom face noi. Această alegere poate fi discutată, desigur, și ar putea fi, de asemenea, o chestiune de gust!

Redirecționarea jetoanelor prin intermediul unei gateway Go

După cum am menționat mai sus, este important să adăugați o poartă de acces între modelul dvs. și clientul final, iar Go este un limbaj de programare bun pentru o astfel de aplicație. În producție, este posibil să doriți, de asemenea, să adăugați o interfață inversă proxy invers între gateway-ul Go și clientul final, precum și un load balancer între gateway-ul Go și modelul AI, pentru a pentru a repartiza sarcina pe mai multe replici ale modelului dumneavoastră. Dar acest lucru iese din sfera de aplicare a articolului nostru!

Aplicația noastră Go va fi, de asemenea, responsabilă de redarea paginii HTML finale.

Această aplicație face o cerere către aplicația FastAPI, primește token-urile transmise de la FastAPI și transmite fiecare token de la FastAPI către FastAPI. token către frontend folosind Server Sent Events (SSE). SSE este mai simplu decât websockets, deoarece este unidirecțional. Acesta este o alegere bună atunci când doriți să construiți o aplicație care împinge informații către un client, fără a asculta un potențial răspuns al clientului.

Iată codul Go (șablonul HTML/JS/CSS va fi prezentat în secțiunea următoare):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Pagina noastră "/home" redă pagina HTML/CSS/JS (prezentată mai târziu). Pagina "/start" primește o cerere POST de la aplicația aplicație JS care declanșează o cerere către modelul nostru AI. Iar pagina noastră "/generate" returnează rezultatul către aplicația JS prin intermediul evenimentelor trimise de server.

Odată ce funcția start() primește o cerere POST de la frontend, aceasta creează automat o goroutine care va face o cerere către aplicația noastră FastAPI.

Funcția generateText() apelează FastAPI și returnează fiecare token primit din mers prin intermediul unui canal dedicat (streamedTextCh). Dacă se primește caracterul EOF de la FastAPI, înseamnă că generarea textului s-a încheiat.

Funcția generate() este o buclă infinită care așteaptă noi jetoane primite de la canalul streamedTextCh. Odată ce se primește un nou token, acesta este transmis automat către frontend ca un eveniment trimis de server. Evenimentele trimise de server trebuie să urmeze o formatare specifică care utilizează "event:" și "data:" prefixe, de unde și funcția formatServerSentEvent().

Pentru ca SSE să fie completă, avem nevoie de un client Javascript care să poată asculta evenimentele trimise de server prin abonarea la pagina "generate". Consultați secțiunea următoare pentru a înțelege cum să realizați acest lucru.

Primirea de jetoane cu Javascript în browser

Acum trebuie să creați un director "templates" și să adăugați în el un fișier "home.html".

Iată conținutul fișierului "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

După cum puteți vedea, ascultarea SSE în browser este destul de simplă.

Mai întâi trebuie să vă abonați la punctul final SSE (pagina "/generate"). apoi trebuie să adăugați un ascultător de evenimente care va citi token-urile transmise în flux imediat ce acestea sunt primite.

Browserele moderne încearcă automat să se reconecteze sursa evenimentului în cazul în care apar probleme de conexiune.

Concluzie

Acum știi cum să creezi o aplicație modernă de inteligență artificială generativă care să creeze dinamic fluxuri de text în browser, à la ChatGPT!

După cum ați observat, o astfel de aplicație nu este neapărat simplă, deoarece mai multe straturi sunt implicate. Și, bineînțeles, codul de mai sus este suprasimplificat de dragul exemplului.

Principala provocare a fluxului de token-uri constă în gestionarea defecțiunilor de rețea. Cele mai multe dintre aceste defecțiuni se vor produce între backend-ul Go și frontend-ul Javascript. Veți trebui să explorați câteva strategii de reconectare mai avansate și să vă asigurați că erorile sunt raportate în mod corespunzător către interfață.

Sper că ați găsit acest tutorial util!

Vincent
Developer Advocate la NLP Cloud