-
Notifications
You must be signed in to change notification settings - Fork 110
feat(event): add customer id #3246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughAdds customerId support across API, client schemas, streaming, ClickHouse queries, and meterevent processing; introduces a FilterIDExact schema and converters; wires customer.Service into meterevent adapter/service; refactors query builders and tests to support customer-based filtering and customer_id selection. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
openmeter/meterevent/service.go (1)
124-211
: Fix cursor generation for ingested_at‐based paginationThe code always uses
Event.Cursor()
(openmeter/meterevent/service.go:72) to buildNextCursor
, which embedse.Time
. But whenListEventsV2Params.IngestedAt
is set, queries in event_query_v2.go order byingested_at DESC
. Emitting a cursor based on the event timestamp will lead to mispage when the client uses that cursor against theingested_at
column.Points to address:
- Service:
Event.Cursor()
(service.go:72) only returnsTime: e.Time
.- Pagination:
pkg/pagination/v2.NewResult
always callslastItem.Cursor()
, never usinge.IngestedAt
.- Adapter/Handler: in
adapter.ListEventsV2
(adapter/event.go:93) andconvertListEventsV2Response
(httphandler/mapping.go:77–89),NextCursor
flows straight through.Suggested fix:
- Add a new constructor on
Event
, e.g.
func (e Event) IngestedCursor() pagination.Cursor { return pagination.Cursor{Time: e.IngestedAt, ID: e.ID} }
- In the adapter (or HTTP handler), detect when
params.IngestedAt
is non-nil and overrideResult.NextCursor
to useIngestedCursor()
instead of the default.- Alternatively extend the pagination library to accept a custom time-extractor for cursor generation.
openmeter/streaming/clickhouse/meter_query.go (1)
229-245
: Avoid referencing the SELECT alias in WHERE for customer_idFiltering with
column = "customer_id"
uses the alias, which is not reliable in WHERE. Use the physical column instead.Apply:
- // Customer ID is a special case - if groupByKey == "customer_id" { - column = "customer_id" - } + // Customer ID is a special case: use the physical column, not the SELECT alias + if groupByKey == "customer_id" { + column = getColumn("customer_id") + }
🧹 Nitpick comments (18)
openmeter/streaming/clickhouse/event_query_v2.go (1)
100-136
: Count query unchanged (OK), but remember to include customer filters if/when added to v2No change needed now. If v2 later gains customer filters, consider echoing a coarse customer constraint into the count to improve ETA accuracy.
openmeter/meterevent/service.go (2)
37-38
: Nit: comment should be plural to match the field (CustomerIDs)Doc reads “The event customer ID.” but the field holds multiple IDs.
Apply this diff:
- // The event customer ID. - CustomerIDs *[]string + // The customer IDs to filter by. + CustomerIDs *[]string
79-122
: Consider basic validation for CustomerIDs to avoid empty IN lists and bad inputsCurrently CustomerIDs is unchecked. Two practical guards:
- Reject an explicitly provided empty list to avoid generating “IN ()”.
- Reject empty-string IDs early; optionally validate ULID format if that’s the canonical ID.
Keep nil meaning “no filter”.
Here’s a minimal, dependency-free validation you can add:
func (i ListEventsParams) Validate() error { var errs []error @@ if i.ClientID != nil && *i.ClientID == "" { errs = append(errs, errors.New("client id cannot be empty")) } + + if i.CustomerIDs != nil { + if len(*i.CustomerIDs) == 0 { + errs = append(errs, errors.New("customerIds must not be empty when provided")) + } else { + for idx, id := range *i.CustomerIDs { + if id == "" { + errs = append(errs, fmt.Errorf("customerIds[%d] cannot be empty", idx)) + } + } + } + }If you already have a ULID validation helper, you can tighten this further; happy to wire it in.
api/spec/src/events.tsp (1)
308-312
: Adding customerId to IngestedEvent is aligned; consider including it in the example, tooHelps SDK/gen consumers see it in docs quickly.
You can extend the existing IngestedEvent example near Line 289 like so (outside this hunk):
@example(#{ event: #{ id: "5c10fade-1c9e-4d6c-8275-c52c36731d3c", source: "service-name", specversion: "1.0", type: "prompt", subject: "customer-id", time: DateTime.fromISO("2023-01-01T01:01:01.001Z"), }, customerId: "01HZXABCD3M6E7K8J9PQRS1TUV", ingestedAt: DateTime.fromISO("2023-01-01T01:01:01.001Z"), storedAt: DateTime.fromISO("2023-01-01T01:01:02.001Z"), })openmeter/meterevent/adapter/event.go (4)
74-80
: N+1 customer lookups during enrichment; consider batching unique subjects.The per-event GetCustomerByUsageAttribution call can become costly. You already left a FIXME; recommend collecting unique subject keys first and introducing a bulk lookup in customerService to cut DB round-trips.
Happy to sketch an interface and a batching implementation if you want to pursue this in a follow-up.
127-152
: Nit: avoid unnecessary allocation for ValidationErrors in mapped events.You can omit initializing ValidationErrors to an empty slice; the zero value (nil) is sufficient and avoids per-event allocations until you actually add an error.
- ValidationErrors: make([]error, 0), + // ValidationErrors left nil until validation populates it
155-195
: Reduce validation complexity by indexing meters by event type.Current validation is O(#events × #meters). Pre-index meters by EventType and validate only relevant ones.
func (a *adapter) validateEvents(ctx context.Context, namespace string, events []meterevent.Event) ([]meterevent.Event, error) { // Get all meters meterList, err := a.meterService.ListMeters(ctx, meter.ListMetersParams{ Namespace: namespace, }) if err != nil { return nil, fmt.Errorf("get meters: %w", err) } - // Validate events against meters + // Index meters by event type + metersByType := make(map[string][]meter.Meter, len(meterList.Items)) + for _, m := range meterList.Items { + metersByType[m.EventType] = append(metersByType[m.EventType], m) + } + + // Validate events against meters validatedEvents := make([]meterevent.Event, 0, len(events)) for _, event := range events { - meterMatch := false - validationErrors := []error{} - - // Check if the event type matches a meter - for _, m := range meterList.Items { - // Check if the event type matches the meter event type - if event.Type == m.EventType { - meterMatch = true - - // Validate the event data against the meter - _, err = meter.ParseEventString(m, event.Data) - if err != nil { - validationErrors = append(validationErrors, err) - } - } - } + meterMatch := false + var validationErrors []error + if ms, ok := metersByType[event.Type]; ok { + meterMatch = true + for _, m := range ms { + if _, err = meter.ParseEventString(m, event.Data); err != nil { + validationErrors = append(validationErrors, err) + } + } + } // If no meter matches the event type, add an error to the event if !meterMatch { validationErrors = append(validationErrors, fmt.Errorf("no meter found for event type: %s", event.Type)) } event.ValidationErrors = validationErrors validatedEvents = append(validatedEvents, event) } return validatedEvents, nil }
197-240
: Avoid shadowing the imported customer package with a local variable.Using the identifier “customer” for the local result from GetCustomerByUsageAttribution shadows the imported package name, hurting readability.
- customer, err := a.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{ + cust, err := a.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{ Namespace: namespace, SubjectKey: event.Subject, }) if err != nil { if models.IsGenericNotFoundError(err) { eventsWithCustomerID = append(eventsWithCustomerID, event) continue } return nil, fmt.Errorf("get customer by usage attribution: %w", err) } // Add the customer ID to the cache - cache[event.Subject] = customer.ID + cache[event.Subject] = cust.ID // Add the event to the list - event.CustomerID = &customer.ID + event.CustomerID = &cust.IDopenmeter/streaming/eventparams.go (1)
36-61
: Add limit validation to mirror v2 and fail fast on bad inputs.ListEventsParams.Validate should reject non-positive limits (the handler typically sets a sane default, but catching programmer error here helps).
func (i ListEventsParams) Validate() error { var errs []error ... if i.IngestedAtFrom != nil && i.IngestedAtTo != nil && i.IngestedAtTo.Before(*i.IngestedAtFrom) { errs = append(errs, fmt.Errorf("ingestedAtTo date is before ingestedAtFrom date: %s < %s", i.IngestedAtTo.Format(time.RFC3339), i.IngestedAtFrom.Format(time.RFC3339))) } + if i.Limit < 1 { + errs = append(errs, errors.New("limit must be greater than 0")) + } + return errors.Join(errs...) }openmeter/meterevent/adapter/adapter.go (1)
24-28
: New field added to adapter; consider nil-safety expectations.If customerService is required for correctness, consider validating non-nil in New and documenting the requirement. Otherwise, ensure downstream code defensively handles a nil customerService.
Here’s a minimal guard if you want it:
func New( streamingConnector streaming.Connector, customerService customer.Service, meterService meter.Service, ) meterevent.Service { + if customerService == nil { + // Option A: panic fast if this must be provided + // panic("customerService must not be nil") + // Option B: leave as-is but log for visibility (requires a logger in scope) + } return &adapter{ streamingConnector: streamingConnector, customerService: customerService, meterService: meterService, } }api/client/javascript/src/zod/index.ts (1)
8566-8568
: Simplify ULID regex & extract shared constantThe current pattern is correct (26 chars, Crockford base32 without I/L/O/U, first char 0–7). For readability you can switch to a RegExp literal with
/i
and, optionally, extract it into a shared constant:-export const listEventsQueryCustomerIdItemRegExp = new RegExp( - '^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$' -) +export const listEventsQueryCustomerIdItemRegExp = + /^[0-7][0-9A-HJKMNP-TV-Z]{25}$/iExample shared constant (e.g.
zod/ulid.ts
):export const ULID_REGEX = /^[0-7][0-9A-HJKMNP-TV-Z]{25}$/ithen
import {ULID_REGEX} from './ulid' … .regex(ULID_REGEX)Other occurrences of the same pattern you may want to align:
- tools/migrate/entitlement_test.go
- api/spec/src/types.tsp
- api/openapi.yaml (multiple schema entries)
openmeter/streaming/testutils/streaming.go (1)
82-88
: Mock signature parity with streaming.Connector looks goodThe mock implements the updated interface correctly. It’s fine to return empty slices here.
If you want stronger tests around customer filtering later, consider capturing incoming params (including Customers/Subject) for assertions or providing a way to pre-seed RawEvents with CustomerID for integration-style tests.
api/client/javascript/src/client/schemas.ts (1)
17612-17613
: Update description to reflect array semantics and repeated query-parameter encodingVerified in
openapi.yaml
that no explicitstyle
/explode
is set, so arrays use the defaultform
style withexplode=true
(i.e.?customerId=a&customerId=b
).• File:
api/client/javascript/src/client/schemas.ts
(around lines 17612–17613)
• Apply this doc tweak (ideally in the OpenAPI source so the generator emits it):- /** @description The event customer ID. */ + /** @description Filter by customer IDs (ULID). Accepts multiple values (repeated query parameters). */openmeter/server/server_test.go (3)
604-618
: Mock signature update LGTM; consider asserting interface complianceThe shift to ListEvents(ctx, namespace, params streaming.ListEventsParams) matches the new interface. To keep the mock future-proof, add a compile-time assertion to ensure it stays in sync with streaming.Connector.
Add outside the method (anywhere after type declaration):
var _ streaming.Connector = (*MockStreamingConnector)(nil)
604-618
: Optional: Populate customer_id in mocked events to increase coverageGiven the PR adds customerId to event list APIs, having the mock set CustomerID can help exercise response serialization paths earlier.
func (c *MockStreamingConnector) ListEvents(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]streaming.RawEvent, error) { events := []streaming.RawEvent{ { ID: mockEvent.ID(), Type: mockEvent.Type(), Source: mockEvent.Source(), Subject: mockEvent.Subject(), Time: mockEvent.Time(), Data: string(mockEvent.Data()), IngestedAt: time.Time{}, StoredAt: time.Time{}, + CustomerID: "customer-1", }, } return events, nil }
620-634
: V2 mock signature LGTM; mirror the V1 mock’s event shapeSignature aligns with streaming.ListEventsV2Params. Consider mirroring the V1 suggestion to set CustomerID for parity.
func (c *MockStreamingConnector) ListEventsV2(ctx context.Context, params streaming.ListEventsV2Params) ([]streaming.RawEvent, error) { events := []streaming.RawEvent{ { ID: mockEvent.ID(), Type: mockEvent.Type(), Source: mockEvent.Source(), Subject: mockEvent.Subject(), Time: mockEvent.Time(), Data: string(mockEvent.Data()), IngestedAt: time.Time{}, StoredAt: time.Time{}, + CustomerID: "customer-1", }, } return events, nil }
openmeter/streaming/clickhouse/queryhelper.go (1)
35-41
: Consider NULL instead of empty string for unknown customer_idReturning '' can conflate unknown vs. intentionally empty values. Using NULL may be semantically clearer for downstream consumers.
- caseBuilder.WriteString("ELSE '' END AS customer_id") + caseBuilder.WriteString("ELSE NULL END AS customer_id")If you change this, confirm downstream JSON marshaling keeps the intended shape (omitempty).
api/openapi.cloud.yaml (1)
16422-16426
: Prefer referencing CustomerId schema; consider marking readOnly if server-enrichedAvoid duplicating the ULID pattern by reusing the shared schema. If this field is only present in responses (enriched/derived), add readOnly: true.
Apply this diff:
- customerId: - type: string - pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ - description: The customer ID if the event is associated with a customer. - example: 01G65Z755AFWAKHE12NY0CQ9FH + customerId: + allOf: + - $ref: '#/components/schemas/CustomerId' + description: The customer ID if the event is associated with a customer. + example: 01G65Z755AFWAKHE12NY0CQ9FH + # Uncomment if this is set by the system and returned in responses only: + # readOnly: trueNote: If other parts of the API allow ULIDOrExternalKey for customer identification, confirm whether this property should accept that union as well for consistency.
Would you like me to scan the spec and flag any places where customer identifiers allow ULIDOrExternalKey so we can align this property?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (23)
api/client/javascript/src/client/schemas.ts
(2 hunks)api/client/javascript/src/zod/index.ts
(2 hunks)api/openapi.cloud.yaml
(2 hunks)api/openapi.yaml
(2 hunks)api/spec/src/events.tsp
(2 hunks)app/common/meterevent.go
(2 hunks)cmd/server/wire_gen.go
(1 hunks)openmeter/meterevent/adapter/adapter.go
(2 hunks)openmeter/meterevent/adapter/event.go
(4 hunks)openmeter/meterevent/httphandler/event.go
(1 hunks)openmeter/meterevent/httphandler/mapping.go
(1 hunks)openmeter/meterevent/service.go
(2 hunks)openmeter/server/server_test.go
(3 hunks)openmeter/streaming/clickhouse/connector.go
(5 hunks)openmeter/streaming/clickhouse/event_query.go
(4 hunks)openmeter/streaming/clickhouse/event_query_test.go
(1 hunks)openmeter/streaming/clickhouse/event_query_v2.go
(2 hunks)openmeter/streaming/clickhouse/event_query_v2_test.go
(11 hunks)openmeter/streaming/clickhouse/meter_query.go
(2 hunks)openmeter/streaming/clickhouse/queryhelper.go
(1 hunks)openmeter/streaming/connector.go
(1 hunks)openmeter/streaming/eventparams.go
(1 hunks)openmeter/streaming/testutils/streaming.go
(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-03-07T12:17:43.129Z
Learnt from: GAlexIHU
PR: openmeterio/openmeter#2383
File: openmeter/entitlement/metered/lateevents_test.go:37-45
Timestamp: 2025-03-07T12:17:43.129Z
Learning: In the OpenMeter codebase, test files like `openmeter/entitlement/metered/lateevents_test.go` may use variables like `meterSlug` and `namespace` without explicit declarations visible in the same file. This appears to be an accepted pattern in their test structure.
Applied to files:
openmeter/streaming/connector.go
openmeter/server/server_test.go
🧬 Code Graph Analysis (15)
openmeter/meterevent/httphandler/event.go (3)
api/client/javascript/src/client/schemas.ts (1)
CustomerId
(10860-10860)api/api.gen.go (1)
CustomerId
(2288-2291)api/client/go/client.gen.go (1)
CustomerId
(2118-2121)
app/common/meterevent.go (2)
openmeter/meterevent/service.go (1)
Service
(18-21)openmeter/meterevent/adapter/adapter.go (1)
New
(10-20)
openmeter/streaming/clickhouse/event_query_v2_test.go (2)
openmeter/meterevent/service.go (1)
ListEventsV2Params
(125-146)openmeter/streaming/eventparams.go (1)
ListEventsV2Params
(64-85)
openmeter/streaming/clickhouse/event_query_v2.go (3)
openmeter/meterevent/service.go (1)
ListEventsV2Params
(125-146)openmeter/streaming/eventparams.go (1)
ListEventsV2Params
(64-85)openmeter/meterevent/httphandler/event_v2.go (1)
ListEventsV2Params
(15-15)
openmeter/streaming/testutils/streaming.go (2)
openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)openmeter/streaming/connector.go (1)
RawEvent
(24-36)
openmeter/streaming/clickhouse/connector.go (2)
openmeter/streaming/connector.go (2)
Connector
(38-48)RawEvent
(24-36)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)
cmd/server/wire_gen.go (1)
app/common/meterevent.go (1)
NewMeterEventService
(17-23)
openmeter/meterevent/adapter/event.go (6)
openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)Event
(48-69)openmeter/streaming/eventparams.go (1)
ListEventsParams
(13-34)openmeter/customer/customer.go (2)
ListCustomersInput
(181-198)GetCustomerByUsageAttributionInput
(163-166)openmeter/streaming/connector.go (1)
RawEvent
(24-36)openmeter/meter/parse.go (1)
ParseEventString
(68-70)pkg/models/errors.go (1)
IsGenericNotFoundError
(57-65)
openmeter/streaming/eventparams.go (3)
api/api.gen.go (6)
Subject
(6309-6334)Customer
(2165-2215)ListEventsParams
(7763-7803)FilterString
(3135-3177)FilterTime
(3180-3198)ListEventsV2Params
(8344-8375)openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)ListEventsV2Params
(125-146)pkg/pagination/v2/cursor.go (1)
Cursor
(11-14)
openmeter/meterevent/adapter/adapter.go (2)
openmeter/streaming/connector.go (1)
Connector
(38-48)openmeter/meterevent/service.go (1)
Service
(18-21)
openmeter/meterevent/httphandler/mapping.go (3)
api/client/javascript/src/client/schemas.ts (1)
CustomerId
(10860-10860)api/api.gen.go (1)
CustomerId
(2288-2291)api/client/go/client.gen.go (1)
CustomerId
(2118-2121)
openmeter/streaming/clickhouse/event_query.go (2)
api/client/javascript/src/client/schemas.ts (2)
Customer
(10851-10851)Subject
(11200-11200)api/api.gen.go (2)
Customer
(2165-2215)Subject
(6309-6334)
openmeter/streaming/clickhouse/queryhelper.go (1)
api/api.gen.go (1)
Customer
(2165-2215)
openmeter/streaming/connector.go (3)
api/api.gen.go (2)
ListEventsParams
(7763-7803)ListEventsV2Params
(8344-8375)openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)ListEventsV2Params
(125-146)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)
openmeter/server/server_test.go (4)
openmeter/meterevent/adapter/adapter.go (1)
New
(10-20)openmeter/streaming/testutils/streaming.go (1)
MockStreamingConnector
(32-35)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)openmeter/streaming/connector.go (1)
RawEvent
(24-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Artifacts / Container image
- GitHub Check: Artifacts / Benthos Collector Container image
- GitHub Check: Test
- GitHub Check: Code Generators
- GitHub Check: Lint
- GitHub Check: Migration Checks
- GitHub Check: Build
- GitHub Check: Analyze (go)
🔇 Additional comments (36)
openmeter/streaming/clickhouse/event_query_v2.go (3)
9-10
: Import switch to streaming params looks goodUsing streaming.ListEventsV2Params aligns with the broader refactor away from meterevent and keeps this unit self-contained within streaming.
18-19
: Param type migration LGTMStruct now depends on streaming.ListEventsV2Params. No behavioral changes here; compile-time safe swap.
25-27
: Missing customer_id in SELECT prevents returning customerId in v2 responsesWith IngestedEvent now exposing customerId, this query does not project customer_id. Downstream scanners/mappers can’t populate the field for v2 lists.
Add the column to the projection. If you rely on positional scans, ensure the receiver code is updated accordingly (ideally scan by column name or keep ordering consistent).
Apply this diff:
- query.Select("id", "type", "subject", "source", "time", "data", "ingested_at", "stored_at", "store_row_id") + query.Select("id", "type", "subject", "source", "time", "data", "customer_id", "ingested_at", "stored_at", "store_row_id")Follow-up: verify and adjust any row scanning/mapping that depends on column order.
Run this to find likely scanners using the same column set:
openmeter/meterevent/service.go (1)
61-63
: Field addition aligns with API: Event now carries CustomerIDThis enables surfacing customerId on responses (v1 and v2). Looks good.
api/spec/src/events.tsp (1)
140-188
: Confirm intent: no customer filter in v2 filter object (OK if by design)V2 filter omits customerId. The PR states only v1 gets customer filtering; just confirming this is intentional to avoid confusion for API consumers who see customerId on IngestedEvent but can’t filter by it in v2 yet.
api/openapi.yaml (2)
5005-5017
: Clarify plurality, cap array size, and centralize ULID schemaThe
customerId
query parameter is defined as an array of ULIDs but named/described singularly and repeats the pattern inline. To improve consistency and avoid schema drift:
- Rename to
customerIds
(unless your convention is to keep singular for arrays).- Update the description to “Filter events by customer ID(s).”
- Add a
maxItems
bound (e.g., 50) to guard against excessively large queries.- Reference the shared
#/components/schemas/CustomerId
component instead of inlining the ULID pattern.
• Verify that this component is indeed a string schema with the correct ULID pattern (not an object).
• If it isn’t, consider defining a dedicatedUlid
schema for reuse.Location:
- api/openapi.yaml, lines ~5005–5017
Proposed diff:
- - name: customerId + - name: customerIds in: query required: false - description: The event customer ID. + description: Filter events by customer ID(s). schema: - type: array - items: - type: string - pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ - description: ULID (Universally Unique Lexicographically Sortable Identifier). - example: 01G65Z755AFWAKHE12NY0CQ9FH + type: array + maxItems: 50 + items: + $ref: '#/components/schemas/CustomerId' explode: false style: formVerification checklist:
- Confirm
#/components/schemas/CustomerId
defines a ULID string pattern.- Ensure consistency with other multi-value query filters (singular vs. plural naming).
16782-16786
: Reuse shared ULID schema for customerId
It looks like#/components/schemas/CustomerId
is defined as an object with anid
property, not a plain string, so referencing it directly would change the type. To avoid duplicating the ULID regex, either:
- Reference the nested
id
schema:customerId:type: string
pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$
description: The customer ID if the event is associated with a customer.
example: 01G65Z755AFWAKHE12NY0CQ9FH
allOf:
- $ref: '#/components/schemas/CustomerId/properties/id'
description: The customer ID if the event is associated with a customer.
example: 01G65Z755AFWAKHE12NY0CQ9FH
- Or extract the ULID pattern into a standalone string schema (e.g.
UlidString
) under#/components/schemas
and reference that instead.Please verify the existing
CustomerId
component and ensure your$ref
points to a string schema.openmeter/meterevent/adapter/event.go (4)
35-57
: Customer filtering wiring looks good.Resolving provided CustomerIDs to concrete customers and attaching them to streaming params is correct. Returning an empty set when none are found avoids an unnecessary ClickHouse round-trip.
51-54
: Good UX: short-circuit when no customers found.Early return with an empty slice is the right behavior for a positive filter that matches no entities.
65-67
: Mapping raw → meter events is straightforward and correct.All relevant fields are carried over; CustomerID is picked up later if present.
69-72
: Validation flow LGTM.Validating after mapping keeps concerns separated and allows enrichment to retain validation errors.
openmeter/streaming/eventparams.go (2)
12-34
: Params shape looks right for v1 streaming, including multi-customer filtering.The addition of Customers *[]Customer matches the new filter capability.
87-146
: V2 validation is comprehensive and consistent.Good coverage for cursor, per-field filter validation, mutual exclusion of time vs ingested_at, and limit constraints.
openmeter/streaming/clickhouse/event_query_v2_test.go (2)
9-9
: Import path switch to streaming types is correct.Tests remain behaviorally unchanged; only the param type origin moved. LGTM.
31-36
: Tests continue to assert SQL/args stability across filters and pagination.Coverage across basic, ID/subject/type filters, time/ingested_at, cursor and limit is solid.
Also applies to: 43-52, 58-67, 74-82, 88-99, 105-114, 140-146, 152-161, 167-176, 182-191
openmeter/meterevent/httphandler/event.go (1)
37-48
: Verify APICustomerId
parameter typeI wasn’t able to find the declaration of
params.CustomerId
in the generated API (e.g.api/api.gen.go
). In
openmeter/meterevent/httphandler/event.go:46 you have:CustomerIDs: params.CustomerId,but the service expects
CustomerIDs *[]string
. Please manually confirm that the API layer definesCustomerId
as*[]string
(or[]string
). If it’s actually a slice of structs (e.g.*[]api.CustomerID
), you’ll need to map it to a string slice, for example:- CustomerIDs: params.CustomerId, + CustomerIDs: func() *[]string { + if params.CustomerId == nil { + return nil + } + ids := make([]string, len(*params.CustomerId)) + for i, c := range *params.CustomerId { + ids[i] = c.Id + } + return &ids + }(),Please verify and adjust accordingly.
openmeter/meterevent/httphandler/mapping.go (1)
68-75
: No conversion needed for CustomerIdThe
api.IngestedEvent.CustomerId
field is defined as*string
, matchinge.CustomerID
’s type (*string
). Assigning it directly is correct—no wrapper conversion is required.Likely an incorrect or invalid review comment.
openmeter/meterevent/adapter/adapter.go (2)
3-8
: New dependency import looks correct and scoped.Importing customer.Service here aligns with the constructor and struct changes. No issues spotted.
10-20
: All constructor call sites updated to three-argument signature
No stale usages found—everyNewMeterEventService
invocation and the underlyingadapter.New
call now consistently use the(streamingConnector, customerService, meterService)
signature.openmeter/streaming/clickhouse/event_query_test.go (2)
66-66
: Subject predicate fully-qualified in expected SQL — OK.The expected fragment now uses (om_events.subject = ?), consistent with qualified column emission from the new helpers.
35-69
: Add coverage for customer-based filtering.Given the PR adds Customers-based filtering via subjectWhere helpers, this test suite should include:
- A case with Customers set (single and multiple IDs).
- A case combining Subject and Customers to validate correct parenthesization and AND/OR semantics.
- A case ensuring empty Customers does not alter the predicate.
This guards the newly introduced query-building paths against regressions.
Would you like me to draft these test cases targeting queryEventsTable with Customers populated?
cmd/server/wire_gen.go (1)
453-453
: Generated wiring updated to pass customerService — in sync with the new signature.Looks correct: common.NewMeterEventService(connector, customerService, service).
Since this file is generated, please ensure the generator output is committed and reproducible:
- Run: go generate ./cmd/server
- Confirm no diff is produced afterward.
api/client/javascript/src/zod/index.ts (1)
8579-8589
: Optional: Enforce non-emptycustomerId
array & clarify description– In
api/client/javascript/src/zod/index.ts
(around lines 8579–8589), update the schema forcustomerId
:customerId: zod .array( zod.coerce .string() .regex(listEventsQueryCustomerIdItemRegExp) .describe( 'ULID (Universally Unique Lexicographically Sortable Identifier).' ) ) + .nonempty('Provide at least one customerId or omit this parameter.') .optional() - .describe('The event customer ID.'), + .describe('Filter events by customer ID(s). Use comma-separated values or repeated parameters per the OpenAPI spec.'),• Verify that the
GET /api/v1/events
parameter forcustomerId
is defined as an array in your OpenAPI spec (with the correctschema.type
,style
, andexplode
) so that clients send multiple values consistently.
• If the server requires uppercase ULIDs, consider normalizing before validation:zod.coerce .string() .transform((s) => s.toUpperCase()) .regex(listEventsQueryCustomerIdItemRegExp)app/common/meterevent.go (2)
6-6
: Importing customer service is correct and expectedThe new import aligns with the adapter and wiring changes across the PR.
17-23
: DI Wiring for NewMeterEventService VerifiedAll call sites use the new three-parameter signature, and the MeterEvent wire set is declared correctly. No further action required.
openmeter/streaming/clickhouse/meter_query.go (1)
276-282
: Subject/customer filtering centralized – good simplificationDelegating to subjectWhere(...) reduces duplication and keeps the mapping logic consistent with other queries.
openmeter/streaming/clickhouse/connector.go (3)
95-111
: ListEvents migrated to streaming params – OKSignature and usage align with the new types; namespace validation and error handling are preserved.
112-127
: ListEventsV2 migration – OKUsing streaming.ListEventsV2Params with validation here is consistent with the interface changes.
309-324
: SELECT includes customer_id when customers filter is applied
The event query builder appends theCASE … AS customer_id
expression viaselectCustomerIdColumns
wheneverCustomers
is non-nil, andRawEvent
defines aCustomerID string
field taggedch:"customer_id"
, sorows.ScanStruct(&RawEvent)
will populate it correctly.
- clickhouse/event_query.go (lines ~120–123): appends
selectCustomerIdColumns(...)
whend.Customers != nil
- clickhouse/queryhelper.go (
selectCustomerIdColumns
): buildsCASE … END AS customer_id
- streaming/connector.go (
RawEvent
struct): includesCustomerID
with matchingch
tagopenmeter/streaming/connector.go (2)
35-36
: RawEvent gains CustomerID – aligns with API goalsField/tag look good. Upstream mapping to API schema (customerId) should handle casing as needed.
41-43
: Connector interface updated – consistent across implsThe interface reflects the new params for v1/v2 and will be enforced by the existing mock/type assertions.
api/client/javascript/src/client/schemas.ts (1)
5464-5468
: IngestedEvent.customerId correctly optional without nullThe OpenAPI spec (
openapi.yaml
) and TSP definitions (events.tsp
) do not markcustomerId
as nullable, socustomerId?: string
(without| null
) aligns with the server schema.openmeter/server/server_test.go (1)
456-456
: Constructor wiring LGTM; customer service injection is correctCreating NoopCustomerService up-front and passing it into metereventadapter.New aligns with the updated constructor signature. Good catch removing the duplicate late init.
Also applies to: 461-461
openmeter/streaming/clickhouse/event_query.go (2)
97-101
: Unified filtering via subjectWhere is a solid improvementReplacing ad-hoc subject filtering with subjectWhere centralizes logic and keeps count/data queries consistent.
142-145
: ID LIKE filter may cause full scans; confirm intended behaviorUsing LIKE with a leading wildcard prevents index usage in many engines. If ClickHouse’s primary access predicates (namespace, time) sufficiently constrain the data, this is acceptable; otherwise consider exact match or anchored prefix search when feasible.
Would you like me to propose an alternative that supports both exact and substring matching via a parameterized filter strategy?
api/openapi.cloud.yaml (1)
5004-5016
: TheCustomerId
schema is present, but I couldn’t confirm two things from the script output:• I didn’t see a
customerId
query parameter under /api/v2/events as described in the PR.
• The survey of other array query params failed (regex multiline issue), so I can’t verify that they all usestyle: form, explode: false
.Could you please manually check that:
- /api/v2/events includes the new
customerId
query parameter.- All existing array-typed query parameters in the spec consistently use
style: form
andexplode: false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
openmeter/server/server_test.go (2)
605-619
: Updated ListEvents signature: capture params in mock to assert customerId filter wiring.Now that v1 supports a customerId filter, it’s valuable to assert that the query param is passed through to streaming.ListEventsParams in tests. Capture the last params on the mock to enable targeted assertions.
Apply this diff to record the incoming params:
-type MockStreamingConnector struct{} +type MockStreamingConnector struct{ + LastListEventsParams streaming.ListEventsParams + LastListEventsV2Params streaming.ListEventsV2Params +}func (c *MockStreamingConnector) ListEvents(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]streaming.RawEvent, error) { + c.LastListEventsParams = params events := []streaming.RawEvent{ { ID: mockEvent.ID(), Type: mockEvent.Type(), Source: mockEvent.Source(), Subject: mockEvent.Subject(), Time: mockEvent.Time(), Data: string(mockEvent.Data()), IngestedAt: time.Time{}, StoredAt: time.Time{}, }, } return events, nil }
Follow-up: add a test (e.g., GET /api/v1/events?customerId=foo) and assert c.LastListEventsParams.CustomerID (or equivalent field) is set to "foo". I can draft that if you want.
621-635
: ListEventsV2 signature updated: consider adding a v2 route test to assert CustomerId in responses.Per PR scope, v2 exposes CustomerId on responses (even if filter is only on v1). Add a simple /api/v2/events test to assert the presence/shape of CustomerId in the returned payload.
I can add a minimal test case mirroring the v1 “query events” case for v2.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
openmeter/server/server_test.go
(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-03-07T12:17:43.129Z
Learnt from: GAlexIHU
PR: openmeterio/openmeter#2383
File: openmeter/entitlement/metered/lateevents_test.go:37-45
Timestamp: 2025-03-07T12:17:43.129Z
Learning: In the OpenMeter codebase, test files like `openmeter/entitlement/metered/lateevents_test.go` may use variables like `meterSlug` and `namespace` without explicit declarations visible in the same file. This appears to be an accepted pattern in their test structure.
Applied to files:
openmeter/server/server_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Artifacts / Container image
- GitHub Check: Artifacts / Benthos Collector Container image
- GitHub Check: Code Generators
- GitHub Check: Test
- GitHub Check: Lint
- GitHub Check: Build
- GitHub Check: Migration Checks
- GitHub Check: Analyze (go)
🔇 Additional comments (3)
openmeter/server/server_test.go (3)
456-458
: LGTM: NoopCustomerService initialization added and injected.Early initialization and wiring through the server config is consistent and removes duplication.
462-462
: Constructor arguments verified: correct order
The callmeterEventService := metereventadapter.New( mockStreamingConnector, // streaming.Connector customerService, // customer.Service meterManageService, // meter.Service )matches the adapter signature:
func New( streamingConnector streaming.Connector, customerService customer.Service, meterService meter.Service, ) ...No changes required.
178-182
: No change needed for CustomerId in this testThe generated API type declares
CustomerId *string `json:"customerId,omitempty"`so an empty‐string pointer is a valid, explicit value (the test stub intentionally uses
lo.ToPtr("")
). Since this test is only exercising the HTTP wiring (it injects a stubbedapi.IngestedEvent
), you can safely leaveCustomerId: lo.ToPtr("")
as-is.
f49615d
to
fe6d887
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
openmeter/meterevent/adapter/event.go (1)
24-33
: Pass through IngestedAt filters to streaming (regression).You’re reconstructing streaming.ListEventsParams but omitting IngestedAtFrom/IngestedAtTo. This drops API-level ingestedAt filters for v1.
Apply this diff:
listParams := streaming.ListEventsParams{ Namespace: params.Namespace, ClientID: params.ClientID, + IngestedAtFrom: params.IngestedAtFrom, + IngestedAtTo: params.IngestedAtTo, From: params.From, To: params.To, ID: params.ID, Subject: params.Subject, Limit: params.Limit, }
🧹 Nitpick comments (4)
openmeter/meterevent/adapter/event.go (3)
35-57
: Confirm semantics: empty CustomerIDs resolution returns no events.If CustomerIDs is provided but resolve to 0 customers, you early-return an empty event list. Confirm this is the intended UX (vs. 4xx on invalid IDs, or querying events with an always-false filter).
195-237
: Avoid shadowing the importedcustomer
package name.The local variable named
customer
shadows the imported packagecustomer
, which is easy to trip over in future edits. Rename the local to improve clarity.Apply this diff:
- customer, err := a.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{ + cust, err := a.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{ Namespace: namespace, SubjectKey: event.Subject, }) if err != nil { if models.IsGenericNotFoundError(err) { eventsWithCustomerID = append(eventsWithCustomerID, event) continue } return nil, fmt.Errorf("get customer by usage attribution: %w", err) } // Add the customer ID to the cache - cache[event.Subject] = customer.ID + cache[event.Subject] = cust.ID // Add the event to the list - event.CustomerID = &customer.ID + event.CustomerID = &cust.ID eventsWithCustomerID = append(eventsWithCustomerID, event)
213-219
: Optional: batch attribution lookups.You already cache by subject to avoid duplicate lookups, which is good. If this hot path becomes heavy, consider adding a customer service method to batch-resolve usage attribution for multiple subject keys in one call to reduce DB round trips.
openmeter/server/server_test.go (1)
138-186
: Consider adding tests to pin new behaviors (customer filter + ingestedAt pass-through).
- Add a server test that supplies ?customerId=... and asserts the connector received non-nil Customers in ListEvents params (e.g., by extending MockStreamingConnector to capture last params).
- Add a server test for ingestedAtFrom/ingestedAtTo to ensure v1 path forwards them to streaming. This would have caught the current omission in adapter.ListEvents.
If helpful, I can sketch the mock capture and assertions.
Also applies to: 606-621
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
openmeter/meterevent/adapter/event.go
(4 hunks)openmeter/server/server_test.go
(5 hunks)openmeter/streaming/connector.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- openmeter/streaming/connector.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-03-07T12:17:43.129Z
Learnt from: GAlexIHU
PR: openmeterio/openmeter#2383
File: openmeter/entitlement/metered/lateevents_test.go:37-45
Timestamp: 2025-03-07T12:17:43.129Z
Learning: In the OpenMeter codebase, test files like `openmeter/entitlement/metered/lateevents_test.go` may use variables like `meterSlug` and `namespace` without explicit declarations visible in the same file. This appears to be an accepted pattern in their test structure.
Applied to files:
openmeter/server/server_test.go
🧬 Code Graph Analysis (2)
openmeter/server/server_test.go (4)
api/api.gen.go (3)
CustomerId
(2288-2291)ListEventsParams
(7763-7803)ListEventsV2Params
(8344-8375)openmeter/meterevent/adapter/adapter.go (1)
New
(10-20)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)openmeter/streaming/connector.go (1)
RawEvent
(24-36)
openmeter/meterevent/adapter/event.go (7)
openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)Event
(48-69)openmeter/streaming/eventparams.go (1)
ListEventsParams
(13-34)openmeter/customer/customer.go (2)
ListCustomersInput
(181-198)GetCustomerByUsageAttributionInput
(163-166)pkg/pagination/v2/pagination.go (1)
Result
(13-19)openmeter/streaming/connector.go (1)
RawEvent
(24-36)openmeter/meter/parse.go (1)
ParseEventString
(68-70)pkg/models/errors.go (1)
IsGenericNotFoundError
(57-65)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Quickstart
- GitHub Check: Analyze (go)
🔇 Additional comments (7)
openmeter/meterevent/adapter/event.go (3)
65-81
: Pipeline LGTM (map -> validate -> enrich).The flow is clean and side-effect free; enrichment properly runs only after validation. No concerns.
92-104
: V2 param mapping looks correct.All relevant fields are mapped; v2 does not include the customer filter (per PR scope). LGTM.
127-149
: Solid mapper.mapEventsToMeterEvents mirrors RawEvent fields correctly, preserves nullable CustomerID, and initializes ValidationErrors. LGTM.
openmeter/server/server_test.go (4)
168-186
: Test update for v1 including CustomerId is appropriate.Asserting CustomerId presence (pointer to empty string due to Noop service) validates the enrichment path. LGTM.
457-465
: Constructor wiring LGTM.Injecting NoopCustomerService and using the new meterevent adapter constructor matches the updated dependencies.
606-621
: Mock signature change to streaming.ListEventsParams LGTM.The mock aligns with the production connector signature changes.
622-636
: Mock signature change to streaming.ListEventsV2Params LGTM.Consistent with v2 connector API. No issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
openmeter/meterevent/adapter/event.go (1)
24-35
: Restored ingestedAt filters pass-through in v1 list — regression fixedAdding IngestedAtFrom/IngestedAtTo to the streaming params resolves the earlier regression. LGTM.
🧹 Nitpick comments (1)
openmeter/meterevent/adapter/event.go (1)
154-194
: Optional: break after first meter match to avoid redundant validationsAssuming event type uniquely identifies a meter, short-circuit the inner loop to reduce work. If multiple meters can legitimately share an event type, ignore this.
Apply this diff:
- for _, m := range meterList.Items { - // Check if the event type matches the meter event type - if event.Type == m.EventType { - meterMatch = true - - // Validate the event data against the meter - _, err = meter.ParseEventString(m, event.Data) - if err != nil { - validationErrors = append(validationErrors, err) - } - } - } + for _, m := range meterList.Items { + if event.Type == m.EventType { + meterMatch = true + if _, e := meter.ParseEventString(m, event.Data); e != nil { + validationErrors = append(validationErrors, e) + } + break + } + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
openmeter/meterevent/adapter/event.go
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
openmeter/meterevent/adapter/event.go (6)
openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)Event
(48-69)openmeter/streaming/eventparams.go (1)
ListEventsParams
(13-34)openmeter/customer/customer.go (2)
ListCustomersInput
(181-198)GetCustomerByUsageAttributionInput
(163-166)openmeter/streaming/connector.go (1)
RawEvent
(24-36)openmeter/meter/parse.go (1)
ParseEventString
(68-70)pkg/models/errors.go (1)
IsGenericNotFoundError
(57-65)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: Artifacts / Container image
- GitHub Check: Build
- GitHub Check: Lint
- GitHub Check: Migration Checks
- GitHub Check: Test
- GitHub Check: Code Generators
- GitHub Check: Analyze (go)
🔇 Additional comments (4)
openmeter/meterevent/adapter/event.go (4)
67-83
: LGTM: map → validate → enrich flow in v1The staged pipeline and error handling are clean and consistent.
94-106
: LGTM: V2 parameter pass-throughThe pass-through of v2 filters matches the spec; enrichment is handled consistently post-validation.
129-151
: LGTM: correct mapping from RawEvent to meterevent.EventCustomerID is preserved and ValidationErrors is initialized per event.
47-51
: No action needed—value receiver is used
Thecustomer.Customer
type implementsstreaming.Customer
via a value receiver (func (c Customer) GetUsageAttribution()
), so appending the loop variablec
directly is correct and poses no “address of range variable” issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
openmeter/streaming/clickhouse/queryhelper.go (2)
36-43
: Good fix: avoids invalid CASE when there are no mappingsReturning a constant projection when no WHENs were added prevents constructing an invalid “CASE … ELSE … END” without any WHENs. This addresses the earlier concern.
61-70
: Avoid calling Or() with zero args when no subject keys are collectedIf customers exist but none has SubjectKeys, query.Or(...) receives zero args. Depending on go-sqlbuilder behavior this can yield an empty/invalid predicate. Guard with a length check.
if len(customers) > 0 { var customerSubjects []string for _, customer := range customers { customerSubjects = append(customerSubjects, customer.GetUsageAttribution().SubjectKeys...) } - query = query.Where(query.Or(slicesx.Map(customerSubjects, mapFunc)...)) + if len(customerSubjects) > 0 { + query = query.Where(query.Or(slicesx.Map(customerSubjects, mapFunc)...)) + } }
🧹 Nitpick comments (4)
openmeter/streaming/clickhouse/queryhelper.go (3)
13-14
: Docstring mismatch with function signature (nit)The comment says “returns the select columns” (plural), but the function returns a single string. Please align the comment for clarity.
-// customerIdSelect returns the select columns for the customer ID. +// selectCustomerIdColumns returns a single SELECT projection for customer_id.
22-34
: Guard against duplicate subject keys across customersIf a subject key appears under multiple customers, this will emit multiple WHEN clauses for the same condition with different THEN values; CASE will take the first match, leading to silent, order-dependent selection. If upstream guarantees global uniqueness of subject keys across customers, we’re good—otherwise, dedupe here to avoid ambiguity and reduce query size.
Example dedup inline (keeps first occurrence):
- // Add the case statements for each subject to customer ID mapping + // Add the case statements for each subject to customer ID mapping + seen := make(map[string]struct{}) for _, customer := range customers { for _, subjectKey := range customer.GetUsageAttribution().SubjectKeys { + if _, dup := seen[subjectKey]; dup { + continue + } + seen[subjectKey] = struct{}{} str := fmt.Sprintf( "WHEN %s = '%s' THEN '%s' ", subjectColumn, sqlbuilder.Escape(subjectKey), sqlbuilder.Escape(customer.GetUsageAttribution().ID), ) caseBuilder.WriteString(str) added = true } }
72-77
: Optional: use IN() instead of large OR chains for better readability/plan sizeFor long subject lists, IN(...) is typically more compact than many OR-ed equality predicates. If go-sqlbuilder supports it for ClickHouse, consider switching to query.In(subjectColumn, sqlbuilder.List(...)).
openmeter/streaming/clickhouse/event_query.go (1)
146-161
: DRY opportunity: factor customers/subjects extraction used in both toCountRowSQL and toSQLBoth methods replicate the logic to materialize customers and subjects before calling subjectWhere. Consider extracting a small helper (e.g., func (d queryEventsTable) materializeFilters() (custs []streaming.Customer, subjects []string)) to reduce duplication and avoid drift.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
api/spec/src/events.tsp
(2 hunks)openmeter/streaming/clickhouse/event_query.go
(4 hunks)openmeter/streaming/clickhouse/meter_query.go
(2 hunks)openmeter/streaming/clickhouse/queryhelper.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- api/spec/src/events.tsp
- openmeter/streaming/clickhouse/meter_query.go
🧰 Additional context used
🧬 Code Graph Analysis (2)
openmeter/streaming/clickhouse/queryhelper.go (1)
api/api.gen.go (1)
Customer
(2165-2215)
openmeter/streaming/clickhouse/event_query.go (1)
api/api.gen.go (2)
Customer
(2165-2215)Subject
(6309-6334)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Artifacts / Container image
- GitHub Check: Artifacts / Benthos Collector Container image
- GitHub Check: Migration Checks
- GitHub Check: Code Generators
- GitHub Check: Lint
- GitHub Check: Build
- GitHub Check: Test
- GitHub Check: Analyze (go)
🔇 Additional comments (3)
openmeter/streaming/clickhouse/event_query.go (3)
61-62
: Adding Customers to queryEventsTable looks goodField addition is consistent with the new filtering and selection helpers.
83-98
: Count path: unified filtering via subjectWhereUsing subjectWhere for the count query keeps semantics consistent with the data query. Looks good.
107-125
: Verify whether customer_id should be projected unconditionally for API stabilityCurrently, the customer_id column is only selected when d.Customers is non-nil and non-empty. If the API contract for v1/v2 expects customerId to always be present (empty string when not attributable), consider always projecting it. The helper already returns a constant when there are no mappings, so you can safely remove the len(...) > 0 guard (or compute the slice locally and pass it regardless).
Option to always include customer_id (keeps schema stable):
// Select customer_id column if customer filter is provided - if d.Customers != nil && len(*d.Customers) > 0 { - selectColumns = append(selectColumns, selectCustomerIdColumns(d.EventsTableName, *d.Customers)) - } + var custs []streaming.Customer + if d.Customers != nil { + custs = *d.Customers + } + selectColumns = append(selectColumns, selectCustomerIdColumns(d.EventsTableName, custs))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
openmeter/meterevent/adapter/event.go (1)
24-35
: Fixed: v1 ingestedAt filter pass-through is now correctly wired.Including IngestedAtFrom/IngestedAtTo in streaming.ListEventsParams resolves the earlier regression in v1 list. LGTM.
🧹 Nitpick comments (3)
openmeter/meterevent/adapter/event.go (3)
37-59
: Honor explicit empty CustomerIDs filter by short-circuiting to no results.If CustomerIDs is provided but empty, the current code treats it as “no filter,” returning all events. For explicit empty filters, it’s safer to return zero events to avoid unexpectedly broad queries and high load. Also keep the current path for non-empty lists.
Suggested change:
Please confirm desired API semantics: should an explicit empty filter return zero results?
- // Resolve customer IDs to customers if provided - if params.CustomerIDs != nil && len(*params.CustomerIDs) > 0 { + // Resolve customer IDs to customers if provided + if params.CustomerIDs != nil { + // Explicitly provided empty filter => return no events + if len(*params.CustomerIDs) == 0 { + return []meterevent.Event{}, nil + } customerList, err := a.customerService.ListCustomers(ctx, customer.ListCustomersInput{ Namespace: params.Namespace, CustomerIDs: *params.CustomerIDs, }) if err != nil { return nil, fmt.Errorf("list customers: %w", err) } - customers := make([]streaming.Customer, 0, len(customerList.Items)) - - for _, c := range customerList.Items { - customers = append(customers, c) - } + customers := make([]streaming.Customer, 0, len(customerList.Items)) + for _, c := range customerList.Items { + customers = append(customers, c) + } // If no customers are found, return an empty list if len(customers) == 0 { return []meterevent.Event{}, nil } listParams.Customers = &customers }
158-203
: Reduce O(events × meters) validation to O(events) with an index by event type.Current loop scans all meters per event. If meters or events are large, this will be costly. Pre-index meters by EventType once, then validate per event only against matching meters.
func (a *adapter) validateEvents(ctx context.Context, namespace string, events []meterevent.Event) ([]meterevent.Event, error) { // Get all meters meterList, err := a.meterService.ListMeters(ctx, meter.ListMetersParams{ Namespace: namespace, }) if err != nil { return nil, fmt.Errorf("get meters: %w", err) } + // Index meters by event type to avoid scanning for every event + metersByType := make(map[string][]meter.Meter, len(meterList.Items)) + for _, m := range meterList.Items { + metersByType[m.EventType] = append(metersByType[m.EventType], m) + } + // Validate events against meters validatedEvents := make([]meterevent.Event, 0, len(events)) for _, event := range events { - meterMatch := false - validationErrors := []error{} - - // Check if the event type matches a meter - for _, m := range meterList.Items { - // Check if the event type matches the meter event type - if event.Type == m.EventType { - meterMatch = true - - // Validate the event data against the meter - _, err = meter.ParseEventString(m, event.Data) - if err != nil { - validationErrors = append(validationErrors, err) - } - } - } + validationErrors := []error{} + ms := metersByType[event.Type] + meterMatch := len(ms) > 0 + for _, m := range ms { + if _, perr := meter.ParseEventString(m, event.Data); perr != nil { + validationErrors = append(validationErrors, perr) + } + } // If no meter matches the event type, add an error to the event if !meterMatch { validationErrors = append(validationErrors, fmt.Errorf("no meter found for event type: %s", event.Type)) } // If the event does not have a customer ID, add an error to the event if event.CustomerID == nil { validationErrors = append(validationErrors, fmt.Errorf("no customer found for event subject: %s", event.Subject)) } event.ValidationErrors = validationErrors validatedEvents = append(validatedEvents, event) } return validatedEvents, nil }Also, if EventType is guaranteed to be unique across meters, you can break after the first match to further reduce work.
205-251
: Add negative caching to avoid repeated lookups on “not found”; consider batching later.Nice touch with per-call positive cache. However, when a subject has no associated customer, we re-query on every occurrence. Add a small negative cache to skip redundant lookups. The FIXME about batching is valid; batching by unique subjects would further reduce DB load.
func (a *adapter) enrichEventsWithCustomerID(ctx context.Context, namespace string, events []meterevent.Event) ([]meterevent.Event, error) { eventsWithCustomerID := make([]meterevent.Event, 0, len(events)) - cache := make(map[string]string) + cache := make(map[string]string) // positive cache: subject -> customerID + notFound := make(map[string]struct{}) // negative cache: subject with no customer for _, event := range events { // If the event already has a customer ID, add it to the list if event.CustomerID != nil { eventsWithCustomerID = append(eventsWithCustomerID, event) continue } // Check if the customer ID for the subject is in the cache if customerID, ok := cache[event.Subject]; ok { // Create a stable copy to take address of id := customerID event.CustomerID = &id eventsWithCustomerID = append(eventsWithCustomerID, event) continue } + // Skip lookup if we already know there is no customer for this subject + if _, nf := notFound[event.Subject]; nf { + eventsWithCustomerID = append(eventsWithCustomerID, event) + continue + } + // FIXME: do this in a batches to avoid hitting the database for each event // Get the customer by usage attribution subject key cust, err := a.customerService.GetCustomerByUsageAttribution(ctx, customer.GetCustomerByUsageAttributionInput{ Namespace: namespace, SubjectKey: event.Subject, }) if err != nil { if models.IsGenericNotFoundError(err) { + notFound[event.Subject] = struct{}{} eventsWithCustomerID = append(eventsWithCustomerID, event) continue } return nil, fmt.Errorf("get customer by usage attribution: %w", err) } // Add the customer ID to the cache cache[event.Subject] = cust.ID // Add the event to the list (use stable copy for pointer) customerID := cust.ID event.CustomerID = &customerID eventsWithCustomerID = append(eventsWithCustomerID, event) } return eventsWithCustomerID, nil }If you want, I can sketch a batched resolver that groups by unique subject and calls a future ListByUsageAttribution API once per request.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
openmeter/meterevent/adapter/event.go
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
openmeter/meterevent/adapter/event.go (7)
openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)Event
(48-69)openmeter/streaming/eventparams.go (1)
ListEventsParams
(13-34)openmeter/customer/customer.go (2)
ListCustomersInput
(181-198)GetCustomerByUsageAttributionInput
(163-166)pkg/pagination/v2/pagination.go (2)
Result
(13-19)NewResult
(23-35)openmeter/streaming/connector.go (1)
RawEvent
(24-36)openmeter/meter/parse.go (1)
ParseEventString
(68-70)pkg/models/errors.go (1)
IsGenericNotFoundError
(57-65)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: Artifacts / Container image
- GitHub Check: Lint
- GitHub Check: Migration Checks
- GitHub Check: Build
- GitHub Check: Test
- GitHub Check: Code Generators
- GitHub Check: Analyze (go)
🔇 Additional comments (5)
openmeter/meterevent/adapter/event.go (5)
62-74
: Good: centralizing post-processing (map → enrich → validate) for v1 list.The split into eventPostProcess improves readability and reuse. Error wrapping is consistent.
86-97
: V2 params mapping looks correct and consistent with the streaming layer.ID/Source/Subject/Type/Time/IngestedAt/Cursor/Limit are passed through as expected. No filter by customer for v2 (in line with the PR objective).
111-131
: Pipeline ordering (enrich before validate) makes sense.Enriching CustomerID before validation reduces false positives in validation results. Good separation of concerns.
133-155
: Mapping RawEvent → meterevent.Event LGTM.Field parity looks correct, and carrying through RawEvent.CustomerID preserves upstream attribution when available.
193-196
: Confirm whether missing CustomerID should always be a validation error.The PR notes “optional customer association,” but this flags every event lacking CustomerID as invalid. If some event types legitimately have no attribution, this may be too strict. Consider gating this on meters that require usage attribution or downgrading to a warning.
Would you like me to add a check against meter configuration (e.g., only enforce when m.GetUsageAttribution() is required)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
e2e/e2e_test.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
e2e/e2e_test.go (3)
api/client/javascript/src/client/schemas.ts (1)
ValidationError
(11248-11248)api/api.gen.go (1)
ValidationError
(6970-6982)api/client/go/client.gen.go (1)
ValidationError
(6386-6398)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: Artifacts / Container image
- GitHub Check: Migration Checks
- GitHub Check: Lint
- GitHub Check: Code Generators
- GitHub Check: Test
- GitHub Check: Build
- GitHub Check: Analyze (go)
d705c1b
to
3c56819
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
openmeter/streaming/clickhouse/connector.go (2)
95-110
: Run params.Validate() for v1 to catch bad inputs earlyV1 path doesn’t validate
ListEventsParams
, while V2 does. This can lead to invalid time ranges or negative limits slipping through.Apply this diff:
func (c *Connector) ListEvents(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]streaming.RawEvent, error) { if namespace == "" { return nil, fmt.Errorf("namespace is required") } + if err := params.Validate(); err != nil { + return nil, fmt.Errorf("validate params: %w", err) + } + + // Optional: prevent namespace mismatches if callers set params.Namespace. + if params.Namespace != "" && params.Namespace != namespace { + return nil, fmt.Errorf("namespace mismatch: params=%q arg=%q", params.Namespace, namespace) + } + events, err := c.queryEventsTable(ctx, namespace, params)
370-419
: Map ClickHouse “code: 60” to NamespaceNotFoundError in V2 (parity with V1)V1 translates CH error 60 to
NamespaceNotFoundError
, but V2 returns a generic error. This diverges API behavior and can break callers relying on typed errors.Apply this diff:
rows, err := c.config.ClickHouse.Query(ctx, sql, args...) if err != nil { - return nil, fmt.Errorf("query events table query: %w", err) + if strings.Contains(err.Error(), "code: 60") { + return nil, models.NewNamespaceNotFoundError(params.Namespace) + } + return nil, fmt.Errorf("query events table query: %w", err) }
♻️ Duplicate comments (4)
openmeter/meterevent/adapter/event.go (2)
25-36
: Regression fixed: ingestedAt filters are now passed through.Good catch adding IngestedAtFrom/IngestedAtTo to the streaming params. This addresses the earlier v1 regression where ingestedAt filters were ignored.
38-60
: Bug: explicit empty CustomerIDs broadens the v1 query to all events. Short-circuit to empty result.If the client supplies an explicit empty array (customerId=[]), the current check skips filtering entirely and returns all events. That’s surprising and can be expensive. Treat an explicit empty filter as “no results.”
Apply this diff:
- // Resolve customer IDs to customers if provided - if params.CustomerIDs != nil && len(*params.CustomerIDs) > 0 { + // Resolve customer IDs to customers if provided + if params.CustomerIDs != nil { + // Explicitly provided empty filter => no events + if len(*params.CustomerIDs) == 0 { + return []meterevent.Event{}, nil + } customerList, err := a.customerService.ListCustomers(ctx, customer.ListCustomersInput{ Namespace: params.Namespace, CustomerIDs: *params.CustomerIDs, }) if err != nil { return nil, fmt.Errorf("list customers: %w", err) } customers := make([]streaming.Customer, 0, len(customerList.Items)) for _, c := range customerList.Items { customers = append(customers, c) } // If no customers are found, return an empty list if len(customers) == 0 { return []meterevent.Event{}, nil } listParams.Customers = &customers }openmeter/meterevent/service.go (1)
71-77
: Cursor implementation added on Event.This satisfies pagination requirements and fixes the earlier interface gap.
openmeter/streaming/clickhouse/event_query.go (1)
120-123
: Guard against empty customer mappings — thank you for adding thisThe additional len(*d.Customers) > 0 guard ensures we only project customer_id when mappings exist, addressing the earlier review note.
🧹 Nitpick comments (27)
openmeter/meterevent/service.go (1)
37-45
: Public API: ListEventsParams gains CustomerIDs. Consider input semantics for empty slice.Adding CustomerIDs is fine, but an explicitly empty slice meaning “no results” vs “no filter” should be clarified and enforced consistently across layers. Current adapter v1 path needs a short-circuit (see adapter comment).
pkg/filter/filter_test.go (1)
470-559
: Add a test for empty In = [] to lock in semantics.We should define whether an explicit empty list is treated as empty filter or “no results.” Recommend asserting “no results” behavior at higher layers, but at the filter level it’s worth deciding if SelectWhereExpr should be empty or generate a safe-false predicate. Add a unit test to prevent regressions.
Would you like me to open a follow-up to add:
- Validate() behavior for empty In (either error or treat as empty)
- SelectWhereExpr behavior for empty In (emit no expr or a false condition)?
I can draft both alternatives with benchmarks.
openmeter/streaming/clickhouse/event_query_v2_test.go (1)
68-82
: Great addition: subject IN filter coverage (customerId mapping)
- Solid test for IN semantics with a slice argument.
- Consider adding parity tests:
- Subject.Nin (NOT IN).
- Guard/behavior for empty lists (should be rejected upstream; see notes on FilterStringExact below).
api/spec/src/filter.tsp (1)
91-109
: New FilterStringExact type is a good fit for exact/in-only filtersThe constrained surface (only $eq and $in) is appropriate for identifiers like customerId.
- Suggest documenting the mutual exclusivity explicitly in the schema comments to match backend validation.
- Optional: enforce min/max items on $in here (uncomment @minItems/@Maxitems) since this model intentionally excludes nullable types.
Apply this diff to clarify exclusivity:
@friendlyName("FilterStringExact") model FilterStringExact { + /** + * Note: $eq and $in are mutually exclusive. Only one may be set. + */ /** * The field must be equal to the provided value. */ $eq?: string | null;Optionally enforce list bounds:
- // @minItems(1) - // @maxItems(100) + @minItems(1) + @maxItems(100) $in?: string[] | null;api/spec/src/events.tsp (3)
68-75
: Remove redundant @query decoratorYou have both @query and @query(#{ explode: true }) on the same parameter. One is enough; keep the exploded form.
- @query - @query(#{ explode: true }) + @query(#{ explode: true }) @example(#["01HZXABCD3M6E7K8J9PQRS1TUV", "01HZXABCDE3M6E7K8J9PQRSTUV"]) customerId?: ULID[],
181-188
: Include customerId in the filter example for discoverabilityAdding customerId to the example helps clients discover the new capability quickly.
@example(#{ id: #{ $eq: "my-event-id" }, source: #{ $eq: "my-event-source" }, subject: #{ $eq: "my-event-subject" }, + customerId: #{ $in: #["01HZXABCD3M6E7K8J9PQRS1TUV", "01HZXABCDE3M6E7K8J9PQRSTUV"] }, type: #{ $eq: "my-event-type" }, time: #{
310-314
: Add customerId to the IngestedEvent exampleSince the field is new and user-facing, show it in the example payload.
@example(#{ event: #{ id: "5c10fade-1c9e-4d6c-8275-c52c36731d3c", source: "service-name", specversion: "1.0", type: "prompt", subject: "customer-id", time: DateTime.fromISO("2023-01-01T01:01:01.001Z"), }, + customerId: "01HZXABCD3M6E7K8J9PQRS1TUV", ingestedAt: DateTime.fromISO("2023-01-01T01:01:01.001Z"), storedAt: DateTime.fromISO("2023-01-01T01:01:02.001Z"), })
pkg/filter/filter.go (2)
189-198
: Defensive guard in SelectWhereExpr for empty $inEven with validation, keeping SelectWhereExpr resilient is cheap. Emit no predicate if $in is empty.
func (f FilterStringExact) SelectWhereExpr(field string, q *sqlbuilder.SelectBuilder) string { switch { case f.Eq != nil: return q.EQ(field, *f.Eq) - case f.In != nil: + case f.In != nil && len(*f.In) > 0: return q.In(field, *f.In) default: return "" } }
160-166
: Add compile-time assertion that FilterStringExact implements FilterMatches the pattern used for other filters and helps catch regressions.
Add near the existing assertions (around Lines 23-29):
var ( // ... _ Filter = (*FilterStringExact)(nil) )api/client/javascript/src/client/schemas.ts (2)
5285-5291
: Clarify FilterStringExact semantics; consider stricter schema if exclusivity is desiredThe new schema looks good. Small doc nit: the description can be clearer about how
$eq
and$in
interact. If both can be provided, say so. If they’re mutually exclusive, consider encoding that in OpenAPI (oneOf) so clients generate a discriminated union.Apply this minimal doc tweak here to avoid ambiguity:
- /** @description A filter for a string field allowing only equality or inclusion. */ + /** @description A filter for a string field allowing exact match ($eq) and/or set membership ($in). */If you want exclusivity, update the OpenAPI instead (generator-friendly), e.g.:
components: schemas: FilterStringExact: oneOf: - type: object properties: $eq: type: string required: [$eq] - type: object properties: $in: type: array items: { type: string } required: [$in]
5471-5475
: Mention ULID in the customerId documentation for IngestedEventThe field is a ULID per the spec elsewhere; reflecting that here helps client-side consumers.
Apply this doc-only diff:
- /** - * @description The customer ID if the event is associated with a customer. - * @example 01G65Z755AFWAKHE12NY0CQ9FH - */ + /** + * @description The customer ID (ULID) if the event is associated with a customer. + * @example 01G65Z755AFWAKHE12NY0CQ9FH + */Note: If this file is generated, please make this change in the OpenAPI source so it propagates here.
api/openapi.yaml (2)
5005-5017
: Tighten query param semantics for array input and examples.Good addition. Recommend minor spec hardening for better client generation and validation.
Apply this diff to clarify CSV semantics, enforce non-empty/unique lists, and provide a concrete multi-value example:
- name: customerId in: query required: false - description: The event customer ID. + description: The event customer ID(s). With style=form and explode=false, provide a comma-separated list (CSV). schema: type: array + minItems: 1 + uniqueItems: true items: type: string pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ description: ULID (Universally Unique Lexicographically Sortable Identifier). example: 01G65Z755AFWAKHE12NY0CQ9FH explode: false style: form + example: 01G65Z755AFWAKHE12NY0CQ9FH,01H0F2K5B1ABCDEF234567890ATwo nits to consider:
- Parameter name is singular but accepts an array. If this diverges from your conventions, consider renaming to customerIds before release to avoid future breaking changes.
- If the backend imposes an upper bound, add maxItems (e.g., 100) to prevent abuse.
16798-16802
: Event ingestion payload: clarify nullability and reuse constraints.If customerId can be absent (no mapping), mark it nullable to disambiguate “omitted vs. empty”, and tighten the ULID constraints. Also consider reusing a shared ULID format across the spec to avoid pattern duplication.
Apply this diff for minimal hardening:
customerId: type: string + nullable: true + minLength: 26 + maxLength: 26 + format: ulid pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ description: The customer ID if the event is associated with a customer. example: 01G65Z755AFWAKHE12NY0CQ9FHFollow-up checks:
- If the core Event schema already contains customerId, avoid duplicating it at the wrapper level unless this field represents a post-validation/enrichment result. In that case, clarify it in the description (e.g., “Resolved via usage-attribution”).
- Consider introducing components/schemas/Ulid and referencing it (and an array variant) wherever ULIDs appear.
app/common/meterevent.go (1)
17-21
: All NewMeterEventService call sites updated
Verification shows that every invocation of NewMeterEventService now passes three arguments—connector, customerService, service
—and the generated Wire file (cmd/server/wire_gen.go:453
) has been updated accordingly. There are no remaining two-argument call sites in the codebase, and Wire providers correctly include a concrete customer.Service.Optional: for clarity, consider adding the suggested doc comment above the constructor:
+// NewMeterEventService constructs a meterevent.Service wired with streaming, customer, and meter services. +// Note: customerService is required to enable customer attribution in event listing APIs. func NewMeterEventService( streamingConnector streaming.Connector, customerService customer.Service, meterService meter.Service, ) meterevent.Service {openmeter/streaming/clickhouse/event_query.go (3)
61-63
: Clarify presence vs. intent semantics for Customers pointerUsing a pointer to []streaming.Customer is good to distinguish “unset” vs “set but empty”. Consider adding a short comment on the struct field specifying:
- nil: no customer-based filtering or projection
- empty slice: same as nil (no filter/projection)
This prevents future regressions in helpers that may treat empty as “match nothing.”
146-161
: Duplicate “derive customers/subjects then subjectWhere” logic — consider a tiny helperThe blocks that derive customers/subjects and then call subjectWhere are duplicated in both toCountRowSQL and toSQL. Extracting into a small private helper will reduce drift risk.
Example signature:
- func (d queryEventsTable) applySubjectFilters(q *sqlbuilder.SelectBuilder) *sqlbuilder.SelectBuilder
121-123
:append
usage is correct; helper returns a single string
The functionfunc selectCustomerIdColumns(eventsTableName string, customers []streaming.Customer) stringreturns a single
string
, so- selectColumns = append(selectColumns, selectCustomerIdColumns(d.EventsTableName, *d.Customers)) + // no change needed: append accepts a string element selectColumns = append(selectColumns, selectCustomerIdColumns(d.EventsTableName, *d.Customers))is already correct.
Nit (optional): consider renaming the helper to a singular form—e.g.
selectCustomerIdColumn
—to better reflect that it returns one string rather than a slice.api/openapi.cloud.yaml (4)
5004-5016
: Clarify serialization, add array constraints, and provide a parameter-level example for customerId query param.With
style: form
andexplode: false
, clients must send a comma-separated list (e.g.,?customerId=01G...,01H...
). If that’s the intended contract, let’s:
- make this explicit in the description,
- add
minItems
anduniqueItems
to the array schema,- include a parameter-level example that matches the chosen serialization.
If, instead, you want repeated keys (e.g.,
?customerId=01G...&customerId=01H...
), switchexplode
totrue
. Please confirm the expected client behavior across SDKs before merging.Proposed in-place diff:
- name: customerId in: query required: false - description: The event customer ID. + description: Filter events by customer ID(s). With style=form and explode=false, provide a comma-separated list (e.g., customerId=01G65Z755AFWAKHE12NY0CQ9FH,01H8F6W6Z6V0V1NX5X3S9B1ABC). schema: type: array + minItems: 1 + uniqueItems: true items: type: string pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ description: ULID (Universally Unique Lexicographically Sortable Identifier). example: 01G65Z755AFWAKHE12NY0CQ9FH explode: false style: form + example: 01G65Z755AFWAKHE12NY0CQ9FH,01H8F6W6Z6V0V1NX5X3S9B1ABCTo reduce duplication of the ULID pattern across the spec, consider introducing a reusable schema and referencing it here (see below in a separate block).
5004-5016
: DRY the ULID pattern via a reusable component.You repeat the ULID regex here and elsewhere. Define a single
ULID
schema undercomponents/schemas
and reference it to avoid drift.Add this component once (outside this hunk):
components: schemas: ULID: type: string pattern: '^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$' description: ULID (Universally Unique Lexicographically Sortable Identifier). example: 01G65Z755AFWAKHE12NY0CQ9FHThen replace the inline
items
schema here with:- items: - type: string - pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ - description: ULID (Universally Unique Lexicographically Sortable Identifier). - example: 01G65Z755AFWAKHE12NY0CQ9FH + items: + $ref: '#/components/schemas/ULID'
11316-11317
: Optional: Attach a description without breaking $ref (wrap via allOf).OpenAPI tooling often ignores siblings next to
$ref
. If you want a property-level description here, wrap the reference to keep compatibility.Proposed diff:
- customerId: - $ref: '#/components/schemas/FilterStringExact' + customerId: + allOf: + - $ref: '#/components/schemas/FilterStringExact' + description: Filter by exact customerId using $eq or by a set using $in.
16438-16442
: Propose reusing ULID schema for customerId and marking it read-onlyAfter verifying, the
Event
schema in bothapi/openapi.cloud.yaml
(line 15832+) andapi/openapi.yaml
(line 16098+) does not definecustomerId
, so there’s no duplication to worry about. You can safely refactor the standalonecustomerId
field to reference the shared ULID component and mark it as read-only.• File:
api/openapi.cloud.yaml
(lines 16438–16442)- customerId: - type: string - pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ - description: The customer ID if the event is associated with a customer. - example: 01G65Z755AFWAKHE12NY0CQ9FH + customerId: + allOf: + - $ref: '#/components/schemas/ULID' + readOnly: true + description: The customer ID if the event is associated with a customer. + example: 01G65Z755AFWAKHE12NY0CQ9FHopenmeter/streaming/eventparams.go (2)
12-33
: Namespace duplication in v1 params vs. method signature causes ambiguity
ListEventsParams
includesNamespace string
, butstreaming.Connector.ListEvents
already takesnamespace string
separately and the ClickHouse code uses the method arg (not the struct field). This invites mismatches.
- Option A (preferred): remove Namespace from
ListEventsParams
(breaking change).- Option B: keep field but enforce equality in the connector and/or mark as deprecated to prevent misuse.
If you keep it, consider marking it as deprecated in the comment:
type ListEventsParams struct { - // The namespace. - Namespace string + // DEPRECATED: Ignored by streaming.Connector.ListEvents. Use the method's `namespace` argument. + // Kept for compatibility with higher layers. + Namespace stringAnd see connector.go suggestion to assert equality at runtime.
36-61
: Add safe guard for negative limit in v1 paramsV1
Limit
is an int with no validation. If zero is meaningful (e.g., “use default”), that’s fine; but negative should be rejected.Apply this diff:
func (i ListEventsParams) Validate() error { var errs []error @@ if i.IngestedAtFrom != nil && i.IngestedAtTo != nil && i.IngestedAtTo.Before(*i.IngestedAtFrom) { errs = append(errs, fmt.Errorf("ingestedAtTo date is before ingestedAtFrom date: %s < %s", i.IngestedAtTo.Format(time.RFC3339), i.IngestedAtFrom.Format(time.RFC3339))) } + if i.Limit < 0 { + errs = append(errs, errors.New("limit cannot be negative")) + } + return errors.Join(errs...) }If zero must be rejected too, change the guard to
< 1
and ensure callers pass an explicit default.openmeter/streaming/clickhouse/connector.go (1)
309-324
: Confirm Customers param type matches the ClickHouse query builderYou thread
params.Customers
intoqueryEventsTable{ Customers: params.Customers }
. In streaming params this is*[]Customer
, while meterevent/service.go exposes*[]string
(ULIDs). Ensure the builder expects the same element type to avoid compile-time or logical mismatches.Would you like me to align v1 to
CustomerIDs *[]string
for consistency with the service layer and API? I can propose a full refactor diff if desired.openmeter/server/server_test.go (3)
168-186
: Confirm API semantics for missing customer ID (empty string vs null/omitted)Test expects
CustomerId: ""
when not set. If API intends “not available” rather than “empty value,” prefer null (pointer nil) or omit the field to reduce ambiguity and wire compatibility.If the agreed shape is null, update expected to
CustomerId: nil
and ensure the mapping returns nil whenRawEvent.CustomerID
is nil.
606-620
: Enhance mock to include CustomerID for at least one event to assert mappingRight now, mocks never set
CustomerID
, so we only test the “missing customer” path. Add a second event with a non-empty ULID to validate end-to-end propagation in v1.Apply this diff:
func (c *MockStreamingConnector) ListEvents(ctx context.Context, namespace string, params streaming.ListEventsParams) ([]streaming.RawEvent, error) { - events := []streaming.RawEvent{ - { + events := []streaming.RawEvent{ + { ID: mockEvent.ID(), Type: mockEvent.Type(), Source: mockEvent.Source(), Subject: mockEvent.Subject(), Time: mockEvent.Time(), Data: string(mockEvent.Data()), IngestedAt: time.Time{}, StoredAt: time.Time{}, }, + { + ID: "evt-2", + Type: mockEvent.Type(), + Source: mockEvent.Source(), + Subject: mockEvent.Subject(), + Time: mockEvent.Time(), + Data: string(mockEvent.Data()), + CustomerID: lo.ToPtr("01ARZ3NDEKTSV4RRFFQ69G5FAV"), // sample ULID + }, } return events, nil }And add a companion assertion in the “list events v1” test for the second item’s
CustomerId
.
622-636
: Add a V2 list-events test case to cover the new customerId filter and response mappingGiven the V2 path now validates and (after fixes) will support
CustomerID
, add a test like “list events v2 with customer filter” to ensure both filtering and payload mapping work.I can draft the test case and adjust the mock to assert that
params.CustomerID
is honored by returning only matching events. Let me know, and I’ll provide a concrete diff.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (29)
api/client/javascript/src/client/schemas.ts
(4 hunks)api/client/javascript/src/zod/index.ts
(2 hunks)api/openapi.cloud.yaml
(4 hunks)api/openapi.yaml
(4 hunks)api/spec/src/events.tsp
(3 hunks)api/spec/src/filter.tsp
(1 hunks)app/common/meterevent.go
(2 hunks)cmd/server/wire_gen.go
(1 hunks)e2e/e2e_test.go
(1 hunks)openmeter/apiconverter/filter.gen.go
(1 hunks)openmeter/apiconverter/filter.go
(1 hunks)openmeter/meterevent/adapter/adapter.go
(2 hunks)openmeter/meterevent/adapter/event.go
(4 hunks)openmeter/meterevent/httphandler/event.go
(1 hunks)openmeter/meterevent/httphandler/mapping.go
(2 hunks)openmeter/meterevent/service.go
(4 hunks)openmeter/server/server_test.go
(5 hunks)openmeter/streaming/clickhouse/connector.go
(5 hunks)openmeter/streaming/clickhouse/event_query.go
(4 hunks)openmeter/streaming/clickhouse/event_query_test.go
(1 hunks)openmeter/streaming/clickhouse/event_query_v2.go
(2 hunks)openmeter/streaming/clickhouse/event_query_v2_test.go
(10 hunks)openmeter/streaming/clickhouse/meter_query.go
(2 hunks)openmeter/streaming/clickhouse/queryhelper.go
(1 hunks)openmeter/streaming/connector.go
(1 hunks)openmeter/streaming/eventparams.go
(1 hunks)openmeter/streaming/testutils/streaming.go
(1 hunks)pkg/filter/filter.go
(1 hunks)pkg/filter/filter_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (12)
- openmeter/streaming/clickhouse/event_query_v2.go
- e2e/e2e_test.go
- cmd/server/wire_gen.go
- openmeter/streaming/clickhouse/meter_query.go
- openmeter/meterevent/adapter/adapter.go
- openmeter/meterevent/httphandler/event.go
- openmeter/streaming/testutils/streaming.go
- openmeter/streaming/clickhouse/event_query_test.go
- openmeter/streaming/connector.go
- openmeter/meterevent/httphandler/mapping.go
- openmeter/streaming/clickhouse/queryhelper.go
- api/client/javascript/src/zod/index.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-03-07T12:17:43.129Z
Learnt from: GAlexIHU
PR: openmeterio/openmeter#2383
File: openmeter/entitlement/metered/lateevents_test.go:37-45
Timestamp: 2025-03-07T12:17:43.129Z
Learning: In the OpenMeter codebase, test files like `openmeter/entitlement/metered/lateevents_test.go` may use variables like `meterSlug` and `namespace` without explicit declarations visible in the same file. This appears to be an accepted pattern in their test structure.
Applied to files:
openmeter/server/server_test.go
🧬 Code graph analysis (13)
openmeter/apiconverter/filter.go (2)
pkg/filter/filter.go (2)
FilterString
(32-47)FilterStringExact
(161-164)api/api.gen.go (2)
FilterString
(3135-3177)FilterStringExact
(3180-3186)
pkg/filter/filter.go (3)
api/client/javascript/src/client/schemas.ts (1)
FilterStringExact
(10931-10931)api/api.gen.go (1)
FilterStringExact
(3180-3186)api/client/go/client.gen.go (1)
FilterStringExact
(2889-2895)
app/common/meterevent.go (2)
openmeter/meterevent/service.go (1)
Service
(18-21)openmeter/meterevent/adapter/adapter.go (1)
New
(10-20)
openmeter/streaming/clickhouse/event_query_v2_test.go (3)
openmeter/meterevent/service.go (1)
ListEventsV2Params
(125-148)openmeter/streaming/eventparams.go (1)
ListEventsV2Params
(64-85)pkg/filter/filter.go (1)
FilterString
(32-47)
pkg/filter/filter_test.go (1)
pkg/filter/filter.go (1)
FilterStringExact
(161-164)
openmeter/meterevent/adapter/event.go (6)
openmeter/meterevent/service.go (3)
ListEventsParams
(24-45)Event
(48-69)ListEventsV2Params
(125-148)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)openmeter/customer/customer.go (2)
ListCustomersInput
(181-198)GetCustomerByUsageAttributionInput
(163-166)pkg/filter/filter.go (1)
FilterString
(32-47)openmeter/streaming/connector.go (1)
RawEvent
(24-36)pkg/models/errors.go (1)
IsGenericNotFoundError
(57-65)
openmeter/streaming/clickhouse/event_query.go (3)
api/client/javascript/src/client/schemas.ts (2)
Customer
(10858-10858)Subject
(11208-11208)api/api.gen.go (2)
Customer
(2165-2215)Subject
(6318-6343)api/client/go/client.gen.go (2)
Customer
(1995-2045)Subject
(5792-5817)
openmeter/apiconverter/filter.gen.go (3)
openmeter/apiconverter/filter.go (2)
ConvertStringExact
(15-15)ConvertStringExactPtr
(16-16)pkg/filter/filter.go (1)
FilterStringExact
(161-164)api/api.gen.go (1)
FilterStringExact
(3180-3186)
openmeter/streaming/clickhouse/connector.go (3)
openmeter/streaming/connector.go (2)
Connector
(38-48)RawEvent
(24-36)openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)ListEventsV2Params
(125-148)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)
openmeter/streaming/eventparams.go (3)
api/api.gen.go (6)
Subject
(6318-6343)Customer
(2165-2215)ListEventsParams
(7772-7812)FilterString
(3135-3177)FilterTime
(3189-3207)ListEventsV2Params
(8353-8387)openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)ListEventsV2Params
(125-148)pkg/pagination/v2/cursor.go (1)
Cursor
(11-14)
api/client/javascript/src/client/schemas.ts (3)
pkg/filter/filter.go (1)
FilterStringExact
(161-164)api/api.gen.go (1)
FilterStringExact
(3180-3186)api/client/go/client.gen.go (1)
FilterStringExact
(2889-2895)
openmeter/meterevent/service.go (4)
api/client/javascript/src/client/schemas.ts (1)
FilterStringExact
(10931-10931)pkg/filter/filter.go (1)
FilterStringExact
(161-164)api/api.gen.go (1)
FilterStringExact
(3180-3186)api/client/go/client.gen.go (1)
FilterStringExact
(2889-2895)
openmeter/server/server_test.go (3)
openmeter/meterevent/adapter/adapter.go (1)
New
(10-20)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)openmeter/streaming/connector.go (1)
RawEvent
(24-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: Artifacts / Container image
- GitHub Check: Artifacts / Benthos Collector Container image
- GitHub Check: Migration Checks
- GitHub Check: Lint
- GitHub Check: Build
- GitHub Check: Code Generators
- GitHub Check: Test
- GitHub Check: Analyze (go)
🔇 Additional comments (21)
openmeter/meterevent/adapter/event.go (2)
144-164
: Post-processing pipeline ordering is sensible.Mapping → enrichment → validation ensures customer mapping errors surface in ValidationErrors. Looks good.
239-284
: Enrichment correctness: stable pointer copies and cache usage look good.
- Good fix using stable local copies for CustomerID pointers.
- The per-subject cache avoids redundant lookups; the FIXME about batching is noted.
openmeter/apiconverter/filter.go (1)
13-24
: Converters for FilterStringExact added coherently.The new ConvertStringExact and ConvertStringExactPtr align with existing converter patterns and internal filter types. No issues.
openmeter/apiconverter/filter.gen.go (1)
137-150
: Generated mapping for FilterStringExact is correct.Eq/In are mapped 1:1; pointer wrapper handles nil safely. This mirrors other generated converters.
openmeter/meterevent/service.go (2)
61-69
: Event now exposes CustomerID; downstream mapping compatibility looks good.Matches streaming.RawEvent.CustomerID and handler mapping needs. No concerns.
140-148
: V2 adds CustomerID filter with validation.Using FilterStringExact is appropriate here. Validation via ValidateWithComplexity(1) is consistent with other fields.
Also applies to: 182-186
pkg/filter/filter_test.go (2)
360-559
: Solid coverage for FilterStringExact Validate and SelectWhereExpr.Covers nil/eq/in and multiple-operator error paths. Nice.
527-559
: IsEmpty tests cover both Eq and In cases.Good to see parity with other filter types.
openmeter/streaming/clickhouse/event_query_v2_test.go (2)
9-9
: Import switch to streaming package is correctUsing streaming.ListEventsV2Params here aligns tests with the refactor away from meterevent types. No issues spotted.
112-114
: Verify epoch‐second consistency in all time filtersBefore merging, please confirm that the
time
andingested_at
filters in your v2 query builder consistently bind Unix‐seconds, just as the cursor predicates do viaCursor.Time.Unix()
. In particular:
- In
queryEventsTableV2.toSQL()
, the cursor clause clearly uses
q.Params.Cursor.Time.Unix()
.
Ensure thatq.Params.Time.SelectWhereExpr("time", …)
and
q.Params.IngestedAt.SelectWhereExpr("ingested_at", …)
likewise pass epoch‐second values (not rawtime.Time
), so the driver binds anInt64
matching your ClickHouse schema.- If
SelectWhereExpr
doesn’t currently invoke.Unix()
, update its implementation (or wrap it) to converttime.Time → int64
before binding.- Add a brief comment in your table‐creation code (e.g. in
createEventsTable.toSQL()
) noting that bothtime
andingested_at
columns are stored as epoch seconds, to prevent future confusion.pkg/filter/filter.go (1)
189-198
: Confirm ClickHouse driver slice expansion for IN placeholdersOur unit tests for
FilterStringExact.SelectWhereExpr
currently assert:
- SQL:
SELECT * FROM table WHERE test_field IN (?)- Args:
[]interface{}{[]string{"test1", "test2"}}However, not all SQL drivers automatically expand a single placeholder with a slice argument into multiple placeholders (e.g.
IN (?, ?)
). Please verify that the ClickHouse Go driver you’re using correctly expands slice arguments in this scenario, or adjust the code to emit repeated placeholders and flatten the slice into individual args.Affected locations:
pkg/filter/filter.go
–SelectWhereExpr
(lines 189–198)pkg/filter/filter_test.go
–TestFilterStringExact_SelectWhereExpr
(around line 439)Suggested actions:
- Consult your ClickHouse driver’s documentation or add an integration test to confirm its behavior when binding a slice to a single
?
.- If the driver does not expand the slice, modify
SelectWhereExpr
to build anIN (?, ?, …)
clause manually, for example:vals := *f.In placeholders := strings.TrimRight(strings.Repeat("?,", len(vals)), ",") expr := fmt.Sprintf("%s IN (%s)", field, placeholders) args := make([]interface{}, len(vals)) for i, v := range vals { args[i] = v } q = q.Args(args...) return expr- Alternatively, use or implement a helper that both repeats placeholders and flattens the slice.
api/client/javascript/src/client/schemas.ts (2)
10931-10931
: LGTM: Public alias for FilterStringExact is correctly exposedThis keeps parity with the Go client and API types shown in the PR. No further action needed.
17620-17621
: Tighten JSDoc for customerId and confirm repeated‐key serializationThe JSDoc on line 17620 currently reads
/** @description The event customer ID. */ customerId?: string[]To clarify its array nature and filtering behavior, please update it to:
- /** @description The event customer ID. */ + /** @description Filter by event customer ULIDs. Multiple values behave as an IN filter (e.g. ?customerId=…&customerId=…). */• We’ve verified that the generated client’s default query serializer is configured with
explode: true
(client/index.ts
), so array parameters are emitted as repeated keys (customerId=…&customerId=…) as intended.
• OpenAPI relies on its defaultstyle: form
andexplode: true
semantics for array query parameters, which aligns with the client’s serialization.api/openapi.yaml (2)
11317-11320
: Filter operator scope and precedence with the new query param.Using FilterStringExact for customerId is appropriate. However, the endpoint now supports both the plain query param (customerId[]) and the JSON filter (filters.customerId). Please confirm and document how they combine (AND vs OR, and precedence on duplicates). If it’s an AND, state it in the parameter or endpoint description to avoid client confusion.
Would you like me to add a one-liner to the endpoint description clarifying: “If both customerId and filters.customerId are provided, both must match (logical AND)”?
5005-5017
: Review update: v2 Events customerId usage differs by designThe v2
/api/v2/events
operations no longer expose a standalonecustomerId
query parameter—instead they use a singlefilter
parameter whose JSON schema includescustomerId
viaFilterStringExact
. Bothapi/openapi.yaml
andapi/openapi.cloud.yaml
are consistent in this approach, and there is exactly one definition ofFilterStringExact
in each spec.Key points:
- The direct
customerId
array parameter exists only on v1 endpoints (and matches identically in both YAMLs).- On v2, filtering—including by
customerId
—is handled via thefilter
query parameter (FilterStringExact
), and both specs align on that.- There are no duplicate or conflicting
FilterStringExact
definitions.- No component parameter named
customerId
is defined undercomponents/parameters
(aside from its inclusion in v1 via usage in generatedBillingProfileListCustomerOverridesParams
).You can safely ignore the original suggestion to add a standalone
customerId
param on v2 endpoints.Likely an incorrect or invalid review comment.
app/common/meterevent.go (2)
6-6
: Import addition looks correct and necessaryThe new customer import is used in the constructor signature; no unused-import issues.
22-22
: Constructor wiring: argument order matches adapter.New signatureadapter.New(streamingConnector, customerService, meterService) aligns with the updated adapter constructor.
openmeter/streaming/clickhouse/event_query.go (2)
83-101
: Count-path now applies subject/customer filters via subjectWhere — good; ensure no-ops on empty inputsThe unified use of subjectWhere in toCountRowSQL keeps estimated row counts consistent with the data query path. Verify subjectWhere is a no-op when both slices are empty to avoid unnecessary CASE/WHERE fragments.
162-165
: Double-check Desc() chaining order in go-sqlbuilderEnsure sqlbuilder formats as ORDER BY time DESC with query.Desc().OrderBy("time"). Some builders expect OrderBy(...).Desc(). If formatting differs, swap the call order.
Consider adding/adjusting a unit test that asserts the rendered ORDER BY clause.
api/openapi.cloud.yaml (1)
11316-11317
: Confirm semantics when both $eq and $in are provided for customerId filter.You reference
FilterStringExact
forcustomerId
. Please confirm whether providing both$eq
and$in
is allowed and how it’s combined (AND vs OR). If you want to allow either but not necessarily both, encode that in the schema (see proposed constraints added toFilterStringExact
in a later comment).openmeter/server/server_test.go (1)
457-465
: LGTM: proper wiring of customerService into meterevent adapterThis matches the new constructor signature and keeps the server assembly coherent.
Params: streaming.ListEventsV2Params{ | ||
Namespace: "my_namespace", | ||
}, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Consistent migration to streaming.ListEventsV2Params
All test cases now construct query params via streaming.ListEventsV2Params. Looks consistent and keeps the test expectations unchanged except for the import/type path.
Run to spot any stragglers still using the old type:
Also applies to: 43-49, 58-64, 88-94, 103-111, 120-126, 155-158, 167-173, 182-188, 197-203, 212-218
🏁 Script executed:
#!/bin/bash
# Find lingering references to the old path/type.
rg -nP 'openmeter/openmeter/meterevent|meterevent\.ListEventsV2Params'
Length of output: 1984
Incomplete migration: remaining references to meterevent.ListEventsV2Params
Several parts of the codebase are still importing and using the old meterevent.ListEventsV2Params
type instead of streaming.ListEventsV2Params
. These must be updated to complete the migration:
• openmeter/server/server_test.go (line 38)
• openmeter/server/router/router.go (lines 37–38)
• cmd/server/wire_gen.go (line 21)
• cmd/server/wire.go (line 24)
• openmeter/meterevent/httphandler/mapping.go (lines 13, 17, 20, 23)
• openmeter/meterevent/httphandler/event.go (line 12)
• openmeter/meterevent/httphandler/event_v2.go (lines 8, 16)
• openmeter/meterevent/httphandler/handler.go (line 8)
• openmeter/meterevent/adapter/event.go (lines 9, 78)
• openmeter/meterevent/adapter/adapter.go (line 6)
• app/common/meterevent.go (lines 8–9)
Please replace all imports of
"github.com/openmeterio/openmeter/openmeter/meterevent"
and any meterevent.ListEventsV2Params
usages with
"github.com/openmeterio/openmeter/streaming"
and
streaming.ListEventsV2Params
respectively, to ensure consistency across the codebase.
🤖 Prompt for AI Agents
In openmeter/streaming/clickhouse/event_query_v2_test.go around lines 31–34, the
test still references the old meterevent.ListEventsV2Params type and import;
replace the import path "github.com/openmeterio/openmeter/openmeter/meterevent"
with "github.com/openmeterio/openmeter/streaming" and change any
meterevent.ListEventsV2Params usages to streaming.ListEventsV2Params; apply the
same replacement across the other listed files (openmeter/server/server_test.go,
openmeter/server/router/router.go, cmd/server/wire_gen.go, cmd/server/wire.go,
openmeter/meterevent/httphandler/*.go, openmeter/meterevent/adapter/*.go,
app/common/meterevent.go) so all imports and type references consistently use
the streaming package and streaming.ListEventsV2Params.
3c56819
to
2f23e03
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (3)
openmeter/meterevent/adapter/event.go (3)
99-127
: v2 customerId filter: guard empties and AND-combine with existing Subject instead of setting a second operator.
- If CustomerID.Eq/In resolves to an empty set, we should return no events rather than broadening the query.
- Setting Subject.In when Subject already has an operator risks multi-operator misuse; combine via AND with the existing Subject filter.
Apply this diff:
// Resolve customer IDs to customers if provided if params.CustomerID != nil { customerIDs := make([]string, 0) if params.CustomerID.Eq != nil { customerIDs = append(customerIDs, *params.CustomerID.Eq) } if params.CustomerID.In != nil { customerIDs = append(customerIDs, *params.CustomerID.In...) } + // Explicit empty filter => no events + if len(customerIDs) == 0 { + return pagination.NewResult([]meterevent.Event{}), nil + } + customerList, err := a.customerService.ListCustomers(ctx, customer.ListCustomersInput{ Namespace: params.Namespace, CustomerIDs: customerIDs, }) if err != nil { return pagination.Result[meterevent.Event]{}, fmt.Errorf("list customers: %w", err) } var subjectKeys []string for _, c := range customerList.Items { subjectKeys = append(subjectKeys, c.GetUsageAttribution().SubjectKeys...) } - if listParams.Subject == nil { - listParams.Subject = &filter.FilterString{} - } - - listParams.Subject.In = &subjectKeys + // Mapping produced no subjects => no events + if len(subjectKeys) == 0 { + return pagination.NewResult([]meterevent.Event{}), nil + } + + // Combine with existing Subject using AND to preserve single-operator semantics + if listParams.Subject == nil { + listParams.Subject = &filter.FilterString{In: &subjectKeys} + } else { + existing := *listParams.Subject + and := []filter.FilterString{ + existing, + {In: &subjectKeys}, + } + listParams.Subject = &filter.FilterString{And: &and} + } }
38-60
: Treat explicitly empty CustomerIDs as “no results” (avoid broad v1 queries).If CustomerIDs is provided but empty, the current code skips filtering and returns all events. That’s surprising and potentially expensive; explicit empty filters should yield an empty result set.
Apply this diff:
- // Resolve customer IDs to customers if provided - if params.CustomerIDs != nil && len(*params.CustomerIDs) > 0 { + // Resolve customer IDs to customers if provided + if params.CustomerIDs != nil { + // Explicitly provided empty filter => return no events + if len(*params.CustomerIDs) == 0 { + return []meterevent.Event{}, nil + } customerList, err := a.customerService.ListCustomers(ctx, customer.ListCustomersInput{ Namespace: params.Namespace, CustomerIDs: *params.CustomerIDs, }) if err != nil { return nil, fmt.Errorf("list customers: %w", err) } customers := make([]streaming.Customer, 0, len(customerList.Items)) for _, c := range customerList.Items { customers = append(customers, c) } // If no customers are found, return an empty list if len(customers) == 0 { return []meterevent.Event{}, nil } listParams.Customers = &customers }
141-142
: meterevent.Event must implement pagination.Item (Cursor method missing)The call to
pagination.NewResult(meterEvents)
requires thatmeterevent.Event
satisfy theItem
interface (defined inpkg/pagination/v2/pagination.go
), which mandates aCursor() Cursor
method. Currently, theEvent
struct has no such method, so the code will not compile.Please add a
Cursor
implementation on theEvent
type—e.g., inopenmeter/meterevent/service.go
—so that it returns the appropriatepagination.Cursor
. For example:diff --git a/openmeter/meterevent/service.go b/openmeter/meterevent/service.go index abc1234..def5678 100644 --- a/openmeter/meterevent/service.go +++ b/openmeter/meterevent/service.go @@ type Event struct { ID string Timestamp time.Time // other fields... } + +// Cursor implements pagination.Item for Event. +func (e Event) Cursor() pagination.Cursor { + return pagination.Cursor{ + Time: e.Timestamp, + ID: e.ID, + } +}• File requiring change:
openmeter/meterevent/service.go
(addCursor() pagination.Cursor
)
• Why:pagination.NewResult[T Item]
requiresT
to implementCursor() Cursor
pkg/pagination/v2/pagination.go- Without this method,
meterevent.Event
does not satisfy theItem
interface and the call on lines 141–142 ofopenmeter/meterevent/adapter/event.go
will fail.
🧹 Nitpick comments (9)
openmeter/apiconverter/filter.gen.go (1)
63-76
: Add focused tests for the new convertersPlease add unit tests that assert:
- Eq only → Eq set, In nil
- In only → In set, Eq nil
- Both set → both propagated
- Ptr nil → returns nil
This guards against accidental generator/config changes.
Example test outline (outside this file):
func TestConvertIDExact(t *testing.T) { eq := "01J8..." // ULID sample in := []string{"01J8...", "01J9..."} got := apiconverter.ConvertIDExact(api.FilterIDExact{Eq: &eq, In: &in}) if got.Eq != &eq { t.Fatalf("Eq mismatch") } if got.In != &in { t.Fatalf("In mismatch") } } func TestConvertIDExactPtr_Nil(t *testing.T) { if apiconverter.ConvertIDExactPtr(nil) != nil { t.Fatalf("expected nil") } }openmeter/apiconverter/filter.go (1)
13-17
: Confirm intent of// goverter:ignoreMissing
for IDExact; consider documenting it inlineWith
ignoreMissing
, ifFilterIDExact
gains new fields later (e.g.,$ne
,$nin
), they’ll be silently dropped in the conversion, which may hide capability drift. If the intent is to strictly allow only$eq
/$in
for IDs, it’s fine—recommend an explicit comment to make this future-proof for readers.Apply this small doc tweak:
- // goverter:ignoreMissing + // goverter:ignoreMissing + // NOTE: FilterIDExact intentionally supports only $eq and $in for ID fields. + // If the API adds more operators later, they are ignored by design here. ConvertIDExact func(api.FilterIDExact) filter.FilterString ConvertIDExactPtr func(*api.FilterIDExact) *filter.FilterStringapi/openapi.yaml (2)
5005-5017
: Tighten query param validation and clarify CSV semantics forcustomerId
.Good addition. Recommend hardening and clarifying usage for clients.
Apply this diff:
- - name: customerId + - name: customerId in: query required: false - description: The event customer ID. + description: Filter by customer ULIDs. When multiple values are provided, they must be comma-separated (style=form, explode=false). schema: type: array + minItems: 1 + uniqueItems: true items: type: string pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ description: ULID (Universally Unique Lexicographically Sortable Identifier). example: 01G65Z755AFWAKHE12NY0CQ9FH explode: false style: form + example: 01G65Z755AFWAKHE12NY0CQ9FH,01H0SZ755AFWAKHE12NY0CQ9FF
16803-16807
: Event schema does not includecustomerId
; marking asreadOnly
is safe and recommendedI’ve verified that under
components.schemas.Event
(lines ~16098–16104) there is no existingcustomerId
property—so this definition isn’t a duplicate. However, ifcustomerId
on this response model is derived rather than client-supplied, it’s best practice to mark it as read-only.• Apply this diff to clearly indicate that clients should not send this field:
customerId: type: string pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ description: The customer ID if the event is associated with a customer. example: 01G65Z755AFWAKHE12NY0CQ9FH + readOnly: true
api/client/javascript/src/client/schemas.ts (2)
5254-5263
: Remove nullable from optional filter schema propertiesThe
?
modifier already indicates that a property may be omitted, so including| null
introduces an extra “null” state and inconsistent encoding. Please drop| null
from all optional filter properties inapi/client/javascript/src/client/schemas.ts
.• FilterIDExact (lines 5258–5263)
• FilterString (lines 5265–5293)
• FilterTime (lines 5299–5320)Apply within each filter block, for example:
- /** @description A filter for an ID (ULID) field allowing only equality or inclusion. */ + /** @description A filter for an ID (ULID) field allowing only equality or inclusion. */ @@ - $eq?: string | null + $eq?: string @@ - $in?: string[] | null + $in?: string[]And similarly remove
| null
from all of:• FilterString:
$eq
,$ne
,$in
,$nin
,$like
,$nlike
,$ilike
,$nilike
,$gt
,$gte
,$lt
,$lte
,$and
,$or
• FilterTime:$gt
,$gte
,$lt
,$lte
,$and
,$or
Ensure consistency across the file by running:
rg -nP '\$[A-Za-z]+\?: [^|\n]+\| null' api/client/javascript/src/client/schemas.ts
and updating each match to drop the
| null
union.
10933-10933
: Normalize “ID” capitalization in exported type aliasesWe currently have several exported aliases whose names don’t exactly match the schema’s casing:
FilterIdExact
→components['schemas']['FilterIDExact']
IdResource
→components['schemas']['IDResource']
UlidOrExternalKey
→components['schemas']['ULIDOrExternalKey']
To eliminate this drift, pick one of the following patterns:
Option A: Align aliases to the exact schema names (no deprecated variants):
-export type FilterIdExact = components['schemas']['FilterIDExact'] +export type FilterIDExact = components['schemas']['FilterIDExact'] -export type IdResource = components['schemas']['IDResource'] +export type IDResource = components['schemas']['IDResource'] -export type UlidOrExternalKey = components['schemas']['ULIDOrExternalKey'] +export type ULIDOrExternalKey = components['schemas']['ULIDOrExternalKey']Option B: Retain the existing
Id
/Ulid
forms for backward-compatibility, but mark them deprecated:-export type FilterIdExact = components['schemas']['FilterIDExact'] +export type FilterIDExact = components['schemas']['FilterIDExact'] +/** @deprecated Use FilterIDExact */ +export type FilterIdExact = FilterIDExact -export type IdResource = components['schemas']['IDResource'] +export type IDResource = components['schemas']['IDResource'] +/** @deprecated Use IDResource */ +export type IdResource = IDResource -export type UlidOrExternalKey = components['schemas']['ULIDOrExternalKey'] +export type ULIDOrExternalKey = components['schemas']['ULIDOrExternalKey'] +/** @deprecated Use ULIDOrExternalKey */ +export type UlidOrExternalKey = ULIDOrExternalKeyPlease apply the chosen convention consistently across this file—and any others exporting schema aliases—to reduce surprises and ensure a uniform developer experience.
api/openapi.cloud.yaml (1)
5004-5016
: Tighten v1 customerId array param: require at least one item and uniqueness; add a CSV example.Prevents empty/duplicate ULIDs and clarifies CSV serialization for clients.
- name: customerId in: query required: false description: The event customer ID. schema: type: array + minItems: 1 + uniqueItems: true items: type: string pattern: ^[0-7][0-9A-HJKMNP-TV-Za-hjkmnp-tv-z]{25}$ description: ULID (Universally Unique Lexicographically Sortable Identifier). example: 01G65Z755AFWAKHE12NY0CQ9FH explode: false style: form + example: 01G65Z755AFWAKHE12NY0CQ9FH,01H8F6W6Z6V0V1NX5X3S9B1ABCopenmeter/meterevent/adapter/event.go (2)
191-236
: Validation is clear; minor perf consideration if EventType is unique.If EventType is guaranteed unique per namespace, you can break after the first match to avoid extra Parse calls. If not guaranteed, keep as is.
239-284
: Good: stable-pointer fix and subject cache; consider batching lookups.
- Using a stable local copy for CustomerID avoids pointer aliasing bugs. Nice.
- Optional: pre-deduplicate subjects and perform a batched lookup to reduce N calls; you already noted the FIXME.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (11)
api/client/javascript/src/client/schemas.ts
(4 hunks)api/openapi.cloud.yaml
(4 hunks)api/openapi.yaml
(4 hunks)api/spec/src/events.tsp
(3 hunks)api/spec/src/filter.tsp
(1 hunks)openmeter/apiconverter/filter.gen.go
(1 hunks)openmeter/apiconverter/filter.go
(1 hunks)openmeter/meterevent/adapter/event.go
(4 hunks)openmeter/meterevent/httphandler/mapping.go
(2 hunks)openmeter/meterevent/service.go
(4 hunks)openmeter/streaming/clickhouse/event_query_v2_test.go
(10 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- api/spec/src/events.tsp
- openmeter/meterevent/httphandler/mapping.go
- openmeter/streaming/clickhouse/event_query_v2_test.go
- openmeter/meterevent/service.go
- api/spec/src/filter.tsp
🧰 Additional context used
🧬 Code graph analysis (3)
openmeter/apiconverter/filter.gen.go (3)
openmeter/apiconverter/filter.go (2)
ConvertIDExact
(16-16)ConvertIDExactPtr
(17-17)api/api.gen.go (2)
FilterIDExact
(3108-3114)FilterString
(3144-3186)pkg/filter/filter.go (1)
FilterString
(32-47)
openmeter/meterevent/adapter/event.go (8)
openmeter/meterevent/service.go (3)
ListEventsParams
(24-45)Event
(48-69)ListEventsV2Params
(125-148)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-85)openmeter/customer/customer.go (2)
ListCustomersInput
(181-198)GetCustomerByUsageAttributionInput
(163-166)pkg/pagination/v2/pagination.go (2)
Result
(13-19)NewResult
(23-35)pkg/filter/filter.go (1)
FilterString
(32-47)openmeter/streaming/connector.go (1)
RawEvent
(24-36)openmeter/meter/parse.go (1)
ParseEventString
(68-70)pkg/models/errors.go (1)
IsGenericNotFoundError
(57-65)
openmeter/apiconverter/filter.go (2)
api/api.gen.go (2)
FilterString
(3144-3186)FilterIDExact
(3108-3114)pkg/filter/filter.go (1)
FilterString
(32-47)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: Artifacts / Container image
- GitHub Check: Build
- GitHub Check: Migration Checks
- GitHub Check: Lint
- GitHub Check: Code Generators
- GitHub Check: Test
- GitHub Check: Analyze (go)
🔇 Additional comments (9)
openmeter/apiconverter/filter.gen.go (1)
63-76
: IDExact converters map Eq/In correctly and are nil-safe — LGTMThe generated
ConvertIDExact
/ConvertIDExactPtr
mirror the pattern used by other converters, correctly wiringEq
andIn
and preserving pointer semantics. No functional or performance concerns here.openmeter/apiconverter/filter.go (1)
13-17
: ConvertIDExactPtr correctly wired for CustomerId filter mappingThe HTTP handler in
openmeter/meterevent/httphandler/mapping.go
already invokes the new converter forcustomerId
:
- openmeter/meterevent/httphandler/mapping.go:34 –
p.CustomerID = apiconverter.ConvertIDExactPtr(params.Filter.CustomerId)
No further changes are required.
api/openapi.yaml (1)
11317-11318
: LGTM:customerId
filter is correctly wired to a dedicated exact/inclusion schema.api/client/javascript/src/client/schemas.ts (1)
5474-5478
: Next Steps: VerifyEvent.customerId
DefinitionBefore updating docs on
IngestedEvent.customerId
, let’s confirm whethercomponents['schemas']['Event']
also declares acustomerId
field and where it lives. Please run:# Show around the suspected `Event` definition (near line 3843) sed -n '3800,3860p' api/client/javascript/src/client/schemas.ts | sed -n '1,20p'Once we know if—and in what context—
Event
exposescustomerId
, we can decide:
- If both
Event
andIngestedEvent
havecustomerId
but with different semantics, call out both in the doc (e.g. “Event’scustomerId
comes directly from payload; IngestedEvent’scustomerId
is derived server-side”).- If only
IngestedEvent
has it, update the doc as suggested.- If the fields are identical, note that and leave a single unified description.
Please share the snippet around the
Event
interface so we can finalize the doc tweak.api/openapi.cloud.yaml (2)
11316-11317
: LGTM: v2 filter wires customerId to a dedicated ULID-exact schema.Referencing
FilterIDExact
forcustomerId
in v2 keeps format/semantics consistent and reusable across endpoints.
16443-16447
: No duplicatecustomerId
inEvent
schema – no action needed.I verified that
components.schemas.Event
in api/openapi.cloud.yaml does not define acustomerId
property, so there’s no redundancy or conflict to resolve. You can safely disregard the original suggestion.Likely an incorrect or invalid review comment.
openmeter/meterevent/adapter/event.go (3)
25-36
: Good: v1 now passes through IngestedAt filters (regression fixed).The addition of IngestedAtFrom/IngestedAtTo to streaming.ListEventsParams restores v1 ingestedAt filtering. Looks correct.
86-97
: v2 params mapping looks consistent with streaming schema.Namespace, ClientID, Cursor, Limit, ID/Source/Subject/Type/Time/IngestedAt are correctly forwarded.
166-188
: Mapping RawEvent → Event looks correct and preserves nullable customerId.Field-by-field copy, including CustomerID pointer and timestamps, is sound.
2f23e03
to
542ac04
Compare
542ac04
to
66a16da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
♻️ Duplicate comments (1)
openmeter/streaming/clickhouse/event_query_v2_test.go (1)
33-36
: Migration to streaming.ListEventsV2Params looks consistent hereAll test cases now construct params via streaming.ListEventsV2Params. Good alignment with the refactor.
Also applies to: 45-51, 60-66, 75-81, 90-97, 107-113, 178-181, 190-196, 205-210, 220-226, 235-251
🧹 Nitpick comments (11)
api/spec/src/events.tsp (2)
309-313
: Include customerId in IngestedEvent example to showcase the new fieldYou introduced
customerId?: ULID;
but it’s missing from the@example
above. Adding it helps users see the end-to-end mapping result.Suggested example tweak (outside this hunk):
@@ @example(#{ event: #{ id: "5c10fade-1c9e-4d6c-8275-c52c36731d3c", source: "service-name", specversion: "1.0", type: "prompt", subject: "customer-id", time: DateTime.fromISO("2023-01-01T01:01:01.001Z"), }, + customerId: "01HZXABCD3M6E7K8J9PQRS1TVW", ingestedAt: DateTime.fromISO("2023-01-01T01:01:01.001Z"), storedAt: DateTime.fromISO("2023-01-01T01:01:02.001Z"), })
Optional: In the field doc, you might clarify when it’s omitted (e.g., “omitted if no customer mapping found”).
183-187
: AddcustomerId
to the filter example and clarify default AND semanticsTo help clients adopt the new V2 filters, please mirror the
customerId
field in the sample and call out how top-level fields are combined:• In
api/spec/src/events.tsp
, update the@example
block to include a valid ULID forcustomerId
.
• Add a brief note above the example stating that top-level filter properties are AND-combined by default, while$and
/$or
operators allow grouping for more complex logic.Example diff:
@@ @example(#{ subject: #{ $eq: "my-event-subject" }, + customerId: #{ $eq: "01HZXABCD3M6E7K8J9PQRS1TVW" }, type: #{ $eq: "my-event-type" }, @@
No changes are needed in
api/spec/src/filter.tsp
: theFilterIDExact
doc (around line 93) already specifies support for equality ($eq
) and inclusion ($in
).openmeter/streaming/clickhouse/queryhelper.go (1)
83-88
: Nit: minor allocation reductionPassing subjects directly is fine, but if this runs hot, consider deduping subjects to shrink IN lists when both Customers and Subjects are applied upstream (two INs). Low priority.
openmeter/streaming/eventparams.go (2)
63-87
: Consider exposing CustomerID filter at the streaming layer (parity with API v2)Today, ListEventsV2Params accepts Customers (materialized set) rather than the API’s CustomerId filter. It works because the adapter resolves IDs → Customers, but it couples two layers. Consider adding CustomerID (exact) here and optionally keeping Customers for pre-resolved cases.
89-148
: Optional: mirror v1 ClientID validation in v2v1 Validate rejects empty ClientID when provided; v2 currently doesn’t. Aligning behavior avoids surprises.
if p.Cursor != nil { if err := p.Cursor.Validate(); err != nil { errs = append(errs, fmt.Errorf("cursor: %w", err)) } } +if p.ClientID != nil && *p.ClientID == "" { + errs = append(errs, errors.New("client id cannot be empty")) +}openmeter/meterevent/adapter/event.go (3)
40-53
: Treat explicitly empty v1 CustomerIDs as “no results”If CustomerIDs is set but empty, the current code skips setting a filter and returns all events. Consider short-circuiting to return an empty list to match user intent and prevent heavy scans.
- // Resolve customer IDs to customers if provided - if params.CustomerIDs != nil && len(*params.CustomerIDs) > 0 { + // Resolve customer IDs to customers if provided + if params.CustomerIDs != nil { + if len(*params.CustomerIDs) == 0 { + return []meterevent.Event{}, nil + } customers, err := a.listCustomers(ctx, params.Namespace, *params.CustomerIDs) if err != nil { return nil, fmt.Errorf("list customers: %w", err) } // If no customers are found, return an empty list if len(customers) == 0 { return []meterevent.Event{}, nil } listParams.Customers = &customers - } + }
256-302
: Batch attribution lookups to reduce N+1 callsenrichEventsWithCustomerID issues one GetCustomerByUsageAttribution per event. Consider batching by unique subjects (or adding a ListCustomersBySubjects API) to cut latency and load. Cache helps, but batching first occurrence of each subject would still be a big win.
I can draft a batched path that collects unique subjects, queries once, populates the cache, then assigns CustomerID.
122-160
: listCustomers: tighten “not found” detection and avoid O(n²)Length check is unnecessary; you already scan inputs to find misses. Build a set of returned IDs and compute differences in O(n).
- if len(customerList.Items) != len(customerIDs) { - var notFoundCustomerIDs []string - for _, c := range customerIDs { - _, found := lo.Find(customerList.Items, func(item customer.Customer) bool { - return item.ID == c - }) - if !found { - notFoundCustomerIDs = append(notFoundCustomerIDs, c) - } - } - if len(notFoundCustomerIDs) > 0 { - return nil, models.NewGenericValidationError( - fmt.Errorf("customers not found in namespace %s: %v", namespace, strings.Join(notFoundCustomerIDs, ", ")), - ) - } - } + // Verify all requested IDs were found + if len(customerList.Items) > 0 { + found := make(map[string]struct{}, len(customerList.Items)) + for _, it := range customerList.Items { + found[it.ID] = struct{}{} + } + var missing []string + for _, id := range customerIDs { + if _, ok := found[id]; !ok { + missing = append(missing, id) + } + } + if len(missing) > 0 { + return nil, models.NewGenericValidationError( + fmt.Errorf("customers not found in namespace %s: %s", namespace, strings.Join(missing, ", ")), + ) + } + }openmeter/streaming/clickhouse/meter_query_test.go (1)
28-48
: Add tests for empty-mapping edges
- Customer with no SubjectKeys → expect WHERE 1 = 0 (or no rows).
- Customers slice provided but empty → expect no rows (adapter-level contract).
- Duplicate subject keys across customers → ensure CASE and IN don’t duplicate args or break grouping.
I can add table-driven cases mirroring these scenarios.
Also applies to: 236-256
openmeter/streaming/clickhouse/event_query_v2.go (1)
101-111
: Cursor predicate: prefer the canonical form for readabilityCurrent form is correct but reads oddly. Canonical “seek” predicate for DESC(time), DESC(id) is:
time < cursorTime OR (time = cursorTime AND id < cursorID)- query.Where( - query.LessEqualThan(timeColumn, q.Params.Cursor.Time.Unix()), - query.Or( - query.LessThan(timeColumn, q.Params.Cursor.Time.Unix()), - query.LessThan("id", q.Params.Cursor.ID), - ), - ) + query.Where( + query.Or( + query.LessThan(timeColumn, q.Params.Cursor.Time.Unix()), + query.And( + query.Equal(timeColumn, q.Params.Cursor.Time.Unix()), + query.LessThan("id", q.Params.Cursor.ID), + ), + ), + )openmeter/streaming/clickhouse/event_query_v2_test.go (1)
90-101
: Cursor time arg uses Unix seconds while other time filters pass time.TimeHere the cursor binds cursorTime.Unix(), elsewhere time filters bind time.Time. This can be fine if the builder consistently casts on the SQL side, but it’s easy to drift. Consider normalizing the bound type (either always Unix or always time.Time) to reduce surprises.
Double-check toSQL(): if it expects int64 for cursor comparisons, keep it consistent and document it; otherwise bind time.Time for cursor as well.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (16)
api/client/javascript/src/client/schemas.ts
(4 hunks)api/openapi.cloud.yaml
(4 hunks)api/openapi.yaml
(4 hunks)api/spec/src/events.tsp
(3 hunks)api/spec/src/filter.tsp
(1 hunks)openmeter/apiconverter/filter.gen.go
(1 hunks)openmeter/apiconverter/filter.go
(1 hunks)openmeter/meterevent/adapter/event.go
(4 hunks)openmeter/meterevent/httphandler/mapping.go
(2 hunks)openmeter/meterevent/service.go
(4 hunks)openmeter/streaming/clickhouse/event_query_test.go
(1 hunks)openmeter/streaming/clickhouse/event_query_v2.go
(4 hunks)openmeter/streaming/clickhouse/event_query_v2_test.go
(13 hunks)openmeter/streaming/clickhouse/meter_query_test.go
(7 hunks)openmeter/streaming/clickhouse/queryhelper.go
(1 hunks)openmeter/streaming/eventparams.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (9)
- openmeter/streaming/clickhouse/event_query_test.go
- openmeter/apiconverter/filter.gen.go
- openmeter/meterevent/httphandler/mapping.go
- openmeter/meterevent/service.go
- openmeter/apiconverter/filter.go
- api/openapi.yaml
- api/openapi.cloud.yaml
- api/spec/src/filter.tsp
- api/client/javascript/src/client/schemas.ts
🧰 Additional context used
🧬 Code graph analysis (5)
openmeter/streaming/eventparams.go (3)
api/api.gen.go (6)
Subject
(6315-6340)Customer
(2165-2215)ListEventsParams
(7769-7809)FilterString
(3141-3183)FilterTime
(3186-3204)ListEventsV2Params
(8350-8384)openmeter/meterevent/service.go (2)
ListEventsParams
(24-45)ListEventsV2Params
(125-148)pkg/pagination/v2/cursor.go (1)
Cursor
(11-14)
openmeter/meterevent/adapter/event.go (6)
openmeter/meterevent/service.go (3)
ListEventsParams
(24-45)Event
(48-69)ListEventsV2Params
(125-148)openmeter/streaming/eventparams.go (2)
ListEventsParams
(13-34)ListEventsV2Params
(64-87)openmeter/customer/customer.go (2)
ListCustomersInput
(181-198)GetCustomerByUsageAttributionInput
(163-166)pkg/models/errors.go (2)
NewGenericValidationError
(138-140)IsGenericNotFoundError
(57-65)openmeter/streaming/connector.go (1)
RawEvent
(24-36)openmeter/meter/parse.go (1)
ParseEventString
(68-70)
openmeter/streaming/clickhouse/event_query_v2.go (2)
openmeter/meterevent/service.go (1)
ListEventsV2Params
(125-148)openmeter/streaming/eventparams.go (1)
ListEventsV2Params
(64-87)
openmeter/streaming/clickhouse/queryhelper.go (3)
api/client/javascript/src/client/schemas.ts (1)
Customer
(10856-10856)api/api.gen.go (1)
Customer
(2165-2215)api/client/go/client.gen.go (1)
Customer
(1995-2045)
openmeter/streaming/clickhouse/event_query_v2_test.go (4)
openmeter/meterevent/service.go (1)
ListEventsV2Params
(125-148)openmeter/streaming/eventparams.go (1)
ListEventsV2Params
(64-87)api/api.gen.go (3)
ListEventsV2Params
(8350-8384)Customer
(2165-2215)CustomerUsageAttribution
(2350-2353)pkg/models/model.go (2)
ManagedResource
(23-31)NamespacedModel
(204-206)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (9)
openmeter/streaming/clickhouse/queryhelper.go (1)
13-43
: Good: CASE builder is safe and robust
- Empty-mapping guard avoids invalid SQL.
- Proper escaping via sqlbuilder.Escape prevents injection.
- Returning a single projection keeps integration simple with callers.
openmeter/streaming/eventparams.go (1)
12-34
: LGTM: v1 params cover new customer supportIncluding Customers in ListEventsParams aligns v1 path with ClickHouse helpers and adapter logic.
openmeter/streaming/clickhouse/meter_query_test.go (4)
49-51
: Nice: subject filter moved to IN with slice argumentMatches new helper semantics and simplifies arg handling.
212-214
: LGTM: single-subject still uses INKeeps query shape consistent irrespective of cardinality.
294-296
: CASE mapping for customer_id matches expected semanticsWHEN branches for subject-to-customer mapping look correct, with a sane ELSE ''.
363-365
: Good coverage of combined customer + subject filtersTwo IN predicates composed with AND verifies intersection semantics.
openmeter/streaming/clickhouse/event_query_v2.go (1)
38-41
: Selecting customer_id only when Customers are present is correctKeeps SELECT lean when customer mapping isn’t requested.
openmeter/streaming/clickhouse/event_query_v2_test.go (2)
150-151
: IN (?) with slice arg — confirm driver expansionThe SQL uses IN (?) with a []string arg. Some ClickHouse drivers expand slices, others expect array syntax. If the query builder/driver doesn’t expand this, it will fail at runtime.
Run (or confirm in code) that your ClickHouse driver expands []string for IN (?) or that you explicitly bind as an Array parameter. If not, consider emitting IN (? , ? , ?) or use a helper that expands placeholders for slices.
117-152
: SQL qualification consistency in customer filter testIn openmeter/streaming/clickhouse/event_query_v2_test.go (lines 117–152), the
wantSQL
mixes qualification styles:
- The
CASE WHEN
expressions refer toom_events.subject
(unqualified).- The
WHERE
clause usesopenmeter.om_events.subject
(schema-qualified).For clarity and consistency, pick one style. For example, you could fully qualify both occurrences:
- CASE WHEN om_events.subject = 'subject1' THEN … + CASE WHEN openmeter.om_events.subject = 'subject1' THEN …or drop the schema prefix everywhere:
- … FROM openmeter.om_events WHERE namespace = ? AND openmeter.om_events.subject IN (?) + … FROM om_events WHERE namespace = ? AND subject IN (?)Likely an incorrect or invalid review comment.
customerId
to event list v1 and v2 APIscustomerId: []ULID
filter to event list v1 APIno customer found for event subject: customer-13
)Summary by CodeRabbit
New Features
Documentation