Skip to content

Conversation

marcog83
Copy link
Contributor

@marcog83 marcog83 commented Aug 5, 2025

Replace maxChunks parameter with flexible reducer function that delegates data aggregation to consumer code. This provides full control over how streamed chunks are combined into the final data structure.

Add support for custom placeholderData that works seamlessly with the reducer function, allowing initialization of complex data types beyond simple arrays.

#9065

BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery. Use a custom reducer function to control data aggregation behavior instead.

Summary by CodeRabbit

  • New Features

    • Streamed queries now use a reducer-based API to accumulate streamed chunks into final results.
    • Optional initialValue lets you seed accumulated data before the first chunk; required when providing a custom reducer.
    • Default reducer preserves prior append behavior for array results when no custom reducer is supplied.
  • Documentation

    • Docs updated to describe reducer and initialValue usage and removed references to the deprecated maxChunks option.

Replace maxChunks parameter with flexible reducer function that delegates
data aggregation to consumer code. This provides full control over how
streamed chunks are combined into the final data structure.

Add support for custom placeholderData that works seamlessly with the
reducer function, allowing initialization of complex data types beyond
simple arrays.

TanStack#9065

BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery.
Use a custom reducer function to control data aggregation behavior instead.
@github-actions github-actions bot added documentation Improvements or additions to documentation package: query-core labels Aug 5, 2025
queryFn: streamedQuery<number, Record<number, boolean>>({
queryFn: () => createAsyncNumberGenerator(2),
reducer: (acc, chunk) => ({
...acc,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first time reducer is invoked, acc is an empty array, because placeholderData is not specified.
it seems a bit strange to me. but i'm not sure how to solve it.
i can apply the same logic as in native reduce...

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/reduce#initialvalue.

Copy link

nx-cloud bot commented Aug 11, 2025

View your CI Pipeline Execution ↗ for commit d627594

Command Status Duration Result
nx affected --targets=test:sherif,test:knip,tes... ✅ Succeeded 2m 42s View ↗
nx run-many --target=build --exclude=examples/*... ✅ Succeeded 1m 21s View ↗

☁️ Nx Cloud last updated this comment at 2025-09-03 12:41:10 UTC

autofix-ci bot and others added 4 commits August 11, 2025 12:31
…treamedQuery

Add type safety by making initialValue mandatory when providing a custom
reducer function. This prevents runtime errors and ensures proper data
initialization for custom data structures beyond simple arrays.

Use conditional types to enforce the relationship between reducer and
initialValue parameters, maintaining backward compatibility for simple
array-based streaming while requiring explicit initialization for
custom reducers.

BREAKING CHANGE: When using a custom reducer function with streamedQuery,
the initialValue parameter is now required and must be provided.
…treamedQuery

Add type safety by making initialValue mandatory when providing a custom
reducer function. This prevents runtime errors and ensures proper data
initialization for custom data structures beyond simple arrays.

Use conditional types to enforce the relationship between reducer and
initialValue parameters, maintaining backward compatibility for simple
array-based streaming while requiring explicit initialization for
custom reducers.

BREAKING CHANGE: When using a custom reducer function with streamedQuery,
the initialValue parameter is now required and must be provided.
…treamedQuery

Add type safety by making initialValue mandatory when providing a custom
reducer function. This prevents runtime errors and ensures proper data
initialization for custom data structures beyond simple arrays.

Use conditional types to enforce the relationship between reducer and
initialValue parameters, maintaining backward compatibility for simple
array-based streaming while requiring explicit initialization for
custom reducers.

BREAKING CHANGE: When using a custom reducer function with streamedQuery,
the initialValue parameter is now required and must be provided.
Copy link

coderabbitai bot commented Aug 27, 2025

Walkthrough

Replaces streamedQuery's chunk-count caching API with a reducer-driven accumulation model: adds reducer and optional initialValue, removes maxChunks, updates types, cache updates, tests, and docs so streamed chunks are reduced into a final TData accumulator.

Changes

Cohort / File(s) Summary
Docs: streamedQuery API
docs/reference/streamedQuery.md
Removed maxChunks; added reducer?: (accumulator: TData, chunk: TQueryFnData) => TData and initialValue?: TData; updated narrative to describe reducer-based accumulation and default append behavior when TData is an array.
Core Implementation: reducer-based streaming
packages/query-core/src/streamedQuery.ts
Reworked streamedQuery signature to generics <TQueryFnData, TData> with reducer and initialValue (defaults to array append/empty array); eliminated maxChunks and trimming logic; changed return type to TData; updated in-memory and cache updates to use reducer semantics; added internal union param types.
Tests: updated for new API
packages/query-core/src/__tests__/streamedQuery.test.tsx
Replaced maxChunks tests with reducer-based tests using reducer and initialValue; generator yields reduced from 3→2 chunks; expectations updated to accumulator/object shapes (e.g., keyed booleans); removed/refactored previous refetch-append scenarios; adjusted timing.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Caller
  participant SQ as streamedQuery(params)
  participant QF as queryFn (async generator)
  participant Red as reducer(acc, chunk)
  participant Cache as Query Cache

  Caller->>SQ: construct QueryFunction { queryFn, reducer?, initialValue? }
  SQ->>Cache: seed acc = initialValue (default [])
  SQ->>QF: start generator
  loop for each yielded chunk
    QF-->>SQ: yield chunk
    SQ->>Red: reducer(acc, chunk)
    Red-->>SQ: newAcc
    SQ->>Cache: setQueryData(prev => reducer(prev ?? initialValue, chunk))
    note right of Cache: cached accumulator updated per reducer
  end
  QF-->>SQ: complete
  SQ->>Cache: finalize accumulated TData
  SQ-->>Caller: return final TData
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I hop through streams where chunks once grew,
I fold each yield and stitch them new,
With reducer nib and seeded start,
I gather pieces, play my part.
Hooray for accumulators! 🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
packages/query-core/src/streamedQuery.ts (1)

42-49: Type-safety hole: callers can specify TData without a reducer. Add overloads to prevent mismatches.

As written, a caller can do streamedQuery<number, Record<string, boolean>>({ queryFn }) with no reducer. Types accept it, but runtime returns an Array while types claim a Record. Add overloads so TData is only specifiable when reducer is provided; otherwise TData is fixed to Array.

+// Overloads to prevent specifying a custom TData without a reducer
+export function streamedQuery<
+  TQueryFnData = unknown,
+  TQueryKey extends QueryKey = QueryKey,
+>(
+  params: SimpleStreamedQueryParams<TQueryFnData, TQueryKey>
+): QueryFunction<Array<TQueryFnData>, TQueryKey>
+
+export function streamedQuery<
+  TQueryFnData = unknown,
+  TData = Array<TQueryFnData>,
+  TQueryKey extends QueryKey = QueryKey,
+>(
+  params: ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>
+): QueryFunction<TData, TQueryKey>
+
 export function streamedQuery<
   TQueryFnData = unknown,
   TData = Array<TQueryFnData>,
   TQueryKey extends QueryKey = QueryKey,
 >({
   queryFn,
   refetchMode = 'reset',
   reducer = (items, chunk) => addToEnd(items as Array<TQueryFnData>, chunk) as TData,
   initialValue = [] as TData,
-}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<
-  TData,
-  TQueryKey
-> {
+}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<TData, TQueryKey> {

Optional: move the type aliases next to this for discoverability.

docs/reference/streamedQuery.md (1)

36-44: Docs/code mismatch: use “initialValue” instead of “placeholderData”, and document requirement when a reducer is provided.

Code uses initialValue and makes it mandatory with a reducer. Update docs accordingly.

-- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
-  - Optional
-  - A function to reduce the streamed chunks into the final data.
-  - Defaults to a function that appends chunks to the end of the array.
-- `placeholderData?: TData = TQueryFnData`
-  - Optional
-  - Defines the initial data to be used while the first chunk is being fetched.
-  - Defaults to an empty array.
+- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
+  - Optional
+  - Reduces streamed chunks into the accumulator.
+  - Defaults to appending chunks to the end of an Array<TQueryFnData>.
+- `initialValue?: TData`
+  - Optional without a reducer; required when a reducer is provided.
+  - The initial accumulator used before the first chunk arrives.
+  - Defaults to an empty array when no reducer is provided.
🧹 Nitpick comments (6)
.vscode/query.code-workspace (1)

28-29: Nit: remove stray blank lines to keep the workspace file minimal.

No functional impact, just trims noise in diffs.

-		
 		"window.zoomLevel": 2,
 		"typescript.format.enable": false,
@@
-		
 	}
 }

Also applies to: 39-41

packages/query-core/src/streamedQuery.ts (2)

26-35: JSDoc still implies Array-only data; clarify that TData can be any accumulator.

Update wording so it doesn’t promise “Data will be an Array…”. With custom reducers, TData is arbitrary.

- * Data will be an Array of all the chunks received.
+ * Data will be accumulated into TData. By default, chunks are appended to an Array<TQueryFnData>.

79-81: Aborted reset-refetch before first chunk can return undefined despite non-null assertion.

If refetchMode==='reset' and the stream is aborted before yielding a chunk, getQueryData(...) is undefined but asserted with !. Either (a) widen return to TData | undefined, or (b) explicitly return query?.state.data to reflect cache state (still undefined), or (c) guard with an invariant/error.

Minimal change:

-    return context.client.getQueryData(context.queryKey)!
+    return context.client.getQueryData(context.queryKey) as TData

Or, if you can accept a type change:

-}: StreamedQueryParams<...>): QueryFunction<TData, TQueryKey> {
+}: StreamedQueryParams<...>): QueryFunction<TData | undefined, TQueryKey> {
...
-    return context.client.getQueryData(context.queryKey)!
+    return context.client.getQueryData(context.queryKey)

Please confirm expected behavior for “reset + aborted before first chunk”.

Also applies to: 91-91

packages/query-core/src/__tests__/streamedQuery.test.tsx (1)

131-181: Consider adding a test for “replace + abort” to lock in cache semantics.

Edge case: refetchMode 'replace' + abort mid-stream should retain old data and never write partials. A focused test would prevent regressions.

I can add a vitest like:

test('should keep old data on replace when aborted before completion', async () => {
  // arrange: stream 2 items, then abort before the 2nd
  // expect: data stays at old [0,1], never becomes [100] mid-flight, ends with [0,1]
})

Also applies to: 240-296

docs/reference/streamedQuery.md (2)

25-29: Typing in docs: queryFn should return AsyncIterable, not TData.

Align with implementation and types.

-- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>`
+- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TQueryFnData>>`

29-35: Typo: missing closing quote in refetchMode union.

Small formatting fix.

-- `refetchMode?: 'append' | 'reset' | 'replace`
+- `refetchMode?: 'append' | 'reset' | 'replace'`
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between d25c0da and 42b9908.

📒 Files selected for processing (4)
  • .vscode/query.code-workspace (1 hunks)
  • docs/reference/streamedQuery.md (1 hunks)
  • packages/query-core/src/__tests__/streamedQuery.test.tsx (3 hunks)
  • packages/query-core/src/streamedQuery.ts (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
packages/query-core/src/streamedQuery.ts (1)
  • streamedQuery (40-93)
packages/query-core/src/streamedQuery.ts (3)
packages/query-core/src/types.ts (3)
  • QueryKey (53-61)
  • QueryFunctionContext (138-165)
  • QueryFunction (96-100)
packages/query-core/src/index.ts (1)
  • streamedQuery (37-37)
packages/query-core/src/utils.ts (1)
  • addToEnd (392-395)
🪛 LanguageTool
docs/reference/streamedQuery.md

[grammar] ~38-~38: There might be a mistake here.
Context: ...the streamed chunks into the final data. - Defaults to a function that appends chun...

(QB_NEW_EN)

🔇 Additional comments (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx (2)

353-385: LGTM: reducer mode is exercised with a non-array accumulator.

Good coverage of custom reducer semantics and final shape.


390-429: LGTM: initialValue path works and merges correctly.

Nice example of seeding the accumulator.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
docs/reference/streamedQuery.md (4)

25-28: Fix chunk type in queryFn signature

The async iterable yields chunk type, not the accumulator type. Use TQueryFnData here to avoid type confusion with TData.

- - `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>`
+ - `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TQueryFnData>>`

29-35: Close the missing quote in refetchMode options

Small typo: missing closing quote around 'replace'.

-- `refetchMode?: 'append' | 'reset' | 'replace`
+- `refetchMode?: 'append' | 'reset' | 'replace'`

40-45: Correct initialValue typing and requirements

  • “initialValue?: TData = TQueryFnData” reads like a type default on a value; remove “= TQueryFnData”.
  • Clarify when initialValue is required and what the default is.
  • Grammar: add “a” before “custom reducer”.
-`initialValue?: TData = TQueryFnData`
-  - Optional
-  - Defines the initial data to be used while the first chunk is being fetched. 
-  - It is mandatory when custom `reducer` is provided.
-  - Defaults to an empty array.
+`initialValue?: TData`
+  - Optional
+  - Initial accumulator value used while the first chunk is being fetched.
+  - Required when a custom `reducer` is provided or when `TData` is not an array.
+  - Default: `[]` when `TData` is an array; otherwise no default.

6-6: Update overview to reflect reducer-based accumulation (not always an array)

This sentence contradicts the new reducer-based API. By default it’s an array, but with a custom reducer it can be any shape.

-`streamedQuery` is a helper function to create a query function that streams data from an AsyncIterable. Data will be an Array of all the chunks received. The query will be in a `pending` state until the first chunk of data is received, but will go to `success` after that. The query will stay in fetchStatus `fetching` until the stream ends.
+`streamedQuery` helps create a query function that streams data from an AsyncIterable. By default, data accumulates into an array of all received chunks; with a custom `reducer` (and `initialValue`), you can accumulate into any shape. The query will be `pending` until the first chunk arrives, then `success` while fetchStatus remains `fetching` until the stream ends.
🧹 Nitpick comments (2)
docs/reference/streamedQuery.md (2)

36-45: Tighten wording and fix minor grammar nits

  • “the streamed chunks into the final data” → fine, but remove extra spaces.
  • “It is mandatory when custom reducer is provided.” → add article “a”.
-  - A function to reduce the streamed chunks into the final data.
+  - A function that reduces streamed chunks into the final data.
@@
-  - It is mandatory when custom `reducer` is provided.
+  - It is mandatory when a custom `reducer` is provided.

45-45: Consider adding a minimal custom reducer example

A short example will help users migrate from maxChunks and understand TQueryFnData vs TData.

+#### Example: Aggregating into a non-array accumulator
+```ts
+import { experimental_streamedQuery as streamedQuery } from '@tanstack/react-query'
+
+type Chunk = { id: string; value: number }
+type Acc = Map<string, number>
+
+const query = queryOptions({
+  queryKey: ['stats'],
+  queryFn: streamedQuery<Chunk, Acc>({
+    queryFn: fetchChunkStream, // () => Promise<AsyncIterable<Chunk>>
+    initialValue: new Map(),
+    reducer: (acc, chunk) => acc.set(chunk.id, (acc.get(chunk.id) ?? 0) + chunk.value),
+    refetchMode: 'replace',
+  }),
+})
+```

Would you like me to add a migration note showing how to replicate the old maxChunks behavior with a reducer?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 42b9908 and 5f68a62.

📒 Files selected for processing (1)
  • docs/reference/streamedQuery.md (1 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/reference/streamedQuery.md

[grammar] ~38-~38: There might be a mistake here.
Context: ...the streamed chunks into the final data. - Defaults to a function that appends chun...

(QB_NEW_EN)


[grammar] ~43-~43: There might be a mistake here.
Context: ... is being fetched. - It is mandatory when custom reducer is provided. - Defau...

(QB_NEW_EN)


[grammar] ~43-~43: There might be a mistake here.
Context: ...atory when custom reducer is provided. - Defaults to an empty array.

(QB_NEW_EN)

gobbimar added 2 commits September 3, 2025 11:53
…l-reducer-to-streamed-query

# Conflicts:
#	packages/query-core/src/__tests__/streamedQuery.test.tsx
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
docs/reference/streamedQuery.md (2)

25-25: Correct the chunk type in queryFn signature

The stream yields TQueryFnData, not TData.

-- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>`
+- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TQueryFnData>>`

6-6: Update the intro to reflect reducer-driven data shapes

The current sentence says data “will be an Array,” which is no longer always true.

-`streamedQuery` is a helper function to create a query function that streams data from an AsyncIterable. Data will be an Array of all the chunks received. The query will be in a `pending` state until the first chunk of data is received, but will go to `success` after that. The query will stay in fetchStatus `fetching` until the stream ends.
+`streamedQuery` is a helper to stream data from an AsyncIterable. By default, data is an array of received chunks; if you provide a `reducer`, the data takes whatever shape your reducer returns. The query is `pending` until the first chunk, then `success`, and remains in fetchStatus `fetching` until the stream ends.
♻️ Duplicate comments (1)
docs/reference/streamedQuery.md (1)

36-40: Clarification on default reducer looks good

The notes now correctly state the default only applies to array TData and that non-array TData requires a custom reducer.

🧹 Nitpick comments (3)
docs/reference/streamedQuery.md (1)

29-35: Close the missing quote in refetchMode union

Minor typo:

-- `refetchMode?: 'append' | 'reset' | 'replace`
+- `refetchMode?: 'append' | 'reset' | 'replace'`
packages/query-core/src/__tests__/streamedQuery.test.tsx (2)

417-426: Assert mid-stream accumulator to prove incremental reduction

Add an intermediate assertion to verify reducer updates between chunks:

-    await vi.advanceTimersByTimeAsync(100)
+    await vi.advanceTimersByTimeAsync(50)
+
+    expect(observer.getCurrentResult()).toMatchObject({
+      status: 'success',
+      fetchStatus: 'fetching',
+      data: { 0: true },
+    })
+
+    await vi.advanceTimersByTimeAsync(50)

431-445: Rename test to emphasize seed semantics (optional)

The term “initialValue (seed)” helps readers connect this to reducer semantics.

-  test('should support custom reducer with initialValue', async () => {
+  test('should support custom reducer with initialValue (seed)', async () => {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5f68a62 and da93cc7.

📒 Files selected for processing (2)
  • docs/reference/streamedQuery.md (1 hunks)
  • packages/query-core/src/__tests__/streamedQuery.test.tsx (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
packages/query-core/src/streamedQuery.ts (1)
  • streamedQuery (40-93)
🪛 LanguageTool
docs/reference/streamedQuery.md

[grammar] ~44-~44: There might be a mistake here.
Context: ... is being fetched. - It is mandatory when custom reducer is provided. - Defau...

(QB_NEW_EN)


[grammar] ~44-~44: There might be a mistake here.
Context: ...atory when custom reducer is provided. - Defaults to an empty array.

(QB_NEW_EN)

🔇 Additional comments (3)
docs/reference/streamedQuery.md (1)

41-45: Fix initialValue signature and semantics in docs

  • Remove the invalid default (= TQueryFnData) and clarify that initialValue is the reduction seed, not a cache placeholder.
  • Confirm docs match code: no runtime enforcement of “mandatory when custom reducer is provided.”

Apply:

- `initialValue?: TData = TQueryFnData`
+ `initialValue?: TData`
   - Optional
-  - Defines the initial data to be used while the first chunk is being fetched. 
-  - It is mandatory when custom `reducer` is provided.
-  - Defaults to an empty array.
+  - Seed for the accumulator passed to `reducer` before any chunks arrive (not written to the cache before the first chunk).
+  - When `TData` is not an array or your reducer needs a non-empty seed, provide `initialValue` explicitly.
+  - Default: `[] as TData` (used by the default array-append reducer). For non-array `TData`, pass a suitable `initialValue`.
packages/query-core/src/__tests__/streamedQuery.test.tsx (2)

399-406: LGTM: solid example reducer + explicit seed

Using initialValue: {} avoids accidental array seeds and makes the runtime shape unambiguous.


456-467: Great coverage of merge behavior

Verifies that seeded keys persist and incoming chunks are merged into the accumulator.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
docs/reference/streamedQuery.md (3)

6-6: Intro still claims data is always an Array — update to reflect reducer mode

With a custom reducer, data is not guaranteed to be an Array.

Apply:

-`streamedQuery` is a helper function to create a query function that streams data from an AsyncIterable. Data will be an Array of all the chunks received. The query will be in a `pending` state until the first chunk of data is received, but will go to `success` after that. The query will stay in fetchStatus `fetching` until the stream ends.
+`streamedQuery` is a helper to create a query function that streams data from an AsyncIterable. By default, data is an Array of received chunks; with a custom `reducer`, data is whatever accumulator shape (`TData`) your reducer returns. The query will be `pending` until the first chunk is received and `success` afterwards. It stays in fetchStatus `fetching` until the stream ends.

25-29: Fix queryFn signature and a typo in refetchMode

  • queryFn streams TQueryFnData, not TData.
  • Support both AsyncIterable and Promise<AsyncIterable>.
  • Close the stray quote in 'replace'.
-- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>`
+- `queryFn: (context: QueryFunctionContext) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>`
   - **Required**
   - The function that returns a Promise of an AsyncIterable of data to stream in.
   - Receives a [QueryFunctionContext](../../framework/react/guides/query-functions.md#queryfunctioncontext)
-- `refetchMode?: 'append' | 'reset' | 'replace`
+- `refetchMode?: 'append' | 'reset' | 'replace'`

36-46: Clarify reducer/initialValue semantics and fix grammar

  • Make initialValue just TData, not “= TQueryFnData”.
  • Say “Required when a custom reducer is provided.”
  • State the default applies only for array TData.
 - `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
   - Optional
   - Reduces streamed chunks (`TQueryFnData`) into the final data shape (`TData`).
   - Default: appends each chunk to the end of the accumulator when `TData` is an array.
   - If `TData` is not an array, you must provide a custom `reducer`.
-`initialValue?: TData = TQueryFnData`
+`initialValue?: TData`
   - Optional
-  - Defines the initial data to be used while the first chunk is being fetched.
-  - It is mandatory when custom `reducer` is provided.
-  - Defaults to an empty array.
+  - Seed value for the reducer (the accumulator’s initial state). It is not shown as placeholder data.
+  - Required when a custom `reducer` is provided.
+  - Default (no custom reducer): `[]` when `TData` is an array.
packages/query-core/src/streamedQuery.ts (1)

97-98: Return a defined value when the stream yields no chunks

On empty streams (or early abort before the first chunk), getQueryData may be undefined. Fall back to result (the seeded accumulator).

-    return context.client.getQueryData(context.queryKey)!
+    const cached = context.client.getQueryData<TData>(context.queryKey)
+    return cached ?? result
🧹 Nitpick comments (3)
packages/query-core/src/streamedQuery.ts (2)

42-45: Update JSDoc to match reducer/initialValue semantics

Reflect accumulator semantics and the array-only default.

- * @param reducer - A function to reduce the streamed chunks into the final data.
- * Defaults to a function that appends chunks to the end of the array.
- * @param initialValue - Initial value to be used while the first chunk is being fetched.
+ * @param reducer - Reduces streamed chunks (`TQueryFnData`) into the accumulator (`TData`).
+ * Defaults to appending each chunk when `TData` is an array.
+ * @param initialValue - Seed value for the reducer (not shown as placeholder data).

85-90: Avoid double reduction per chunk and keep result in lockstep with the cache

You currently compute the reduction twice (once in setQueryData, once for result). Merge them to reduce work and prevent divergence.

-        context.client.setQueryData<TData>(context.queryKey, (prev) =>
-          reducer(prev ?? initialValue, chunk),
-        )
-      }
-      result = reducer(result, chunk)
+        context.client.setQueryData<TData>(context.queryKey, (prev) => {
+          const next = reducer((prev ?? initialValue) as TData, chunk)
+          result = next
+          return next
+        })
+      } else {
+        result = reducer(result, chunk)
+      }
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)

431-471: Seeded accumulator test LGTM; consider adding empty-stream and replace+reducer cases

  • Add a test where the stream yields zero chunks to lock in the fallback result.
  • Add a “refetchMode: 'replace' + custom reducer” test to assert no intermediate cache writes.

Example additions:

test('returns seeded accumulator on empty stream', async () => {
  const key = queryKey()
  const observer = new QueryObserver(queryClient, {
    queryKey: key,
    queryFn: streamedQuery<number, Record<number, boolean>>({
      queryFn: async function* () {}, // no chunks
      reducer: (acc, chunk) => ({ ...acc, [chunk]: true }),
      initialValue: { 42: true },
    }),
  })
  observer.subscribe(vi.fn())
  await vi.advanceTimersByTimeAsync(10)
  expect(observer.getCurrentResult()).toMatchObject({
    status: 'success',
    fetchStatus: 'idle',
    data: { 42: true },
  })
})

test('replace refetch with custom reducer writes only at the end', async () => {
  const key = queryKey()
  let offset = 0
  const observer = new QueryObserver(queryClient, {
    queryKey: key,
    queryFn: streamedQuery<number, Record<number, boolean>>({
      queryFn: () => createAsyncNumberGenerator(2, offset),
      refetchMode: 'replace',
      reducer: (acc, chunk) => ({ ...acc, [chunk]: true }),
      initialValue: {},
    }),
  })
  observer.subscribe(vi.fn())
  await vi.advanceTimersByTimeAsync(100)
  expect(observer.getCurrentResult().data).toEqual({ 0: true, 1: true })
  offset = 100
  void observer.refetch()
  await vi.advanceTimersByTimeAsync(90)
  // still old data until stream ends
  expect(observer.getCurrentResult().data).toEqual({ 0: true, 1: true })
  await vi.advanceTimersByTimeAsync(10)
  expect(observer.getCurrentResult().data).toEqual({ 100: true, 101: true })
})
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between da93cc7 and e3a54ea.

📒 Files selected for processing (3)
  • docs/reference/streamedQuery.md (1 hunks)
  • packages/query-core/src/__tests__/streamedQuery.test.tsx (3 hunks)
  • packages/query-core/src/streamedQuery.ts (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx (2)
packages/query-core/src/index.ts (1)
  • streamedQuery (37-37)
packages/query-core/src/streamedQuery.ts (1)
  • streamedQuery (46-99)
packages/query-core/src/streamedQuery.ts (3)
packages/query-core/src/types.ts (3)
  • QueryKey (53-61)
  • QueryFunctionContext (138-165)
  • QueryFunction (96-100)
packages/query-core/src/index.ts (1)
  • streamedQuery (37-37)
packages/query-core/src/utils.ts (1)
  • addToEnd (399-402)
🪛 LanguageTool
docs/reference/streamedQuery.md

[grammar] ~44-~44: There might be a mistake here.
Context: ...k is being fetched. - It is mandatory when custom reducer is provided. - Defau...

(QB_NEW_EN)


[grammar] ~44-~44: There might be a mistake here.
Context: ...atory when custom reducer is provided. - Defaults to an empty array.

(QB_NEW_EN)

🔇 Additional comments (2)
packages/query-core/src/streamedQuery.ts (1)

53-56: Default reducer/seed rely on array semantics — safe once overloads land

Current casts (as Array<TQueryFnData>, [] as TData) are acceptable if the simple path is constrained to arrays via overloads; otherwise they’re unsafe.

Do you want me to push the overloads and adjust these casts accordingly?

packages/query-core/src/__tests__/streamedQuery.test.tsx (1)

394-429: Custom reducer test looks good

Covers non-array TData accumulation with correct timing and final shape.

Copy link

pkg-pr-new bot commented Sep 3, 2025

More templates

@tanstack/angular-query-devtools-experimental

npm i https://pkg.pr.new/@tanstack/angular-query-devtools-experimental@9532

@tanstack/angular-query-experimental

npm i https://pkg.pr.new/@tanstack/angular-query-experimental@9532

@tanstack/eslint-plugin-query

npm i https://pkg.pr.new/@tanstack/eslint-plugin-query@9532

@tanstack/query-async-storage-persister

npm i https://pkg.pr.new/@tanstack/query-async-storage-persister@9532

@tanstack/query-broadcast-client-experimental

npm i https://pkg.pr.new/@tanstack/query-broadcast-client-experimental@9532

@tanstack/query-core

npm i https://pkg.pr.new/@tanstack/query-core@9532

@tanstack/query-devtools

npm i https://pkg.pr.new/@tanstack/query-devtools@9532

@tanstack/query-persist-client-core

npm i https://pkg.pr.new/@tanstack/query-persist-client-core@9532

@tanstack/query-sync-storage-persister

npm i https://pkg.pr.new/@tanstack/query-sync-storage-persister@9532

@tanstack/react-query

npm i https://pkg.pr.new/@tanstack/react-query@9532

@tanstack/react-query-devtools

npm i https://pkg.pr.new/@tanstack/react-query-devtools@9532

@tanstack/react-query-next-experimental

npm i https://pkg.pr.new/@tanstack/react-query-next-experimental@9532

@tanstack/react-query-persist-client

npm i https://pkg.pr.new/@tanstack/react-query-persist-client@9532

@tanstack/solid-query

npm i https://pkg.pr.new/@tanstack/solid-query@9532

@tanstack/solid-query-devtools

npm i https://pkg.pr.new/@tanstack/solid-query-devtools@9532

@tanstack/solid-query-persist-client

npm i https://pkg.pr.new/@tanstack/solid-query-persist-client@9532

@tanstack/svelte-query

npm i https://pkg.pr.new/@tanstack/svelte-query@9532

@tanstack/svelte-query-devtools

npm i https://pkg.pr.new/@tanstack/svelte-query-devtools@9532

@tanstack/svelte-query-persist-client

npm i https://pkg.pr.new/@tanstack/svelte-query-persist-client@9532

@tanstack/vue-query

npm i https://pkg.pr.new/@tanstack/vue-query@9532

@tanstack/vue-query-devtools

npm i https://pkg.pr.new/@tanstack/vue-query-devtools@9532

commit: d627594

Copy link

codecov bot commented Sep 3, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 59.37%. Comparing base (43049c5) to head (d627594).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##             main    #9532       +/-   ##
===========================================
+ Coverage   45.26%   59.37%   +14.10%     
===========================================
  Files         208      137       -71     
  Lines        8340     5580     -2760     
  Branches     1894     1504      -390     
===========================================
- Hits         3775     3313      -462     
+ Misses       4118     1963     -2155     
+ Partials      447      304      -143     
Components Coverage Δ
@tanstack/angular-query-devtools-experimental ∅ <ø> (∅)
@tanstack/angular-query-experimental 87.00% <ø> (ø)
@tanstack/eslint-plugin-query ∅ <ø> (∅)
@tanstack/query-async-storage-persister 43.85% <ø> (ø)
@tanstack/query-broadcast-client-experimental 24.39% <ø> (ø)
@tanstack/query-codemods ∅ <ø> (∅)
@tanstack/query-core 97.43% <100.00%> (+<0.01%) ⬆️
@tanstack/query-devtools 3.48% <ø> (ø)
@tanstack/query-persist-client-core 79.47% <ø> (ø)
@tanstack/query-sync-storage-persister 84.61% <ø> (ø)
@tanstack/query-test-utils ∅ <ø> (∅)
@tanstack/react-query 96.00% <ø> (ø)
@tanstack/react-query-devtools 10.00% <ø> (ø)
@tanstack/react-query-next-experimental ∅ <ø> (∅)
@tanstack/react-query-persist-client 100.00% <ø> (ø)
@tanstack/solid-query 78.13% <ø> (ø)
@tanstack/solid-query-devtools ∅ <ø> (∅)
@tanstack/solid-query-persist-client 100.00% <ø> (ø)
@tanstack/svelte-query 87.58% <ø> (ø)
@tanstack/svelte-query-devtools ∅ <ø> (∅)
@tanstack/svelte-query-persist-client 100.00% <ø> (ø)
@tanstack/vue-query 71.10% <ø> (ø)
@tanstack/vue-query-devtools ∅ <ø> (∅)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

queryFn: streamedQuery({
queryFn: () => createAsyncNumberGenerator(3),
maxChunks: 2,
queryFn: streamedQuery<number, Record<number, boolean>>({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let’s not pass types this way - as it won’t test if type inference works. Instead, let’s type the initialValue:

initialValue: {} as Record<number, boolean>

queryFn: () => createAsyncNumberGenerator(3),
maxChunks: 2,
refetchMode: 'append',
queryFn: streamedQuery<number, Record<number, boolean>>({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here please:

Suggested change
queryFn: streamedQuery<number, Record<number, boolean>>({
queryFn: streamedQuery({

gobbimar added 2 commits September 3, 2025 14:05
…o-streamed-query' into feature/added-optional-reducer-to-streamed-query

# Conflicts:
#	packages/query-core/src/__tests__/streamedQuery.test.tsx
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/query-core/src/streamedQuery.ts (1)

32-45: Fix JSDoc: not always an array; clarify replace semantics and initialValue requirement.

The doc still states data is always an array and doesn’t reflect that replace gating applies only on refetch.

- * Data will be an Array of all the chunks received.
+ * By default, data is an array that appends each chunk. With a custom reducer + initialValue,
+ * data is the accumulator type TData that you control.
@@
- * Set to `'replace'` to write all data to the cache once the stream ends.
+ * Set to `'replace'` to defer cache writes until the stream ends on refetches.
+ * (On initial fetches, cache is still updated per chunk.)
@@
- * @param reducer - A function to reduce the streamed chunks into the final data.
- * Defaults to a function that appends chunks to the end of the array.
- * @param initialValue - Initial value to be used while the first chunk is being fetched.
+ * @param reducer - Function to reduce chunks into the accumulator (TData).
+ * Defaults to appending to the end of an array.
+ * @param initialValue - Required when a custom reducer is provided; seeds the accumulator.
♻️ Duplicate comments (2)
packages/query-core/src/streamedQuery.ts (2)

85-87: Good fix: undefined check prevents “empty” collisions.

Using prev === undefined ? initialValue : prev correctly preserves null as a valid value.


11-31: Enforce array TData when no reducer (add overloads).

Current signature lets callers pick non-array TData in the “simple” path, while the default reducer assumes arrays. Add overloads to couple API shape to return type and close the type hole.

+// Overloads to bind API shape to TData
+export function streamedQuery<
+  TQueryFnData = unknown,
+  TQueryKey extends QueryKey = QueryKey,
+>(params: SimpleStreamedQueryParams<TQueryFnData, TQueryKey>): QueryFunction<
+  Array<TQueryFnData>,
+  TQueryKey
+>
+export function streamedQuery<
+  TQueryFnData = unknown,
+  TData = unknown,
+  TQueryKey extends QueryKey = QueryKey,
+>(params: ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<
+  TData,
+  TQueryKey
+>
🧹 Nitpick comments (3)
packages/query-core/src/streamedQuery.ts (3)

46-56: Prefer resolving defaults inside the function to improve narrowing and readability.

Move defaults for reducer/initialValue out of the parameter list and resolve them locally. Keeps params in sync with the union types and avoids “phantom” defaults in the reducible path.

 export function streamedQuery<
   TQueryFnData = unknown,
   TData = Array<TQueryFnData>,
   TQueryKey extends QueryKey = QueryKey,
 >({
   queryFn,
   refetchMode = 'reset',
-  reducer = (items, chunk) =>
-    addToEnd(items as Array<TQueryFnData>, chunk) as TData,
-  initialValue = [] as TData,
+  reducer,
+  initialValue,
 }: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<
   TData,
   TQueryKey
 > {
-  return async (context) => {
+  return async (context) => {
+    const resolvedReducer =
+      (reducer ??
+        ((items: TData, chunk: TQueryFnData) =>
+          addToEnd(items as unknown as Array<TQueryFnData>, chunk) as TData))
+    const resolvedInitial = (initialValue ??
+      ([] as unknown as TData)) as TData

And then replace uses of reducer/initialValue below with resolvedReducer/resolvedInitial.


89-90: Nit: avoid reducing twice when not needed.

result is only read for replace-refetch finalization. Skip updating it otherwise to save work on hot paths.

-      result = reducer(result, chunk)
+      if (isRefetch && refetchMode === 'replace') {
+        result = reducer(result, chunk)
+      }

If you apply the “resolved*” refactor above, use resolvedReducer.


32-42: Heads-up: memory behavior is now fully consumer-defined.

With maxChunks removed, long-running streams can grow unbounded unless the reducer implements windowing/truncation. Call out this guidance prominently in docs with examples.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e3a54ea and d627594.

📒 Files selected for processing (2)
  • packages/query-core/src/__tests__/streamedQuery.test.tsx (3 hunks)
  • packages/query-core/src/streamedQuery.ts (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/query-core/src/tests/streamedQuery.test.tsx
🧰 Additional context used
🧬 Code graph analysis (1)
packages/query-core/src/streamedQuery.ts (2)
packages/query-core/src/types.ts (3)
  • QueryKey (53-61)
  • QueryFunctionContext (138-165)
  • QueryFunction (96-100)
packages/query-core/src/utils.ts (1)
  • addToEnd (399-402)
🔇 Additional comments (2)
packages/query-core/src/streamedQuery.ts (2)

4-26: API shape looks solid; names are clear and local. Consider exporting types later if docs need them.

No action required now.


32-36: streamedQuery: verify abort/no-chunk behavior
If a stream aborts before the first yield (or produces no chunks), the function currently returns undefined (no cache write) and ends in “success.” Confirm whether abort-before-emit should instead be treated as cancellation (e.g. throw CancelledError by tracking a hasEmitted flag) and update streamedQuery accordingly. Add tests for empty-stream and early-abort scenarios.

@marcog83 marcog83 requested a review from TkDodo September 3, 2025 17:52
@TkDodo TkDodo merged commit 8f24580 into TanStack:main Sep 4, 2025
8 checks passed
@marcog83
Copy link
Contributor Author

marcog83 commented Sep 4, 2025

Thanks a lot for letting me collaborate on react-query! It’s been an awesome experience.

@marcog83 marcog83 deleted the feature/added-optional-reducer-to-streamed-query branch September 4, 2025 19:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation package: query-core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants