正在为人工智能或全栈开发而苦恼?我们的专家将为您提供指导:量身定制的建议、技术整合等。联系我们 [email protected].

如何使用 Go、FastAPI 和 JS 为您的 LLM 开发令牌流用户界面

生成模型有时需要一些时间才能返回结果,因此,利用令牌流以便在用户界面中即时看到结果是非常有趣的。以下是如何使用 Go、FastAPI 和 Javascript 为您的 LLM 实现这样的文本流前端。

PC 上的开发者

什么是令牌流?

作为提醒,标记是一个独特的实体,可以是一个小词、词的一部分或标点符号。平均来说,1 个标记由 4 个字符组成、 100 个标记大致相当于 75 个单词。自然语言处理模型需要将文本转化为标记才能进行处理。

在使用文本生成人工智能模型(也称为 "生成 "模型)时,响应时间可能相当长,这取决于硬件和模型的大小。 例如,在英伟达 A100 GPU 的 fp16 中部署 LLaMA 30B 这样的大型语言模型(也称为 "LLM")时,该模型生成 100 个标记大约需要 3 秒钟。 因此,如果您希望您的生成式模型生成数百或数千字的大型文本,那么延迟将会很高,您可能需要等待 超过 10 秒才能得到完整的响应。

从用户体验的角度来看,等待很长时间才能得到回应可能是个问题。这种情况下的解决方案就是令牌流!

令牌流是指即时生成每个新令牌,而不是等待整个响应准备就绪。这就是 例如,你可以在 ChatGPT 应用程序或 NLP Cloud ChatDolphin 助手上看到这一点。当模型生成单词时,单词就会立即出现。 在这里试用海豚人工智能助手。

在 NLP 云上使用 ChatDolphin 进行令牌流式传输 在 NLP 云上使用 ChatDolphin 助手进行令牌流。 在这里试试。

选择支持令牌流的推理引擎

第一步是利用支持标记流的推理引擎。

以下是您可能需要考虑的一些选择:

下面是一个使用 HuggingFace 生成()方法的示例:

from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)

# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
    print(new_text)

在本例中,我们使用 GPT-2 模型生成输出,并在每个标记到达时立即在控制台中打印出来。

使用 FastAPI 进行流式响应

既然已经选择了推理引擎,就需要为模型提供服务并返回流式标记。

您的模型很可能在 Python 环境中运行,因此您需要一个 Python 服务器来返回令牌,并通过 HTTP API 并通过 HTTP API 使其可用。在这种情况下,FastAPI 已成为事实上的选择。

在这里,我们使用 Uvicorn 和 FastAPI 的 StreamingResponse,以便在每个令牌生成后立即为其提供服务。下面是一个示例:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")

app = FastAPI()

async def generate():
    inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
    streamer = TextIteratorStreamer(tokenizer)
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    for new_text in streamer:
        yield new_text

@app.get("/")
async def main():
    return StreamingResponse(generate())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

您可以使用以下 cURL 命令测试您的流媒体服务器:

curl -N localhost:8000

我们现在有了一个可以正常工作的人工智能模型,它可以正确返回流代币。

我们可以直接从浏览器中的客户端程序读取这些流式令牌。 但我们不会这么做,原因有两个。

首先,必须将人工智能模型与堆栈的其他部分分离开来。 首先,必须将人工智能模型与堆栈的其他部分解耦,因为我们不希望每次 对应用程序接口稍作改动,我们都不想重启模型。请记住,现代生成式人工智能模型非常重 通常需要几分钟才能重启。

第二个原因是,在构建高吞吐量并发应用程序时,Python 不一定是最好的选择。 不一定是构建高吞吐量并发应用程序的最佳选择。这个选择 当然可以讨论,这也可能是个人喜好的问题!

通过 Go 网关转发令牌

如上所述,在模型和最终客户端之间添加一个网关非常重要、 Go 是一种很好的编程语言。在生产中,您可能还想在 Go 网关和最终客户端之间添加一个反向 在 Go 网关和最终客户端之间添加一个反向代理,并在 Go 网关和人工智能模型之间添加一个负载平衡器,以便将负载分散到人工智能模型的多个副本上。 将负载分散到模型的多个副本上。但这不在我们文章的讨论范围之内!

我们的 Go 应用程序还将负责渲染最终的 HTML 页面。

该应用程序向 FastAPI 应用程序发出请求,从 FastAPI 接收流式令牌,并使用服务器发送事件(SSE)将每个令牌转发到前端。 使用服务器发送事件(SSE)将每个令牌转发给前端。SSE 比 websockets 更简单,因为它是单向的。它 当您想构建一个向客户端推送信息的应用程序,而不需要监听潜在的客户端响应时,SSE 是一个不错的选择。 客户端响应。

以下是 Go 代码(HTML/JS/CSS 模板将在下一节展示):

package main

import (
    "bufio"
    "fmt"
    "html/template"
    "io"
    "log"
    "net/http"
    "strings"
)

var (
    templates      *template.Template
    streamedTextCh chan string
)

func init() {
    // Parse all templates in the templates folder.
    templates = template.Must(template.ParseGlob("templates/*.html"))

    streamedTextCh = make(chan string)
}

// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
    var buf io.Reader = nil

    req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
    if err != nil {
        log.Fatal(err)
    }

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    reader := bufio.NewReader(resp.Body)

outerloop:
    for {
        chunk, err := reader.ReadBytes('\x00')
        if err != nil {
            if err == io.EOF {
                break outerloop
            }
            log.Println(err)
            break outerloop
        }

        output := string(chunk)

        streamedTextCh <- output
    }
}

// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
    sb := strings.Builder{}

    _, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
    if err != nil {
        return "", err
    }
    _, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
    if err != nil {
        return "", err
    }

    return sb.String(), nil
}

// generate is an infinite loop that waits for new tokens received 
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event. 
func generate(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "SSE not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")

    for text := range streamedTextCh {
        event, err := formatServerSentEvent("streamed-text", text)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        _, err = fmt.Fprint(w, event)
        if err != nil {
            http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
            return
        }

        flusher.Flush()
    }
}

// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
    go generateText(streamedTextCh)
}

func home(w http.ResponseWriter, r *http.Request) {
    if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
        log.Println(err.Error())
        http.Error(w, "", http.StatusInternalServerError)
        return
    }
}

func main() {
    http.HandleFunc("/generate", generate)
    http.HandleFunc("/start", start).Methods("POST")
    http.HandleFunc("/", home).Methods("GET")

    log.Fatal(http.ListenAndServe(":8000", r))
}                

我们的"/home "页面渲染 HTML/CSS/JS 页面(稍后显示)。/start "页面接收来自 JS 应用程序发出的 POST 请求,该请求会触发对我们的人工智能模型的请求。我们的"/generate "页面通过服务器发送的事件将结果返回给 JS 应用程序。

一旦 start() 函数接收到来自前端的 POST 请求,它就会自动创建一个 goroutine,该 goroutine 将请求 到我们的 FastAPI 应用程序。

generateText() 函数调用 FastAPI,并通过专用通道(streamedTextCh)返回实时接收到的每个标记。 如果从 FastAPI 收到 EOF 字符,则表示文本生成结束。

generate() 函数是一个无限循环,它会等待从 streamedTextCh 频道接收到新的令牌。一旦收到新的令牌 就会作为服务器发送的事件自动推送到前端。服务器发送的事件需要遵循特定的格式,使用 "event: "和 "data:" 前缀,因此需要使用 formatServerSentEvent() 函数。

为了完成 SSE,我们需要一个 Javascript 客户端,它能够通过订阅 "生成 "页面来监听服务器发送的事件。 请参阅下一节了解如何实现这一点。

在浏览器中使用 Javascript 接收令牌

现在,您需要创建一个 "templates "目录,并在其中添加一个 "home.html "文件。

下面是 "home.html "的内容:

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Our Streamed Tokens App</title>
</head>
<body>
    <div id="response-section"></div>    
    <form method="POST">
        <button onclick="start()">Start</button>
    </form>
</body>
<script>
    // Disable the default behavior of the HTML form.
    document.querySelector('form').addEventListener('submit', function(e) {
        e.preventDefault()
    })

    // Make a request to the /start to trigger the request to the AI model.
    async function start() {
        try {
            const response = await fetch("/start", {
            method: "POST",
            })
        } catch (error) {
            console.error("Error when starting process:", error)
        }
    }

    // Listen to SSE by subscribing to the /generate page, and
    // put the result in the #response-section div.
    const evtSource = new EventSource("generate")
    evtSource.addEventListener("streamed-text", (event) => {
        document.getElementById('response-section').innerHTML = event.data
    })
</script>
</html>

如您所见,在浏览器中监听 SSE 非常简单。

首先,您需要订阅我们的 SSE 端点("/generate "页面)。 然后,您需要添加一个事件监听器,以便在收到流式令牌后立即读取它们。 事件监听器。

现代浏览器会自动尝试重新连接 如果出现连接问题,会自动尝试重新连接事件源。

结论

您现在知道如何创建一个现代生成式 AI 应用程序,在浏览器中动态地 的现代生成式人工智能应用程序!

正如大家所注意到的,这样的应用程序并不一定简单,因为其中涉及多个 层。当然,为了举例说明,上述代码也过于简化了。 示例。

令牌流的主要挑战在于如何处理网络故障。大多数 这些故障将发生在 Go 后端和 Javascript 前端之间。您 需要探索一些更先进的重新连接策略,并确保将错误 正确报告给用户界面。

希望本教程对您有所帮助!

Vincent
NLP Cloud 开发人员倡导者