Имате проблеми с ИИ или разработката на пълен пакет? Нашите експерти са тук, за да ви напътстват: индивидуални съвети, техническа интеграция и др. Свържете се с [email protected].

Как да разработите потребителски интерфейс за стрийминг на токени за вашия LLM с Go, FastAPI и JS

Понякога отнема известно време да се върне резултат, така че е интересно да се използва поточно предаване на маркери, за да се види как резултатът се появява в движение в потребителския интерфейс. Ето как можете да постигнете такъв frontend за поточно предаване на текстове за вашия LLM с Go, FastAPI и Javascript.

Разработчик на PC

Какво е стрийминг на токени?

Напомняме, че токенът е уникална единица, която може да бъде малка дума, част от дума или препинателен знак. Средно 1 токен се състои от 4 символа, а 100 токена се равняват приблизително на 75 думи. Моделите за обработка на естествен език трябва да превърнат вашия текст в токени, за да го обработят.

Когато използвате модел на изкуствен интелект за генериране на текст (известен също като "генеративен" модел), времето за реакция може да бъде доста голямо в зависимост от хардуера и размера на модела. Например в случай на голям езиков модел (известен още като "LLM") като LLaMA 30B, разгърнат на графичен процесор NVIDIA A100 в fp16, моделът генерира 100 лексеми за около 3 секунди. Така че, ако очаквате вашият генеративен модел да генерира голям текст от стотици или хиляди думи, закъснението ще бъде голямо и ще трябва да изчакате може би повече от 10 секунди, за да получите пълен отговор.

Дългото чакане за получаване на отговор може да бъде проблем от гледна точка на потребителското изживяване. Решението в този случай е стриймингът на символи!

Поточното предаване на токени е свързано с генерирането на всеки нов токен в движение, вместо да се чака целият отговор да бъде готов. Това е, което вие можете да видите например в приложението ChatGPT или в асистента NLP Cloud ChatDolphin. Думите се появяват веднага след като бъдат генерирани от модела. Изпробвайте асистента с изкуствен интелект ChatDolphin тук.

Поточно предаване на токени с ChatDolphin в NLP Cloud Поточно предаване на токени с асистента ChatDolphin в NLP Cloud. Опитайте тук.

Избор на енджин за изводи, който поддържа поток от токени

Първата стъпка ще бъде да използвате двигател за изводи, който поддържа поточно предаване на символи.

Ето няколко варианта, които може да обмислите:

Ето един пример, в който се използва методът HuggingFace generate():

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

В този пример генерираме изход с модела GPT-2 и отпечатваме всеки токен в конзолата веднага щом пристигне.

Поточно предаване на отговора с FastAPI

След като сте избрали двигател за изводи, ще трябва да обслужвате модела си и да връщате поточно предадените токени.

Вашият модел най-вероятно ще работи в среда на Python, така че ще ви е необходим сървър на Python, за да връщате токените. и да ги предоставите чрез HTTP API. FastAPI се превърна в де факто избор за такива ситуации.

Тук използваме Uvicorn и StreamingResponse на FastAPI, за да обслужваме всеки токен веднага след генерирането му. Ето един пример:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Можете да тествате вашия стрийминг сървър чрез следната команда cURL:

curl -N localhost:8000

Вече разполагаме с работещ модел на изкуствен интелект, който правилно връща стриймвани токени.

Можем директно да прочетем тези поточно предавани токени от клиентско приложение в браузър. Но ние няма да направим това по две причини.

Първо, важно е да се отдели на модела на изкуствения интелект от останалата част на стека, защото не искаме да рестартираме модела всеки път, когато ще направим малка промяна в API. Имайте предвид, че съвременните генеративни AI модели са много тежки и често рестартирането им отнема няколко минути.

Втората причина е, че Python не е непременно най-добрият избор когато става въпрос за изграждане на високопроизводително паралелно приложение, каквото ще правим ние. Този избор разбира се, може да бъде обсъден и може да бъде въпрос на вкус!

Препращане на токени през шлюз Go

Както бе споменато по-горе, важно е да добавите портал между модела и крайния клиент, и Go е добър език за програмиране за такова приложение. В производството може да искате да добавите и обратен прокси между шлюза на Go и крайния клиент, както и разпределител на натоварването между шлюза на Go и модела с изкуствен интелект, за да да разпределите натоварването върху няколко реплики на вашия модел. Но това е извън обхвата на нашата статия!

Нашето приложение Go ще отговаря и за визуализирането на крайната HTML страница.

Това приложение подава заявка към приложението FastAPI, получава поточните токени от FastAPI и препраща всеки от тях токена, като използва изпратени от сървъра събития (SSE). SSE е по-просто от уебсокетите, тъй като е еднопосочно. Той е добър избор, когато искате да създадете приложение, което изпраща информация към клиента, без да слуша потенциален клиентски отговор.

Ето кода на Go (шаблонът HTML/JS/CSS ще бъде показан в следващия раздел):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

Нашата страница "/home" визуализира HTML/CSS/JS страницата (показана по-късно). Страницата "/start" получава POST заявка от JS приложение, което задейства заявка към нашия модел за изкуствен интелект. А нашата страница "/generate" връща резултата на JS приложението чрез изпратени от сървъра събития.

След като функцията start() получи POST заявка от frontend, тя автоматично създава goroutine, която ще направи заявка към нашето FastAPI приложение.

Функцията generateText() извиква FastAPI и връща всеки токен, получен в движение чрез специален канал (streamedTextCh). Ако от FastAPI се получи знакът EOF, това означава, че генерирането на текст е приключило.

Функцията generate() е безкраен цикъл, който чака за нови символи, получени от канала streamedTextCh. След като бъде получен нов токен, той автоматично се изпраща към frontend като събитие, изпратено от сървъра. Изпратените от сървъра събития трябва да следват специфично форматиране, което използва "event:" и "data:". префикси, поради което е създадена функцията formatServerSentEvent().

За да може SSE да бъде завършен, се нуждаем от клиент на Javascript, който да може да слуша изпратените от сървъра събития, като се абонира за страницата "генериране". Вижте следващия раздел, за да разберете как да постигнете това.

Получаване на токени с Javascript в браузъра

Сега трябва да създадете директория "templates" и да добавите в нея файл "home.html".

Ето съдържанието на "home.html":

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

Както можете да видите, слушането на SSE в браузъра е доста просто.

Първо трябва да се абонирате за нашата крайна точка SSE (страницата "/generate"). След това трябва да добавите слушател на събития, който ще прочете поточните токени веднага щом бъдат получени.

Съвременните браузъри автоматично се опитват да възстановят връзката източника на събития в случай на проблеми с връзката.

Заключение

Вече знаете как да създадете модерно приложение за генеративен изкуствен интелект, което динамично стриймва текст в браузъра, а ла ChatGPT!

Както сте забелязали, подобно приложение не е непременно просто, тъй като няколко е свързано с много слоеве. И разбира се, горният код е твърде опростен в името на пример.

Основното предизвикателство при стрийминга на символи е свързано с обработката на мрежови повреди. Повечето от тези откази ще се случат между бекенда на Go и фронтенда на Javascript. Вие ще трябва да проучите някои по-усъвършенствани стратегии за повторно свързване и да се уверите, че грешките са да се докладват правилно на потребителския интерфейс.

Надявам се, че този урок ви е бил полезен!

Vincent
Застъпник на разработчиците в NLP Cloud