Swift: Streaming OpenAI API Response (Chunked Encoding Transfer)

Itsuki
Stackademic
Published in
10 min readMay 11, 2024

Streaming Chunked encoded data is useful when large amounts of data is expected to receive from an URLRequest. For example, when we request a completion from the OpenAI, we might want to start updating our UI with the partial response before the entire completion is generated.

In this article, we will be taking a look at streaming and handling Chunked Encoded data. Specifically, we will be taking a look at

  • how we can receive our response as an actual stream (Not the data that is simply streamed-looking)
  • Reading and parsing streams (data chunk)
  • Update UI on each Data chunk received

We will be using OpenAI completion API with streaming as an example but the same idea applies for any endpoints that have Transfer-Encoding enabled.

By the end of this article, we will have something like following!

I bet you can ask ChatGPT to do something more interesting than just saying This is a test, but you get the idea!

Set Up Models for Sending Request

Before we move onto actually sending the request and streaming the response, let’s create couple structs for sending our request to OpenAI. The format of the request body for Completion API can be found in the official document.

Here is what we have for curl.

curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{

"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Say this is a test!"}],
"temperature": 0.7
}'

Translate the body json to Swift struct.

// Request models
struct CompletionRequest: Codable {
var model: String = "gpt-4"
var stream: Bool = true
var messages: [Message]
}

struct Message: Codable {
var role: Role
var content: String
}


enum Role: String, Codable {
case user
case assistant
case system
}

RequestManager

This will be class that is responsible for handling our URLRequests as well as updating our UI.

Here is what we have as a starting point. Remember to replace the API_KEY_PLACEHOLDER with that of yours!


import SwiftUI

class RequestManager: ObservableObject {

private let url = "https://api.openai.com/v1/chat/completions"
private let API_KEY = "API_KEY_PLACEHOLDER"

enum ServiceError: Error {
case urlCreation
case timeout
case parsing
case badRequest
}


private func createRequest() throws -> URLRequest {
guard let url = URL(string: url) else { throw ServiceError.urlCreation }

var request = URLRequest(url: url)
request.httpMethod = "POST"

let headers: [String: String] = [
"Content-Type": "application/json",
"Authorization": "Bearer \(API_KEY)",
"Accept": "*/*"
]
request.allHTTPHeaderFields = headers

let requestStruct = CompletionRequest(
messages: [
Message(
role: .user,
content: "Say this is a test for 10 times!"
)
]
)
let jsonData = try JSONEncoder().encode(requestStruct)
request.httpBody = jsonData

return request

}



}

What we have above is simply creating a URLRequest like we always do! Nothing special!

I will simply ask GPT to say This is a test for 10 times (as I have showed you in the beginning) but obviously, you can also take in a user input and send it as your content.

Send Request with URLSession

MOST important takeaway here!!!

Use URLSession.shared.bytes when sending request! Either (for request: URLRequest), or (from url: URL). This function returns a byte stream that conforms to AsyncSequence protocol and can be totally based on based on async/await so there is not need to implement any delegation methods.

Let’s add the following function to our RequestManager and give it a try!

 func sendCompletionRequest() async throws {

let request = try createRequest()

var asyncBytes: URLSession.AsyncBytes!
var response: URLResponse!

do {
(asyncBytes, response) = try await URLSession.shared.bytes(for: request)
} catch {
throw ServiceError.timeout
}


guard let response = response as? HTTPURLResponse else { throw ServiceError.badRequest } //unknwonError

let statusCode = response.statusCode

if !(200...300 ~= statusCode ) {
throw ServiceError.badRequest
}

for try await line in asyncBytes.lines {
print("raw: --------")
print(line)
}
}

I have created a simple Button just to trigger the function.


import SwiftUI

struct StreamingRequestDemo: View {
@StateObject var requestManager = RequestManager()
var body: some View {
Button(action: {
Task {
do {
try await requestManager.sendCompletionRequest()
} catch(let error) {
print("error: \(error)")
}
}
}, label: {
Text("Send [Say Test * 10] Request!")
})

}
}

And here what gets printed to our terminal.

Yeah! Streaming!

Important Note!

URLSession.shared.data does NOT stream!

Give the following a try

let (data, response) = try await URLSession.shared.data(for: request)
let dataString = String(data: data, encoding: .utf8)
print(dataString)

and you will see the dataString is indeed streamed-looking, ie:

BUT all of the data is actually printed out as a single chunk, ie: it is just streamed-looking but not actually streamed!

It might be hard to recognize for short responses, try change the prompt to something like “Say this is a test for 30 times!” instead!

Parsing Lines

We are ready to parse our server-sent events. Note that those events are non-trivial and should be done with caution. Simple strategies like splitting by a new line may result in parsing errors. I have done some researches over the existing libraries and here is what I have for you!

Data Structure Overview

Let first take a more detailed look at the response structure we get.

For each line (data chunk) in asyncBytes.lines , we have a general structure that looks like below.

data: 
{"id":"chatcmpl-9NV51DcBlWbvUaijuoSqdYXK0wVMJ",
"object":"chat.completion.chunk",
"created":1715388043,
"model":"gpt-4-0613",
"system_fingerprint":null,
"choices":[{"index":0,"delta":{"content":".)"},"logprobs":null,"finish_reason":"stop"}]
}

However, delta can also be an empty object, ie: “delta”:{}, sometimes.

And the end of the stream is marked by data: [DONE].

Decoding Models

Having that in mind, let’s create our Codable struct for parsing the data chunk. To make sure that the empty delta object can be decoded to nil, we have added our own init(from decoder: Decoder) here.

// Response Model
struct CompletionChunkResponse: Codable {
var id: String
var object: String
var model: String
var choices: [Choice]
}

struct Choice: Codable {
var index: Int
var delta: Delta?
var finishReason: String?

// we want to decode empty delta to nil
init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
index = try container.decode(Int.self, forKey: .index)
finishReason = try? container.decode(String.self, forKey: .finishReason)
delta = try? container.decode(Delta.self, forKey: .delta)
}
}

struct Delta: Codable {
var content: String
}

Also, OpenAI send Error Response in JSON Format sometimes, so let’s also add the Codable for that.

// Error response
struct APIErrorResponse: Error, Codable {
public let error: APIError
}

struct APIError: Codable {
public let message: String
public let type: String
public let param: String?
public let code: String?

init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
if let string = try? container.decode(String.self, forKey: .message) {
self.message = string
} else if let array = try? container.decode([String].self, forKey: .message) {
self.message = array.joined(separator: "\n")
} else {
throw DecodingError.typeMismatch(String.self, .init(codingPath: [CodingKeys.message], debugDescription: "message: expected String or [String]"))
}

self.type = try container.decode(String.self, forKey: .type)
self.param = try container.decodeIfPresent(String.self, forKey: .param)
self.code = try container.decodeIfPresent(String.self, forKey: .code)
}
}

Enough set ups! Let’s parse our lines!

Parse!


class RequestManager: ObservableObject {
private var previousChunkBuffer = ""
private let streamingCompletionMarker = "[DONE]"


//...stuff we had above

func processLine(_ line: String) throws {
if line.isEmpty {
return
}

let jsonStringArray = "\(previousChunkBuffer)\(line)"
.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: "data:")
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
.filter { $0.isEmpty == false }

previousChunkBuffer = ""

guard !jsonStringArray.isEmpty else {
return
}

guard jsonStringArray.first != streamingCompletionMarker else {
return
}

for index in 0..<jsonStringArray.count {
let jsonString = jsonStringArray[index]

guard !jsonString.isEmpty else {
continue
}

guard jsonString != streamingCompletionMarker else {
return
}

guard let jsonData = jsonString.data(using: .utf8) else {
print("bad conversion to data")
throw ServiceError.parsing
}
let decoder = JSONDecoder()
do {
let completionChunk = try decoder.decode(CompletionChunkResponse.self, from: jsonData)
let text = completionChunk.choices.first?.delta?.content ?? ""
print(text)
} catch(let error) {
print(error)
if let decodedError = try? decoder.decode(APIErrorResponse.self, from: jsonData) {
throw decodedError
} else if index == jsonStringArray.count - 1 {
previousChunkBuffer = "data: \(jsonString)" // Chunk ends in a partial JSON
} else {
throw ServiceError.parsing
}
}

}

}
}

That’s a lot of code all at once! Let’s take a more detailed look at what we have above.

We first have previousChunkBuffer. I have not encountered this case by myself yet, but it seems like it is possible for one JSON chunk to be split into multiple data:….

Our jsonStringArray is created by first combining the previousChunkBuffer and the current line into a single String, removing the white spaces and newlines, and mapped into an Array of String separated by data:.

let jsonStringArray = "\(previousChunkBuffer)\(line)"
.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: "data:")
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
.filter { $0.isEmpty == false }

We then check if it is empty or the first element of it is our streamingCompletionMarker. If true, we will return from here.

We will then decode each String in our jsonStringArray if it is not empty nor being our streamingCompletionMarker by first convert it to data and then decode it to our CompletionChunkResponse.

Note!

When decoding fails for converting jsonData to CompletionChunkResponse, there are multiple possible reasons and we need to handle each one of those individually!

  • APIError: We will decode the error to obtain the message and throw the error!
  • We have a bad formatted JSON due to what I have mentioned above where one JSON chunk is split into multiple data:… byte chunk. We will set it to be our previousChunkBuffer , break out of current jsonStringArray and continue on the next line! (I don’t have break nor continue because this is the last element in the array anyway.)
  • Other errors: We throw!

At this point, we are simply printing out the response extracted from the line.

This is, technically speaking, all for streaming Chunked encoded data!

But Just in case you are interested, let’s create a simple UI and update it as we stream our response!

Update UI on Streaming

Let’s first add couple @Published variable to our RequestManager that we will be able to watch from our Views.

  • isStreaming: true when we are making our request and waiting for the streaming to complete. User should not be able to further request while we are streaming!
  • contents: streamed response updated on every line decoded. Reset to an empty String at the beginning of each request.
class RequestManager: ObservableObject {
@Published var isStreaming: Bool = false
@Published var contents: String = ""

//...
}

And couple UI related functions.

We should Make sure to update our UI in DispatchQueue.main.async. We don’t want to do it from the background thread!

private func startStreaming() {
DispatchQueue.main.async {
self.isStreaming = true
self.contents = ""
}
}

private func streamingFinish() {
DispatchQueue.main.async {
self.isStreaming = false
}
}

private func updateContent(_ text: String) {
DispatchQueue.main.async {
self.contents = "\(self.contents)\(text)"
}
}

We will then add the functions above to our sendCompletionRequest and processLine function so that we can update our UI correspondingly!

RequestManager (Final version!)


import SwiftUI

class RequestManager: ObservableObject {
@Published var isStreaming: Bool = false
@Published var contents: String = ""


private let url = "https://api.openai.com/v1/chat/completions"
private let API_KEY = "API_KEY_PLACEHOLDER"

enum ServiceError: Error {
case urlCreation
case timeout
case parsing
case badRequest
}

private var previousChunkBuffer = ""
private let streamingCompletionMarker = "[DONE]"


func sendCompletionRequest() async throws {

startStreaming()

let request = try createRequest()

var asyncBytes: URLSession.AsyncBytes!
var response: URLResponse!

do {
(asyncBytes, response) = try await URLSession.shared.bytes(for: request)
} catch {
throw ServiceError.timeout
}


guard let response = response as? HTTPURLResponse else { throw ServiceError.badRequest } //unknwonError

let statusCode = response.statusCode

if !(200...300 ~= statusCode ) {
throw ServiceError.badRequest
}

for try await line in asyncBytes.lines {
print("raw: --------")
print(line)
try processLine(line)
}
}

private func startStreaming() {
DispatchQueue.main.async {
self.isStreaming = true
self.contents = ""
}
}

private func streamingFinish() {
DispatchQueue.main.async {
self.isStreaming = false
}
}

private func updateContent(_ text: String) {
DispatchQueue.main.async {
self.contents = "\(self.contents)\(text)"
}
}

private func processLine(_ line: String) throws {
if line.isEmpty {
return
}

let jsonStringArray = "\(previousChunkBuffer)\(line)"
.trimmingCharacters(in: .whitespacesAndNewlines)
.components(separatedBy: "data:")
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
.filter { $0.isEmpty == false }

previousChunkBuffer = ""

guard !jsonStringArray.isEmpty else {
return
}

guard jsonStringArray.first != streamingCompletionMarker else {
streamingFinish()
return
}

for index in 0..<jsonStringArray.count {
let jsonString = jsonStringArray[index]

guard !jsonString.isEmpty else {
continue
}

guard jsonString != streamingCompletionMarker else {
streamingFinish()
return
}

guard let jsonData = jsonString.data(using: .utf8) else {
print("bad conversion to data")
throw ServiceError.parsing
}
let decoder = JSONDecoder()
do {
let completionChunk = try decoder.decode(CompletionChunkResponse.self, from: jsonData)
let text = completionChunk.choices.first?.delta?.content ?? ""
updateContent(text)
} catch(let error) {
print(error)
if let decodedError = try? decoder.decode(APIErrorResponse.self, from: jsonData) {
throw decodedError
} else if index == jsonStringArray.count - 1 {
previousChunkBuffer = "data: \(jsonString)" // Chunk ends in a partial JSON
} else {
throw ServiceError.parsing
}
}

}

}

private func createRequest() throws -> URLRequest {
guard let url = URL(string: url) else { throw ServiceError.urlCreation }

var request = URLRequest(url: url)
request.httpMethod = "POST"

let headers: [String: String] = [
"Content-Type": "application/json",
"Authorization": "Bearer \(API_KEY)",
"Accept": "*/*"
]
request.allHTTPHeaderFields = headers

let requestStruct = CompletionRequest(
messages: [
Message(
role: .user,
content: "Say this is a test for 10 times!"
)
]
)
let jsonData = try JSONEncoder().encode(requestStruct)
request.httpBody = jsonData

return request

}
}

StreamingRequestDemo View

Let’s now add couple elements to our StreamingRequestDemo View watching for the changes of the Published variables in our requestManager.


import SwiftUI

struct StreamingRequestDemo: View {
@StateObject var requestManager = RequestManager()
var body: some View {
VStack(spacing: 50) {

Text("Streaming? \(requestManager.isStreaming)")
.foregroundStyle(Color.white)
.padding()
.background(RoundedRectangle(cornerRadius: 16)
)

Button(action: {
Task {
do {
try await requestManager.sendCompletionRequest()
} catch(let error) {
print("error: \(error)")
}

}
}, label: {
Text("Send [Say Test * 10] Request!")
})
.foregroundStyle(Color.white)
.padding()
.background(
RoundedRectangle(cornerRadius: 16).fill(Color.blue.opacity(requestManager.isStreaming ? 0.3 : 0.8))
)
.disabled(requestManager.isStreaming)


(Text("Your AI's Response: \n") + Text("\(requestManager.contents.isEmpty ? "Nothing Yet!" : requestManager.contents)"))
.font(.system(size: 16))
.lineSpacing(10)
.frame(maxWidth: .infinity, alignment: .leading)
.padding()
.background(
RoundedRectangle(cornerRadius: 16)
.fill(Color.clear)
.stroke(Color.blue, style: StrokeStyle())
)
.padding()

}
.frame(maxWidth: .infinity, maxHeight: .infinity, alignment: .top)
.padding(.vertical, 30)
.background(Color.gray.opacity(0.2))

}
}

And here we go! A super simple streaming Chat Completion App that is only able to say This is a test!

Thank you for reading! That’s all I have for today!

I don’t like waiting without knowing what is actually happening on backend! So definitely appreciate any updating on the current progress than simply waiting for the full completion to be finished!

Happy streaming!

Stackademic 🎓

Thank you for reading until the end. Before you go:

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Published in Stackademic

Stackademic is a learning hub for programmers, devs, coders, and engineers. Our goal is to democratize free coding education for the world.

Responses (1)

What are your thoughts?

Thank you so much! Now trying and enjoying

--