Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit d0853ee

Browse files
Merge pull request go-kit#504 from martinbaillie/master
Add Eureka service discovery implementation
2 parents 5d93a23 + 443f6ea commit d0853ee

File tree

9 files changed

+702
-0
lines changed

9 files changed

+702
-0
lines changed

‎circle.yml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ test:
2424
ETCD_ADDR: http://localhost:2379
2525
CONSUL_ADDR: localhost:8500
2626
ZK_ADDR: localhost:2181
27+
EUREKA_ADDR: http://localhost:8761/eureka

‎docker-compose-integration.yml‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,9 @@ services:
1414
image: zookeeper
1515
ports:
1616
- "2181:2181"
17+
eureka:
18+
image: springcloud/eureka
19+
environment:
20+
eureka.server.responseCacheUpdateIntervalMs: 1000
21+
ports:
22+
- "8761:8761"

‎sd/eureka/doc.go‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka
2+
package eureka

‎sd/eureka/integration_test.go‎

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// +build integration
2+
3+
package eureka
4+
5+
import (
6+
"io"
7+
"os"
8+
"testing"
9+
"time"
10+
11+
"github.com/hudl/fargo"
12+
13+
"github.com/go-kit/kit/endpoint"
14+
"github.com/go-kit/kit/log"
15+
)
16+
17+
// Package sd/eureka provides a wrapper around the Netflix Eureka service
18+
// registry by way of the Fargo library. This test assumes the user has an
19+
// instance of Eureka available at the address in the environment variable.
20+
// Example `${EUREKA_ADDR}` format: http://localhost:8761/eureka
21+
//
22+
// NOTE: when starting a Eureka server for integration testing, ensure
23+
// the response cache interval is reduced to one second. This can be
24+
// achieved with the following Java argument:
25+
// `-Deureka.server.responseCacheUpdateIntervalMs=1000`
26+
func TestIntegration(t *testing.T) {
27+
eurekaAddr := os.Getenv("EUREKA_ADDR")
28+
if eurekaAddr == "" {
29+
t.Skip("EUREKA_ADDR is not set")
30+
}
31+
32+
logger := log.NewLogfmtLogger(os.Stderr)
33+
logger = log.With(logger, "ts", log.DefaultTimestamp)
34+
35+
var fargoConfig fargo.Config
36+
// Target Eureka server(s).
37+
fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr}
38+
// How often the subscriber should poll for updates.
39+
fargoConfig.Eureka.PollIntervalSeconds = 1
40+
41+
// Create a Fargo connection and a Eureka registrar.
42+
fargoConnection := fargo.NewConnFromConfig(fargoConfig)
43+
registrar1 := NewRegistrar(&fargoConnection, instanceTest1, log.With(logger, "component", "registrar1"))
44+
45+
// Register one instance.
46+
registrar1.Register()
47+
defer registrar1.Deregister()
48+
49+
// This should be enough time for the Eureka server response cache to update.
50+
time.Sleep(time.Second)
51+
52+
// Build a Eureka subscriber.
53+
factory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
54+
t.Logf("factory invoked for %q", instance)
55+
return endpoint.Nop, nil, nil
56+
}
57+
s := NewSubscriber(
58+
&fargoConnection,
59+
appNameTest,
60+
factory,
61+
log.With(logger, "component", "subscriber"),
62+
)
63+
defer s.Stop()
64+
65+
// We should have one endpoint immediately after subscriber instantiation.
66+
endpoints, err := s.Endpoints()
67+
if err != nil {
68+
t.Error(err)
69+
}
70+
if want, have := 1, len(endpoints); want != have {
71+
t.Errorf("want %d, have %d", want, have)
72+
}
73+
74+
// Register a second instance
75+
registrar2 := NewRegistrar(&fargoConnection, instanceTest2, log.With(logger, "component", "registrar2"))
76+
registrar2.Register()
77+
defer registrar2.Deregister() // In case of exceptional circumstances.
78+
79+
// This should be enough time for a scheduled update assuming Eureka is
80+
// configured with the properties mentioned in the function comments.
81+
time.Sleep(2 * time.Second)
82+
83+
// Now we should have two endpoints.
84+
endpoints, err = s.Endpoints()
85+
if err != nil {
86+
t.Error(err)
87+
}
88+
if want, have := 2, len(endpoints); want != have {
89+
t.Errorf("want %d, have %d", want, have)
90+
}
91+
92+
// Deregister the second instance.
93+
registrar2.Deregister()
94+
95+
// Wait for another scheduled update.
96+
time.Sleep(2 * time.Second)
97+
98+
// And then there was one.
99+
endpoints, err = s.Endpoints()
100+
if err != nil {
101+
t.Error(err)
102+
}
103+
if want, have := 1, len(endpoints); want != have {
104+
t.Errorf("want %d, have %d", want, have)
105+
}
106+
}

‎sd/eureka/registrar.go‎

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package eureka
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
"sync"
7+
"time"
8+
9+
"github.com/hudl/fargo"
10+
11+
"github.com/go-kit/kit/log"
12+
"github.com/go-kit/kit/sd"
13+
)
14+
15+
// Matches official Netflix Java client default.
16+
const defaultRenewalInterval = 30 * time.Second
17+
18+
// The methods of fargo.Connection used in this package.
19+
type fargoConnection interface {
20+
RegisterInstance(instance *fargo.Instance) error
21+
DeregisterInstance(instance *fargo.Instance) error
22+
ReregisterInstance(instance *fargo.Instance) error
23+
HeartBeatInstance(instance *fargo.Instance) error
24+
ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
25+
GetApp(name string) (*fargo.Application, error)
26+
}
27+
28+
type fargoUnsuccessfulHTTPResponse struct {
29+
statusCode int
30+
messagePrefix string
31+
}
32+
33+
// Registrar maintains service instance liveness information in Eureka.
34+
type Registrar struct {
35+
conn fargoConnection
36+
instance *fargo.Instance
37+
logger log.Logger
38+
quitc chan chan struct{}
39+
sync.Mutex
40+
}
41+
42+
var _ sd.Registrar = (*Registrar)(nil)
43+
44+
// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
45+
// Fargo connection and instance. See the integration test for usage examples.
46+
func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
47+
return &Registrar{
48+
conn: conn,
49+
instance: instance,
50+
logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
51+
}
52+
}
53+
54+
// Register implements sd.Registrar.
55+
func (r *Registrar) Register() {
56+
r.Lock()
57+
defer r.Unlock()
58+
59+
if r.quitc != nil {
60+
return // Already in the registration loop.
61+
}
62+
63+
if err := r.conn.RegisterInstance(r.instance); err != nil {
64+
r.logger.Log("during", "Register", "err", err)
65+
}
66+
67+
r.quitc = make(chan chan struct{})
68+
go r.loop()
69+
}
70+
71+
// Deregister implements sd.Registrar.
72+
func (r *Registrar) Deregister() {
73+
r.Lock()
74+
defer r.Unlock()
75+
76+
if r.quitc == nil {
77+
return // Already deregistered.
78+
}
79+
80+
q := make(chan struct{})
81+
r.quitc <- q
82+
<-q
83+
r.quitc = nil
84+
}
85+
86+
func (r *Registrar) loop() {
87+
var renewalInterval time.Duration
88+
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
89+
renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
90+
} else {
91+
renewalInterval = defaultRenewalInterval
92+
}
93+
ticker := time.NewTicker(renewalInterval)
94+
defer ticker.Stop()
95+
96+
for {
97+
select {
98+
case <-ticker.C:
99+
if err := r.heartbeat(); err != nil {
100+
r.logger.Log("during", "heartbeat", "err", err)
101+
}
102+
103+
case q := <-r.quitc:
104+
if err := r.conn.DeregisterInstance(r.instance); err != nil {
105+
r.logger.Log("during", "Deregister", "err", err)
106+
}
107+
close(q)
108+
return
109+
}
110+
}
111+
}
112+
113+
func (r *Registrar) heartbeat() error {
114+
err := r.conn.HeartBeatInstance(r.instance)
115+
if err != nil {
116+
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok && u.statusCode == http.StatusNotFound {
117+
// Instance expired (e.g. network partition). Re-register.
118+
r.logger.Log("during", "heartbeat", err.Error())
119+
return r.conn.ReregisterInstance(r.instance)
120+
}
121+
}
122+
return err
123+
}
124+
125+
func (u *fargoUnsuccessfulHTTPResponse) Error() string {
126+
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
127+
}

‎sd/eureka/registrar_test.go‎

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package eureka
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestRegistrar(t *testing.T) {
9+
connection := &testConnection{
10+
errHeartbeat: errTest,
11+
}
12+
13+
registrar1 := NewRegistrar(connection, instanceTest1, loggerTest)
14+
registrar2 := NewRegistrar(connection, instanceTest2, loggerTest)
15+
16+
// Not registered.
17+
registrar1.Deregister()
18+
if want, have := 0, len(connection.instances); want != have {
19+
t.Errorf("want %d, have %d", want, have)
20+
}
21+
22+
// Register.
23+
registrar1.Register()
24+
if want, have := 1, len(connection.instances); want != have {
25+
t.Errorf("want %d, have %d", want, have)
26+
}
27+
28+
registrar2.Register()
29+
if want, have := 2, len(connection.instances); want != have {
30+
t.Errorf("want %d, have %d", want, have)
31+
}
32+
33+
// Deregister.
34+
registrar1.Deregister()
35+
if want, have := 1, len(connection.instances); want != have {
36+
t.Errorf("want %d, have %d", want, have)
37+
}
38+
39+
// Already registered.
40+
registrar1.Register()
41+
if want, have := 2, len(connection.instances); want != have {
42+
t.Errorf("want %d, have %d", want, have)
43+
}
44+
registrar1.Register()
45+
if want, have := 2, len(connection.instances); want != have {
46+
t.Errorf("want %d, have %d", want, have)
47+
}
48+
49+
// Wait for a heartbeat failure.
50+
time.Sleep(1010 * time.Millisecond)
51+
if want, have := 2, len(connection.instances); want != have {
52+
t.Errorf("want %d, have %d", want, have)
53+
}
54+
registrar1.Deregister()
55+
if want, have := 1, len(connection.instances); want != have {
56+
t.Errorf("want %d, have %d", want, have)
57+
}
58+
}
59+
60+
func TestBadRegister(t *testing.T) {
61+
connection := &testConnection{
62+
errRegister: errTest,
63+
}
64+
65+
registrar := NewRegistrar(connection, instanceTest1, loggerTest)
66+
registrar.Register()
67+
if want, have := 0, len(connection.instances); want != have {
68+
t.Errorf("want %d, have %d", want, have)
69+
}
70+
}
71+
72+
func TestBadDeregister(t *testing.T) {
73+
connection := &testConnection{
74+
errDeregister: errTest,
75+
}
76+
77+
registrar := NewRegistrar(connection, instanceTest1, loggerTest)
78+
registrar.Register()
79+
if want, have := 1, len(connection.instances); want != have {
80+
t.Errorf("want %d, have %d", want, have)
81+
}
82+
registrar.Deregister()
83+
if want, have := 1, len(connection.instances); want != have {
84+
t.Errorf("want %d, have %d", want, have)
85+
}
86+
}
87+
88+
func TestExpiredInstance(t *testing.T) {
89+
connection := &testConnection{
90+
errHeartbeat: errNotFound,
91+
}
92+
93+
registrar := NewRegistrar(connection, instanceTest1, loggerTest)
94+
registrar.Register()
95+
96+
// Wait for a heartbeat failure.
97+
time.Sleep(1010 * time.Millisecond)
98+
99+
if want, have := 1, len(connection.instances); want != have {
100+
t.Errorf("want %d, have %d", want, have)
101+
}
102+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /