Why We Need Streaming Applications with LLMs!
There is something different about LLMs compared to other machine learning models! They take a long time to return the full answer. It can take many seconds. So what do you do then? The solution to that very peculiar problem is to stream the response as it is being generated.
Server-Sent Events
One specificity of LLM applications is the latency in obtaining a full output response from an LLM. It can take a few seconds, and it can be a bad user experience if a user needs to wait that long every time. One strategy to improve the user experience is to start streaming the LLM response before the completion of the text generation. This allows the user to start reading the response right away.
By the time the LLM finishes the response, the user is almost done reading it.
There are a couple of strategies we can use to generate a response from the LLM server. The first one is a typical HTTP request that leads to a response. The problem is that the connection gets closed right away, and it doesn’t enable multiple updates. So simple HTTP requests are not great candidates for enabling streaming capabilities.
Another option is a WebSocket connection. The connection starts with an HTTP request that gets upgraded into a multi-directional channel. As soon as the channel is established, messages can be sent from the client and the server. This is a great option when real users are communicating in a chat because they can send a message at any time. However, LLM applications tend to only respond to a user request, so it is not the best option for enabling streaming applications.
The typical solution is to use Server-Sent Events (SSE). The initial request is initiated by an HTTP request, but streaming SSEs are sending updates to the initial response. It is important to have an API on the client side that can continue to listen for updates after the HTTP request receives its response.
For example, the StreamingResponse class in FastAPI implements the SSE API on the server side.
And the EventSource API is a Javascript implementation to receive SSEs.
Implementing Streaming
Let’s implement a quick streaming FastAPI endpoint. vLLM is a great library for implementing LLM inference and serving endpoints.
For example, I can instantiate the Phi-3-mini-4k-instruct model this way:
from vllm import AsyncLLMEngine, SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
engine = AsyncLLMEngine.from_engine_args(
AsyncEngineArgs(
model='microsoft/Phi-3-mini-4k-instruct',
)
)
sampling_params = SamplingParams(
temperature=0.7,
max_tokens=100
)
And we can iterate through the tokens generated from a prompt:
prompt = "How are you?"
async for output in engine.generate(prompt, sampling_params, 1):
print(output.outputs[0].text)
I'm functioning well, thank you for asking. As an AI assistant, I don't have feelings in the way humans do, but I'm operating as intended and ready to help with any questions or tasks you may have. Is there something specific I can assist you with today?
To be able to use this in a stream endpoint, we need to create a token generator:
import uuid
sampling_params = SamplingParams(
temperature=temperature,
max_tokens=max_tokens
)
async def generate_stream(prompt: str, ):
request_id = str(uuid.uuid4())
async for output in engine.generate(
prompt,
sampling_params,
request_id
):
text = json.dumps({'text': output.outputs[0].text})
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
At every iteration of the decoding process, the generator will yield the new token. Now, we can pass this to a FastAPI endpoint:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI()
class GenerationRequest(BaseModel):
prompt: str
@app.post("/generate-stream")
async def generate_text(request: GenerationRequest):
return StreamingResponse(
generate_stream(request.prompt),
media_type="text/event-stream"
)
To connect to that endpoint on the frontend, we first need to send an HTTP request:
url = 'http://theaiedge.io/generate-stream'
fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ prompt: prompt }),
})
Once we receive a response, we can attach an Event source listener to that endpoint:
url = 'http://theaiedge.io/generate-stream'
function connectToStream(prompt) {
fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ prompt: prompt }),
})
.then(response => {
if (response.ok) {
const eventSource = new EventSource(url);
// Listen for messages
eventSource.onmessage = (event) => {
console.log('Received chunk:', event.data);
};
}
}
And we can replace the eventSource.onmessage
method by any function we want to handle an update to the event source.