제너레이티브 모델은 결과를 반환하는 데 시간이 걸리는 경우가 있으므로 토큰 스트리밍을 활용하여 UI에 즉시 결과를 표시하는 것이 흥미롭습니다. 다음은 Go, FastAPI 및 Javascript를 사용하여 LLM을 위한 텍스트 스트리밍 프론트엔드를 구현하는 방법입니다.
토큰은 작은 단어, 단어의 일부 또는 구두점이 될 수 있는 고유한 개체를 말합니다. 평균적으로 1개의 토큰은 4개의 문자로 구성됩니다, 100개의 토큰은 대략 75개의 단어에 해당합니다. 자연어 처리 모델은 텍스트를 처리하기 위해 텍스트를 토큰으로 변환해야 합니다.
텍스트 생성 AI 모델("생성" 모델이라고도 함)을 사용할 경우, 하드웨어와 모델의 크기에 따라 응답 시간이 상당히 길어질 수 있습니다. 예를 들어, fp16의 NVIDIA A100 GPU에 배포된 LLaMA 30B와 같은 대규모 언어 모델("LLM"이라고도 함)의 경우, 이 모델은 약 3초 만에 100개의 토큰을 생성합니다. 따라서 생성 모델이 수백 또는 수천 단어의 큰 텍스트를 생성할 것으로 예상되는 경우 지연 시간이 길어지고 전체 텍스트를 얻기 위해 10초 이상 10초 이상 기다려야 합니다.
응답을 받기 위해 너무 오래 기다리는 것은 사용자 경험 측면에서 문제가 될 수 있습니다. 이 경우 해결책은 토큰 스트리밍입니다!
토큰 스트리밍은 전체 응답이 준비될 때까지 기다리지 않고 모든 새 토큰을 즉시 생성하는 것입니다. 이것이 바로 ChatGPT 앱이나 NLP Cloud ChatDolphin 어시스턴트에서 볼 수 있습니다. 단어는 모델에서 생성되는 즉시 나타납니다. 여기에서 돌핀 AI 어시스턴트를 사용해 보세요.
NLP 클라우드에서 돌핀 어시스턴트를 사용한 토큰 스트리밍. 여기에서 사용해 보세요.
첫 번째 단계는 토큰 스트리밍을 지원하는 추론 엔진을 활용하는 것입니다.
다음은 고려할 수 있는 몇 가지 옵션입니다:
다음은 HuggingFace 생성() 메서드를 사용한 예시입니다:
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
이 예제에서는 GPT-2 모델로 출력을 생성하고 토큰이 도착하는 즉시 콘솔에서 각 토큰을 인쇄합니다.
이제 추론 엔진을 선택했으므로 모델을 제공하고 스트리밍된 토큰을 반환해야 합니다.
모델은 파이썬 환경에서 실행될 가능성이 높으므로 토큰을 반환하려면 파이썬 서버가 필요합니다. 토큰을 반환하고 HTTP API를 통해 사용할 수 있도록 하려면 파이썬 서버가 필요합니다. 이러한 상황에서 FastAPI는 사실상 선택이 되었습니다.
여기서는 각 토큰이 생성되는 즉시 서비스를 제공하기 위해 Uvicorn과 FastAPI의 StreamingResponse를 사용합니다. 다음은 예시입니다:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
다음 cURL 명령으로 스트리밍 서버를 테스트할 수 있습니다:
curl -N localhost:8000
이제 스트리밍된 토큰을 제대로 반환하는 작동하는 AI 모델이 생겼습니다.
브라우저의 클라이언트 애플리케이션에서 스트리밍된 토큰을 직접 읽을 수도 있습니다. 하지만 두 가지 이유로 그렇게 하지 않을 것입니다.
첫째, 매번 모델을 다시 시작하고 싶지 않기 때문에 매번 모델을 다시 시작하고 싶지 않기 때문입니다. 매번 모델을 다시 시작하고 싶지 않기 때문입니다. 최신 제너레이티브 AI 모델은 매우 무겁고 재시작하는 데 몇 분이 걸리는 경우가 많습니다.
두 번째 이유는 파이썬이 반드시 최선의 선택은 아니기 때문입니다. 처리량이 많은 동시 애플리케이션을 구축할 때 파이썬이 반드시 최선의 선택은 아니라는 것입니다. 이 선택 에 대해서는 당연히 논의할 수 있으며 취향의 문제일 수도 있습니다!
위에서 언급했듯이 모델과 최종 클라이언트 사이에 게이트웨이를 추가하는 것이 중요합니다, Go는 이러한 애플리케이션에 적합한 프로그래밍 언어입니다. 프로덕션 환경에서는 Go 게이트웨이와 최종 클라이언트 사이에 리버스 프록시를 추가하고 Go 게이트웨이와 최종 클라이언트 사이에 로드 밸런서를 추가할 수도 있습니다. 모델의 여러 복제본에 부하를 분산시킬 수 있습니다. 하지만 이는 이 글의 범위를 벗어납니다!
Go 애플리케이션은 최종 HTML 페이지 렌더링도 담당합니다.
이 애플리케이션은 FastAPI 앱에 요청을 하고, FastAPI로부터 스트리밍된 토큰을 받은 다음, 각 토큰을 토큰을 프론트엔드로 전달합니다. SSE는 단방향이기 때문에 웹소켓보다 간단합니다. It 잠재적인 클라이언트 응답을 기다리지 않고 정보를 클라이언트에 푸시하는 애플리케이션을 구축하려는 경우 좋은 선택입니다. 클라이언트 응답을 듣지 않고 정보를 푸시하는 애플리케이션을 구축하려는 경우에 적합합니다.
다음은 Go 코드입니다(HTML/JS/CSS 템플릿은 다음 섹션에서 보여드리겠습니다):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
"/home" 페이지는 HTML/CSS/JS 페이지(나중에 표시됨)를 렌더링합니다. "/start" 페이지는 AI 모델에 요청을 트리거하는 JS 애플리케이션으로부터 요청을 받습니다. 그리고 "/generate" 페이지는 서버에서 전송된 이벤트를 통해 JS 앱에 결과를 반환합니다.
start() 함수가 프론트엔드에서 POST 요청을 받으면 자동으로 FastAPI 앱에 요청을 보내는 고루틴을 요청하는 고루틴을 자동으로 생성합니다.
생성 텍스트() 함수는 FastAPI를 호출하고 전용 채널(streamedTextCh)을 통해 즉석에서 수신한 모든 토큰을 반환합니다. FastAPI에서 EOF 문자가 수신되면 텍스트 생성이 끝났다는 의미입니다.
생성() 함수는 스트리밍된 텍스트 채널에서 새 토큰이 수신될 때까지 기다리는 무한 루프입니다. 새 토큰이 수신되면 서버가 보낸 이벤트로 프론트엔드에 자동으로 푸시됩니다. 서버 전송 이벤트는 "event:" 및 "data:" 접두사를 사용하는 특정 형식을 따라야 하므로 formatServerSentEvent() 함수를 사용해야 합니다.
SSE가 완료되려면 '생성' 페이지를 구독하여 서버에서 보낸 이벤트를 수신할 수 있는 자바스크립트 클라이언트가 필요합니다. 이를 달성하는 방법은 다음 섹션을 참조하세요.
이제 "템플릿" 디렉터리를 만들고 그 안에 "home.html" 파일을 추가해야 합니다.
다음은 "home.html"의 내용입니다:
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
보시다시피 브라우저에서 SSE를 듣는 것은 매우 간단합니다.
먼저 SSE 엔드포인트("/generate" 페이지)를 구독해야 합니다. 그런 다음 스트리밍된 토큰을 수신하는 즉시 읽을 이벤트 리스너를 추가해야 합니다. 이벤트 리스너를 추가해야 합니다.
최신 브라우저는 연결 문제가 발생하면 자동으로 재연결을 시도합니다. 이벤트 소스에 자동으로 재연결을 시도합니다.
이제 브라우저에서 텍스트를 동적으로 브라우저에서 텍스트를 동적으로 스트리밍하는 최신 생성형 AI 애플리케이션을 만드는 방법을 알게 되었습니다!
아시다시피 이러한 응용 프로그램은 여러 계층이 관련되어 있기 때문에 반드시 간단하지는 않습니다. 레이어가 관련되어 있기 때문입니다. 물론 위의 코드는 예제를 위해 지나치게 단순화되었습니다. 예시입니다.
토큰 스트리밍의 주요 과제는 네트워크 장애를 처리하는 것입니다. 대부분의 이러한 장애는 Go 백엔드와 자바스크립트 프론트엔드 사이에서 발생합니다. 따라서 몇 가지 고급 재연결 전략을 탐색하고 오류가 UI에 UI에 제대로 보고되는지 확인해야 합니다.
이 튜토리얼이 도움이 되셨기를 바랍니다!
Vincent
NLP 클라우드의 개발자 옹호자