Skip to content

Instantly share code, notes, and snippets.

@jordangarcia
Created March 11, 2026 22:24
Show Gist options
  • Select an option

  • Save jordangarcia/e225c89786c7da8be9669903b9b0c648 to your computer and use it in GitHub Desktop.

Select an option

Save jordangarcia/e225c89786c7da8be9669903b9b0c648 to your computer and use it in GitHub Desktop.

Gemini Streaming Refactor Plan

Motivation

gemini.model.ts is 960 lines, mixes HTTP transport, SSE parsing, cumulative/fragmented dedup, S3 upload orchestration, backpressure handling, and business logic in one class. The raw fetch approach (bypassing the @google/genai SDK) is necessary for memory efficiency — the SDK buffers entire request bodies via JSON.stringify and parses full response objects per chunk — but the implementation can be decomposed into composable layers.

Streaming primitive: AsyncIterable<T> → AsyncGenerator<U>

All transforms are functions: (input: AsyncIterable<A>) → AsyncGenerator<B>.

  • Backpressure is free. AsyncGenerators are pull-based — if the consumer doesn't call .next(), the producer pauses at yield. This propagates through the entire chain back to response.body.
  • One paradigm. No mixing Node Transform classes, Web Streams, or custom consumer interfaces. Node Readable only appears at the S3 boundary via Readable.from().
  • Composable via variables. No pipe helper needed — just name each step.
const sse      = parseSseStream(response.body)
const events   = parseGeminiEvents(sse)
const uploaded = uploadImageChunks(events, uploadConfig)
const blocks   = collectText(uploaded)

for await (const block of blocks) { ... }

Layers and types

Layer 0 — SSE parsing

src/common/sse-stream.ts

parseSseStream(body: NodeJS.ReadableStream): AsyncGenerator<SseEvent>
type SseEvent = {
  data: string      // raw data field(s), joined by \n if multi-line
  event?: string    // event: field
  id?: string       // id: field
}

Generic, spec-compliant SSE parser (WHATWG EventSource spec). No JSON parsing, no Gemini knowledge.

Handles:

  • \r\n, \r, \n line endings
  • blank-line event boundaries
  • : comment lines (skip)
  • multi-line data: field concatenation
  • BOM stripping
  • [DONE] sentinel → stops iteration

Note: the @google/genai SDK's internal SSE parser (responseLineRE) uses a single regex that doesn't handle all edge cases. This replaces it.

src/common/sse-stream.spec.ts — test against all the above edge cases plus chunked delivery (data split across multiple body reads).


Layer 1 — Gemini event parsing

src/ai/custom-providers/google/gemini-stream-events.ts

parseGeminiEvents(sse: AsyncIterable<SseEvent>): AsyncGenerator<GeminiStreamEvent>
type GeminiStreamEvent =
  | { type: 'image-start'; mimeType: string }
  | { type: 'image-delta'; data: string }       // base64 chunk, already deduped
  | { type: 'text-delta'; text: string }
  | { type: 'done'; finishReason?: string; usage?: GeminiTokenUsage }

type GeminiTokenUsage = {
  promptTokenCount: number
  candidatesTokenCount: number
  totalTokenCount: number
}

Responsibilities:

  • JSON.parse each SseEvent.data into raw Gemini response shape
  • Throw on error field in response
  • Track usageMetadata and finishReason (typically only on final chunk)
  • Detect cumulative vs fragmented inline data delivery (Vertex sends fragments, ML Dev sends cumulative) and deduplicate — consumers only see new bytes in image-delta
  • Emit image-start on first inline data part (with mimeType)
  • Emit done when iteration ends, with final usage/finishReason

The cumul/frag detection uses a prefix sample comparison (existing extractUnseenBase64Chunk logic, moved here).

src/ai/custom-providers/google/gemini-stream-events.spec.ts — test cumulative mode, fragmented mode, auto-detection, text-only responses, mixed text+image, error responses.


Layer 2 — Image upload transform

src/ai/custom-providers/google/gemini-upload-transform.ts

uploadImageChunks(
  events: AsyncIterable<GeminiStreamEvent>,
  config: {
    s3Client: AWSS3ClientService
    bucket: string
    keyPrefix: string
  },
): AsyncGenerator<AfterUploadEvent>
type ImageBlock = {
  type: 'image'
  url: string
  s3Key: string
  s3Bucket: string
  width: number
  height: number
  format: string
  contentType: string
  contentLength: number
}

type AfterUploadEvent =
  | ImageBlock
  | { type: 'text-delta'; text: string }
  | { type: 'done'; finishReason?: string; usage?: GeminiTokenUsage }

image-start + image-delta events → consumed and resolved into a single ImageBlock yield. Everything else passes through unchanged.

Internally on image-start:

  1. Creates an inner drainImageChunks() async generator that pulls image-delta events from the shared iterator until done
  2. Pipes through base64Decode()dimensionProbe.transform()Readable.from() → S3 upload
  3. Awaits upload + dimension probe
  4. Yields ImageBlock
  5. Re-emits any non-image events that arrived during streaming (buffered aside)

Memory at any moment: ~1 base64 chunk + ~1 decoded buffer + dimension probe (≤256KB) + S3 SDK buffer. Full image never in JS heap.

src/ai/custom-providers/google/gemini-upload-transform.spec.ts


Layer 2 helpers — base64 decode + dimension probe

src/common/base64-decode.ts

base64Decode(chunks: AsyncIterable<string>): AsyncGenerator<Buffer>

Handles carry bytes across chunk boundaries for padding alignment. Generic, not Gemini-specific.

src/media/dimension-probe.ts

createDimensionProbe(opts?: { maxProbeBytes?: number }): {
  transform: (chunks: AsyncIterable<Buffer>) => AsyncGenerator<Buffer>
  dimensions: Promise<{ width: number; height: number }>
}

Passthrough transform. Buffers up to maxProbeBytes (default 256KB) to call imageSize(). Resolves dimensions promise when found, or falls back to full buffer at flush. Side-channel via the returned promise.


Layer 3 — Text collection

src/ai/custom-providers/google/gemini-text-transform.ts

collectText(events: AsyncIterable<AfterUploadEvent>): AsyncGenerator<GeminiOutputBlock>
type TextBlock = { type: 'text'; text: string }

type GeminiOutputBlock =
  | ImageBlock
  | TextBlock
  | { type: 'done'; finishReason?: string; usage?: GeminiTokenUsage }

Accumulates text-delta events into a single TextBlock, emitted when a non-text event arrives or iteration ends. Everything else passes through.


Transport — Gemini API adapter

src/ai/custom-providers/google/gemini-api-adapter.ts

Wraps GoogleGenAI from @google/genai to expose the private apiClient methods we need for raw streaming. The only file that uses as any.

interface GeminiApiAdapter {
  buildRequestUrl(modelVersion: string, action: string): URL
  getHeaders(url: string): Promise<Record<string, string>>
  isVertexAI(): boolean
}

class GoogleGenAIApiAdapter implements GeminiApiAdapter {
  constructor(private client: GoogleGenAI) {}
  // accesses (this.client as any).apiClient internally
}

Why we bypass the SDK's public request()/requestStream(): they JSON.stringify the entire request body (buffering multi-MB base64 style reference images) and parse full response objects per SSE chunk (holding entire image base64 in memory).

Methods quarantined:

  • apiClient.constructUrl(path, httpOptions, prependProjectLocation) → returns URL
  • apiClient.getHeadersInternal(httpOptions, url) → returns Headers (includes auth)
  • apiClient.shouldPrependVertexProjectPath({path, httpMethod}) → returns boolean
  • apiClient.isVertexAI() → returns boolean

src/ai/custom-providers/google/gemini-api-adapter.spec.ts


Transport — Streaming client

src/ai/custom-providers/google/gemini-streaming-client.ts

class GeminiStreamingClient {
  constructor(private adapter: GeminiApiAdapter) {}

  async fetch(params: {
    modelVersion: string
    contents: Part[]
    config: GeminiStreamRequestConfig
    signal?: AbortSignal
  }): Promise<{ body: NodeJS.ReadableStream }>
}

Responsibilities:

  • Build the :streamGenerateContent?alt=sse URL via adapter
  • Serialize request body as a streaming Readable (chunked JSON with 64KB base64 slices — avoids JSON.stringify buffering the full request)
  • fetch() with the provided AbortSignal
  • Return the response body stream

Does NOT parse SSE or know about events — that's the pipeline's job.

src/ai/custom-providers/google/gemini-streaming-client.spec.ts


Wiring — how GeminiModel gets a GeminiStreamingClient

aiDeployments.getClient(modelId) returns { client: GoogleGenAI, providerRegion }. The GoogleGenAI instance is constructed in deployments.ts with either Vertex AI or API key config. GeminiModel creates the adapter and streaming client from this:

// In gemini.model.ts — generate() / masklessEdit()

const deployment = await aiDeployments.getClient(aiModelId, {
  featureFlags: regionFlags,
})

// Wrap the SDK client — adapter quarantines the `as any` access
const adapter = new GoogleGenAIApiAdapter(deployment.client)
const streamingClient = new GeminiStreamingClient(adapter)

const response = await streamingClient.fetch({
  modelVersion,
  contents,
  config,
  signal: AbortSignal.timeout(timeoutMs),
})

// ... pipeline starts from response.body

The adapter + streaming client are cheap to construct (no state, no connections) so creating them per-request is fine. No need for NestJS DI or module-level singletons — the GoogleGenAI instance from aiDeployments already handles connection pooling and auth token caching internally.

If we later want to share a single GeminiStreamingClient instance (e.g. injected via NestJS), the adapter could be created from the deployment in a factory provider. But per-request construction is simpler and there's no performance reason to change it.


File layout

src/
  common/
    sse-stream.ts                         generic SSE parser
    sse-stream.spec.ts
    base64-decode.ts                      async generator transform
    base64-decode.spec.ts

  ai/custom-providers/google/
    gemini-api-adapter.ts                 typed interface + GoogleGenAI wrapper (`as any` quarantine)
    gemini-api-adapter.spec.ts
    gemini-streaming-client.ts            fetch + streamed request body serializer
    gemini-streaming-client.spec.ts
    gemini-stream-events.ts               SSE → typed events, cumul/frag dedup
    gemini-stream-events.spec.ts
    gemini-stream-types.ts                all event/block type definitions
    gemini-upload-transform.ts            image-* events → S3 uploaded ImageBlock
    gemini-upload-transform.spec.ts
    gemini-text-transform.ts              text-delta → TextBlock
    gemini-text-transform.spec.ts

  media/
    dimension-probe.ts                    transform + side-channel promise
    dimension-probe.spec.ts
    ai-image-generation/
      gemini.model.ts                     slimmed: build contents, wire pipeline, map to AIGeneratedImage

What gemini.model.ts becomes

generate()

async generate(user: User, input: ImageGenerateModelInput): Promise<AIGeneratedImage> {
  const aiModelId = input.model as GeminiImageModelId
  const modelVersion = GEMINI_IMAGE_MODELS[aiModelId]

  const deployment = await aiDeployments.getClient(aiModelId, {
    featureFlags: {
      'google-vertex-regions': await this.featureFlagService.getFlagValue(
        'google-vertex-regions',
        user,
      ),
    },
  })

  const span = getActiveSpan()
  span?.addTags({
    model: aiModelId,
    provider: 'google',
    providerRegion: deployment.providerRegion,
    ...input,
    styleReferenceImageData: input.styleReferenceImageData
      ? `[${input.styleReferenceImageData.length} images]`
      : undefined,
  })

  // Build contents (style reference images + prompt)
  const contents = await this.buildGenerateContents(input, span)

  // Build image config (aspect ratio + size)
  const imageSize = GEMINI_IMAGE_SIZES[aiModelId]
  const imageConfig: GeminiStreamRequestConfig['imageConfig'] = {
    aspectRatio: ASPECT_RATIO_MAPPING[input.aspectRatio],
  }
  if (imageSize) {
    imageConfig.imageSize = imageSize
  }

  // Log request summary (no base64)
  const inlineDataParts = contents.filter((c) => 'inlineData' in c).length
  const textPart = contents.find((c) => 'text' in c)?.text
  this.logger.debug('Gemini streamGenerateContent request', {
    model: modelVersion,
    inlineDataParts,
    textPart,
    imageConfig,
  })

  // Wire up adapter + streaming client from the deployment's GoogleGenAI instance
  const adapter = new GoogleGenAIApiAdapter(deployment.client)
  const streamingClient = new GeminiStreamingClient(adapter)

  const response = await streamingClient.fetch({
    modelVersion,
    contents,
    config: { responseModalities: ['Image'], imageConfig },
    signal: AbortSignal.timeout(getModelTimeoutMs(input.model)),
  })

  // Pipeline: SSE bytes → typed events → uploaded image + collected text
  const sse      = parseSseStream(response.body)
  const events   = parseGeminiEvents(sse)
  const uploaded = uploadImageChunks(events, {
    s3Client: this.s3Client,
    bucket: this.cdnBucket,
    keyPrefix: `${input.workspaceId}/generated-images`,
  })
  const blocks   = collectText(uploaded)

  let image: ImageBlock | undefined
  let done: DoneBlock | undefined
  for await (const block of blocks) {
    if (block.type === 'image') image = block
    if (block.type === 'done') done = block
  }

  if (!image) throw new Error('No image data received from Gemini')

  const cost = this.getCost(aiModelId)
  span?.addTags({
    cost,
    width: image.width,
    height: image.height,
  })

  return {
    uploadedMedia: {
      url: image.url,
      s3Key: image.s3Key,
      s3Bucket: image.s3Bucket,
      contentType: image.contentType,
      contentLength: image.contentLength,
    },
    width: image.width,
    height: image.height,
    provider: 'google',
    providerRegion: deployment.providerRegion,
    providerRequest: {
      model: modelVersion,
      config: { responseModalities: ['Image'], imageConfig },
      contents: { inlineDataParts, textPart },
      tokenUsage: done?.usage,
      finishReason: done?.finishReason,
    },
    format: image.format,
    cost,
  }
}

masklessEdit()

@AISpan('ai.image.edit.maskless')
async masklessEdit(
  user: User,
  input: Omit<MasklessEditInput, 'operation' | 'requiredCredits'>,
): Promise<AIGeneratedImage> {
  const aiModelId = input.editParams.model as GeminiImageModelId
  const imageModelId = GEMINI_IMAGE_MODELS[aiModelId]

  const deployment = await aiDeployments.getClient(aiModelId, {
    featureFlags: {
      'google-vertex-regions': await this.featureFlagService.getFlagValue(
        'google-vertex-regions',
        user,
      ),
    },
  })

  const span = tracer.getActiveSpan()
  span?.setAttributes({
    model: aiModelId,
    provider: 'google',
    providerRegion: deployment.providerRegion,
    ...input,
    url: input.url.startsWith('data:') ? '[data URI]' : input.url,
  })

  // Download and resize the source image for editing
  const resizeParameters =
    this.imageDownloadService.resizeParametersForImageEditing(input.url)
  const imageData =
    await this.imageDownloadService.downloadAsBase64WithDimensions(
      input.url,
      resizeParameters,
    )

  const contents: Part[] = [
    { text: input.editParams.editPrompt },
    {
      inlineData: {
        mimeType: formatToImageMimeType(resizeParameters.format),
        data: imageData.base64,
      },
    },
  ]

  // Wire up adapter + streaming client
  const adapter = new GoogleGenAIApiAdapter(deployment.client)
  const streamingClient = new GeminiStreamingClient(adapter)

  const response = await streamingClient.fetch({
    modelVersion: imageModelId,
    contents,
    config: { responseModalities: ['Text', 'Image'] },
    signal: AbortSignal.timeout(
      getModelTimeoutMs(
        input.editParams.model as Parameters<typeof getModelTimeoutMs>[0],
      ),
    ),
  })

  // Pipeline: same transforms, different destination
  const sse      = parseSseStream(response.body)
  const events   = parseGeminiEvents(sse)
  const uploaded = uploadImageChunks(events, {
    s3Client: this.s3Client,
    bucket: this.cdnBucket,
    keyPrefix: `${input.workspaceId}/edited-images`,
  })
  const blocks   = collectText(uploaded)

  let image: ImageBlock | undefined
  let done: DoneBlock | undefined
  for await (const block of blocks) {
    if (block.type === 'image') image = block
    if (block.type === 'done') done = block
  }

  if (!image) throw new Error('No image data received from Gemini')

  span?.setAttributes({
    cost: IMAGE_COST,
  })

  return {
    uploadedMedia: {
      url: image.url,
      s3Key: image.s3Key,
      s3Bucket: image.s3Bucket,
      contentType: image.contentType,
      contentLength: image.contentLength,
    },
    width: image.width,
    height: image.height,
    provider: 'google',
    providerRegion: deployment.providerRegion,
    providerRequest: {
      editPrompt: input.editParams.editPrompt,
      image: input.url.startsWith('data:') ? '[data URI]' : input.url,
      tokenUsage: done?.usage,
      finishReason: done?.finishReason,
    },
    format: image.format,
    cost: IMAGE_COST,
  }
}

What stays in gemini.model.ts

  • generate(), masklessEdit() — business logic, span tagging, cost calculation, mapping to AIGeneratedImage
  • buildGenerateContents() — style reference image download + prompt assembly
  • enhancePromptWithStyleReferenceImages() — AI JSX prompt rendering
  • getSupportedAspectRatios(), getInputDimensions(), getCost() — static config
  • Model/aspect ratio/size constant maps

What moves out

  • SSE parsing → src/common/sse-stream.ts
  • Gemini event parsing + cumul/frag dedup → gemini-stream-events.ts
  • Raw fetch + request body serialization → gemini-streaming-client.ts + gemini-api-adapter.ts
  • Image upload orchestration → gemini-upload-transform.ts
  • Text accumulation → gemini-text-transform.ts
  • Base64 decode transform → src/common/base64-decode.ts
  • Dimension probing → src/media/dimension-probe.ts
  • writeBase64Chunk backpressure handling → gone (AsyncGenerator backpressure replaces it)
  • toGeminiStreamError → absorbed into gemini-streaming-client.ts error handling

Out of scope (for now)

  • Migrating design-image-helpers.ts (callGeminiImageEdit) — uses SDK's generateContent (non-streaming). Can adopt later by using the same pipeline without uploadImageChunks.
  • Migrating google.provider.ts — uses @ai-sdk/google, different SDK entirely.
  • Style reference LRU cache (separate concern, already in Paul's PR).

Unresolved questions

  • collectText worth having as a separate transform, or just inline let text = '' in the for-await loop at the call site?
  • uploadImageChunks takes s3Client directly — should it take an abstract upload function for easier testing?
  • base64Decode in common/ since it's generic — or colocate with google since it's the only consumer right now?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment