Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit e12fada

Browse files
finish with gossip and token ranges
1 parent 49d8543 commit e12fada

File tree

7 files changed

+112
-218
lines changed

7 files changed

+112
-218
lines changed

‎mutexes/distributed-db/app/app.go‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ func New() (*App, error) {
2424
return nil, fmt.Errorf("need at least 1 node to talk to")
2525
}
2626

27-
2827
addr := fmt.Sprintf("localhost:%d", *port)
2928
nodes.CurrentNode = addr
30-
tokens := models.NewTokens(nodes, 5)
29+
tokens := models.NewTokens(nodes, 256)
3130
cacheRepo := repositories.NewCache()
3231
httpClient := clients.NewHTTP(addr)
3332
svc := services.NewCache(cacheRepo, httpClient, tokens)

‎mutexes/distributed-db/clients/http.go‎

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,29 @@ func (c *HTTPClient) Get(node string, key string) (models.CacheItem, error) {
4646
return cacheItem[0], nil
4747
}
4848

49-
func (c *HTTPClient) Gossip(node string, nodes []string, tokensChecksum string) error {
49+
func (c *HTTPClient) Gossip(node string, nodes []string, tokensChecksum string) ([]string, error) {
5050
body := models.GossipRequest{
5151
Nodes: nodes,
5252
CreatedAt: time.Now().UTC(),
5353
TokensChecksum: tokensChecksum,
5454
}
5555
req, err := c.makeRequest(http.MethodPost, c.url(node, "gossip"), body)
5656
if err != nil {
57-
return err
57+
return []string{}, err
5858
}
5959

60-
_, err = c.httpClient.Do(req)
60+
res, err := c.httpClient.Do(req)
61+
if err != nil {
62+
return []string{}, err
63+
}
64+
65+
var gossipRes models.GossipResponse
66+
err = json.NewDecoder(res.Body).Decode(&gossipRes)
6167
if err != nil {
62-
return err
68+
return []string{}, err
6369
}
6470

65-
return nil
71+
return gossipRes.Nodes, nil
6672
}
6773

6874
func (c *HTTPClient) Tokens(node string) (models.TokenMappings, error) {

‎mutexes/distributed-db/controllers/tokens.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
type tokensGetter interface {
12-
GetTokens() map[uint64]string
12+
GetTokens() map[int]string
1313
}
1414

1515
func tokens(svc tokensGetter) http.HandlerFunc {

‎mutexes/distributed-db/models/flags.go‎

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
type Nodes struct {
8-
Map map[string]struct{}
8+
Map map[string]struct{}
99
CurrentNode string
1010
}
1111

@@ -18,11 +18,21 @@ func (n Nodes) String() string {
1818
return strings.Join(n.List(len(n.Map)), ",")
1919
}
2020

21-
func (n Nodes) Add(node string) {
22-
if node == n.CurrentNode {
23-
return
21+
func (n Nodes) Add(nodes ...string) {
22+
for _, node := range nodes {
23+
if node == n.CurrentNode {
24+
return
25+
}
26+
n.Map[node] = struct{}{}
27+
}
28+
}
29+
30+
func (n Nodes) WithCurrentNode() map[string]struct{} {
31+
nodes := map[string]struct{}{n.CurrentNode: {}}
32+
for s := range n.Map {
33+
nodes[s] = struct{}{}
2434
}
25-
n.Map[node] =struct{}{}
35+
returnnodes
2636
}
2737

2838
func (n Nodes) List(a int) []string {

‎mutexes/distributed-db/models/tokens.go‎

Lines changed: 63 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,49 +6,50 @@ import (
66
"fmt"
77
"hash/fnv"
88
"log"
9+
"math"
910
"math/rand"
11+
"reflect"
1012
"sort"
1113
"time"
1214
)
1315

14-
type TokenMappings map[uint64]string
16+
type TokenMappings map[int]string
1517

1618
type Tokens struct {
1719
Mappings TokenMappings
1820
Nodes Nodes
19-
ranges []uint64
21+
ranges []int
2022
numberOfTokenRanges int
2123
}
2224

23-
func NewTokens(nodes Nodes, numberOfTokenRanges int) Tokens {
24-
numberOfNodes := len(nodes.Map)
25-
tokenRange := uint64(200 / numberOfNodes / numberOfTokenRanges)
26-
ranges := make([]uint64, 0, numberOfNodes*numberOfTokenRanges)
27-
for i := 0; i < numberOfNodes; i++ {
25+
func NewTokens(nodes Nodes, numberOfTokenRanges int) *Tokens {
26+
nodeList := append([]string{nodes.CurrentNode}, nodes.List(len(nodes.Map))...)
27+
tokenRange := math.MaxInt / len(nodeList) / numberOfTokenRanges
28+
ranges := make([]int, 0, len(nodeList)*numberOfTokenRanges)
29+
for i := 0; i < len(nodeList); i++ {
2830
for j := numberOfTokenRanges * i; j < numberOfTokenRanges*(i+1); j++ {
29-
r := tokenRange * uint64(j+1)
31+
r := tokenRange * (j+1)
3032
// produces a sorted ranges slice => needed for searching
3133
ranges = append(ranges, r)
3234
}
3335
}
3436

35-
randomRanges := append([]uint64{}, ranges...)
37+
randomRanges := append([]int{}, ranges...)
3638
rand.Seed(time.Now().UnixNano())
3739
rand.Shuffle(len(randomRanges), func(i, j int) {
3840
randomRanges[i], randomRanges[j] = randomRanges[j], randomRanges[i]
3941
})
4042

41-
i, mappings := 0, map[uint64]string{}
42-
nodeList := nodes.List(len(nodes.Map))
43+
i, mappings := 0, map[int]string{}
4344
for _, r := range randomRanges {
4445
mappings[r] = nodeList[i]
4546
i++
46-
if i == numberOfNodes {
47+
if i == len(nodeList) {
4748
i = 0
4849
}
4950
}
5051

51-
tokens := Tokens{
52+
tokens := &Tokens{
5253
Mappings: mappings,
5354
Nodes: nodes,
5455
ranges: ranges,
@@ -57,38 +58,64 @@ func NewTokens(nodes Nodes, numberOfTokenRanges int) Tokens {
5758
return tokens
5859
}
5960

60-
func (t *Tokens) GetNode(token uint64) string {
61-
idx := sort.Search(len(t.ranges)-1, func(i int) bool {
62-
return t.ranges[i] >= token
63-
})
61+
func (t *Tokens) GetNode(token int) string {
62+
idx := sort.SearchInts(t.ranges, token)
6463
node := t.Mappings[t.ranges[idx]]
6564
return node
6665
}
6766

68-
func (t *Tokens) AddNode(node string) {
69-
_, ok := t.Nodes.Map[node]
70-
if ok || node == t.Nodes.CurrentNode {
67+
func (t *Tokens) Merge(mappings map[int]string) {
68+
newMappings := map[int]string{}
69+
nodes := t.Nodes.WithCurrentNode()
70+
ranges := t.ranges
71+
m1, m2 := mappings, t.Mappings
72+
if len(mappings) < len(t.Mappings) {
73+
m1 = t.Mappings
74+
m2 = mappings
75+
}
76+
77+
newNodes := map[string]struct{}{}
78+
newRanges := make([]int, 0)
79+
for _, s := range mappings {
80+
newNodes[s] = struct{}{}
81+
}
82+
for r := range m1 {
83+
newRanges = append(newRanges, r)
84+
}
85+
sort.Ints(newRanges)
86+
ranges = newRanges
87+
nodes = newNodes
88+
89+
if reflect.DeepEqual(nodes, t.Nodes.Map) {
7190
return
7291
}
73-
tokenRange := uint64(200 / len(t.Nodes.Map) / t.numberOfTokenRanges)
74-
newRanges := make([]uint64, 0, t.numberOfTokenRanges)
75-
for i := 0; i < len(t.ranges)+t.numberOfTokenRanges; i++ {
76-
if i < len(t.ranges) {
77-
r := t.ranges[i]
78-
decrement := r - tokenRange*(uint64(i+1))
79-
newRange := r - decrement
80-
srv := t.Mappings[r]
8192

82-
delete(t.Mappings, r)
83-
t.Mappings[newRange] = srv
84-
t.ranges[i] = newRange
85-
} else {
86-
newRange := tokenRange * uint64(i+1)
87-
t.Mappings[newRange] = node
88-
newRanges = append(newRanges, newRange)
93+
for s := range nodes {
94+
_, ok := t.Nodes.WithCurrentNode()[s]
95+
if ok {
96+
delete(nodes, s)
8997
}
9098
}
91-
t.ranges = append(t.ranges, newRanges...)
99+
100+
i := 0
101+
numberOfNodes := len(nodes) + len(t.Nodes.WithCurrentNode())
102+
tokenRange := math.MaxInt / numberOfNodes / t.numberOfTokenRanges
103+
m1Nodes := map[string]struct{}{}
104+
for r, s := range m1 {
105+
m1Nodes[s] = struct{}{}
106+
factor := sort.SearchInts(ranges, r) + 1
107+
newMappings[factor*tokenRange] = s
108+
i++
109+
}
110+
for _, s := range m2 {
111+
_, ok := m1Nodes[s]
112+
if ok {
113+
continue
114+
}
115+
i++
116+
newMappings[(i)*tokenRange] = s
117+
}
118+
t.Mappings = newMappings
92119
}
93120

94121
func (t *Tokens) Checksum() string {

‎mutexes/distributed-db/playground/tokens/main.go‎

Lines changed: 0 additions & 141 deletions
This file was deleted.

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /