Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit ed643d2

Browse files
adjust set and get operations in the distributed database example
1 parent e12fada commit ed643d2

File tree

8 files changed

+121
-37
lines changed

8 files changed

+121
-37
lines changed

‎mutexes/distributed-db/clients/http.go‎

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"net/http"
77
"net/url"
8-
"time"
98

109
"distributed-db/models"
1110
)
@@ -23,11 +22,12 @@ type HTTPClient struct {
2322
httpClient *http.Client
2423
}
2524

26-
func (c *HTTPClient) Get(node string, key string) (models.CacheItem, error) {
27-
body := models.GetRequest{
28-
Keys: []string{key},
25+
func (c *HTTPClient) Set(node string, key, value string) (models.CacheItem, error) {
26+
body := models.SetRequest{
27+
Key: key,
28+
Value: value,
2929
}
30-
req, err := c.makeRequest(http.MethodGet, c.url(node, "get"), body)
30+
req, err := c.makeRequest(http.MethodPost, c.url(node, "set"), body)
3131
if err != nil {
3232
return models.CacheItem{}, err
3333
}
@@ -37,19 +37,40 @@ func (c *HTTPClient) Get(node string, key string) (models.CacheItem, error) {
3737
return models.CacheItem{}, err
3838
}
3939

40-
var cacheItem []models.CacheItem
41-
err = json.NewDecoder(res.Body).Decode(&cacheItem)
40+
var itemmodels.CacheItem
41+
err = json.NewDecoder(res.Body).Decode(&item)
4242
if err != nil {
4343
return models.CacheItem{}, err
4444
}
45+
item.Node = node
46+
47+
return item, nil
48+
}
49+
50+
func (c *HTTPClient) Get(node string, keys []string) ([]models.CacheItem, error) {
51+
body := models.GetRequest{Keys: keys}
52+
req, err := c.makeRequest(http.MethodGet, c.url(node, "get"), body)
53+
if err != nil {
54+
return []models.CacheItem{}, err
55+
}
56+
57+
res, err := c.httpClient.Do(req)
58+
if err != nil {
59+
return []models.CacheItem{}, err
60+
}
61+
62+
var cacheItems []models.CacheItem
63+
err = json.NewDecoder(res.Body).Decode(&cacheItems)
64+
if err != nil {
65+
return []models.CacheItem{}, err
66+
}
4567

46-
return cacheItem[0], nil
68+
return cacheItems, nil
4769
}
4870

4971
func (c *HTTPClient) Gossip(node string, nodes []string, tokensChecksum string) ([]string, error) {
5072
body := models.GossipRequest{
5173
Nodes: nodes,
52-
CreatedAt: time.Now().UTC(),
5374
TokensChecksum: tokensChecksum,
5475
}
5576
req, err := c.makeRequest(http.MethodPost, c.url(node, "gossip"), body)

‎mutexes/distributed-db/controllers/set.go‎

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
type cacheSetter interface {
12-
Set(key, value string)
12+
Set(key, value string) (models.CacheItem, error)
1313
}
1414

1515
func set(svc cacheSetter) http.HandlerFunc {
@@ -22,9 +22,18 @@ func set(svc cacheSetter) http.HandlerFunc {
2222
return
2323
}
2424

25-
svc.Set(req.Key, req.Value)
25+
item, err := svc.Set(req.Key, req.Value)
26+
if err != nil {
27+
log.Printf("could not store cache item: %v", err)
28+
w.WriteHeader(http.StatusInternalServerError)
29+
return
30+
}
2631

27-
log.Printf("successfully stored record with key: %s", req.Key)
28-
w.WriteHeader(http.StatusOK)
32+
log.Printf("successfully stored record with key: %s on: %s", item.Key, item.Node)
33+
w.Header().Set("Content-Type", "application/json")
34+
err = json.NewEncoder(w).Encode(item)
35+
if err != nil {
36+
log.Printf("could not encode set response: %v", err)
37+
}
2938
}
3039
}

‎mutexes/distributed-db/models/cache.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ type CacheItem struct {
88
Key string `json:"key"`
99
Value string `json:"value"`
1010
UpdatedAt time.Time `json:"updated_at"`
11+
Node string `json:"node,omitempty"`
1112
}

‎mutexes/distributed-db/models/flags.go‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ func (n Nodes) String() string {
2121
func (n Nodes) Add(nodes ...string) {
2222
for _, node := range nodes {
2323
if node == n.CurrentNode {
24-
return
24+
continue
2525
}
26+
// update some kind of UpdateAt field for nodes health check
27+
// change from map[string]struct{} to map[string]int => states: Up/Down
28+
// a node is Down if it hasn't sent a gossip request in 10 seconds
2629
n.Map[node] = struct{}{}
2730
}
2831
}

‎mutexes/distributed-db/models/requests.go‎

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package models
22

3-
import (
4-
"time"
5-
)
6-
73
type GetRequest struct {
84
Keys []string `json:"keys"`
95
// how many reads before returning
@@ -20,7 +16,6 @@ type SetRequest struct {
2016
}
2117

2218
type GossipRequest struct {
23-
CreatedAt time.Time `json:"created_at"`
2419
Nodes []string `json:"nodes"`
2520
TokensChecksum string `json:"tokens_checksum"`
2621
}

‎mutexes/distributed-db/models/tokens.go‎

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ func NewTokens(nodes Nodes, numberOfTokenRanges int) *Tokens {
5858
return tokens
5959
}
6060

61-
func (t *Tokens) GetNode(token int) string {
62-
idx := sort.SearchInts(t.ranges, token)
61+
func (t *Tokens) GetNode(key string) string {
62+
token := HashKey(key)
63+
idx := sort.SearchInts(t.ranges, int(token))
6364
node := t.Mappings[t.ranges[idx]]
6465
return node
6566
}
@@ -128,7 +129,7 @@ func (t *Tokens) Checksum() string {
128129
return fmt.Sprintf("%x", sum)
129130
}
130131

131-
func hash(s string) uint64 {
132+
func HashKey(s string) uint64 {
132133
h := fnv.New64a()
133134
_, _ = h.Write([]byte(s))
134135
return h.Sum64()
Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package repositories
22

33
import (
4+
"fmt"
45
"sync"
56
"time"
67

@@ -18,41 +19,44 @@ type Cache struct {
1819
data map[string]models.CacheItem
1920
}
2021

21-
func (c *Cache) Set(key, value string) {
22+
func (c *Cache) Set(key, value string) models.CacheItem{
2223
c.mu.Lock()
2324
defer c.mu.Unlock()
2425

25-
cacheValue := models.CacheItem{
26+
item := models.CacheItem{
2627
Key: key,
2728
Value: value,
2829
UpdatedAt: time.Now().UTC(),
2930
}
30-
c.data[key] = cacheValue
31+
32+
sum := fmt.Sprintf("%d", models.HashKey(key))
33+
c.data[sum] = item
34+
return item
3135
}
3236

3337
func (c *Cache) Get(key string) *models.CacheItem {
3438
c.mu.RLock()
3539
defer c.mu.RUnlock()
3640

37-
val, ok := c.data[key]
41+
item, ok := c.data[key]
3842
if !ok {
3943
return nil
4044
}
4145

42-
return &val
46+
return &item
4347
}
4448

4549
func (c *Cache) GetMany(keys []string) []models.CacheItem {
4650
c.mu.RLock()
4751
defer c.mu.RUnlock()
4852

49-
values := make([]models.CacheItem, 0)
50-
for _, k := range keys {
51-
v, ok := c.data[k]
53+
items := make([]models.CacheItem, 0)
54+
for _, key := range keys {
55+
item, ok := c.data[key]
5256
if ok {
53-
values = append(values, v)
57+
items = append(items, item)
5458
}
5559
}
5660

57-
return values
61+
return items
5862
}

‎mutexes/distributed-db/services/cache.go‎

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package services
22

33
import (
4+
"fmt"
45
"log"
56
"strings"
67

@@ -10,11 +11,12 @@ import (
1011
type CacheRepository interface {
1112
Get(key string) *models.CacheItem
1213
GetMany(keys []string) []models.CacheItem
13-
Set(key, value string)
14+
Set(key, value string) models.CacheItem
1415
}
1516

1617
type HTTPClient interface {
17-
Get(node string, key string) (models.CacheItem, error)
18+
Set(node, key, value string) (models.CacheItem, error)
19+
Get(node string, keys []string) ([]models.CacheItem, error)
1820
Gossip(node string, nodes []string, tokensChecksum string) (oldNodes []string, err error)
1921
Tokens(node string) (models.TokenMappings, error)
2022
}
@@ -34,11 +36,59 @@ type CacheSvc struct {
3436
}
3537

3638
func (svc CacheSvc) Get(keys []string) []models.CacheItem {
37-
return svc.cacheRepo.GetMany(keys)
39+
keyToNode := map[string]string{}
40+
sumToNode := map[string]string{}
41+
for _, key := range keys {
42+
sum := fmt.Sprintf("%d", models.HashKey(key))
43+
node := svc.tokens.GetNode(key)
44+
sumToNode[sum] = node
45+
keyToNode[key] = node
46+
}
47+
48+
nodeToSums := map[string][]string{}
49+
for sum, node := range sumToNode {
50+
nodeToSums[node] = append(nodeToSums[node], sum)
51+
}
52+
nodeToKeys := map[string][]string{}
53+
for key, node := range keyToNode {
54+
nodeToKeys[node] = append(nodeToKeys[node], key)
55+
}
56+
57+
cacheItems := make([]models.CacheItem, 0)
58+
for node, sums := range nodeToSums {
59+
if node == svc.tokens.Nodes.CurrentNode {
60+
items := svc.cacheRepo.GetMany(sums)
61+
for _, item := range items {
62+
item.Node = node
63+
cacheItems = append(cacheItems, item)
64+
}
65+
continue
66+
}
67+
68+
nodeKeys := nodeToKeys[node]
69+
items, err := svc.httpClient.Get(node, nodeKeys)
70+
if err != nil {
71+
log.Printf("could not get cache items from node: %s, %v", node, err)
72+
}
73+
74+
for _, item := range items {
75+
item.Node = node
76+
cacheItems = append(cacheItems, item)
77+
}
78+
}
79+
80+
return cacheItems
3881
}
3982

40-
func (svc CacheSvc) Set(key, value string) {
41-
svc.cacheRepo.Set(key, value)
83+
func (svc CacheSvc) Set(key, value string) (models.CacheItem, error) {
84+
node := svc.tokens.GetNode(key)
85+
if node == svc.tokens.Nodes.CurrentNode {
86+
item := svc.cacheRepo.Set(key, value)
87+
item.Node = node
88+
return item, nil
89+
}
90+
91+
return svc.httpClient.Set(node, key, value)
4292
}
4393

4494
func (svc CacheSvc) Gossip() {

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /