Go、FastAPI、JSを使用してLLM用のトークン・ストリーミングUIを開発する方法

生成モデルは結果を返すのに時間がかかることがあるので、トークン・ストリーミングを活用してUIに結果がその場で表示されるようにするのは面白い。ここでは、Go、FastAPI、Javascriptを使って、LLMのテキスト・ストリーミング・フロントエンドを実現する方法を紹介します。

PCの開発者

トークン・ストリーミングとは何か?

注意点として、トークンとは、小さな単語、単語の一部、または句読点のいずれかであるユニークなエンティティです。平均して、1トークンは4文字で構成されています、 100トークンは75ワードにほぼ相当します。自然言語処理モデルは、テキストを処理するためにテキストをトークンに変換する必要があります。

テキスト生成AIモデル(「生成」モデルとも呼ばれる)を使用する場合、ハードウェアやモデルのサイズによっては、応答時間が非常に長くなることがあります。 例えば、LLaMA 30Bのような大規模な言語モデル(「LLM」とも呼ばれる)をNVIDIA A100 GPUにfp16でデプロイした場合、このモデルは100個のトークンを約3秒で生成します。 そのため、生成モデルが数百語から数千語の大きなテキストを生成することを期待する場合、レイテンシは高くなり、完全なテキストを得るには10秒以上待つ必要があります。 秒以上待つ必要がある。

レスポンスを得るために長い時間待つことは、ユーザーエクスペリエンスの観点から問題があります。その場合の解決策がトークン・ストリーミングだ!

トークン・ストリーミングとは、レスポンス全体の準備ができるのを待つのではなく、新しいトークンをその場で生成することである。これは ChatGPTアプリやNLP Cloud ChatDolphinアシスタントなどで見ることができます。単語はモデルによって生成されるとすぐに表示されます。 ドルフィンのAIアシスタントを試してみる

NLPクラウド上のChatDolphinによるトークン・ストリーミング NLPクラウド上のChatDolphinアシスタントによるトークンストリーミング。 ここで試してみよう。

トークン・ストリーミングをサポートする推論エンジンの選択

最初のステップは、トークン・ストリーミングをサポートする推論エンジンを活用することだ。

以下は、あなたが検討したいかもしれないいくつかのオプションです:

以下は、HuggingFace generate()メソッドを使った例である:

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

この例では、GPT-2モデルで出力を生成し、各トークンが到着するとすぐにコンソールに表示します。

FastAPIによるレスポンスのストリーミング

推論エンジンを選んだので、モデルを処理してストリームされたトークンを返す必要がある。

あなたのモデルはPython環境で実行される可能性が高いので、トークンを返すためにPythonサーバーが必要になります。 を返し、HTTP APIで利用できるようにするためにPythonサーバーが必要になります。FastAPI は、そのような状況での事実上の選択肢となっています。

ここでは、UvicornとFastAPIのStreamingResponseを使って、トークンが生成されるとすぐに各トークンを提供する。以下はその例です:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

以下のcURLコマンドを使って、ストリーミング・サーバーをテストすることができる:

curl -N localhost:8000

私たちは今、ストリームされたトークンを適切に返す、動作するAIモデルを手に入れた。

ブラウザーのクライアント・アプリケーションから、ストリームされたトークンを直接読み取ることもできる。 しかし、ここでは2つの理由から、そのようなことはしない。

まず、AIモデルを他のスタックから切り離すことが重要だ。 なぜなら、APIにちょっとした変更を加えるたびにモデルを再起動させたくないからだ。 なぜなら、APIにちょっとした変更を加えるたびにモデルを再起動させたくないからだ。最近の生成AIモデルは非常に重く、再起動に数分かかることが多い。 再起動に数分かかることが多い。

第二の理由は、我々がやろうとしているような高スループットの並行処理アプリケーションを構築する場合、Pythonは必ずしも最適な選択ではないということだ。 ということです。この選択 はもちろん議論できるし、好みの問題かもしれない!

Goゲートウェイを介したトークンの転送

上述したように、モデルと最終的なクライアントの間にゲートウェイを追加することは重要だ、 Goはそのようなアプリケーションに適したプログラミング言語です。Goはこのようなアプリケーションに適したプログラミング言語です。 プロキシを追加し、GoゲートウェイとAIモデルの間にロードバランサーを追加するのもよいでしょう。 モデルの複数のレプリカに負荷を分散させるためです。しかし、それはこの記事の範囲外です!

私たちのGoアプリケーションは、最終的なHTMLページのレンダリングも担当する。

このアプリケーションは FastAPI アプリにリクエストを行い、FastAPI からストリーム トークンを受信し、サーバー送信イベント(SSE)を使用して各トークンをフロントエンドに転送します。 Server Sent Events (SSE) を使用して各トークンをフロントエンドに転送します。SSE は一方向なので、ウェブソケットより単純です。これは クライアントの潜在的な応答をリッスンすることなく、クライアントに情報をプッシュするアプリケーションを構築する場合に適しています。 プッシュするアプリケーションを構築したい場合に適しています。

これがGoコードです(HTML/JS/CSSテンプレートは次のセクションで紹介します):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

私たちの「/home」ページは、HTML/CSS/JSページ(後述)をレンダリングする。start "ページは、JSアプリケーションからPOSTリクエストを受け取る。 JSアプリケーションからのPOSTリクエストを受け取ります。そして、"/generate "ページは、サーバーから送られたイベントを通してJSアプリに結果を返します。

start() 関数がフロントエンドから POST リクエストを受信すると、 FastAPI アプリにリクエストを送信するゴルーチンを 自動的に作成します。 を作成します。

generateText() 関数は FastAPI を呼び出し、専用チャネル (streamedTextCh) を介してオンザフライで受信したすべてのトークンを返します。 FastAPI から EOF 文字を受信した場合は、テキスト生成が終了したことを意味します。

generate() 関数は、streamedTextCh チャネルから受信した新しいトークンを待機する無限ループです。新しいトークンが受信されると として自動的にフロントエンドにプッシュされます。サーバー送信イベントは、"event: "と "data: "という接頭辞を使った特定のフォーマットに従う必要がある。 そのため、formatServerSentEvent() 関数が用意されています。

SSEを完成させるためには、"generate "ページを購読することによって、サーバーから送られるイベントをリッスンできるJavascriptクライアントが必要である。 それを実現する方法については、次のセクションを参照してください。

ブラウザのJavascriptでトークンを受け取る

templates "ディレクトリを作成し、その中に "home.html "ファイルを追加する必要がある。

以下は "home.html "の内容である:

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

ご覧のように、ブラウザでSSEを聴くのはとても簡単だ。

まず、SSEエンドポイント("/generate "ページ)をサブスクライブする必要があります。 次に、ストリームされたトークンを受信したらすぐに読み込むイベント・リスナーを追加する必要があります。 を追加する必要があります。

最近のブラウザは自動的に再接続を試みます を試みます。

結論

これで、ブラウザ上でテキストを動的にストリームする最新の生成AIアプリケーションの作り方がわかりました。 を作成する方法がわかりました!

お気づきのように、このようなアプリケーションは必ずしも単純ではない。 層が関与している。そしてもちろん、上記のコードは例のために単純化しすぎている。 のために単純化しすぎています。

トークン・ストリーミングの主な課題は、ネットワーク障害への対応だ。そのほとんどは これらの障害は、GoバックエンドとJavascriptフロントエンドの間で発生します。そのため より高度な再接続戦略を検討し、エラーがUIに適切に報告されるようにする必要があります。 UIに適切に報告されるようにする必要があります。

このチュートリアルがお役に立てば幸いです!

Vincent
NLPクラウドのデベロッパー・アドボケイト