Генеративным моделям иногда требуется некоторое время, чтобы выдать результат, поэтому интересно использовать потоковую передачу токенов, чтобы видеть результат на лету в пользовательском интерфейсе. Вот как можно реализовать такой фронтенд для потоковой передачи текста в LLM с помощью Go, FastAPI и Javascript.
Напомним, что токен - это уникальный объект, который может быть либо небольшим словом, либо частью слова, либо знаком препинания. В среднем 1 токен состоит из 4 символов, а 100 лексем примерно эквивалентны 75 словам. Для обработки текста моделям обработки естественного языка необходимо превратить его в лексемы.
При использовании ИИ-модели, генерирующей текст (также известной как "генеративная" модель), время отклика может быть достаточно высоким, в зависимости от аппаратного обеспечения и размера модели. Например, в случае большой языковой модели (также известной как "LLM"), такой как LLaMA 30B, развернутой на графическом процессоре NVIDIA A100 в режиме fp16, модель генерирует 100 лексем примерно за 3 секунды. Таким образом, если вы ожидаете, что ваша генеративная модель сгенерирует большой фрагмент текста из сотен или тысяч слов, задержка будет высокой, и вам придется подождать, возможно, более 10 секунд, чтобы получить полный текст. более 10 секунд, чтобы получить полный ответ.
Длительное ожидание ответа может стать проблемой с точки зрения пользовательского опыта. Решением в этом случае является потоковая передача токенов!
Потоковая передача токенов - это генерация каждого нового токена на лету, вместо того чтобы ждать, пока будет готов весь ответ. Это то, что вы можно увидеть в приложении ChatGPT или, например, в ассистенте NLP Cloud ChatDolphin. Слова появляются сразу же, как только их генерирует модель. Попробуйте воспользоваться ИИ-ассистентом ChatDolphin здесь.
Токен-стриминг с помощником ChatDolphin на NLP Cloud. Попробуйте сделать это здесь.
Первым шагом будет использование механизма вывода, поддерживающего потоковую передачу токенов.
Вот несколько вариантов, которые вы можете рассмотреть:
Приведем пример с использованием метода HuggingFace generate():
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
В данном примере мы генерируем вывод по модели GPT-2 и выводим в консоль каждый токен по мере его поступления.
Теперь, когда вы выбрали механизм вывода, необходимо обслужить вашу модель и вернуть потоковые токены.
Скорее всего, ваша модель будет работать в среде Python, поэтому вам потребуется сервер Python для возврата токенов и сделать их доступными через HTTP API. FastAPI стал де-факто выбором для таких ситуаций.
Здесь мы используем Uvicorn и StreamingResponse от FastAPI для того, чтобы обслуживать каждый токен сразу после его генерации. Вот пример:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Протестировать свой потоковый сервер можно с помощью следующей команды cURL:
curl -N localhost:8000
Теперь у нас есть работающая модель ИИ, которая корректно возвращает потоковые токены.
Мы могли бы напрямую читать эти потоковые токены из клиентского приложения в браузере. Но мы не будем этого делать по двум причинам.
Во-первых, важно отделить модель искусственного интеллекта от остальной части стека, поскольку мы не хотим перезапускать модель каждый раз, когда мы не хотим перезапускать модель каждый раз, когда собираемся внести небольшое изменение в API. Следует помнить, что современные генеративные модели ИИ очень тяжелы и их перезапуск часто занимает несколько минут.
Вторая причина заключается в том, что Python не всегда является лучшим выбором когда речь идет о создании высокопроизводительного параллельного приложения, как мы собираемся сделать. Этот выбор можно, конечно, обсудить, и это может быть делом вкуса!
Как уже говорилось выше, добавление шлюза между моделью и конечным клиентом очень важно, и Go является хорошим языком программирования для такого приложения. В производственных условиях, возможно, потребуется добавить обратный прокси между шлюзом Go и конечным клиентом, а также балансировщик нагрузки между шлюзом Go и моделью ИИ, чтобы распределить нагрузку на несколько копий. чтобы распределить нагрузку на несколько копий модели. Но это уже выходит за рамки нашей статьи!
Наше приложение Go также будет отвечать за рендеринг конечной HTML-страницы.
Это приложение делает запрос к приложению FastAPI, получает потоковые токены от FastAPI и пересылает каждый токен, используя Server Sent Events (SSE). токен на фронтенд с помощью Server Sent Events (SSE). SSE проще, чем websockets, поскольку является однонаправленным. Это Это хороший выбор, если необходимо создать приложение, которое передает информацию клиенту, не прослушивая потенциальный ответа клиента.
Здесь приведен код Go (шаблон HTML/JS/CSS будет показан в следующем разделе):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
На нашей странице "/home" выполняется рендеринг HTML/CSS/JS-страницы (показано далее). Страница "/start" получает POST-запрос от JS-приложения, который запускает запрос к нашей AI-модели. А страница "/generate" возвращает результат в JS-приложение через события, отправленные сервером.
После того как функция start() получает POST-запрос от фронтенда, она автоматически создает горутину, которая будет выполнять запрос к нашему приложению FastAPI.
Функция generateText() вызывает FastAPI и возвращает каждый полученный на лету токен по выделенному каналу (streamedTextCh). Если от FastAPI получен символ EOF, то это означает, что генерация текста закончена.
Функция generate() представляет собой бесконечный цикл, который ожидает получения новых токенов из канала streamedTextCh. Как только новый токен получен, он автоматически отправляется на фронтенд как событие, отправленное сервером. События, отправляемые сервером, должны иметь определенное форматирование, в котором используются префиксы "event:" и "data:". префиксы, поэтому и появилась функция formatServerSentEvent().
Для того чтобы SSE был полноценным, нам необходим Javascript-клиент, способный прослушивать события, посылаемые сервером, подписываясь на страницу "generate". О том, как этого добиться, читайте в следующем разделе.
Теперь необходимо создать каталог "templates" и добавить в него файл "home.html".
Вот содержимое файла "home.html":
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
Как видите, прослушивание SSE в браузере довольно простое.
Сначала необходимо подписаться на нашу конечную точку SSE (страница "/generate"). Затем необходимо добавить слушатель событий, который будет считывать потоковые токены, как только как только они будут получены.
Современные браузеры автоматически пытаются переподключить источник событий в случае проблем с соединением.
Теперь вы знаете, как создать современное генеративное AI-приложение, которое динамически динамически передавать текст в браузере, а-ля ChatGPT!
Как вы заметили, такое приложение не всегда является простым, поскольку в нем задействовано несколько слоев. И, конечно, приведенный выше код упрощен в целях примера.
Основная проблема, связанная с потоковой передачей токенов, заключается в обработке сетевых сбоев. Большинство таких сбоев будет происходить между бэкендом Go и фронтендом Javascript. Вам необходимо изучить несколько более продвинутых стратегий переподключения и убедиться в том, что ошибки правильно сообщаются в пользовательский интерфейс. чтобы об ошибках правильно сообщалось в пользовательский интерфейс.
Я надеюсь, что этот учебник оказался для вас полезным!
Vincent
Защитник разработчиков в NLP Cloud