Skip to content

Instantly share code, notes, and snippets.

@ptoffy
Created September 4, 2025 09:58
Show Gist options
  • Select an option

  • Save ptoffy/53a38db280f99369433e0ec41a0b6e73 to your computer and use it in GitHub Desktop.

Select an option

Save ptoffy/53a38db280f99369433e0ec41a0b6e73 to your computer and use it in GitHub Desktop.
AsyncSequence CSV parser
import Fluent
import Vapor
protocol CSVDecodable {
associatedtype DTO: CSVDecodableDTO
init(from: DTO) throws
}
protocol CSVDecodableDTO: Sendable {
associatedtype Repository: CSVRepository<Self>
init(line: Line) throws
init(elements: [String]) throws
}
extension CSVDecodableDTO {
init(line: Line) throws {
try self.init(elements: line.split())
}
}
enum CSVDecodableError: Error {
case invalidValue(String)
case invalidNumberOfElements(Int, expected: Int)
case emptyRow
}
extension CSVParser {
enum Error: Swift.Error {
case unexpectedEndOfFile
}
}
import Vapor
struct CSVParser {
enum State {
case parsing(ByteBuffer)
case finished
}
let lineEnding: UInt8
private(set) var state: State = .parsing(.init())
init(lineEnding: UInt8 = 10) {
self.lineEnding = lineEnding
}
enum ReadResult {
case line(Line)
case needMoreData
case eof
}
mutating func append(buffer: ByteBuffer) {
switch state {
case .parsing(var existingBuffer):
existingBuffer.writeImmutableBuffer(buffer)
state = .parsing(existingBuffer)
case .finished:
break
}
}
mutating func next() -> ReadResult {
switch state {
case .parsing(let buffer) where buffer.readableBytes == 0:
.needMoreData
case .parsing(let byteBuffer):
nextLine(from: byteBuffer)
case .finished:
.eof
}
}
private mutating func nextLine(from buffer: ByteBuffer) -> ReadResult {
switch buffer.readUntilEOL(eolIndicator: lineEnding) {
case .success(let eolIndex):
let data = buffer.readableBytesView[...eolIndex]
self.state = .parsing(.init(buffer.readableBytesView[(eolIndex + 1)...]))
return .line(.init(data: String(decoding: data, as: UTF8.self)))
case .needMoreData:
return .needMoreData
}
}
}
extension ByteBuffer {
enum EndOfLineIndexResult {
case success(eolIndex: Int)
case needMoreData
}
func readUntilEOL(eolIndicator: UInt8) -> EndOfLineIndexResult {
var isInsideQuotes = false
var i = self.readerIndex
while i < self.readableBytes {
if readableBytesView[i] == 34 {
isInsideQuotes.toggle()
}
if readableBytesView[i] == eolIndicator && !isInsideQuotes {
return .success(eolIndex: i)
}
i += 1
}
return .needMoreData
}
}
import Vapor
struct CSVParsingAsyncSequence<BackingSequence, TableElement>: AsyncSequence, Sendable
where BackingSequence: AsyncSequence & Sendable, BackingSequence.Element == ByteBuffer, TableElement: CSVDecodableDTO {
let backingSequence: BackingSequence
let parser: CSVParser = .init()
struct AsyncIterator: AsyncIteratorProtocol {
typealias Element = TableElement
var parser: CSVParser
var iterator: BackingSequence.AsyncIterator
var headerEmitted = false
var isExhausted: Bool = false
mutating func next() async throws -> Element? {
while true {
if isExhausted { return nil }
switch parser.next() {
case .line(let line):
if headerEmitted == false {
headerEmitted = true
continue
}
do {
return try .init(line: line)
} catch CSVDecodableError.emptyRow {
continue
}
case .eof: return nil
case .needMoreData:
if let next = try await iterator.next() {
parser.append(buffer: next)
} else {
switch parser.state {
case .finished: return nil
case .parsing(let buffer):
if buffer.readableBytes == 0 { return nil }
// Likely end of file without a newline
let line = Line(data: String(decoding: buffer.readableBytesView, as: UTF8.self))
do {
self.isExhausted = true
return try .init(line: line)
} catch CSVDecodableError.emptyRow {
continue
}
}
}
}
}
}
}
func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(parser: parser, iterator: backingSequence.makeAsyncIterator())
}
}
struct Line {
let data: String
func split(separator: Character = ",") -> [String] {
var elements: [String] = []
var currentElement = ""
var isInsideQuotes = false
for character in data {
switch character {
case "\"":
isInsideQuotes.toggle()
case separator where !isInsideQuotes:
elements.append(currentElement.trimmingCharacters(in: .whitespacesAndNewlines))
currentElement = ""
default:
currentElement.append(character)
}
}
elements.append(currentElement.trimmingCharacters(in: .whitespacesAndNewlines))
return elements
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment