Skip to content

Commit 3054593

Browse files
authored
feat(bigtable): first_response_latencies and connectivity_error_count metrics (#10616)
* feat(bigtable): Adding first_response_latencies and connectivity_error_count metrics * test(bigtable): Fixing tests
1 parent 233a62b commit 3054593

File tree

4 files changed

+277
-30
lines changed

4 files changed

+277
-30
lines changed

bigtable/bigtable.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,6 +1019,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
10191019
return errNegativeRowLimit
10201020
}
10211021

1022+
firstResponseRecorded := false
10221023
err = gaxInvokeWithRecorder(ctx, mt, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
10231024
if rowLimitSet && numRowsRead >= intialRowLimit {
10241025
return nil
@@ -1071,6 +1072,10 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
10711072
for {
10721073
proto.Reset(res)
10731074
err := stream.RecvMsg(res)
1075+
if !firstResponseRecorded && (err == nil || err == io.EOF) {
1076+
firstResponseRecorded = true
1077+
mt.currOp.setFirstRespTime(time.Now())
1078+
}
10741079
if err == io.EOF {
10751080
*trailerMD = stream.Trailer()
10761081
break

bigtable/integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,7 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
10081008
metricNameOperationLatencies,
10091009
metricNameAttemptLatencies,
10101010
metricNameServerLatencies,
1011+
metricNameFirstRespLatencies,
10111012
}
10121013

10131014
// Try for 5m with 10s sleep between retries

bigtable/metrics.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,9 @@ type opTracer struct {
423423

424424
startTime time.Time
425425

426+
// Only for ReadRows. Time when the response headers are received in a streaming RPC.
427+
firstRespTime time.Time
428+
426429
// gRPC status code of last completed attempt
427430
status string
428431

@@ -435,6 +438,10 @@ func (o *opTracer) setStartTime(t time.Time) {
435438
o.startTime = t
436439
}
437440

441+
func (o *opTracer) setFirstRespTime(t time.Time) {
442+
o.firstRespTime = t
443+
}
444+
438445
func (o *opTracer) setStatus(status string) {
439446
o.status = status
440447
}
@@ -666,6 +673,13 @@ func (mt *builtinMetricsTracer) recordOperationCompletion() {
666673
opLatAttrs, _ := mt.toOtelMetricAttrs(metricNameOperationLatencies)
667674
mt.instrumentOperationLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributeSet(opLatAttrs))
668675

676+
// Record first_reponse_latencies
677+
firstRespLatAttrs, _ := mt.toOtelMetricAttrs(metricNameFirstRespLatencies)
678+
if mt.method == metricMethodPrefix+methodNameReadRows {
679+
elapsedTimeMs = convertToMs(mt.currOp.firstRespTime.Sub(mt.currOp.startTime))
680+
mt.instrumentFirstRespLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributeSet(firstRespLatAttrs))
681+
}
682+
669683
// Record retry_count
670684
retryCntAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount)
671685
if mt.currOp.attemptCount > 1 {

0 commit comments

Comments
 (0)