Swift: Streaming OpenAI API Response (Chunked Encoding Transfer)
Streaming Chunked encoded data is useful when large amounts of data is expected to receive from an URLRequest. For example, when we request a completion
from the OpenAI, we might want to start updating our UI with the partial response before the entire completion is generated.
In this article, we will be taking a look at streaming and handling Chunked Encoded data. Specifically, we will be taking a look at
- how we can receive our response as an actual stream (Not the data that is simply streamed-looking)
- Reading and parsing streams (data chunk)
- Update UI on each Data chunk received
We will be using OpenAI completion API with streaming as an example but the same idea applies for any endpoints that have Transfer-Encoding enabled.
By the end of this article, we will have something like following!
I bet you can ask ChatGPT to do something more interesting than just saying This is a test
, but you get the idea!
Set Up Models for Sending Request
Before we move onto actually sending the request and streaming the response, let’s create couple structs
for sending our request to OpenAI. The format of the request body for Completion API can be found in the official document.
Here is what we have for curl.
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Say this is a test!"}],
"temperature": 0.7
}'
Translate the body json to Swift struct
.
// Request models
struct CompletionRequest: Codable {
var model: String = "gpt-4"
var stream: Bool = true
var messages: [Message]
}
struct Message: Codable {
var role: Role
var content: String
}
enum Role: String, Codable {
case user
case assistant
case system
}
RequestManager
This will be class that is responsible for handling our URLRequests as well as updating our UI.
Here is what we have as a starting point. Remember to replace the API_KEY_PLACEHOLDER
with that of yours!
import SwiftUI
class RequestManager: ObservableObject {
private let url = "https://api.openai.com/v1/chat/completions"
private let API_KEY = "API_KEY_PLACEHOLDER"
enum ServiceError: Error {
case urlCreation
case timeout
case parsing
case badRequest
}
private func createRequest() throws -> URLRequest {
guard let url = URL(string: url) else { throw ServiceError.urlCreation }
var request = URLRequest(url: url)
request.httpMethod = "POST"
let headers: [String: String] = [
"Content-Type": "application/json",
"Authorization": "Bearer \(API_KEY)",
"Accept": "*/*"
]
request.allHTTPHeaderFields = headers
let requestStruct = CompletionRequest(
messages: [
Message(
role: .user,
content: "Say this is a test for 10 times!"
)
]
)
let jsonData = try JSONEncoder().encode(requestStruct)
request.httpBody = jsonData
return request
}
}
What we have above is simply creating a URLRequest
like we always do! Nothing special!
I will simply ask GPT to say This is a test
for 10 times (as I have showed you in the beginning) but obviously, you can also take in a user input and send it as your content.
Send Request with URLSession
MOST important takeaway here!!!
Use URLSession.shared.bytes
when sending request! Either (for request: URLRequest)
, or (from url: URL)
. This function returns a byte stream that conforms to AsyncSequence
protocol
and can be totally based on based on async/await so there is not need to implement any delegation methods.
Let’s add the following function to our RequestManager
and give it a try!
func sendCompletionRequest() async throws {
let request = try createRequest()
var asyncBytes: URLSession.AsyncBytes!
var response: URLResponse!
do {
(asyncBytes, response) = try await URLSession.shared.bytes(for: request)
} catch {
throw ServiceError.timeout
}
guard let response = response as? HTTPURLResponse else { throw ServiceError.badRequest } //unknwonError
let statusCode = response.statusCode
if !(200...300 ~= statusCode ) {
throw ServiceError.badRequest
}
for try await line in asyncBytes.lines {
print("raw: --------")
print(line)
}
}
I have created a simple Button just to trigger the function.
import SwiftUI
struct StreamingRequestDemo: View {
@StateObject var requestManager = RequestManager()
var body: some View {
Button(action: {
Task {
do {
try await requestManager.sendCompletionRequest()
} catch(let error) {
print("error: \(error)")
}
}
}, label: {
Text("Send [Say Test * 10] Request!")
})
}
}
And here what gets printed to our terminal.
Yeah! Streaming!
Important Note!
URLSession.shared.data
does NOT stream!
Give the following a try
let (data, response) = try await URLSession.shared.data(for: request)
let dataString = String(data: data, encoding: .utf8)
print(dataString)
and you will see the dataString
is indeed streamed-looking, ie:
BUT all of the data is actually printed out as a single chunk, ie: it is just streamed-looking but not actually streamed!
It might be hard to recognize for short responses, try change the prompt to something like “Say this is a test for 30 times!”
instead!
Parsing Lines
We are ready to parse our server-sent events. Note that those events are non-trivial and should be done with caution. Simple strategies like splitting by a new line may result in parsing errors. I have done some researches over the existing libraries and here is what I have for you!
Data Structure Overview
Let first take a more detailed look at the response structure we get.
For each line
(data chunk) in asyncBytes.lines
, we have a general structure that looks like below.
data:
{"id":"chatcmpl-9NV51DcBlWbvUaijuoSqdYXK0wVMJ",
"object":"chat.completion.chunk",
"created":1715388043,
"model":"gpt-4-0613",
"system_fingerprint":null,
"choices":[{"index":0,"delta":{"content":".)"},"logprobs":null,"finish_reason":"stop"}]
}
However, delta
can also be an empty object, ie: “delta”:{}
, sometimes.
And the end of the stream is marked by data: [DONE]
.
Decoding Models
Having that in mind, let’s create our Codable struct
for parsing the data chunk. To make sure that the empty delta
object can be decoded to nil
, we have added our own init(from decoder: Decoder)
here.
// Response Model
struct CompletionChunkResponse: Codable {
var id: String
var object: String
var model: String
var choices: [Choice]
}
struct Choice: Codable {
var index: Int
var delta: Delta?
var finishReason: String?
// we want to decode empty delta to nil
init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
index = try container.decode(Int.self, forKey: .index)
finishReason = try? container.decode(String.self, forKey: .finishReason)
delta = try? container.decode(Delta.self, forKey: .delta)
}
}
struct Delta: Codable {
var content: String
}
Also, OpenAI send Error Response in JSON Format sometimes, so let’s also add the Codable for that.
// Error response
struct APIErrorResponse: Error, Codable {
public let error: APIError
}
struct APIError: Codable {
public let message: String
public let type: String
public let param: String?
public let code: String?
init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
if let string = try? container.decode(String.self, forKey: .message) {
self.message = string
} else if let array = try? container.decode([String].self, forKey: .message) {
self.message = array.joined(separator: "\n")
} else {
throw DecodingError.typeMismatch(String.self, .init(codingPath: [CodingKeys.message], debugDescription: "message: expected String or [String]"))
}
self.type = try container.decode(String.self, forKey: .type)
self.param = try container.decodeIfPresent(String.self, forKey: .param)
self.code = try container.decodeIfPresent(String.self, forKey: .code)
}
}
Enough set ups! Let’s parse our lines
!
Parse!
class RequestManager: ObservableObject {
private var previousChunkBuffer = ""
private let streamingCompletionMarker = "[DONE]"
//...stuff we had above
func processLine(_ line: String) throws {
if line.isEmpty {
return
}
let jsonStringArray = "\(previousChunkBuffer)\(line)"
.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: "data:")
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
.filter { $0.isEmpty == false }
previousChunkBuffer = ""
guard !jsonStringArray.isEmpty else {
return
}
guard jsonStringArray.first != streamingCompletionMarker else {
return
}
for index in 0..<jsonStringArray.count {
let jsonString = jsonStringArray[index]
guard !jsonString.isEmpty else {
continue
}
guard jsonString != streamingCompletionMarker else {
return
}
guard let jsonData = jsonString.data(using: .utf8) else {
print("bad conversion to data")
throw ServiceError.parsing
}
let decoder = JSONDecoder()
do {
let completionChunk = try decoder.decode(CompletionChunkResponse.self, from: jsonData)
let text = completionChunk.choices.first?.delta?.content ?? ""
print(text)
} catch(let error) {
print(error)
if let decodedError = try? decoder.decode(APIErrorResponse.self, from: jsonData) {
throw decodedError
} else if index == jsonStringArray.count - 1 {
previousChunkBuffer = "data: \(jsonString)" // Chunk ends in a partial JSON
} else {
throw ServiceError.parsing
}
}
}
}
}
That’s a lot of code all at once! Let’s take a more detailed look at what we have above.
We first have previousChunkBuffer
. I have not encountered this case by myself yet, but it seems like it is possible for one JSON chunk to be split into multiple data:…
.
Our jsonStringArray
is created by first combining the previousChunkBuffer
and the current line
into a single String
, removing the white spaces and newlines, and mapped into an Array
of String
separated by data:
.
let jsonStringArray = "\(previousChunkBuffer)\(line)"
.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: "data:")
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
.filter { $0.isEmpty == false }
We then check if it is empty or the first element of it is our streamingCompletionMarker
. If true
, we will return
from here.
We will then decode each String
in our jsonStringArray
if it is not empty nor being our streamingCompletionMarker
by first convert it to data
and then decode it to our CompletionChunkResponse
.
Note!
When decoding fails for converting jsonData
to CompletionChunkResponse
, there are multiple possible reasons and we need to handle each one of those individually!
APIError
: We will decode theerror
to obtain the message andthrow
theerror
!- We have a bad formatted JSON due to what I have mentioned above where one JSON chunk is split into multiple
data:…
byte chunk. We will set it to be ourpreviousChunkBuffer
, break out of currentjsonStringArray
and continue on the nextline
! (I don’t havebreak
norcontinue
because this is the last element in the array anyway.) - Other errors: We
throw
!
At this point, we are simply printing out the response extracted from the line
.
This is, technically speaking, all for streaming Chunked encoded data!
But Just in case you are interested, let’s create a simple UI and update it as we stream our response!
Update UI on Streaming
Let’s first add couple @Published
variable to our RequestManager
that we will be able to watch from our Views.
isStreaming
:true
when we are making our request and waiting for the streaming to complete. User should not be able to further request while we are streaming!contents
: streamed response updated on every line decoded. Reset to an emptyString
at the beginning of each request.
class RequestManager: ObservableObject {
@Published var isStreaming: Bool = false
@Published var contents: String = ""
//...
}
And couple UI related functions.
We should Make sure to update our UI in DispatchQueue.main.async
. We don’t want to do it from the background thread!
private func startStreaming() {
DispatchQueue.main.async {
self.isStreaming = true
self.contents = ""
}
}
private func streamingFinish() {
DispatchQueue.main.async {
self.isStreaming = false
}
}
private func updateContent(_ text: String) {
DispatchQueue.main.async {
self.contents = "\(self.contents)\(text)"
}
}
We will then add the functions above to our sendCompletionRequest
and processLine
function so that we can update our UI correspondingly!
RequestManager (Final version!)
import SwiftUI
class RequestManager: ObservableObject {
@Published var isStreaming: Bool = false
@Published var contents: String = ""
private let url = "https://api.openai.com/v1/chat/completions"
private let API_KEY = "API_KEY_PLACEHOLDER"
enum ServiceError: Error {
case urlCreation
case timeout
case parsing
case badRequest
}
private var previousChunkBuffer = ""
private let streamingCompletionMarker = "[DONE]"
func sendCompletionRequest() async throws {
startStreaming()
let request = try createRequest()
var asyncBytes: URLSession.AsyncBytes!
var response: URLResponse!
do {
(asyncBytes, response) = try await URLSession.shared.bytes(for: request)
} catch {
throw ServiceError.timeout
}
guard let response = response as? HTTPURLResponse else { throw ServiceError.badRequest } //unknwonError
let statusCode = response.statusCode
if !(200...300 ~= statusCode ) {
throw ServiceError.badRequest
}
for try await line in asyncBytes.lines {
print("raw: --------")
print(line)
try processLine(line)
}
}
private func startStreaming() {
DispatchQueue.main.async {
self.isStreaming = true
self.contents = ""
}
}
private func streamingFinish() {
DispatchQueue.main.async {
self.isStreaming = false
}
}
private func updateContent(_ text: String) {
DispatchQueue.main.async {
self.contents = "\(self.contents)\(text)"
}
}
private func processLine(_ line: String) throws {
if line.isEmpty {
return
}
let jsonStringArray = "\(previousChunkBuffer)\(line)"
.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: "data:")
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
.filter { $0.isEmpty == false }
previousChunkBuffer = ""
guard !jsonStringArray.isEmpty else {
return
}
guard jsonStringArray.first != streamingCompletionMarker else {
streamingFinish()
return
}
for index in 0..<jsonStringArray.count {
let jsonString = jsonStringArray[index]
guard !jsonString.isEmpty else {
continue
}
guard jsonString != streamingCompletionMarker else {
streamingFinish()
return
}
guard let jsonData = jsonString.data(using: .utf8) else {
print("bad conversion to data")
throw ServiceError.parsing
}
let decoder = JSONDecoder()
do {
let completionChunk = try decoder.decode(CompletionChunkResponse.self, from: jsonData)
let text = completionChunk.choices.first?.delta?.content ?? ""
updateContent(text)
} catch(let error) {
print(error)
if let decodedError = try? decoder.decode(APIErrorResponse.self, from: jsonData) {
throw decodedError
} else if index == jsonStringArray.count - 1 {
previousChunkBuffer = "data: \(jsonString)" // Chunk ends in a partial JSON
} else {
throw ServiceError.parsing
}
}
}
}
private func createRequest() throws -> URLRequest {
guard let url = URL(string: url) else { throw ServiceError.urlCreation }
var request = URLRequest(url: url)
request.httpMethod = "POST"
let headers: [String: String] = [
"Content-Type": "application/json",
"Authorization": "Bearer \(API_KEY)",
"Accept": "*/*"
]
request.allHTTPHeaderFields = headers
let requestStruct = CompletionRequest(
messages: [
Message(
role: .user,
content: "Say this is a test for 10 times!"
)
]
)
let jsonData = try JSONEncoder().encode(requestStruct)
request.httpBody = jsonData
return request
}
}
StreamingRequestDemo View
Let’s now add couple elements to our StreamingRequestDemo
View
watching for the changes of the Published
variables in our requestManager
.
import SwiftUI
struct StreamingRequestDemo: View {
@StateObject var requestManager = RequestManager()
var body: some View {
VStack(spacing: 50) {
Text("Streaming? \(requestManager.isStreaming)")
.foregroundStyle(Color.white)
.padding()
.background(RoundedRectangle(cornerRadius: 16)
)
Button(action: {
Task {
do {
try await requestManager.sendCompletionRequest()
} catch(let error) {
print("error: \(error)")
}
}
}, label: {
Text("Send [Say Test * 10] Request!")
})
.foregroundStyle(Color.white)
.padding()
.background(
RoundedRectangle(cornerRadius: 16).fill(Color.blue.opacity(requestManager.isStreaming ? 0.3 : 0.8))
)
.disabled(requestManager.isStreaming)
(Text("Your AI's Response: \n") + Text("\(requestManager.contents.isEmpty ? "Nothing Yet!" : requestManager.contents)"))
.font(.system(size: 16))
.lineSpacing(10)
.frame(maxWidth: .infinity, alignment: .leading)
.padding()
.background(
RoundedRectangle(cornerRadius: 16)
.fill(Color.clear)
.stroke(Color.blue, style: StrokeStyle())
)
.padding()
}
.frame(maxWidth: .infinity, maxHeight: .infinity, alignment: .top)
.padding(.vertical, 30)
.background(Color.gray.opacity(0.2))
}
}
And here we go! A super simple streaming Chat Completion App that is only able to say This is a test
!
Thank you for reading! That’s all I have for today!
I don’t like waiting without knowing what is actually happening on backend! So definitely appreciate any updating on the current progress than simply waiting for the full completion to be finished!
Happy streaming!
Stackademic 🎓
Thank you for reading until the end. Before you go:
- Please consider clapping and following the writer! 👏
- Follow us X | LinkedIn | YouTube | Discord
- Visit our other platforms: In Plain English | CoFeed | Venture | Cubed
- Tired of blogging platforms that force you to deal with algorithmic content? Try Differ
- More content at Stackademic.com