Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 9c27421

Browse files
feat(client): blocking LineSenderPool (#53)
Breaking change of an experimental API The pool now blocks if there are already too many senders in use. Pooled senders are released implicitly, via the Close call.
1 parent 53a2d35 commit 9c27421

File tree

9 files changed

+334
-110
lines changed

9 files changed

+334
-110
lines changed

‎README.md‎

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Golang client for QuestDB's [Influx Line Protocol](https://questdb.io/docs/refer
99
The library requires Go 1.19 or newer.
1010

1111
Features:
12-
* Context-aware API.
12+
* [Context](https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go)-aware API.
1313
* Optimized for batch writes.
1414
* Supports TLS encryption and ILP authentication.
1515
* Automatic write retries and connection reuse for ILP over HTTP.
@@ -43,23 +43,40 @@ func main() {
4343
}
4444
// Make sure to close the sender on exit to release resources.
4545
defer sender.Close(ctx)
46+
4647
// Send a few ILP messages.
48+
tradedTs, err := time.Parse(time.RFC3339, "2022年08月06日T15:04:05.123456Z")
49+
if err != nil {
50+
log.Fatal(err)
51+
}
4752
err = sender.
48-
Table("trades").
49-
Symbol("name", "test_ilp1").
50-
Float64Column("value", 12.4).
51-
AtNow(ctx)
53+
Table("trades_go").
54+
Symbol("pair", "USDGBP").
55+
Symbol("type", "buy").
56+
Float64Column("traded_price", 0.83).
57+
Float64Column("limit_price", 0.84).
58+
Int64Column("qty", 100).
59+
At(ctx, tradedTs)
60+
if err != nil {
61+
log.Fatal(err)
62+
}
63+
64+
tradedTs, err = time.Parse(time.RFC3339, "2022年08月06日T15:04:06.987654Z")
5265
if err != nil {
5366
log.Fatal(err)
5467
}
5568
err = sender.
56-
Table("trades").
57-
Symbol("name", "test_ilp2").
58-
Float64Column("value", 11.4).
59-
At(ctx, time.Now().UnixNano())
69+
Table("trades_go").
70+
Symbol("pair", "GBPJPY").
71+
Symbol("type", "sell").
72+
Float64Column("traded_price", 135.97).
73+
Float64Column("limit_price", 0.84).
74+
Int64Column("qty", 400).
75+
At(ctx, tradedTs)
6076
if err != nil {
6177
log.Fatal(err)
6278
}
79+
6380
// Make sure that the messages are sent over the network.
6481
err = sender.Flush(ctx)
6582
if err != nil {
@@ -80,15 +97,15 @@ To connect via TCP, set the configuration string to:
8097
**Warning: Experimental feature designed for use with HTTP senders ONLY**
8198

8299
Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
83-
to cache previously-used `LineSender`s in memory so they can be reused without
84-
having to allocate and instantiate new senders.
100+
to pool previously-used `LineSender`s so they can be reused without having
101+
to allocate and instantiate new senders.
85102

86-
A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders
103+
A LineSenderPool is thread-safe and can be used to concurrently obtain senders
87104
across multiple goroutines.
88105

89106
Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
90107
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
91-
execution block to Release the sender at the end of the goroutine.
108+
execution block to Close the sender at the end of the goroutine.
92109

93110
Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:
94111

@@ -112,7 +129,7 @@ func main() {
112129
}
113130
}()
114131

115-
sender, err := pool.Acquire(ctx)
132+
sender, err := pool.Sender(ctx)
116133
if err != nil {
117134
panic(err)
118135
}
@@ -122,7 +139,8 @@ func main() {
122139
Float64Column("price", 123.45).
123140
AtNow(ctx)
124141

125-
if err := pool.Release(ctx, sender); err != nil {
142+
// Close call returns the sender back to the pool
143+
if err := sender.Close(ctx); err != nil {
126144
panic(err)
127145
}
128146
}

‎export_test.go‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func Messages(s LineSender) string {
6464
}
6565

6666
func MsgCount(s LineSender) int {
67+
if ps, ok := s.(*pooledSender); ok {
68+
hs, _ := ps.wrapped.(*httpLineSender)
69+
return hs.MsgCount()
70+
}
6771
if hs, ok := s.(*httpLineSender); ok {
6872
return hs.MsgCount()
6973
}

‎http_sender.go‎

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"context"
3030
"crypto/tls"
3131
"encoding/json"
32-
"errors"
3332
"fmt"
3433
"io"
3534
"math/big"
@@ -176,7 +175,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
176175
)
177176

178177
if s.closed {
179-
return errors.New("cannot flush a closed LineSender")
178+
return errClosedSenderFlush
180179
}
181180

182181
err := s.buf.LastErr()
@@ -187,7 +186,7 @@ func (s *httpLineSender) flush0(ctx context.Context, closing bool) error {
187186
}
188187
if s.buf.HasTable() {
189188
s.buf.DiscardPendingMsg()
190-
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
189+
return errFlushWithPendingMessage
191190
}
192191

193192
if s.buf.msgCount == 0 {
@@ -285,7 +284,7 @@ func (s *httpLineSender) BoolColumn(name string, val bool) LineSender {
285284

286285
func (s *httpLineSender) Close(ctx context.Context) error {
287286
if s.closed {
288-
return nil
287+
return errDoubleSenderClose
289288
}
290289

291290
var err error
@@ -309,7 +308,7 @@ func (s *httpLineSender) AtNow(ctx context.Context) error {
309308

310309
func (s *httpLineSender) At(ctx context.Context, ts time.Time) error {
311310
if s.closed {
312-
return errors.New("cannot queue new messages on a closed LineSender")
311+
return errClosedSenderAt
313312
}
314313

315314
sendTs := true

‎http_sender_test.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ func TestSenderDoubleClose(t *testing.T) {
601601
assert.NoError(t, err)
602602

603603
err = sender.Close(ctx)
604-
assert.NoError(t, err)
604+
assert.Error(t, err)
605605
}
606606

607607
func TestErrorOnFlushWhenSenderIsClosed(t *testing.T) {

‎sender.go‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ import (
3535
"time"
3636
)
3737

38+
var (
39+
errClosedSenderFlush = errors.New("cannot flush a closed LineSender")
40+
errFlushWithPendingMessage = errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
41+
errClosedSenderAt = errors.New("cannot queue new messages on a closed LineSender")
42+
errDoubleSenderClose = errors.New("double sender close")
43+
)
44+
3845
// LineSender allows you to insert rows into QuestDB by sending ILP
3946
// messages over HTTP or TCP protocol.
4047
//

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /