Генеративним моделям іноді потрібен деякий час, щоб повернути результат, тому цікаво використовувати потік токенів, щоб результат з'являвся в інтерфейсі "на льоту". Ось як ви можете створити такий фронтенд для текстового потоку для вашого LLM за допомогою Go, FastAPI та Javascript.
Нагадуємо, що токен - це унікальна сутність, яка може бути невеликим словом, частиною слова або розділовим знаком. В середньому, 1 токен складається з 4 символів, а 100 токенів приблизно еквівалентні 75 словам. Моделі обробки природної мови повинні перетворити ваш текст на токени, щоб обробити його.
При використанні моделі ШІ, що генерує текст (також відомої як "генеративна" модель), час відгуку може бути досить високим, залежно від вашого обладнання та розміру моделі. Наприклад, у випадку великої мовної моделі (також відомої як "LLM"), такої як LLaMA 30B, розгорнутої на графічному процесорі NVIDIA A100 в fp16, модель генерує 100 токенів приблизно за 3 секунди. Отже, якщо ви очікуєте, що ваша генеративна модель згенерує великий фрагмент тексту на сотні або тисячі слів, затримка буде високою, і вам доведеться чекати, можливо, більше більше 10 секунд, щоб отримати повну відповідь.
Довге очікування відповіді може бути проблемою з точки зору користувацького досвіду. Рішенням в такому випадку є потокова передача токенів!
Потокова передача токенів - це генерація кожного нового токена на льоту, замість того, щоб чекати, поки вся відповідь буде готова. Це те, що ви ви можете побачити в додатку ChatGPT або, наприклад, у помічнику NLP Cloud ChatDolphin. Слова з'являються, як тільки їх генерує модель. Спробуйте АІ-асистента ChatDolphin тут.
Стрімінг токенів з асистентом ChatDolphin на NLP Cloud. Спробуй тут.
Першим кроком буде використання механізму виведення, який підтримує потік токенів.
Ось кілька варіантів, які ви можете розглянути:
Ось приклад використання методу HuggingFace generate():
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
У цьому прикладі ми генеруємо вивід з моделлю GPT-2 і виводимо кожен токен в консолі, як тільки він надходить.
Тепер, коли ви вибрали механізм виведення, вам потрібно обслужити вашу модель і повернути потокові токени.
Ваша модель, швидше за все, буде працювати в середовищі Python, тому вам знадобиться сервер Python, щоб повертати токени і зробити їх доступними через HTTP API. FastAPI став де-факто вибором для таких ситуацій.
Тут ми використовуємо Uvicorn і StreamingResponse від FastAPI, щоб обслужити кожен токен, як тільки він буде згенерований. Ось приклад:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Ви можете протестувати ваш потоковий сервер за допомогою наступної команди cURL:
curl -N localhost:8000
Тепер у нас є робоча модель штучного інтелекту, яка належним чином повертає потокові токени.
Ми могли б безпосередньо зчитувати ці потокові токени з клієнтської програми в браузері. Але ми не будемо цього робити з двох причин.
По-перше, важливо відокремити модель ШІ від решти стеку, тому що ми не хочемо перезапускати модель щоразу, коли коли ми збираємося внести невелику зміну в API. Майте на увазі, що сучасні генеративні моделі ШІ дуже важкі і часто перезапуск займає кілька хвилин.
Друга причина полягає в тому, що Python не обов'язково є найкращим вибором коли мова йде про створення високопродуктивного паралельного додатку, як ми збираємося зробити. Цей вибір можна, звичайно, обговорити, і це також може бути справою смаку!
Як згадувалося вище, важливо додати шлюз між вашою моделлю та кінцевим клієнтом, і Go є хорошою мовою програмування для такого додатку. У виробництві, можливо, ви також захочете додати зворотний проксі між Go-шлюзом і кінцевим клієнтом, а також балансувальник навантаження між вашим Go-шлюзом і моделлю ШІ, щоб розподілити навантаження на кілька реплік вашої моделі. Але це виходить за рамки нашої статті!
Наш Go-додаток також відповідатиме за рендеринг фінальної HTML-сторінки.
Ця програма робить запит до програми FastAPI, отримує потокові токени від FastAPI і пересилає кожен токен на фронтенд за допомогою Server Sent Events (SSE). SSE простіший, ніж веб-сокети, оскільки є односпрямованим. Це це хороший вибір, коли ви хочете створити додаток, який передає інформацію клієнту, не чекаючи на потенційну відповідь клієнта.
Ось код Go (шаблон HTML/JS/CSS буде показано в наступному розділі):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
Наша сторінка "/home" рендерить HTML/CSS/JS сторінку (показано пізніше). Сторінка "/start" отримує POST-запит від JS-додатку, який запускає запит до нашої ШІ-моделі. А наша сторінка "/generate" повертає результат в JS-додаток за допомогою подій, надісланих сервером.
Як тільки функція start() отримує POST-запит від фронтенду, вона автоматично створює підпрограму, яка надсилає запит до нашого FastAPI додатку.
Функція generateText() викликає FastAPI і повертає кожен токен, отриманий "на льоту" через виділений канал (streamedTextCh). Якщо від FastAPI отримано символ EOF, це означає, що генерацію тексту завершено.
Функція generate() є нескінченним циклом, який очікує на нові токени, отримані з каналу streamedTextCh. Як тільки новий токен отримано, він автоматично надсилається на фронтенд як подія, надіслана сервером. Події, що надсилаються сервером, повинні відповідати певному формату, який використовує префікси "event:" та "data:" префікси, звідси і функція formatServerSentEvent().
Для того, щоб SSE був повноцінним, нам потрібен Javascript-клієнт, який може прослуховувати події, що надсилаються сервером, підписавшись на сторінку "generate". Дивіться наступний розділ, щоб зрозуміти, як цього досягти.
Тепер вам потрібно створити каталог "templates" і додати до нього файл "home.html".
Ось вміст файлу "home.html":
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
Як бачите, прослуховувати SSE в браузері досить просто.
Спочатку вам потрібно підписатися на нашу кінцеву точку SSE (сторінка "/generate"). Потім вам потрібно додати слухача подій, який буде читати потокові токени, як тільки вони будуть отримані.
Сучасні браузери автоматично намагаються перепідключитися до джерело події у разі проблем зі з'єднанням.
Тепер ви знаєте, як створити сучасний генеративний AI-додаток, який динамічно потокове передавання тексту в браузері, а-ля ChatGPT!
Як ви помітили, такий додаток не обов'язково простий, оскільки в ньому задіяно кілька шарів. І, звичайно ж, наведений вище код спрощено заради прикладу.
Основна проблема з потоковою передачею токенів полягає в обробці мережевих збоїв. Більшість з них цих збоїв відбуватиметься між бекендом Go і фронтендом Javascript. Вам потрібно вивчити деякі більш просунуті стратегії відновлення з'єднання і переконатися, що помилки належним чином повідомляються в інтерфейс користувача.
Сподіваюся, цей підручник був для вас корисним!
Vincent
Адвокат розробників у NLP Cloud