Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit cb03da6

Browse files
Add Eureka service discovery implementation
This commit adds a service discovery implementation for the Eureka registry, a component in Netflix's OSS suite. Eureka is a popular choice in JVM-based microservice architectures, particularly when used in conjunction with the Spring Cloud ecosystem. This implementation delegates integration to Fargo: the de facto Golang Eureka client. It allows the user to employ a Fargo connection as the foundational configuration item for Registrars and Subscribers. This should offer the user the most control of Fargo within the constraints of the Go-kit service discovery abstractions.
1 parent cdd47ca commit cb03da6

File tree

10 files changed

+678
-0
lines changed

10 files changed

+678
-0
lines changed

‎circle.yml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ test:
2424
ETCD_ADDR: http://localhost:2379
2525
CONSUL_ADDR: localhost:8500
2626
ZK_ADDR: localhost:2181
27+
EUREKA_ADDR: http://localhost:8761/eureka

‎docker-compose-integration.yml‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,9 @@ services:
1414
image: zookeeper
1515
ports:
1616
- "2181:2181"
17+
eureka:
18+
image: springcloud/eureka
19+
environment:
20+
eureka.server.responseCacheUpdateIntervalMs: 1000
21+
ports:
22+
- "8761:8761"

‎sd/eureka/client.go‎

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package eureka
2+
3+
import (
4+
stdeureka "github.com/hudl/fargo"
5+
stdeurekalogging "github.com/op/go-logging"
6+
)
7+
8+
func init() {
9+
// Quieten Fargo's own logging
10+
stdeurekalogging.SetLevel(stdeurekalogging.ERROR, "fargo")
11+
}
12+
13+
// Client is a wrapper around the Eureka API.
14+
type Client interface {
15+
// Register an instance with Eureka.
16+
Register(i *stdeureka.Instance) error
17+
18+
// Deregister an instance from Eureka.
19+
Deregister(i *stdeureka.Instance) error
20+
21+
// Send an instance heartbeat to Eureka.
22+
Heartbeat(i *stdeureka.Instance) error
23+
24+
// Get all instances for an app in Eureka.
25+
Instances(app string) ([]*stdeureka.Instance, error)
26+
27+
// Receive scheduled updates about an app's instances in Eureka.
28+
ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate
29+
}
30+
31+
type client struct {
32+
connection *stdeureka.EurekaConnection
33+
}
34+
35+
// NewClient returns an implementation of the Client interface, wrapping a
36+
// concrete connection to Eureka using the Fargo library.
37+
// Taking in Fargo's own connection abstraction gives the user maximum
38+
// freedom in regards to how that connection is configured.
39+
func NewClient(ec *stdeureka.EurekaConnection) Client {
40+
return &client{connection: ec}
41+
}
42+
43+
func (c *client) Register(i *stdeureka.Instance) error {
44+
if c.instanceRegistered(i) {
45+
// Already registered. Send a heartbeat instead.
46+
return c.Heartbeat(i)
47+
}
48+
return c.connection.RegisterInstance(i)
49+
}
50+
51+
func (c *client) Deregister(i *stdeureka.Instance) error {
52+
return c.connection.DeregisterInstance(i)
53+
}
54+
55+
func (c *client) Heartbeat(i *stdeureka.Instance) (err error) {
56+
if err = c.connection.HeartBeatInstance(i); err != nil && c.instanceNotFoundErr(err) {
57+
// Instance not registered. Register first before sending heartbeats.
58+
return c.Register(i)
59+
}
60+
return err
61+
}
62+
63+
func (c *client) Instances(app string) ([]*stdeureka.Instance, error) {
64+
stdApp, err := c.connection.GetApp(app)
65+
if err != nil {
66+
return nil, err
67+
}
68+
return stdApp.Instances, nil
69+
}
70+
71+
func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan stdeureka.AppUpdate {
72+
return c.connection.ScheduleAppUpdates(app, false, quitc)
73+
}
74+
75+
func (c *client) instanceRegistered(i *stdeureka.Instance) bool {
76+
_, err := c.connection.GetInstance(i.App, i.Id())
77+
return err == nil
78+
}
79+
80+
func (c *client) instanceNotFoundErr(err error) bool {
81+
code, ok := stdeureka.HTTPResponseStatusCode(err)
82+
return ok && code == 404
83+
}

‎sd/eureka/client_test.go‎

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package eureka
2+
3+
import (
4+
"errors"
5+
"reflect"
6+
7+
"github.com/go-kit/kit/log"
8+
stdeureka "github.com/hudl/fargo"
9+
)
10+
11+
var (
12+
errTest = errors.New("kaboom")
13+
loggerTest = log.NewNopLogger()
14+
instanceTest1 = &stdeureka.Instance{
15+
HostName: "server1.acme.org",
16+
Port: 8080,
17+
App: "go-kit",
18+
IPAddr: "192.168.0.1",
19+
VipAddress: "192.168.0.1",
20+
SecureVipAddress: "192.168.0.1",
21+
HealthCheckUrl: "http://server1.acme.org:8080/healthz",
22+
StatusPageUrl: "http://server1.acme.org:8080/status",
23+
HomePageUrl: "http://server1.acme.org:8080/",
24+
Status: stdeureka.UP,
25+
DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn},
26+
LeaseInfo: stdeureka.LeaseInfo{RenewalIntervalInSecs: 1},
27+
}
28+
instanceTest2 = &stdeureka.Instance{
29+
HostName: "server2.acme.org",
30+
Port: 8080,
31+
App: "go-kit",
32+
IPAddr: "192.168.0.2",
33+
VipAddress: "192.168.0.2",
34+
SecureVipAddress: "192.168.0.2",
35+
HealthCheckUrl: "http://server2.acme.org:8080/healthz",
36+
StatusPageUrl: "http://server2.acme.org:8080/status",
37+
HomePageUrl: "http://server2.acme.org:8080/",
38+
Status: stdeureka.UP,
39+
DataCenterInfo: stdeureka.DataCenterInfo{Name: stdeureka.MyOwn},
40+
}
41+
applicationTest = &stdeureka.Application{
42+
Name: "go-kit",
43+
Instances: []*stdeureka.Instance{instanceTest1, instanceTest2},
44+
}
45+
)
46+
47+
type testClient struct {
48+
instances []*stdeureka.Instance
49+
application *stdeureka.Application
50+
errInstances error
51+
errApplication error
52+
errHeartbeat error
53+
}
54+
55+
func (c *testClient) Register(i *stdeureka.Instance) error {
56+
for _, instance := range c.instances {
57+
if reflect.DeepEqual(*instance, *i) {
58+
return errors.New("already registered")
59+
}
60+
}
61+
62+
c.instances = append(c.instances, i)
63+
return nil
64+
}
65+
66+
func (c *testClient) Deregister(i *stdeureka.Instance) error {
67+
var newInstances []*stdeureka.Instance
68+
for _, instance := range c.instances {
69+
if reflect.DeepEqual(*instance, *i) {
70+
continue
71+
}
72+
newInstances = append(newInstances, instance)
73+
}
74+
if len(newInstances) == len(c.instances) {
75+
return errors.New("not registered")
76+
}
77+
78+
c.instances = newInstances
79+
return nil
80+
}
81+
82+
func (c *testClient) Heartbeat(i *stdeureka.Instance) (err error) {
83+
return c.errHeartbeat
84+
}
85+
86+
func (c *testClient) Instances(app string) ([]*stdeureka.Instance, error) {
87+
return c.instances, c.errInstances
88+
}
89+
90+
func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan stdeureka.AppUpdate {
91+
updatec := make(chan stdeureka.AppUpdate, 1)
92+
updatec <- stdeureka.AppUpdate{App: c.application, Err: c.errApplication}
93+
return updatec
94+
}

‎sd/eureka/doc.go‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka
2+
package eureka

‎sd/eureka/integration_test.go‎

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// +build integration
2+
3+
package eureka
4+
5+
import (
6+
"io"
7+
"os"
8+
"testing"
9+
"time"
10+
11+
"github.com/go-kit/kit/endpoint"
12+
"github.com/go-kit/kit/log"
13+
stdeureka "github.com/hudl/fargo"
14+
)
15+
16+
// Package sd/eureka provides a wrapper around the Netflix Eureka service
17+
// registry by way of the Fargo library. This test assumes the user has an
18+
// instance of Eureka available at the address in the environment variable.
19+
// Example `${EUREKA_ADDR}` format: http://localhost:8761/eureka
20+
//
21+
// NOTE: when starting a Eureka server for integration testing, ensure
22+
// the response cache interval is reduced to one second. This can be
23+
// achieved with the following Java argument:
24+
// `-Deureka.server.responseCacheUpdateIntervalMs=1000`
25+
func TestIntegration(t *testing.T) {
26+
eurekaAddr := os.Getenv("EUREKA_ADDR")
27+
if eurekaAddr == "" {
28+
t.Skip("EUREKA_ADDR is not set")
29+
}
30+
31+
var client Client
32+
{
33+
var stdConfig stdeureka.Config
34+
stdConfig.Eureka.ServiceUrls = []string{eurekaAddr}
35+
stdConfig.Eureka.PollIntervalSeconds = 1
36+
37+
stdConnection := stdeureka.NewConnFromConfig(stdConfig)
38+
client = NewClient(&stdConnection)
39+
}
40+
41+
logger := log.NewLogfmtLogger(os.Stderr)
42+
logger = log.With(logger, "ts", log.DefaultTimestamp)
43+
44+
// Register one instance.
45+
registrar1 := NewRegistrar(client, instanceTest1, log.With(logger, "component", "registrar1"))
46+
registrar1.Register()
47+
defer registrar1.Deregister()
48+
49+
// This should be enough time for the Eureka server response cache to update.
50+
time.Sleep(time.Second)
51+
52+
// Build a subscriber.
53+
factory := func(instance string) (endpoint.Endpoint, io.Closer, error) {
54+
t.Logf("factory invoked for %q", instance)
55+
return endpoint.Nop, nil, nil
56+
}
57+
s := NewSubscriber(
58+
client,
59+
factory,
60+
log.With(logger, "component", "subscriber"),
61+
instanceTest1.App,
62+
)
63+
defer s.Stop()
64+
65+
// We should have one endpoint immediately after subscriber instantiation.
66+
endpoints, err := s.Endpoints()
67+
if err != nil {
68+
t.Error(err)
69+
}
70+
if want, have := 1, len(endpoints); want != have {
71+
t.Errorf("want %d, have %d", want, have)
72+
}
73+
74+
// Register a second instance
75+
registrar2 := NewRegistrar(client, instanceTest2, log.With(logger, "component", "registrar2"))
76+
registrar2.Register()
77+
defer registrar2.Deregister() // In case of exceptional circumstances.
78+
79+
// This should be enough time for a scheduled update assuming Eureka is
80+
// configured with the properties mentioned in the function comments.
81+
time.Sleep(2 * time.Second)
82+
83+
// Now we should have two endpoints.
84+
endpoints, err = s.Endpoints()
85+
if err != nil {
86+
t.Error(err)
87+
}
88+
if want, have := 2, len(endpoints); want != have {
89+
t.Errorf("want %d, have %d", want, have)
90+
}
91+
92+
// Deregister the second instance.
93+
registrar2.Deregister()
94+
95+
// Wait for another scheduled update.
96+
time.Sleep(2 * time.Second)
97+
98+
// And then there was one.
99+
endpoints, err = s.Endpoints()
100+
if err != nil {
101+
t.Error(err)
102+
}
103+
if want, have := 1, len(endpoints); want != have {
104+
t.Errorf("want %d, have %d", want, have)
105+
}
106+
}

‎sd/eureka/registrar.go‎

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package eureka
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
stdeureka "github.com/hudl/fargo"
9+
10+
"github.com/go-kit/kit/log"
11+
)
12+
13+
// Registrar maintains service instance liveness information in Eureka.
14+
type Registrar struct {
15+
client Client
16+
instance *stdeureka.Instance
17+
logger log.Logger
18+
19+
quitmtx sync.Mutex
20+
quit chan bool
21+
}
22+
23+
// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
24+
// Fargo instance.
25+
func NewRegistrar(client Client, i *stdeureka.Instance, l log.Logger) *Registrar {
26+
return &Registrar{
27+
client: client,
28+
instance: i,
29+
logger: log.With(l, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)),
30+
}
31+
}
32+
33+
// Register implements sd.Registrar interface.
34+
func (r *Registrar) Register() {
35+
if err := r.client.Register(r.instance); err != nil {
36+
r.logger.Log("err", err)
37+
} else {
38+
r.logger.Log("action", "register")
39+
}
40+
41+
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
42+
// User has opted for heartbeat functionality in Eureka.
43+
go r.loop()
44+
}
45+
}
46+
47+
// Deregister implements sd.Registrar interface.
48+
func (r *Registrar) Deregister() {
49+
if err := r.client.Deregister(r.instance); err != nil {
50+
r.logger.Log("err", err)
51+
} else {
52+
r.logger.Log("action", "deregister")
53+
}
54+
55+
r.quitmtx.Lock()
56+
defer r.quitmtx.Unlock()
57+
if r.quit != nil {
58+
r.quit <- true
59+
}
60+
}
61+
62+
func (r *Registrar) loop() {
63+
r.quitmtx.Lock()
64+
if r.quit != nil {
65+
defer r.quitmtx.Unlock()
66+
return // Already running.
67+
}
68+
r.quit = make(chan bool)
69+
r.quitmtx.Unlock()
70+
71+
tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second)
72+
defer tick.Stop()
73+
for {
74+
select {
75+
case <-tick.C:
76+
if err := r.client.Heartbeat(r.instance); err != nil {
77+
r.logger.Log("err", err)
78+
}
79+
case <-r.quit:
80+
r.quitmtx.Lock()
81+
defer r.quitmtx.Unlock()
82+
83+
close(r.quit)
84+
r.quit = nil
85+
86+
return
87+
}
88+
}
89+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /