Máte problémy s AI alebo vývojom celého balíka? Naši odborníci sú tu, aby vám poradili: poradenstvo na mieru, technická integrácia a ďalšie služby. Obráťte sa na [email protected].

Ako vytvoriť používateľské rozhranie s tokenovým streamovaním pre váš LLM pomocou Go, FastAPI a JS

Generatívnym modelom niekedy trvá určitý čas, kým vrátia výsledok, preto je zaujímavé využiť streamovanie tokenov, aby sa výsledok zobrazoval priebežne v používateľskom rozhraní. Tu sa dozviete, ako môžete dosiahnuť takýto frontend na streamovanie textov pre váš LLM pomocou Go, FastAPI a Javascriptu.

Vývojár na PC

Čo je tokenový streaming?

Pripomíname, že token je jedinečná jednotka, ktorá môže byť buď malé slovo, časť slova alebo interpunkcia. V priemere sa 1 token skladá zo 4 znakov, a 100 tokenov približne zodpovedá 75 slovám. Modely spracovania prirodzeného jazyka potrebujú na spracovanie vášho textu premeniť ho na tokeny.

Pri používaní modelu umelej inteligencie na generovanie textu (známeho aj ako "generatívny" model) môže byť čas odozvy pomerne vysoký v závislosti od hardvéru a veľkosti modelu. Napríklad v prípade veľkého jazykového modelu (alsko známeho ako "LLM"), ako je LLaMA 30B, nasadeného na grafickom procesore NVIDIA A100 vo fp16, model vygeneruje 100 tokenov za približne 3 sekundy. Ak teda očakávate, že váš generatívny model vygeneruje veľký text pozostávajúci zo stoviek alebo tisícok slov, latencia bude vysoká a budete musieť čakať možno viac ako 10 sekúnd, aby ste dostali úplnú odpoveď.

Dlhé čakanie na odpoveď môže byť z hľadiska používateľského zážitku problémom. Riešením v takom prípade je token streaming!

Token streaming je o generovaní každého nového tokenu za behu namiesto čakania na celú odpoveď. To je to, čo ste môžete vidieť napríklad v aplikácii ChatGPT alebo v asistentovi NLP Cloud ChatDolphin. Slová sa zobrazujú hneď, ako ich model vygeneruje. Vyskúšajte asistenta s umelou inteligenciou ChatDolphin tu.

Tokenový streaming s ChatDolphinom v službe NLP Cloud Tokenový tok s asistentom ChatDolphin v službe NLP Cloud. Skúste to tu.

Výber odvodzovacieho mechanizmu, ktorý podporuje tokenový tok

Prvým krokom bude využitie inferenčného enginu, ktorý podporuje token streaming.

Tu je niekoľko možností, ktoré by ste mohli zvážiť:

Tu je príklad s použitím metódy HuggingFace generate():

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

V tomto príklade generujeme výstup s modelom GPT-2 a každý token vypíšeme do konzoly hneď, ako príde.

Streamovanie odpovede pomocou FastAPI

Teraz, keď ste si vybrali inferenčný motor, musíte svoj model obsluhovať a vrátiť streamované tokeny.

Váš model bude s najväčšou pravdepodobnosťou bežať v prostredí Python, takže na vrátenie tokenov budete potrebovať server Python a sprístupniť ich prostredníctvom API HTTP. FastAPI sa stalo de facto voľbou pre takéto situácie.

Tu používame Uvicorn a StreamingResponse FastAPI na to, aby sme každý token doručili hneď po jeho vygenerovaní. Tu je príklad:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Streamingový server môžete otestovať pomocou nasledujúceho príkazu cURL:

curl -N localhost:8000

Teraz máme funkčný model umelej inteligencie, ktorý správne vracia tokeny.

Tieto streamované tokeny by sme mohli priamo čítať z klientskej aplikácie v prehliadači. To však nebudeme robiť z dvoch dôvodov.

Po prvé, je dôležité oddeliť model umelej inteligencie od zvyšku zásobníka, pretože nechceme model reštartovať zakaždým, keď vykonáme malú zmenu v rozhraní API. Majte na pamäti, že moderné generatívne modely AI sú veľmi ťažké a ich reštartovanie často trvá niekoľko minút.

Druhým dôvodom je, že Python nie je nevyhnutne najlepšou voľbou keď ide o budovanie vysoko priepustnej súbežnej aplikácie, akú sa chystáme vytvoriť. Táto voľba sa dá samozrejme diskutovať a môže to byť aj otázka vkusu!

Presmerovanie tokenov cez bránu Go

Ako už bolo spomenuté vyššie, je dôležité pridať bránu medzi modelom a konečným klientom, a Go je pre takúto aplikáciu vhodným programovacím jazykom. V produkcii možno budete chcieť pridať aj reverzný proxy medzi bránou Go a koncovým klientom a load balancer medzi bránou Go a modelom AI, aby ste mohli rozložiť záťaž na niekoľko replik vášho modelu. Ale to je mimo rozsahu nášho článku!

Naša aplikácia Go bude mať na starosti aj vykresľovanie konečnej stránky HTML.

Táto aplikácia zadá požiadavku aplikácii FastAPI, prijme prúdové tokeny od FastAPI a prepošle každý token pomocou udalosti odoslanej serverom (SSE). SSE je jednoduchšie ako websockety, pretože je jednosmerné. Je to je dobrou voľbou, keď chcete vytvoriť aplikáciu, ktorá posiela informácie klientovi bez toho, aby počúvala potenciálne odpovede klienta.

Tu je kód Go (šablóna HTML/JS/CSS bude zobrazená v ďalšej časti):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Naša stránka "/home" vykresľuje stránku HTML/CSS/JS (zobrazené neskôr). Stránka "/start" dostane požiadavku POST od JS aplikácie, ktorá spustí požiadavku na náš model AI. A naša stránka "/generate" vráti výsledok aplikácii JS prostredníctvom udalostí odoslaných serverom.

Keď funkcia start() prijme požiadavku POST z frontendu, automaticky vytvorí goroutine, ktorý vykoná požiadavku do našej aplikácie FastAPI.

Funkcia generateText() volá FastAPI a vracia každý token prijatý za behu prostredníctvom vyhradeného kanála (streamedTextCh). Ak je z FastAPI prijatý znak EOF, znamená to, že generovanie textu sa skončilo.

Funkcia generate() je nekonečná slučka, ktorá čaká na nové tokeny prijaté z kanála streamedTextCh. Po prijatí nového tokenu, je automaticky odoslaný na frontend ako udalosť odoslaná serverom. Udalosti odoslané serverom musia dodržiavať špecifické formátovanie, ktoré používa "event:" a "data:". preto je tu funkcia formatServerSentEvent().

Aby bolo SSE kompletné, potrebujeme Javascriptového klienta, ktorý je schopný počúvať udalosti odoslané serverom prihlásením sa na stránku "generate". Pozrite si nasledujúcu časť, aby ste pochopili, ako to dosiahnuť.

Prijímanie tokenov pomocou Javascriptu v prehliadači

Teraz musíte vytvoriť adresár "templates" a pridať do neho súbor "home.html".

Tu je obsah stránky "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

Ako vidíte, počúvanie SSE v prehliadači je pomerne jednoduché.

Najprv sa musíte prihlásiť na náš koncový bod SSE (stránka "/generate"). potom musíte pridať poslucháča udalostí, ktorý bude čítať streamované tokeny, akonáhle sú prijaté.

Moderné prehliadače sa automaticky pokúšajú o opätovné pripojenie zdroj udalosti v prípade problémov s pripojením.

Záver

Teraz viete, ako vytvoriť modernú generatívnu aplikáciu umelej inteligencie, ktorá dynamicky streamuje text v prehliadači, à la ChatGPT!

Ako ste si všimli, takáto aplikácia nie je nevyhnutne jednoduchá, pretože niekoľko vrstiev. A samozrejme, uvedený kód je zjednodušený kvôli príkladu.

Hlavnou výzvou pri streamovaní tokenov je riešenie zlyhaní siete. Väčšina z nich týchto zlyhaní nastane medzi backendom Go a frontendom Javascriptu. Budete budete musieť preskúmať niektoré pokročilejšie stratégie opätovného pripojenia a zabezpečiť, aby sa chyby boli správne hlásené používateľskému rozhraniu.

Dúfam, že sa vám tento návod zdal užitočný!

Vincent
Zástupca vývojárov v službe NLP Cloud