gemini.model.ts is 960 lines, mixes HTTP transport, SSE parsing, cumulative/fragmented dedup, S3 upload orchestration, backpressure handling, and business logic in one class. The raw fetch approach (bypassing the @google/genai SDK) is necessary for memory efficiency — the SDK buffers entire request bodies via JSON.stringify and parses full response objects per chunk — but the implementation can be decomposed into composable layers.
All transforms are functions: (input: AsyncIterable<A>) → AsyncGenerator<B>.
- Backpressure is free. AsyncGenerators are pull-based — if the consumer doesn't call
.next(), the producer pauses atyield. This propagates through the entire chain back toresponse.body. - One paradigm. No mixing Node Transform classes, Web Streams, or custom consumer interfaces. Node
Readableonly appears at the S3 boundary viaReadable.from(). - Composable via variables. No pipe helper needed — just name each step.
const sse = parseSseStream(response.body)
const events = parseGeminiEvents(sse)
const uploaded = uploadImageChunks(events, uploadConfig)
const blocks = collectText(uploaded)
for await (const block of blocks) { ... }src/common/sse-stream.ts
parseSseStream(body: NodeJS.ReadableStream): AsyncGenerator<SseEvent>
type SseEvent = {
data: string // raw data field(s), joined by \n if multi-line
event?: string // event: field
id?: string // id: field
}Generic, spec-compliant SSE parser (WHATWG EventSource spec). No JSON parsing, no Gemini knowledge.
Handles:
\r\n,\r,\nline endings- blank-line event boundaries
:comment lines (skip)- multi-line
data:field concatenation - BOM stripping
[DONE]sentinel → stops iteration
Note: the @google/genai SDK's internal SSE parser (responseLineRE) uses a single regex that doesn't handle all edge cases. This replaces it.
src/common/sse-stream.spec.ts — test against all the above edge cases plus chunked delivery (data split across multiple body reads).
src/ai/custom-providers/google/gemini-stream-events.ts
parseGeminiEvents(sse: AsyncIterable<SseEvent>): AsyncGenerator<GeminiStreamEvent>
type GeminiStreamEvent =
| { type: 'image-start'; mimeType: string }
| { type: 'image-delta'; data: string } // base64 chunk, already deduped
| { type: 'text-delta'; text: string }
| { type: 'done'; finishReason?: string; usage?: GeminiTokenUsage }
type GeminiTokenUsage = {
promptTokenCount: number
candidatesTokenCount: number
totalTokenCount: number
}Responsibilities:
JSON.parseeachSseEvent.datainto raw Gemini response shape- Throw on
errorfield in response - Track
usageMetadataandfinishReason(typically only on final chunk) - Detect cumulative vs fragmented inline data delivery (Vertex sends fragments, ML Dev sends cumulative) and deduplicate — consumers only see new bytes in
image-delta - Emit
image-starton first inline data part (with mimeType) - Emit
donewhen iteration ends, with final usage/finishReason
The cumul/frag detection uses a prefix sample comparison (existing extractUnseenBase64Chunk logic, moved here).
src/ai/custom-providers/google/gemini-stream-events.spec.ts — test cumulative mode, fragmented mode, auto-detection, text-only responses, mixed text+image, error responses.
src/ai/custom-providers/google/gemini-upload-transform.ts
uploadImageChunks(
events: AsyncIterable<GeminiStreamEvent>,
config: {
s3Client: AWSS3ClientService
bucket: string
keyPrefix: string
},
): AsyncGenerator<AfterUploadEvent>
type ImageBlock = {
type: 'image'
url: string
s3Key: string
s3Bucket: string
width: number
height: number
format: string
contentType: string
contentLength: number
}
type AfterUploadEvent =
| ImageBlock
| { type: 'text-delta'; text: string }
| { type: 'done'; finishReason?: string; usage?: GeminiTokenUsage }image-start + image-delta events → consumed and resolved into a single ImageBlock yield. Everything else passes through unchanged.
Internally on image-start:
- Creates an inner
drainImageChunks()async generator that pullsimage-deltaevents from the shared iterator untildone - Pipes through
base64Decode()→dimensionProbe.transform()→Readable.from()→ S3 upload - Awaits upload + dimension probe
- Yields
ImageBlock - Re-emits any non-image events that arrived during streaming (buffered aside)
Memory at any moment: ~1 base64 chunk + ~1 decoded buffer + dimension probe (≤256KB) + S3 SDK buffer. Full image never in JS heap.
src/ai/custom-providers/google/gemini-upload-transform.spec.ts
src/common/base64-decode.ts
base64Decode(chunks: AsyncIterable<string>): AsyncGenerator<Buffer>
Handles carry bytes across chunk boundaries for padding alignment. Generic, not Gemini-specific.
src/media/dimension-probe.ts
createDimensionProbe(opts?: { maxProbeBytes?: number }): {
transform: (chunks: AsyncIterable<Buffer>) => AsyncGenerator<Buffer>
dimensions: Promise<{ width: number; height: number }>
}
Passthrough transform. Buffers up to maxProbeBytes (default 256KB) to call imageSize(). Resolves dimensions promise when found, or falls back to full buffer at flush. Side-channel via the returned promise.
src/ai/custom-providers/google/gemini-text-transform.ts
collectText(events: AsyncIterable<AfterUploadEvent>): AsyncGenerator<GeminiOutputBlock>
type TextBlock = { type: 'text'; text: string }
type GeminiOutputBlock =
| ImageBlock
| TextBlock
| { type: 'done'; finishReason?: string; usage?: GeminiTokenUsage }Accumulates text-delta events into a single TextBlock, emitted when a non-text event arrives or iteration ends. Everything else passes through.
src/ai/custom-providers/google/gemini-api-adapter.ts
Wraps GoogleGenAI from @google/genai to expose the private apiClient methods we need for raw streaming. The only file that uses as any.
interface GeminiApiAdapter {
buildRequestUrl(modelVersion: string, action: string): URL
getHeaders(url: string): Promise<Record<string, string>>
isVertexAI(): boolean
}
class GoogleGenAIApiAdapter implements GeminiApiAdapter {
constructor(private client: GoogleGenAI) {}
// accesses (this.client as any).apiClient internally
}Why we bypass the SDK's public request()/requestStream(): they JSON.stringify the entire request body (buffering multi-MB base64 style reference images) and parse full response objects per SSE chunk (holding entire image base64 in memory).
Methods quarantined:
apiClient.constructUrl(path, httpOptions, prependProjectLocation)→ returnsURLapiClient.getHeadersInternal(httpOptions, url)→ returnsHeaders(includes auth)apiClient.shouldPrependVertexProjectPath({path, httpMethod})→ returnsbooleanapiClient.isVertexAI()→ returnsboolean
src/ai/custom-providers/google/gemini-api-adapter.spec.ts
src/ai/custom-providers/google/gemini-streaming-client.ts
class GeminiStreamingClient {
constructor(private adapter: GeminiApiAdapter) {}
async fetch(params: {
modelVersion: string
contents: Part[]
config: GeminiStreamRequestConfig
signal?: AbortSignal
}): Promise<{ body: NodeJS.ReadableStream }>
}Responsibilities:
- Build the
:streamGenerateContent?alt=sseURL via adapter - Serialize request body as a streaming
Readable(chunked JSON with 64KB base64 slices — avoidsJSON.stringifybuffering the full request) fetch()with the providedAbortSignal- Return the response body stream
Does NOT parse SSE or know about events — that's the pipeline's job.
src/ai/custom-providers/google/gemini-streaming-client.spec.ts
aiDeployments.getClient(modelId) returns { client: GoogleGenAI, providerRegion }. The GoogleGenAI instance is constructed in deployments.ts with either Vertex AI or API key config. GeminiModel creates the adapter and streaming client from this:
// In gemini.model.ts — generate() / masklessEdit()
const deployment = await aiDeployments.getClient(aiModelId, {
featureFlags: regionFlags,
})
// Wrap the SDK client — adapter quarantines the `as any` access
const adapter = new GoogleGenAIApiAdapter(deployment.client)
const streamingClient = new GeminiStreamingClient(adapter)
const response = await streamingClient.fetch({
modelVersion,
contents,
config,
signal: AbortSignal.timeout(timeoutMs),
})
// ... pipeline starts from response.bodyThe adapter + streaming client are cheap to construct (no state, no connections) so creating them per-request is fine. No need for NestJS DI or module-level singletons — the GoogleGenAI instance from aiDeployments already handles connection pooling and auth token caching internally.
If we later want to share a single GeminiStreamingClient instance (e.g. injected via NestJS), the adapter could be created from the deployment in a factory provider. But per-request construction is simpler and there's no performance reason to change it.
src/
common/
sse-stream.ts generic SSE parser
sse-stream.spec.ts
base64-decode.ts async generator transform
base64-decode.spec.ts
ai/custom-providers/google/
gemini-api-adapter.ts typed interface + GoogleGenAI wrapper (`as any` quarantine)
gemini-api-adapter.spec.ts
gemini-streaming-client.ts fetch + streamed request body serializer
gemini-streaming-client.spec.ts
gemini-stream-events.ts SSE → typed events, cumul/frag dedup
gemini-stream-events.spec.ts
gemini-stream-types.ts all event/block type definitions
gemini-upload-transform.ts image-* events → S3 uploaded ImageBlock
gemini-upload-transform.spec.ts
gemini-text-transform.ts text-delta → TextBlock
gemini-text-transform.spec.ts
media/
dimension-probe.ts transform + side-channel promise
dimension-probe.spec.ts
ai-image-generation/
gemini.model.ts slimmed: build contents, wire pipeline, map to AIGeneratedImage
async generate(user: User, input: ImageGenerateModelInput): Promise<AIGeneratedImage> {
const aiModelId = input.model as GeminiImageModelId
const modelVersion = GEMINI_IMAGE_MODELS[aiModelId]
const deployment = await aiDeployments.getClient(aiModelId, {
featureFlags: {
'google-vertex-regions': await this.featureFlagService.getFlagValue(
'google-vertex-regions',
user,
),
},
})
const span = getActiveSpan()
span?.addTags({
model: aiModelId,
provider: 'google',
providerRegion: deployment.providerRegion,
...input,
styleReferenceImageData: input.styleReferenceImageData
? `[${input.styleReferenceImageData.length} images]`
: undefined,
})
// Build contents (style reference images + prompt)
const contents = await this.buildGenerateContents(input, span)
// Build image config (aspect ratio + size)
const imageSize = GEMINI_IMAGE_SIZES[aiModelId]
const imageConfig: GeminiStreamRequestConfig['imageConfig'] = {
aspectRatio: ASPECT_RATIO_MAPPING[input.aspectRatio],
}
if (imageSize) {
imageConfig.imageSize = imageSize
}
// Log request summary (no base64)
const inlineDataParts = contents.filter((c) => 'inlineData' in c).length
const textPart = contents.find((c) => 'text' in c)?.text
this.logger.debug('Gemini streamGenerateContent request', {
model: modelVersion,
inlineDataParts,
textPart,
imageConfig,
})
// Wire up adapter + streaming client from the deployment's GoogleGenAI instance
const adapter = new GoogleGenAIApiAdapter(deployment.client)
const streamingClient = new GeminiStreamingClient(adapter)
const response = await streamingClient.fetch({
modelVersion,
contents,
config: { responseModalities: ['Image'], imageConfig },
signal: AbortSignal.timeout(getModelTimeoutMs(input.model)),
})
// Pipeline: SSE bytes → typed events → uploaded image + collected text
const sse = parseSseStream(response.body)
const events = parseGeminiEvents(sse)
const uploaded = uploadImageChunks(events, {
s3Client: this.s3Client,
bucket: this.cdnBucket,
keyPrefix: `${input.workspaceId}/generated-images`,
})
const blocks = collectText(uploaded)
let image: ImageBlock | undefined
let done: DoneBlock | undefined
for await (const block of blocks) {
if (block.type === 'image') image = block
if (block.type === 'done') done = block
}
if (!image) throw new Error('No image data received from Gemini')
const cost = this.getCost(aiModelId)
span?.addTags({
cost,
width: image.width,
height: image.height,
})
return {
uploadedMedia: {
url: image.url,
s3Key: image.s3Key,
s3Bucket: image.s3Bucket,
contentType: image.contentType,
contentLength: image.contentLength,
},
width: image.width,
height: image.height,
provider: 'google',
providerRegion: deployment.providerRegion,
providerRequest: {
model: modelVersion,
config: { responseModalities: ['Image'], imageConfig },
contents: { inlineDataParts, textPart },
tokenUsage: done?.usage,
finishReason: done?.finishReason,
},
format: image.format,
cost,
}
}@AISpan('ai.image.edit.maskless')
async masklessEdit(
user: User,
input: Omit<MasklessEditInput, 'operation' | 'requiredCredits'>,
): Promise<AIGeneratedImage> {
const aiModelId = input.editParams.model as GeminiImageModelId
const imageModelId = GEMINI_IMAGE_MODELS[aiModelId]
const deployment = await aiDeployments.getClient(aiModelId, {
featureFlags: {
'google-vertex-regions': await this.featureFlagService.getFlagValue(
'google-vertex-regions',
user,
),
},
})
const span = tracer.getActiveSpan()
span?.setAttributes({
model: aiModelId,
provider: 'google',
providerRegion: deployment.providerRegion,
...input,
url: input.url.startsWith('data:') ? '[data URI]' : input.url,
})
// Download and resize the source image for editing
const resizeParameters =
this.imageDownloadService.resizeParametersForImageEditing(input.url)
const imageData =
await this.imageDownloadService.downloadAsBase64WithDimensions(
input.url,
resizeParameters,
)
const contents: Part[] = [
{ text: input.editParams.editPrompt },
{
inlineData: {
mimeType: formatToImageMimeType(resizeParameters.format),
data: imageData.base64,
},
},
]
// Wire up adapter + streaming client
const adapter = new GoogleGenAIApiAdapter(deployment.client)
const streamingClient = new GeminiStreamingClient(adapter)
const response = await streamingClient.fetch({
modelVersion: imageModelId,
contents,
config: { responseModalities: ['Text', 'Image'] },
signal: AbortSignal.timeout(
getModelTimeoutMs(
input.editParams.model as Parameters<typeof getModelTimeoutMs>[0],
),
),
})
// Pipeline: same transforms, different destination
const sse = parseSseStream(response.body)
const events = parseGeminiEvents(sse)
const uploaded = uploadImageChunks(events, {
s3Client: this.s3Client,
bucket: this.cdnBucket,
keyPrefix: `${input.workspaceId}/edited-images`,
})
const blocks = collectText(uploaded)
let image: ImageBlock | undefined
let done: DoneBlock | undefined
for await (const block of blocks) {
if (block.type === 'image') image = block
if (block.type === 'done') done = block
}
if (!image) throw new Error('No image data received from Gemini')
span?.setAttributes({
cost: IMAGE_COST,
})
return {
uploadedMedia: {
url: image.url,
s3Key: image.s3Key,
s3Bucket: image.s3Bucket,
contentType: image.contentType,
contentLength: image.contentLength,
},
width: image.width,
height: image.height,
provider: 'google',
providerRegion: deployment.providerRegion,
providerRequest: {
editPrompt: input.editParams.editPrompt,
image: input.url.startsWith('data:') ? '[data URI]' : input.url,
tokenUsage: done?.usage,
finishReason: done?.finishReason,
},
format: image.format,
cost: IMAGE_COST,
}
}generate(),masklessEdit()— business logic, span tagging, cost calculation, mapping toAIGeneratedImagebuildGenerateContents()— style reference image download + prompt assemblyenhancePromptWithStyleReferenceImages()— AI JSX prompt renderinggetSupportedAspectRatios(),getInputDimensions(),getCost()— static config- Model/aspect ratio/size constant maps
- SSE parsing →
src/common/sse-stream.ts - Gemini event parsing + cumul/frag dedup →
gemini-stream-events.ts - Raw fetch + request body serialization →
gemini-streaming-client.ts+gemini-api-adapter.ts - Image upload orchestration →
gemini-upload-transform.ts - Text accumulation →
gemini-text-transform.ts - Base64 decode transform →
src/common/base64-decode.ts - Dimension probing →
src/media/dimension-probe.ts writeBase64Chunkbackpressure handling → gone (AsyncGenerator backpressure replaces it)toGeminiStreamError→ absorbed intogemini-streaming-client.tserror handling
- Migrating
design-image-helpers.ts(callGeminiImageEdit) — uses SDK'sgenerateContent(non-streaming). Can adopt later by using the same pipeline withoutuploadImageChunks. - Migrating
google.provider.ts— uses@ai-sdk/google, different SDK entirely. - Style reference LRU cache (separate concern, already in Paul's PR).
collectTextworth having as a separate transform, or just inlinelet text = ''in the for-await loop at the call site?uploadImageChunkstakess3Clientdirectly — should it take an abstract upload function for easier testing?base64Decodeincommon/since it's generic — or colocate with google since it's the only consumer right now?