Τα παραγωγικά μοντέλα χρειάζονται μερικές φορές αρκετό χρόνο για να επιστρέψουν ένα αποτέλεσμα, οπότε είναι ενδιαφέρον να αξιοποιήσουμε τη ροή συμβόλων για να δούμε το αποτέλεσμα να εμφανίζεται στο UI. Δείτε πώς μπορείτε να επιτύχετε ένα τέτοιο frontend ροής κειμένου για το LLM σας με Go, FastAPI και Javascript.
Ως υπενθύμιση, ένα token είναι μια μοναδική οντότητα που μπορεί να είναι είτε μια μικρή λέξη, είτε μέρος μιας λέξης, είτε σημείο στίξης. Κατά μέσο όρο, 1 token αποτελείται από 4 χαρακτήρες, και 100 μάρκες αντιστοιχούν περίπου σε 75 λέξεις. Τα μοντέλα επεξεργασίας φυσικής γλώσσας πρέπει να μετατρέψουν το κείμενό σας σε tokens προκειμένου να το επεξεργαστούν.
Όταν χρησιμοποιείτε ένα μοντέλο τεχνητής νοημοσύνης παραγωγής κειμένου (επίσης γνωστό ως "παραγωγικό" μοντέλο), ο χρόνος απόκρισης μπορεί να είναι αρκετά υψηλός, ανάλογα με το υλικό σας και το μέγεθος του μοντέλου σας. Για παράδειγμα, στην περίπτωση ενός μεγάλου γλωσσικού μοντέλου (γνωστού και ως "LLM") όπως το LLaMA 30B, που αναπτύσσεται σε μια GPU NVIDIA A100 σε fp16, το μοντέλο παράγει 100 tokens σε περίπου 3 δευτερόλεπτα. Έτσι, αν περιμένετε από το παραγωγικό σας μοντέλο να δημιουργήσει ένα μεγάλο κομμάτι κειμένου εκατοντάδων ή χιλιάδων λέξεων, η καθυστέρηση θα είναι υψηλή και θα πρέπει να περιμένετε ίσως περισσότερο από 10 δευτερόλεπτα για να λάβετε την πλήρη απάντηση.
Η αναμονή τόσο πολύ για να λάβετε μια απάντηση μπορεί να είναι πρόβλημα από την άποψη της εμπειρίας του χρήστη. Η λύση σε αυτή την περίπτωση είναι το token streaming!
Το Token streaming αφορά τη δημιουργία κάθε νέου token on the fly αντί να περιμένετε να είναι έτοιμη ολόκληρη η απάντηση. Αυτό είναι που σας μπορείτε να δείτε στην εφαρμογή ChatGPT ή στον βοηθό NLP Cloud ChatDolphin, για παράδειγμα. Οι λέξεις εμφανίζονται αμέσως μόλις δημιουργηθούν από το μοντέλο. Δοκιμάστε τον βοηθό τεχνητής νοημοσύνης ChatDolphin εδώ.
Ροή Token με τον βοηθό ChatDolphin στο NLP Cloud. Δοκιμάστε το εδώ.
Το πρώτο βήμα θα είναι να αξιοποιήσετε μια μηχανή εξαγωγής συμπερασμάτων που υποστηρίζει τη ροή συμβόλων.
Ακολουθούν ορισμένες επιλογές που ίσως θέλετε να εξετάσετε:
Ακολουθεί ένα παράδειγμα που χρησιμοποιεί τη μέθοδο generate() της HuggingFace:
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
print(new_text)
Σε αυτό το παράδειγμα, δημιουργούμε μια έξοδο με το μοντέλο GPT-2 και εκτυπώνουμε κάθε token στην κονσόλα μόλις φτάσει.
Τώρα που έχετε επιλέξει μια μηχανή εξαγωγής συμπερασμάτων, θα πρέπει να εξυπηρετήσετε το μοντέλο σας και να επιστρέψετε τα tokens της ροής.
Το μοντέλο σας πιθανότατα θα εκτελείται σε περιβάλλον Python, οπότε θα χρειαστείτε έναν διακομιστή Python για να επιστρέψετε τα tokens. και να τα καταστήσετε διαθέσιμα μέσω ενός API HTTP. Το FastAPI έχει γίνει μια de facto επιλογή για τέτοιες καταστάσεις.
Εδώ χρησιμοποιούμε το Uvicorn και το StreamingResponse της FastAPI για να εξυπηρετήσουμε κάθε token μόλις δημιουργηθεί. Ακολουθεί ένα παράδειγμα:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
app = FastAPI()
async def generate():
inputs = tokenizer(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextIteratorStreamer(tokenizer)
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in streamer:
yield new_text
@app.get("/")
async def main():
return StreamingResponse(generate())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Μπορείτε να δοκιμάσετε τον διακομιστή ροής χάρη στην ακόλουθη εντολή cURL:
curl -N localhost:8000
Τώρα έχουμε ένα λειτουργικό μοντέλο AI που επιστρέφει σωστά τα streamed tokens.
Θα μπορούσαμε να διαβάσουμε απευθείας αυτά τα streamed tokens από μια εφαρμογή-πελάτη σε ένα πρόγραμμα περιήγησης. Αλλά δεν πρόκειται να το κάνουμε αυτό, για 2 λόγους.
Πρώτον, είναι σημαντικό να αποσυνδεθούν το μοντέλο ΤΝ από την υπόλοιπη στοίβα, επειδή δεν θέλουμε να επανεκκινούμε το μοντέλο κάθε φορά που πρόκειται να κάνουμε μια μικρή αλλαγή στο API. Λάβετε υπόψη σας ότι τα σύγχρονα παραγωγικά μοντέλα τεχνητής νοημοσύνης είναι πολύ βαριά και συχνά χρειάζονται αρκετά λεπτά για να επανεκκινήσουν.
Ένας δεύτερος λόγος είναι ότι η Python δεν είναι απαραίτητα η καλύτερη επιλογή όταν πρόκειται για τη δημιουργία μιας ταυτόχρονης εφαρμογής υψηλής απόδοσης, όπως αυτή που πρόκειται να κάνουμε. Αυτή η επιλογή μπορεί να συζητηθεί φυσικά και μπορεί να είναι και θέμα γούστου!
Όπως αναφέρθηκε παραπάνω, η προσθήκη μιας πύλης μεταξύ του μοντέλου σας και του τελικού σας πελάτη είναι σημαντική, και η Go είναι μια καλή γλώσσα προγραμματισμού για μια τέτοια εφαρμογή. Στην παραγωγή, μπορεί επίσης να θέλετε να προσθέσετε μια αντίστροφη μεσολάβησης μεταξύ της πύλης Go και του τελικού πελάτη και έναν εξισορροπιστή φορτίου μεταξύ της πύλης Go και του μοντέλου AI, προκειμένου να να κατανέμετε το φορτίο σε διάφορα αντίγραφα του μοντέλου σας. Αλλά αυτό είναι εκτός του πεδίου εφαρμογής του άρθρου μας!
Η εφαρμογή μας Go θα είναι επίσης υπεύθυνη για την απόδοση της τελικής σελίδας HTML.
Αυτή η εφαρμογή κάνει ένα αίτημα στην εφαρμογή FastAPI, λαμβάνει τα streamed tokens από το FastAPI και προωθεί κάθε ένα από αυτά. token στο frontend χρησιμοποιώντας Server Sent Events (SSE). Το SSE είναι απλούστερο από τα websockets επειδή είναι μονόδρομος. Είναι είναι μια καλή επιλογή όταν θέλετε να δημιουργήσετε μια εφαρμογή που να στέλνει πληροφορίες σε έναν πελάτη, χωρίς να ακούει ένα πιθανό απάντηση του πελάτη.
Εδώ είναι ο κώδικας Go (το πρότυπο HTML/JS/CSS θα παρουσιαστεί στην επόμενη ενότητα):
package main
import (
"bufio"
"fmt"
"html/template"
"io"
"log"
"net/http"
"strings"
)
var (
templates *template.Template
streamedTextCh chan string
)
func init() {
// Parse all templates in the templates folder.
templates = template.Must(template.ParseGlob("templates/*.html"))
streamedTextCh = make(chan string)
}
// generateText calls FastAPI and returns every token received on the fly through
// a dedicated channel (streamedTextCh).
// If the EOF character is received from FastAPI, it means that text generation is over.
func generateText(streamedTextCh chan<- string) {
var buf io.Reader = nil
req, err := http.NewRequest("GET", "http://127.0.0.1:8000", buf)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
outerloop:
for {
chunk, err := reader.ReadBytes('\x00')
if err != nil {
if err == io.EOF {
break outerloop
}
log.Println(err)
break outerloop
}
output := string(chunk)
streamedTextCh <- output
}
}
// formatServerSentEvent creates a proper SSE compatible body.
// Server sent events need to follow a specific formatting that
// uses "event:" and "data:" prefixes.
func formatServerSentEvent(event, data string) (string, error) {
sb := strings.Builder{}
_, err := sb.WriteString(fmt.Sprintf("event: %s\n", event))
if err != nil {
return "", err
}
_, err = sb.WriteString(fmt.Sprintf("data: %v\n\n", data))
if err != nil {
return "", err
}
return sb.String(), nil
}
// generate is an infinite loop that waits for new tokens received
// from the streamedTextCh. Once a new token is received,
// it is automatically pushed to the frontend as a server sent event.
func generate(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
for text := range streamedTextCh {
event, err := formatServerSentEvent("streamed-text", text)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
_, err = fmt.Fprint(w, event)
if err != nil {
http.Error(w, "Cannot format SSE message", http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
// start starts an asynchronous request to the AI engine.
func start(w http.ResponseWriter, r *http.Request) {
go generateText(streamedTextCh)
}
func home(w http.ResponseWriter, r *http.Request) {
if err := templates.ExecuteTemplate(w, "home.html", nil); err != nil {
log.Println(err.Error())
http.Error(w, "", http.StatusInternalServerError)
return
}
}
func main() {
http.HandleFunc("/generate", generate)
http.HandleFunc("/start", start).Methods("POST")
http.HandleFunc("/", home).Methods("GET")
log.Fatal(http.ListenAndServe(":8000", r))
}
Η σελίδα μας "/home" αποδίδει τη σελίδα HTML/CSS/JS (που παρουσιάζεται αργότερα). Η σελίδα "/start" λαμβάνει ένα αίτημα POST από το JS εφαρμογή που ενεργοποιεί ένα αίτημα στο μοντέλο AI μας. Και η σελίδα μας "/generate" επιστρέφει το αποτέλεσμα στην εφαρμογή JS μέσω γεγονότων που αποστέλλονται από τον διακομιστή.
Μόλις η συνάρτηση start() λάβει ένα αίτημα POST από το frontend, δημιουργεί αυτόματα μια goroutine που θα κάνει ένα αίτημα στην εφαρμογή FastAPI.
Η συνάρτηση generateText() καλεί το FastAPI και επιστρέφει κάθε token που λαμβάνεται εν κινήσει μέσω ενός ειδικού καναλιού (streamedTextCh). Εάν ληφθεί ο χαρακτήρας EOF από το FastAPI, αυτό σημαίνει ότι η παραγωγή κειμένου έχει τελειώσει.
Η συνάρτηση generate() είναι ένας ατέρμονος βρόχος που περιμένει για νέα tokens που λαμβάνονται από το κανάλι streamedTextCh. Μόλις ληφθεί ένα νέο κουπόνι, μεταφέρεται αυτόματα στο frontend ως γεγονός που αποστέλλεται από τον διακομιστή. Τα γεγονότα που αποστέλλονται από τον διακομιστή πρέπει να ακολουθούν μια συγκεκριμένη μορφοποίηση που χρησιμοποιεί τα "event:" και "data:" προθέματα, εξ ου και η συνάρτηση formatServerSentEvent().
Για να είναι ολοκληρωμένο το SSE, χρειαζόμαστε ένα πρόγραμμα-πελάτη Javascript που να είναι σε θέση να ακούει τα γεγονότα που στέλνει ο διακομιστής, εγγραφόμενος στη σελίδα "generate". Δείτε την επόμενη ενότητα για να καταλάβετε πώς θα το πετύχετε αυτό.
Τώρα πρέπει να δημιουργήσετε έναν κατάλογο "templates" και να προσθέσετε ένα αρχείο "home.html" μέσα σε αυτόν.
Εδώ είναι το περιεχόμενο του "home.html":
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Our Streamed Tokens App</title>
</head>
<body>
<div id="response-section"></div>
<form method="POST">
<button onclick="start()">Start</button>
</form>
</body>
<script>
// Disable the default behavior of the HTML form.
document.querySelector('form').addEventListener('submit', function(e) {
e.preventDefault()
})
// Make a request to the /start to trigger the request to the AI model.
async function start() {
try {
const response = await fetch("/start", {
method: "POST",
})
} catch (error) {
console.error("Error when starting process:", error)
}
}
// Listen to SSE by subscribing to the /generate page, and
// put the result in the #response-section div.
const evtSource = new EventSource("generate")
evtSource.addEventListener("streamed-text", (event) => {
document.getElementById('response-section').innerHTML = event.data
})
</script>
</html>
Όπως μπορείτε να δείτε, η ακρόαση του SSE στο πρόγραμμα περιήγησης είναι αρκετά απλή.
Πρώτα πρέπει να εγγραφείτε στο τελικό σημείο SSE (η σελίδα "/generate"). Στη συνέχεια, πρέπει να προσθέσετε έναν ακροατή συμβάντων που θα διαβάζει τα streamed tokens μόλις λαμβάνονται.
Τα σύγχρονα προγράμματα περιήγησης προσπαθούν αυτόματα να επανασυνδεθούν την πηγή συμβάντων σε περίπτωση προβλημάτων σύνδεσης.
Τώρα ξέρετε πώς να δημιουργήσετε μια σύγχρονη γεννητική εφαρμογή τεχνητής νοημοσύνης που δυναμικά ροή κειμένου στο πρόγραμμα περιήγησης, αλά ChatGPT!
Όπως παρατηρήσατε, μια τέτοια εφαρμογή δεν είναι απαραίτητα απλή, καθώς αρκετές επίπεδα εμπλέκονται. Και φυσικά ο παραπάνω κώδικας είναι υπεραπλουστευμένος για χάρη της παραδείγματος.
Η κύρια πρόκληση με το token streaming είναι ο χειρισμός των αποτυχιών του δικτύου. Τα περισσότερα από τα αυτών των αποτυχιών θα συμβούν μεταξύ του Go backend και του Javascript frontend. Θα πρέπει να εξερευνήσετε κάποιες πιο προηγμένες στρατηγικές επανασύνδεσης και να βεβαιωθείτε ότι τα σφάλματα είναι αναφερθούν σωστά στο περιβάλλον εργασίας.
Ελπίζω να βρήκατε αυτό το σεμινάριο χρήσιμο!
Vincent
Συνήγορος προγραμματιστών στο NLP Cloud