生成模型有时需要一些时间才能返回结果,因此,利用令牌流以便在用户界面中即时看到结果是非常有趣的。以下是如何使用 Go、FastAPI 和 Javascript 为您的 LLM 实现这样的文本流前端。
作为提醒,标记是一个独特的实体,可以是一个小词、词的一部分或标点符号。平均来说,1 个标记由 4 个字符组成、 100 个标记大致相当于 75 个单词。自然语言处理模型需要将文本转化为标记才能进行处理。
在使用文本生成人工智能模型(也称为 "生成 "模型)时,响应时间可能相当长,这取决于硬件和模型的大小。 例如,在英伟达 A100 GPU 的 fp16 中部署 LLaMA 30B 这样的大型语言模型(也称为 "LLM")时,该模型生成 100 个标记大约需要 3 秒钟。 因此,如果您希望您的生成式模型生成数百或数千字的大型文本,那么延迟将会很高,您可能需要等待 超过 10 秒才能得到完整的响应。
从用户体验的角度来看,等待很长时间才能得到回应可能是个问题。这种情况下的解决方案就是令牌流!
令牌流是指即时生成每个新令牌,而不是等待整个响应准备就绪。这就是 例如,你可以在 ChatGPT 应用程序或 NLP Cloud ChatDolphin 助手上看到这一点。当模型生成单词时,单词就会立即出现。 在这里试用海豚人工智能助手。
在 NLP 云上使用 ChatDolphin 助手进行令牌流。 在这里试试。
第一步是利用支持标记流的推理引擎。
以下是您可能需要考虑的一些选择:
下面是一个使用 HuggingFace 生成()方法的示例:
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
在本例中,我们使用 GPT-2 模型生成输出,并在每个标记到达时立即在控制台中打印出来。
既然已经选择了推理引擎,就需要为模型提供服务并返回流式标记。
您的模型很可能在 Python 环境中运行,因此您需要一个 Python 服务器来返回令牌,并通过 HTTP API 并通过 HTTP API 使其可用。在这种情况下,FastAPI 已成为事实上的选择。
在这里,我们使用 Uvicorn 和 FastAPI 的 StreamingResponse,以便在每个令牌生成后立即为其提供服务。下面是一个示例:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
您可以使用以下 cURL 命令测试您的流媒体服务器:
curl -N localhost:8000
我们现在有了一个可以正常工作的人工智能模型,它可以正确返回流代币。
我们可以直接从浏览器中的客户端程序读取这些流式令牌。 但我们不会这么做,原因有两个。
首先,必须将人工智能模型与堆栈的其他部分分离开来。 首先,必须将人工智能模型与堆栈的其他部分解耦,因为我们不希望每次 对应用程序接口稍作改动,我们都不想重启模型。请记住,现代生成式人工智能模型非常重 通常需要几分钟才能重启。
第二个原因是,在构建高吞吐量并发应用程序时,Python 不一定是最好的选择。 不一定是构建高吞吐量并发应用程序的最佳选择。这个选择 当然可以讨论,这也可能是个人喜好的问题!
如上所述,在模型和最终客户端之间添加一个网关非常重要、 Go 是一种很好的编程语言。在生产中,您可能还想在 Go 网关和最终客户端之间添加一个反向 在 Go 网关和最终客户端之间添加一个反向代理,并在 Go 网关和人工智能模型之间添加一个负载平衡器,以便将负载分散到人工智能模型的多个副本上。 将负载分散到模型的多个副本上。但这不在我们文章的讨论范围之内!
我们的 Go 应用程序还将负责渲染最终的 HTML 页面。
该应用程序向 FastAPI 应用程序发出请求,从 FastAPI 接收流式令牌,并使用服务器发送事件(SSE)将每个令牌转发到前端。 使用服务器发送事件(SSE)将每个令牌转发给前端。SSE 比 websockets 更简单,因为它是单向的。它 当您想构建一个向客户端推送信息的应用程序,而不需要监听潜在的客户端响应时,SSE 是一个不错的选择。 客户端响应。
以下是 Go 代码(HTML/JS/CSS 模板将在下一节展示):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
我们的"/home "页面渲染 HTML/CSS/JS 页面(稍后显示)。/start "页面接收来自 JS 应用程序发出的 POST 请求,该请求会触发对我们的人工智能模型的请求。我们的"/generate "页面通过服务器发送的事件将结果返回给 JS 应用程序。
一旦 start() 函数接收到来自前端的 POST 请求,它就会自动创建一个 goroutine,该 goroutine 将请求 到我们的 FastAPI 应用程序。
generateText() 函数调用 FastAPI,并通过专用通道(streamedTextCh)返回实时接收到的每个标记。 如果从 FastAPI 收到 EOF 字符,则表示文本生成结束。
generate() 函数是一个无限循环,它会等待从 streamedTextCh 频道接收到新的令牌。一旦收到新的令牌 就会作为服务器发送的事件自动推送到前端。服务器发送的事件需要遵循特定的格式,使用 "event: "和 "data:" 前缀,因此需要使用 formatServerSentEvent() 函数。
为了完成 SSE,我们需要一个 Javascript 客户端,它能够通过订阅 "生成 "页面来监听服务器发送的事件。 请参阅下一节了解如何实现这一点。
现在,您需要创建一个 "templates "目录,并在其中添加一个 "home.html "文件。
下面是 "home.html "的内容:
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
如您所见,在浏览器中监听 SSE 非常简单。
首先,您需要订阅我们的 SSE 端点("/generate "页面)。 然后,您需要添加一个事件监听器,以便在收到流式令牌后立即读取它们。 事件监听器。
现代浏览器会自动尝试重新连接 如果出现连接问题,会自动尝试重新连接事件源。
您现在知道如何创建一个现代生成式 AI 应用程序,在浏览器中动态地 的现代生成式人工智能应用程序!
正如大家所注意到的,这样的应用程序并不一定简单,因为其中涉及多个 层。当然,为了举例说明,上述代码也过于简化了。 示例。
令牌流的主要挑战在于如何处理网络故障。大多数 这些故障将发生在 Go 后端和 Javascript 前端之间。您 需要探索一些更先进的重新连接策略,并确保将错误 正确报告给用户界面。
希望本教程对您有所帮助!
Vincent
NLP Cloud 开发人员倡导者