Reactive Extensions for Go with full Reactive Streams 1.0.4 compliance.
- Type-safe generics - Full Go generics support
- Reactive Streams 1.0.4 - Complete specification compliance
- Backpressure strategies - Buffer, Drop, Latest, Error
- Push & Pull Models - Observable API (push) and Reactive Streams (pull with backpressure)
- Retry and backoff - Fixed, Linear, Exponential backoff with configurable retry limits
- Context cancellation - Graceful shutdown
- Thread-safe - All APIs are safe for concurrent access with proper synchronization
Install the library:
go get github.com/droxer/RxGo@latest
Simple and intuitive API for basic reactive programming:
package main
import (
"context"
"fmt"
"github.com/droxer/RxGo/pkg/observable"
)
func main() {
// Basic usage
obs := observable.Just(1, 2, 3, 4, 5)
obs.Subscribe(context.Background(), observable.NewSubscriber(
func(v int) { fmt.Printf("Got %d\n", v) },
func() { fmt.Println("Done") },
func(err error) { fmt.Printf("Error: %v\n", err) },
))
// Using new operators
numbers := observable.Range(1, 10)
firstFive := observable.Take(numbers, 5)
fmt.Println("\nFirst five numbers:")
firstFive.Subscribe(context.Background(), observable.NewSubscriber(
func(v int) { fmt.Printf("%d ", v) },
func() { fmt.Println("\nCompleted") },
func(err error) { fmt.Printf("Error: %v\n", err) },
))
}
Output:
Got 1
Got 2
Got 3
Got 4
Got 5
Done
First five numbers:
1 2 3 4 5
Completed
Full Reactive Streams 1.0.4 compliance with backpressure support:
package main
import (
"context"
"fmt"
"github.com/droxer/RxGo/pkg/streams"
)
func main() {
publisher := streams.NewCompliantRangePublisher(1, 5)
publisher.Subscribe(context.Background(), streams.NewSubscriber(
func(v int) { fmt.Printf("Received: %d\n", v) },
func(err error) { fmt.Printf("Error: %v\n", err) },
func() { fmt.Println("Completed") },
))
// Using new processors
fmt.Println("\nUsing TakeProcessor:")
numbers := streams.NewCompliantRangePublisher(1, 10)
takeProcessor := streams.NewTakeProcessor[int](5)
numbers.Subscribe(context.Background(), takeProcessor)
takeProcessor.Subscribe(context.Background(), streams.NewSubscriber(
func(v int) { fmt.Printf("%d ", v) },
func(err error) { fmt.Printf("Error: %v\n", err) },
func() { fmt.Println("\nTake completed") },
))
}
Output:
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Completed
Using TakeProcessor:
1 2 3 4 5
Take completed
Handle producer/consumer speed mismatches with four strategies:
import "github.com/droxer/RxGo/pkg/streams"
// Buffer - keep all items in bounded buffer
publisher := streams.NewBufferedPublisher[int](
streams.WithBufferStrategy(streams.Buffer),
streams.WithBufferSize(100),
)
// Drop - discard new items when full
publisher := streams.NewBufferedPublisher[int](
streams.WithBufferStrategy(streams.Drop),
streams.WithBufferSize(50),
)
// Latest - keep only latest item
publisher := streams.NewBufferedPublisher[int](
streams.WithBufferStrategy(streams.Latest),
streams.WithBufferSize(1),
)
// Error - signal error on overflow
publisher := streams.NewBufferedPublisher[int](
streams.WithBufferStrategy(streams.Error),
streams.WithBufferSize(10),
)
You can easily convert between the Observable
and Publisher
APIs using adapters. This is useful when you need to combine the simplicity of the observable
package with the backpressure support of the streams
package.
import (
"github.com/droxer/RxGo/pkg/adapters"
"github.com/droxer/RxGo/pkg/observable"
"github.com/droxer/RxGo/pkg/streams"
)
// Convert an Observable to a Publisher
obs := observable.Just(1, 2, 3)
publisher := adapters.ObservablePublisherAdapter(obs)
// Convert a Publisher to an Observable
pub := streams.NewCompliantRangePublisher(1, 5)
observable := adapters.PublisherToObservableAdapter(pub)
- Architecture - Package structure and design decisions
- Observable API - Simple Observable API examples
- Reactive Streams - Full Reactive Streams 1.0.4 compliance
- Backpressure - Handle producer/consumer speed mismatches
- Push vs Pull Models - Understanding push and pull models with backpressure
- Retry and Backoff - Configurable retry with backoff strategies
- Transformations - Transform and process data streams with both Reactive Streams and Observable API
- Context Cancellation - Graceful cancellation using Go context
- Schedulers - Execution context control
We welcome contributions! Please see CONTRIBUTING.md for guidelines.