Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 443f6ea

Browse files
Refactoring without client intermediary
1 parent 2a823da commit 443f6ea

File tree

8 files changed

+356
-320
lines changed

8 files changed

+356
-320
lines changed

‎sd/eureka/client.go‎

Lines changed: 0 additions & 77 deletions
This file was deleted.

‎sd/eureka/client_test.go‎

Lines changed: 0 additions & 95 deletions
This file was deleted.

‎sd/eureka/integration_test.go‎

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,37 +29,36 @@ func TestIntegration(t *testing.T) {
2929
t.Skip("EUREKA_ADDR is not set")
3030
}
3131

32-
var client Client
33-
{
34-
var fargoConfig fargo.Config
35-
fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr}
36-
fargoConfig.Eureka.PollIntervalSeconds = 1
37-
38-
fargoConnection := fargo.NewConnFromConfig(fargoConfig)
39-
client = NewClient(&fargoConnection)
40-
}
41-
4232
logger := log.NewLogfmtLogger(os.Stderr)
4333
logger = log.With(logger, "ts", log.DefaultTimestamp)
4434

35+
var fargoConfig fargo.Config
36+
// Target Eureka server(s).
37+
fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr}
38+
// How often the subscriber should poll for updates.
39+
fargoConfig.Eureka.PollIntervalSeconds = 1
40+
41+
// Create a Fargo connection and a Eureka registrar.
42+
fargoConnection := fargo.NewConnFromConfig(fargoConfig)
43+
registrar1 := NewRegistrar(&fargoConnection, instanceTest1, log.With(logger, "component", "registrar1"))
44+
4545
// Register one instance.
46-
registrar1 := NewRegistrar(client, instanceTest1, log.With(logger, "component", "registrar1"))
4746
registrar1.Register()
4847
defer registrar1.Deregister()
4948

5049
// This should be enough time for the Eureka server response cache to update.
5150
time.Sleep(time.Second)
5251

53-
// Build a subscriber.
52+
// Build a Eureka subscriber.
5453
factory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
5554
t.Logf("factory invoked for %q", instance)
5655
return endpoint.Nop, nil, nil
5756
}
5857
s := NewSubscriber(
59-
client,
58+
&fargoConnection,
59+
appNameTest,
6060
factory,
6161
log.With(logger, "component", "subscriber"),
62-
instanceTest1.App,
6362
)
6463
defer s.Stop()
6564

@@ -73,7 +72,7 @@ func TestIntegration(t *testing.T) {
7372
}
7473

7574
// Register a second instance
76-
registrar2 := NewRegistrar(client, instanceTest2, log.With(logger, "component", "registrar2"))
75+
registrar2 := NewRegistrar(&fargoConnection, instanceTest2, log.With(logger, "component", "registrar2"))
7776
registrar2.Register()
7877
defer registrar2.Deregister() // In case of exceptional circumstances.
7978

‎sd/eureka/registrar.go‎

Lines changed: 84 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,79 +2,126 @@ package eureka
22

33
import (
44
"fmt"
5+
"net/http"
56
"sync"
67
"time"
78

89
"github.com/hudl/fargo"
910

1011
"github.com/go-kit/kit/log"
12+
"github.com/go-kit/kit/sd"
1113
)
1214

15+
// Matches official Netflix Java client default.
16+
const defaultRenewalInterval = 30 * time.Second
17+
18+
// The methods of fargo.Connection used in this package.
19+
type fargoConnection interface {
20+
RegisterInstance(instance *fargo.Instance) error
21+
DeregisterInstance(instance *fargo.Instance) error
22+
ReregisterInstance(instance *fargo.Instance) error
23+
HeartBeatInstance(instance *fargo.Instance) error
24+
ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
25+
GetApp(name string) (*fargo.Application, error)
26+
}
27+
28+
type fargoUnsuccessfulHTTPResponse struct {
29+
statusCode int
30+
messagePrefix string
31+
}
32+
1333
// Registrar maintains service instance liveness information in Eureka.
1434
type Registrar struct {
15-
client Client
35+
conn fargoConnection
1636
instance *fargo.Instance
1737
logger log.Logger
18-
quit chan struct{}
19-
wgsync.WaitGroup
38+
quitc chan chan struct{}
39+
sync.Mutex
2040
}
2141

42+
var _ sd.Registrar = (*Registrar)(nil)
43+
2244
// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
23-
// Fargo instance.
24-
func NewRegistrar(clientClient, i *fargo.Instance, logger log.Logger) *Registrar {
45+
// Fargo connection and instance. See the integration test for usage examples.
46+
func NewRegistrar(connfargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
2547
return &Registrar{
26-
client: client,
27-
instance: i,
28-
logger: log.With(logger, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)),
48+
conn: conn,
49+
instance: instance,
50+
logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
2951
}
3052
}
3153

32-
// Register implements sd.Registrar interface.
54+
// Register implements sd.Registrar.
3355
func (r *Registrar) Register() {
34-
if err := r.client.Register(r.instance); err != nil {
35-
r.logger.Log("err", err)
36-
} else {
37-
r.logger.Log("action", "register")
56+
r.Lock()
57+
defer r.Unlock()
58+
59+
if r.quitc != nil {
60+
return // Already in the registration loop.
3861
}
3962

40-
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
41-
// User has opted for heartbeat functionality in Eureka.
42-
if r.quit == nil {
43-
r.quit = make(chan struct{})
44-
r.wg.Add(1)
45-
go r.loop()
46-
}
63+
if err := r.conn.RegisterInstance(r.instance); err != nil {
64+
r.logger.Log("during", "Register", "err", err)
4765
}
66+
67+
r.quitc = make(chan chan struct{})
68+
go r.loop()
4869
}
4970

50-
// Deregister implements sd.Registrar interface.
71+
// Deregister implements sd.Registrar.
5172
func (r *Registrar) Deregister() {
52-
if err := r.client.Deregister(r.instance); err != nil {
53-
r.logger.Log("err", err)
54-
} else {
55-
r.logger.Log("action", "deregister")
56-
}
73+
r.Lock()
74+
defer r.Unlock()
5775

58-
if r.quit != nil {
59-
close(r.quit)
60-
r.wg.Wait()
61-
r.quit = nil
76+
if r.quitc == nil {
77+
return // Already deregistered.
6278
}
79+
80+
q := make(chan struct{})
81+
r.quitc <- q
82+
<-q
83+
r.quitc = nil
6384
}
6485

6586
func (r *Registrar) loop() {
66-
tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second)
67-
defer tick.Stop()
68-
defer r.wg.Done()
87+
var renewalInterval time.Duration
88+
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
89+
renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
90+
} else {
91+
renewalInterval = defaultRenewalInterval
92+
}
93+
ticker := time.NewTicker(renewalInterval)
94+
defer ticker.Stop()
6995

7096
for {
7197
select {
72-
case <-tick.C:
73-
if err := r.client.Heartbeat(r.instance); err != nil {
74-
r.logger.Log("err", err)
98+
case <-ticker.C:
99+
if err := r.heartbeat(); err != nil {
100+
r.logger.Log("during", "heartbeat", "err", err)
101+
}
102+
103+
case q := <-r.quitc:
104+
if err := r.conn.DeregisterInstance(r.instance); err != nil {
105+
r.logger.Log("during", "Deregister", "err", err)
75106
}
76-
case<-r.quit:
107+
close(q)
77108
return
78109
}
79110
}
80111
}
112+
113+
func (r *Registrar) heartbeat() error {
114+
err := r.conn.HeartBeatInstance(r.instance)
115+
if err != nil {
116+
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound {
117+
// Instance expired (e.g. network partition). Re-register.
118+
r.logger.Log("during", "heartbeat", err.Error())
119+
return r.conn.ReregisterInstance(r.instance)
120+
}
121+
}
122+
return err
123+
}
124+
125+
func (u *fargoUnsuccessfulHTTPResponse) Error() string {
126+
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
127+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /