-
-
Notifications
You must be signed in to change notification settings - Fork 3.4k
feat(query-core): add custom reducer support to streamedQuery #9532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(query-core): add custom reducer support to streamedQuery #9532
Conversation
Replace maxChunks parameter with flexible reducer function that delegates data aggregation to consumer code. This provides full control over how streamed chunks are combined into the final data structure. Add support for custom placeholderData that works seamlessly with the reducer function, allowing initialization of complex data types beyond simple arrays. TanStack#9065 BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery. Use a custom reducer function to control data aggregation behavior instead.
queryFn: streamedQuery<number, Record<number, boolean>>({ | ||
queryFn: () => createAsyncNumberGenerator(2), | ||
reducer: (acc, chunk) => ({ | ||
...acc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the first time reducer
is invoked, acc
is an empty array, because placeholderData
is not specified.
it seems a bit strange to me. but i'm not sure how to solve it.
i can apply the same logic as in native reduce
...
View your CI Pipeline Execution ↗ for commit d627594
☁️ Nx Cloud last updated this comment at |
…treamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided.
…treamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided.
…treamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided.
WalkthroughReplaces streamedQuery's chunk-count caching API with a reducer-driven accumulation model: adds Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Caller
participant SQ as streamedQuery(params)
participant QF as queryFn (async generator)
participant Red as reducer(acc, chunk)
participant Cache as Query Cache
Caller->>SQ: construct QueryFunction { queryFn, reducer?, initialValue? }
SQ->>Cache: seed acc = initialValue (default [])
SQ->>QF: start generator
loop for each yielded chunk
QF-->>SQ: yield chunk
SQ->>Red: reducer(acc, chunk)
Red-->>SQ: newAcc
SQ->>Cache: setQueryData(prev => reducer(prev ?? initialValue, chunk))
note right of Cache: cached accumulator updated per reducer
end
QF-->>SQ: complete
SQ->>Cache: finalize accumulated TData
SQ-->>Caller: return final TData
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
packages/query-core/src/streamedQuery.ts (1)
42-49
: Type-safety hole: callers can specify TData without a reducer. Add overloads to prevent mismatches.As written, a caller can do streamedQuery<number, Record<string, boolean>>({ queryFn }) with no reducer. Types accept it, but runtime returns an Array while types claim a Record. Add overloads so TData is only specifiable when reducer is provided; otherwise TData is fixed to Array.
+// Overloads to prevent specifying a custom TData without a reducer +export function streamedQuery< + TQueryFnData = unknown, + TQueryKey extends QueryKey = QueryKey, +>( + params: SimpleStreamedQueryParams<TQueryFnData, TQueryKey> +): QueryFunction<Array<TQueryFnData>, TQueryKey> + +export function streamedQuery< + TQueryFnData = unknown, + TData = Array<TQueryFnData>, + TQueryKey extends QueryKey = QueryKey, +>( + params: ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey> +): QueryFunction<TData, TQueryKey> + export function streamedQuery< TQueryFnData = unknown, TData = Array<TQueryFnData>, TQueryKey extends QueryKey = QueryKey, >({ queryFn, refetchMode = 'reset', reducer = (items, chunk) => addToEnd(items as Array<TQueryFnData>, chunk) as TData, initialValue = [] as TData, -}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction< - TData, - TQueryKey -> { +}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<TData, TQueryKey> {Optional: move the type aliases next to this for discoverability.
docs/reference/streamedQuery.md (1)
36-44
: Docs/code mismatch: use “initialValue” instead of “placeholderData”, and document requirement when a reducer is provided.Code uses initialValue and makes it mandatory with a reducer. Update docs accordingly.
-- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData` - - Optional - - A function to reduce the streamed chunks into the final data. - - Defaults to a function that appends chunks to the end of the array. -- `placeholderData?: TData = TQueryFnData` - - Optional - - Defines the initial data to be used while the first chunk is being fetched. - - Defaults to an empty array. +- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData` + - Optional + - Reduces streamed chunks into the accumulator. + - Defaults to appending chunks to the end of an Array<TQueryFnData>. +- `initialValue?: TData` + - Optional without a reducer; required when a reducer is provided. + - The initial accumulator used before the first chunk arrives. + - Defaults to an empty array when no reducer is provided.
🧹 Nitpick comments (6)
.vscode/query.code-workspace (1)
28-29
: Nit: remove stray blank lines to keep the workspace file minimal.No functional impact, just trims noise in diffs.
- "window.zoomLevel": 2, "typescript.format.enable": false, @@ - } }Also applies to: 39-41
packages/query-core/src/streamedQuery.ts (2)
26-35
: JSDoc still implies Array-only data; clarify that TData can be any accumulator.Update wording so it doesn’t promise “Data will be an Array…”. With custom reducers, TData is arbitrary.
- * Data will be an Array of all the chunks received. + * Data will be accumulated into TData. By default, chunks are appended to an Array<TQueryFnData>.
79-81
: Aborted reset-refetch before first chunk can return undefined despite non-null assertion.If refetchMode==='reset' and the stream is aborted before yielding a chunk, getQueryData(...) is undefined but asserted with !. Either (a) widen return to TData | undefined, or (b) explicitly return query?.state.data to reflect cache state (still undefined), or (c) guard with an invariant/error.
Minimal change:
- return context.client.getQueryData(context.queryKey)! + return context.client.getQueryData(context.queryKey) as TDataOr, if you can accept a type change:
-}: StreamedQueryParams<...>): QueryFunction<TData, TQueryKey> { +}: StreamedQueryParams<...>): QueryFunction<TData | undefined, TQueryKey> { ... - return context.client.getQueryData(context.queryKey)! + return context.client.getQueryData(context.queryKey)Please confirm expected behavior for “reset + aborted before first chunk”.
Also applies to: 91-91
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
131-181
: Consider adding a test for “replace + abort” to lock in cache semantics.Edge case: refetchMode 'replace' + abort mid-stream should retain old data and never write partials. A focused test would prevent regressions.
I can add a vitest like:
test('should keep old data on replace when aborted before completion', async () => { // arrange: stream 2 items, then abort before the 2nd // expect: data stays at old [0,1], never becomes [100] mid-flight, ends with [0,1] })Also applies to: 240-296
docs/reference/streamedQuery.md (2)
25-29
: Typing in docs: queryFn should return AsyncIterable, not TData.Align with implementation and types.
-- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>` +- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TQueryFnData>>`
29-35
: Typo: missing closing quote in refetchMode union.Small formatting fix.
-- `refetchMode?: 'append' | 'reset' | 'replace` +- `refetchMode?: 'append' | 'reset' | 'replace'`
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
.vscode/query.code-workspace
(1 hunks)docs/reference/streamedQuery.md
(1 hunks)packages/query-core/src/__tests__/streamedQuery.test.tsx
(3 hunks)packages/query-core/src/streamedQuery.ts
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
packages/query-core/src/streamedQuery.ts (1)
streamedQuery
(40-93)
packages/query-core/src/streamedQuery.ts (3)
packages/query-core/src/types.ts (3)
QueryKey
(53-61)QueryFunctionContext
(138-165)QueryFunction
(96-100)packages/query-core/src/index.ts (1)
streamedQuery
(37-37)packages/query-core/src/utils.ts (1)
addToEnd
(392-395)
🪛 LanguageTool
docs/reference/streamedQuery.md
[grammar] ~38-~38: There might be a mistake here.
Context: ...the streamed chunks into the final data. - Defaults to a function that appends chun...
(QB_NEW_EN)
🔇 Additional comments (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx (2)
353-385
: LGTM: reducer mode is exercised with a non-array accumulator.Good coverage of custom reducer semantics and final shape.
390-429
: LGTM: initialValue path works and merges correctly.Nice example of seeding the accumulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
docs/reference/streamedQuery.md (4)
25-28
: Fix chunk type in queryFn signatureThe async iterable yields chunk type, not the accumulator type. Use TQueryFnData here to avoid type confusion with TData.
- - `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>` + - `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TQueryFnData>>`
29-35
: Close the missing quote in refetchMode optionsSmall typo: missing closing quote around 'replace'.
-- `refetchMode?: 'append' | 'reset' | 'replace` +- `refetchMode?: 'append' | 'reset' | 'replace'`
40-45
: Correct initialValue typing and requirements
- “initialValue?: TData = TQueryFnData” reads like a type default on a value; remove “= TQueryFnData”.
- Clarify when initialValue is required and what the default is.
- Grammar: add “a” before “custom reducer”.
-`initialValue?: TData = TQueryFnData` - - Optional - - Defines the initial data to be used while the first chunk is being fetched. - - It is mandatory when custom `reducer` is provided. - - Defaults to an empty array. +`initialValue?: TData` + - Optional + - Initial accumulator value used while the first chunk is being fetched. + - Required when a custom `reducer` is provided or when `TData` is not an array. + - Default: `[]` when `TData` is an array; otherwise no default.
6-6
: Update overview to reflect reducer-based accumulation (not always an array)This sentence contradicts the new reducer-based API. By default it’s an array, but with a custom reducer it can be any shape.
-`streamedQuery` is a helper function to create a query function that streams data from an AsyncIterable. Data will be an Array of all the chunks received. The query will be in a `pending` state until the first chunk of data is received, but will go to `success` after that. The query will stay in fetchStatus `fetching` until the stream ends. +`streamedQuery` helps create a query function that streams data from an AsyncIterable. By default, data accumulates into an array of all received chunks; with a custom `reducer` (and `initialValue`), you can accumulate into any shape. The query will be `pending` until the first chunk arrives, then `success` while fetchStatus remains `fetching` until the stream ends.
🧹 Nitpick comments (2)
docs/reference/streamedQuery.md (2)
36-45
: Tighten wording and fix minor grammar nits
- “the streamed chunks into the final data” → fine, but remove extra spaces.
- “It is mandatory when custom
reducer
is provided.” → add article “a”.- - A function to reduce the streamed chunks into the final data. + - A function that reduces streamed chunks into the final data. @@ - - It is mandatory when custom `reducer` is provided. + - It is mandatory when a custom `reducer` is provided.
45-45
: Consider adding a minimal custom reducer exampleA short example will help users migrate from
maxChunks
and understandTQueryFnData
vsTData
.+#### Example: Aggregating into a non-array accumulator +```ts +import { experimental_streamedQuery as streamedQuery } from '@tanstack/react-query' + +type Chunk = { id: string; value: number } +type Acc = Map<string, number> + +const query = queryOptions({ + queryKey: ['stats'], + queryFn: streamedQuery<Chunk, Acc>({ + queryFn: fetchChunkStream, // () => Promise<AsyncIterable<Chunk>> + initialValue: new Map(), + reducer: (acc, chunk) => acc.set(chunk.id, (acc.get(chunk.id) ?? 0) + chunk.value), + refetchMode: 'replace', + }), +}) +```Would you like me to add a migration note showing how to replicate the old
maxChunks
behavior with a reducer?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
docs/reference/streamedQuery.md
(1 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/reference/streamedQuery.md
[grammar] ~38-~38: There might be a mistake here.
Context: ...the streamed chunks into the final data. - Defaults to a function that appends chun...
(QB_NEW_EN)
[grammar] ~43-~43: There might be a mistake here.
Context: ... is being fetched. - It is mandatory when custom reducer
is provided. - Defau...
(QB_NEW_EN)
[grammar] ~43-~43: There might be a mistake here.
Context: ...atory when custom reducer
is provided. - Defaults to an empty array.
(QB_NEW_EN)
…l-reducer-to-streamed-query # Conflicts: # packages/query-core/src/__tests__/streamedQuery.test.tsx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
docs/reference/streamedQuery.md (2)
25-25
: Correct the chunk type in queryFn signatureThe stream yields
TQueryFnData
, notTData
.-- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>` +- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TQueryFnData>>`
6-6
: Update the intro to reflect reducer-driven data shapesThe current sentence says data “will be an Array,” which is no longer always true.
-`streamedQuery` is a helper function to create a query function that streams data from an AsyncIterable. Data will be an Array of all the chunks received. The query will be in a `pending` state until the first chunk of data is received, but will go to `success` after that. The query will stay in fetchStatus `fetching` until the stream ends. +`streamedQuery` is a helper to stream data from an AsyncIterable. By default, data is an array of received chunks; if you provide a `reducer`, the data takes whatever shape your reducer returns. The query is `pending` until the first chunk, then `success`, and remains in fetchStatus `fetching` until the stream ends.
♻️ Duplicate comments (1)
docs/reference/streamedQuery.md (1)
36-40
: Clarification on default reducer looks goodThe notes now correctly state the default only applies to array
TData
and that non-arrayTData
requires a custom reducer.
🧹 Nitpick comments (3)
docs/reference/streamedQuery.md (1)
29-35
: Close the missing quote in refetchMode unionMinor typo:
-- `refetchMode?: 'append' | 'reset' | 'replace` +- `refetchMode?: 'append' | 'reset' | 'replace'`packages/query-core/src/__tests__/streamedQuery.test.tsx (2)
417-426
: Assert mid-stream accumulator to prove incremental reductionAdd an intermediate assertion to verify reducer updates between chunks:
- await vi.advanceTimersByTimeAsync(100) + await vi.advanceTimersByTimeAsync(50) + + expect(observer.getCurrentResult()).toMatchObject({ + status: 'success', + fetchStatus: 'fetching', + data: { 0: true }, + }) + + await vi.advanceTimersByTimeAsync(50)
431-445
: Rename test to emphasize seed semantics (optional)The term “initialValue (seed)” helps readers connect this to reducer semantics.
- test('should support custom reducer with initialValue', async () => { + test('should support custom reducer with initialValue (seed)', async () => {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
docs/reference/streamedQuery.md
(1 hunks)packages/query-core/src/__tests__/streamedQuery.test.tsx
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
packages/query-core/src/streamedQuery.ts (1)
streamedQuery
(40-93)
🪛 LanguageTool
docs/reference/streamedQuery.md
[grammar] ~44-~44: There might be a mistake here.
Context: ... is being fetched. - It is mandatory when custom reducer
is provided. - Defau...
(QB_NEW_EN)
[grammar] ~44-~44: There might be a mistake here.
Context: ...atory when custom reducer
is provided. - Defaults to an empty array.
(QB_NEW_EN)
🔇 Additional comments (3)
docs/reference/streamedQuery.md (1)
41-45
: FixinitialValue
signature and semantics in docs
- Remove the invalid default (
= TQueryFnData
) and clarify thatinitialValue
is the reduction seed, not a cache placeholder.- Confirm docs match code: no runtime enforcement of “mandatory when custom
reducer
is provided.”Apply:
- `initialValue?: TData = TQueryFnData` + `initialValue?: TData` - Optional - - Defines the initial data to be used while the first chunk is being fetched. - - It is mandatory when custom `reducer` is provided. - - Defaults to an empty array. + - Seed for the accumulator passed to `reducer` before any chunks arrive (not written to the cache before the first chunk). + - When `TData` is not an array or your reducer needs a non-empty seed, provide `initialValue` explicitly. + - Default: `[] as TData` (used by the default array-append reducer). For non-array `TData`, pass a suitable `initialValue`.packages/query-core/src/__tests__/streamedQuery.test.tsx (2)
399-406
: LGTM: solid example reducer + explicit seedUsing
initialValue: {}
avoids accidental array seeds and makes the runtime shape unambiguous.
456-467
: Great coverage of merge behaviorVerifies that seeded keys persist and incoming chunks are merged into the accumulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
docs/reference/streamedQuery.md (3)
6-6
: Intro still claims data is always an Array — update to reflect reducer modeWith a custom reducer, data is not guaranteed to be an Array.
Apply:
-`streamedQuery` is a helper function to create a query function that streams data from an AsyncIterable. Data will be an Array of all the chunks received. The query will be in a `pending` state until the first chunk of data is received, but will go to `success` after that. The query will stay in fetchStatus `fetching` until the stream ends. +`streamedQuery` is a helper to create a query function that streams data from an AsyncIterable. By default, data is an Array of received chunks; with a custom `reducer`, data is whatever accumulator shape (`TData`) your reducer returns. The query will be `pending` until the first chunk is received and `success` afterwards. It stays in fetchStatus `fetching` until the stream ends.
25-29
: Fix queryFn signature and a typo in refetchMode
queryFn
streamsTQueryFnData
, notTData
.- Support both
AsyncIterable
andPromise<AsyncIterable>
.- Close the stray quote in
'replace'
.-- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>` +- `queryFn: (context: QueryFunctionContext) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>` - **Required** - The function that returns a Promise of an AsyncIterable of data to stream in. - Receives a [QueryFunctionContext](../../framework/react/guides/query-functions.md#queryfunctioncontext) -- `refetchMode?: 'append' | 'reset' | 'replace` +- `refetchMode?: 'append' | 'reset' | 'replace'`
36-46
: Clarify reducer/initialValue semantics and fix grammar
- Make
initialValue
justTData
, not “= TQueryFnData”.- Say “Required when a custom reducer is provided.”
- State the default applies only for array
TData
.- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData` - Optional - Reduces streamed chunks (`TQueryFnData`) into the final data shape (`TData`). - Default: appends each chunk to the end of the accumulator when `TData` is an array. - If `TData` is not an array, you must provide a custom `reducer`. -`initialValue?: TData = TQueryFnData` +`initialValue?: TData` - Optional - - Defines the initial data to be used while the first chunk is being fetched. - - It is mandatory when custom `reducer` is provided. - - Defaults to an empty array. + - Seed value for the reducer (the accumulator’s initial state). It is not shown as placeholder data. + - Required when a custom `reducer` is provided. + - Default (no custom reducer): `[]` when `TData` is an array.packages/query-core/src/streamedQuery.ts (1)
97-98
: Return a defined value when the stream yields no chunksOn empty streams (or early abort before the first chunk),
getQueryData
may beundefined
. Fall back toresult
(the seeded accumulator).- return context.client.getQueryData(context.queryKey)! + const cached = context.client.getQueryData<TData>(context.queryKey) + return cached ?? result
🧹 Nitpick comments (3)
packages/query-core/src/streamedQuery.ts (2)
42-45
: Update JSDoc to match reducer/initialValue semanticsReflect accumulator semantics and the array-only default.
- * @param reducer - A function to reduce the streamed chunks into the final data. - * Defaults to a function that appends chunks to the end of the array. - * @param initialValue - Initial value to be used while the first chunk is being fetched. + * @param reducer - Reduces streamed chunks (`TQueryFnData`) into the accumulator (`TData`). + * Defaults to appending each chunk when `TData` is an array. + * @param initialValue - Seed value for the reducer (not shown as placeholder data).
85-90
: Avoid double reduction per chunk and keepresult
in lockstep with the cacheYou currently compute the reduction twice (once in
setQueryData
, once forresult
). Merge them to reduce work and prevent divergence.- context.client.setQueryData<TData>(context.queryKey, (prev) => - reducer(prev ?? initialValue, chunk), - ) - } - result = reducer(result, chunk) + context.client.setQueryData<TData>(context.queryKey, (prev) => { + const next = reducer((prev ?? initialValue) as TData, chunk) + result = next + return next + }) + } else { + result = reducer(result, chunk) + }packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
431-471
: Seeded accumulator test LGTM; consider adding empty-stream and replace+reducer cases
- Add a test where the stream yields zero chunks to lock in the fallback result.
- Add a “refetchMode: 'replace' + custom reducer” test to assert no intermediate cache writes.
Example additions:
test('returns seeded accumulator on empty stream', async () => { const key = queryKey() const observer = new QueryObserver(queryClient, { queryKey: key, queryFn: streamedQuery<number, Record<number, boolean>>({ queryFn: async function* () {}, // no chunks reducer: (acc, chunk) => ({ ...acc, [chunk]: true }), initialValue: { 42: true }, }), }) observer.subscribe(vi.fn()) await vi.advanceTimersByTimeAsync(10) expect(observer.getCurrentResult()).toMatchObject({ status: 'success', fetchStatus: 'idle', data: { 42: true }, }) }) test('replace refetch with custom reducer writes only at the end', async () => { const key = queryKey() let offset = 0 const observer = new QueryObserver(queryClient, { queryKey: key, queryFn: streamedQuery<number, Record<number, boolean>>({ queryFn: () => createAsyncNumberGenerator(2, offset), refetchMode: 'replace', reducer: (acc, chunk) => ({ ...acc, [chunk]: true }), initialValue: {}, }), }) observer.subscribe(vi.fn()) await vi.advanceTimersByTimeAsync(100) expect(observer.getCurrentResult().data).toEqual({ 0: true, 1: true }) offset = 100 void observer.refetch() await vi.advanceTimersByTimeAsync(90) // still old data until stream ends expect(observer.getCurrentResult().data).toEqual({ 0: true, 1: true }) await vi.advanceTimersByTimeAsync(10) expect(observer.getCurrentResult().data).toEqual({ 100: true, 101: true }) })
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
docs/reference/streamedQuery.md
(1 hunks)packages/query-core/src/__tests__/streamedQuery.test.tsx
(3 hunks)packages/query-core/src/streamedQuery.ts
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx (2)
packages/query-core/src/index.ts (1)
streamedQuery
(37-37)packages/query-core/src/streamedQuery.ts (1)
streamedQuery
(46-99)
packages/query-core/src/streamedQuery.ts (3)
packages/query-core/src/types.ts (3)
QueryKey
(53-61)QueryFunctionContext
(138-165)QueryFunction
(96-100)packages/query-core/src/index.ts (1)
streamedQuery
(37-37)packages/query-core/src/utils.ts (1)
addToEnd
(399-402)
🪛 LanguageTool
docs/reference/streamedQuery.md
[grammar] ~44-~44: There might be a mistake here.
Context: ...k is being fetched. - It is mandatory when custom reducer
is provided. - Defau...
(QB_NEW_EN)
[grammar] ~44-~44: There might be a mistake here.
Context: ...atory when custom reducer
is provided. - Defaults to an empty array.
(QB_NEW_EN)
🔇 Additional comments (2)
packages/query-core/src/streamedQuery.ts (1)
53-56
: Default reducer/seed rely on array semantics — safe once overloads landCurrent casts (
as Array<TQueryFnData>
,[] as TData
) are acceptable if the simple path is constrained to arrays via overloads; otherwise they’re unsafe.Do you want me to push the overloads and adjust these casts accordingly?
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
394-429
: Custom reducer test looks goodCovers non-array
TData
accumulation with correct timing and final shape.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #9532 +/- ##
===========================================
+ Coverage 45.26% 59.37% +14.10%
===========================================
Files 208 137 -71
Lines 8340 5580 -2760
Branches 1894 1504 -390
===========================================
- Hits 3775 3313 -462
+ Misses 4118 1963 -2155
+ Partials 447 304 -143 🚀 New features to boost your workflow:
|
queryFn: streamedQuery({ | ||
queryFn: () => createAsyncNumberGenerator(3), | ||
maxChunks: 2, | ||
queryFn: streamedQuery<number, Record<number, boolean>>({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let’s not pass types this way - as it won’t test if type inference works. Instead, let’s type the initialValue:
initialValue: {} as Record<number, boolean>
queryFn: () => createAsyncNumberGenerator(3), | ||
maxChunks: 2, | ||
refetchMode: 'append', | ||
queryFn: streamedQuery<number, Record<number, boolean>>({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here please:
queryFn: streamedQuery<number, Record<number, boolean>>({ | |
queryFn: streamedQuery({ |
…initialValue type
…o-streamed-query' into feature/added-optional-reducer-to-streamed-query # Conflicts: # packages/query-core/src/__tests__/streamedQuery.test.tsx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/query-core/src/streamedQuery.ts (1)
32-45
: Fix JSDoc: not always an array; clarify replace semantics and initialValue requirement.The doc still states data is always an array and doesn’t reflect that replace gating applies only on refetch.
- * Data will be an Array of all the chunks received. + * By default, data is an array that appends each chunk. With a custom reducer + initialValue, + * data is the accumulator type TData that you control. @@ - * Set to `'replace'` to write all data to the cache once the stream ends. + * Set to `'replace'` to defer cache writes until the stream ends on refetches. + * (On initial fetches, cache is still updated per chunk.) @@ - * @param reducer - A function to reduce the streamed chunks into the final data. - * Defaults to a function that appends chunks to the end of the array. - * @param initialValue - Initial value to be used while the first chunk is being fetched. + * @param reducer - Function to reduce chunks into the accumulator (TData). + * Defaults to appending to the end of an array. + * @param initialValue - Required when a custom reducer is provided; seeds the accumulator.
♻️ Duplicate comments (2)
packages/query-core/src/streamedQuery.ts (2)
85-87
: Good fix: undefined check prevents “empty” collisions.Using
prev === undefined ? initialValue : prev
correctly preserves null as a valid value.
11-31
: Enforce array TData when no reducer (add overloads).Current signature lets callers pick non-array TData in the “simple” path, while the default reducer assumes arrays. Add overloads to couple API shape to return type and close the type hole.
+// Overloads to bind API shape to TData +export function streamedQuery< + TQueryFnData = unknown, + TQueryKey extends QueryKey = QueryKey, +>(params: SimpleStreamedQueryParams<TQueryFnData, TQueryKey>): QueryFunction< + Array<TQueryFnData>, + TQueryKey +> +export function streamedQuery< + TQueryFnData = unknown, + TData = unknown, + TQueryKey extends QueryKey = QueryKey, +>(params: ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction< + TData, + TQueryKey +>
🧹 Nitpick comments (3)
packages/query-core/src/streamedQuery.ts (3)
46-56
: Prefer resolving defaults inside the function to improve narrowing and readability.Move defaults for reducer/initialValue out of the parameter list and resolve them locally. Keeps params in sync with the union types and avoids “phantom” defaults in the reducible path.
export function streamedQuery< TQueryFnData = unknown, TData = Array<TQueryFnData>, TQueryKey extends QueryKey = QueryKey, >({ queryFn, refetchMode = 'reset', - reducer = (items, chunk) => - addToEnd(items as Array<TQueryFnData>, chunk) as TData, - initialValue = [] as TData, + reducer, + initialValue, }: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction< TData, TQueryKey > { - return async (context) => { + return async (context) => { + const resolvedReducer = + (reducer ?? + ((items: TData, chunk: TQueryFnData) => + addToEnd(items as unknown as Array<TQueryFnData>, chunk) as TData)) + const resolvedInitial = (initialValue ?? + ([] as unknown as TData)) as TDataAnd then replace uses of
reducer
/initialValue
below withresolvedReducer
/resolvedInitial
.
89-90
: Nit: avoid reducing twice when not needed.
result
is only read for replace-refetch finalization. Skip updating it otherwise to save work on hot paths.- result = reducer(result, chunk) + if (isRefetch && refetchMode === 'replace') { + result = reducer(result, chunk) + }If you apply the “resolved*” refactor above, use
resolvedReducer
.
32-42
: Heads-up: memory behavior is now fully consumer-defined.With maxChunks removed, long-running streams can grow unbounded unless the reducer implements windowing/truncation. Call out this guidance prominently in docs with examples.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx
(3 hunks)packages/query-core/src/streamedQuery.ts
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/query-core/src/tests/streamedQuery.test.tsx
🧰 Additional context used
🧬 Code graph analysis (1)
packages/query-core/src/streamedQuery.ts (2)
packages/query-core/src/types.ts (3)
QueryKey
(53-61)QueryFunctionContext
(138-165)QueryFunction
(96-100)packages/query-core/src/utils.ts (1)
addToEnd
(399-402)
🔇 Additional comments (2)
packages/query-core/src/streamedQuery.ts (2)
4-26
: API shape looks solid; names are clear and local. Consider exporting types later if docs need them.No action required now.
32-36
: streamedQuery: verify abort/no-chunk behavior
If a stream aborts before the first yield (or produces no chunks), the function currently returnsundefined
(no cache write) and ends in “success.” Confirm whether abort-before-emit should instead be treated as cancellation (e.g. throwCancelledError
by tracking ahasEmitted
flag) and updatestreamedQuery
accordingly. Add tests for empty-stream and early-abort scenarios.
Thanks a lot for letting me collaborate on react-query! It’s been an awesome experience. |
Replace maxChunks parameter with flexible reducer function that delegates data aggregation to consumer code. This provides full control over how streamed chunks are combined into the final data structure.
Add support for custom placeholderData that works seamlessly with the reducer function, allowing initialization of complex data types beyond simple arrays.
#9065
BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery. Use a custom reducer function to control data aggregation behavior instead.
Summary by CodeRabbit
New Features
Documentation