Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 883f407

Browse files
add token range example in the distributed-db playground
1 parent 4e4aa18 commit 883f407

File tree

11 files changed

+134
-72
lines changed

11 files changed

+134
-72
lines changed

‎mutexes/distributed-db/app/app.go‎

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package app
22

33
import (
44
"flag"
5+
"fmt"
56
"log"
67
"net/http"
78
"strconv"
@@ -15,20 +16,19 @@ import (
1516
)
1617

1718
func New() (*App, error) {
18-
varpeers models.StringList
19+
peers :=models.Peers{}
1920
port := flag.Int("port", 8080, "the port of the running server")
2021
flag.Var(&peers, "peer", "the list of peer servers to gossip to")
2122

2223
flag.Parse()
24+
if len(peers) < models.MinimumPeers {
25+
return nil, fmt.Errorf("need at least %d peer servers", models.MinimumPeers)
26+
}
2327

2428
addr := ":" + strconv.Itoa(*port)
25-
peersRepo, err := repositories.NewPeers(peers)
26-
if err != nil {
27-
return nil, err
28-
}
2929
cacheRepo := repositories.NewCache()
3030
httpClient := clients.NewHTTP("localhost" + addr)
31-
svc := services.NewCache(cacheRepo, peersRepo, httpClient)
31+
svc := services.NewCache(cacheRepo, httpClient, peers)
3232
router := controllers.NewRouter(svc)
3333
srv := &http.Server{
3434
Addr: addr,

‎mutexes/distributed-db/clients/http.go‎

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ func (c *HTTPClient) Get(peer string, key string) (models.CacheItem, error) {
4646
}
4747

4848
func (c *HTTPClient) Gossip(peer string, summary models.Summary) error {
49-
body := models.GossipRequest{
50-
Summary: summary,
51-
}
49+
body := models.GossipMessage{}
5250
req, err := c.makeRequest(http.MethodPost, c.url(peer, "gossip"), body)
5351
if err != nil {
5452
return err

‎mutexes/distributed-db/controllers/gossip.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ type cacheSummaryResolver interface {
1414

1515
func gossip(svc cacheSummaryResolver) http.HandlerFunc {
1616
return func(w http.ResponseWriter, r *http.Request) {
17-
var req models.GossipRequest
17+
var req models.GossipMessage
1818
err := json.NewDecoder(r.Body).Decode(&req)
1919
if err != nil {
2020
log.Printf("could not decode gossip request: %v", err)
2121
w.WriteHeader(http.StatusInternalServerError)
2222
return
2323
}
2424

25-
svc.ResolveSummary(r.Host, req.Summary)
25+
svc.ResolveSummary(r.Host, models.Summary{})
2626
}
2727
}

‎mutexes/distributed-db/controllers/set.go‎

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,17 @@ import (
44
"encoding/json"
55
"log"
66
"net/http"
7+
8+
"distributed-db/models"
79
)
810

911
type cacheSetter interface {
1012
Set(key, value string)
1113
}
1214

13-
type setRequest struct {
14-
Key string `json:"key"`
15-
Value string `json:"value"`
16-
}
17-
1815
func set(svc cacheSetter) http.HandlerFunc {
1916
return func(w http.ResponseWriter, r *http.Request) {
20-
var req setRequest
17+
var req models.SetRequest
2118
err := json.NewDecoder(r.Body).Decode(&req)
2219
if err != nil {
2320
log.Printf("could not decode set request: %v", err)

‎mutexes/distributed-db/models/flag.go‎

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,32 @@ import (
44
"strings"
55
)
66

7-
typeStringList []string
7+
constMinimumPeers=2
88

9-
func (l *StringList) Set(s string) error {
10-
*l = append(*l, s)
9+
// rename to Nodes
10+
type Peers map[string]struct{}
11+
12+
func (p Peers) Set(s string) error {
13+
p.Add(s)
1114
return nil
1215
}
1316

14-
func (l StringList) String() string {
15-
return strings.Join(l, ",")
17+
func (p Peers) String() string {
18+
return strings.Join(p.List(len(p)), ",")
19+
}
20+
21+
func (p Peers) Add(s string) {
22+
p[s] = struct{}{}
23+
}
24+
25+
func (p Peers) List(n int) []string {
26+
i, keys := 0, make([]string, 0, len(p))
27+
for k := range p {
28+
if i == n {
29+
break
30+
}
31+
keys = append(keys, k)
32+
i++
33+
}
34+
return keys
1635
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package models
2+
3+
import (
4+
"time"
5+
)
6+
7+
type GossipMessage struct {
8+
CreatedAt time.Time `json:"created_at"`
9+
Peers Peers `json:"peers"`
10+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package models
2+
3+
// partitioner that has a Hash function for determining on which node to store the data
4+
5+
type Partitioner struct {
6+
}
7+
8+
type Ring struct {
9+
}

‎mutexes/distributed-db/models/requests.go‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ type GetRequest struct {
44
Keys []string `json:"keys"`
55
}
66

7-
type GossipRequest struct {
8-
Summary Summary `json:"summary"`
7+
type SetRequest struct {
8+
Key string `json:"key"`
9+
Value string `json:"value"`
910
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"hash/fnv"
6+
"math"
7+
"math/rand"
8+
"sort"
9+
"time"
10+
)
11+
12+
func main() {
13+
tokens := NewTokens(5, 256)
14+
for i := 0; i < 50; i++ {
15+
key := fmt.Sprintf("k%d", i+1)
16+
sum := hash(key)
17+
fmt.Println(key, sum, tokens.Get(sum))
18+
}
19+
}
20+
21+
func NewTokens(numberOfNodes, numberOfTokenRanges int) Tokens {
22+
servers := make([]string, numberOfNodes)
23+
tokenRange := math.MaxInt / numberOfNodes / numberOfTokenRanges
24+
ranges := make([]uint64, 0, numberOfNodes*numberOfTokenRanges)
25+
for i := 0; i < numberOfNodes; i++ {
26+
servers[i] = fmt.Sprintf("server%d", i+1)
27+
for j := numberOfTokenRanges * i; j < numberOfTokenRanges*(i+1); j++ {
28+
r := tokenRange * (j + 1)
29+
ranges = append(ranges, uint64(r))
30+
}
31+
}
32+
33+
randomRanges := append([]uint64{}, ranges...)
34+
rand.Seed(time.Now().UnixNano())
35+
rand.Shuffle(len(randomRanges), func(i, j int) {
36+
randomRanges[i], randomRanges[j] = randomRanges[j], randomRanges[i]
37+
})
38+
39+
i, mappings := 0, map[uint64]string{}
40+
for _, r := range randomRanges {
41+
mappings[r] = servers[i]
42+
i++
43+
if i == numberOfNodes {
44+
i = 0
45+
}
46+
}
47+
48+
tokens := Tokens{
49+
Ranges: ranges,
50+
Mappings: mappings,
51+
}
52+
return tokens
53+
}
54+
55+
type Tokens struct {
56+
Ranges []uint64
57+
Mappings map[uint64]string
58+
}
59+
60+
func (t *Tokens) Get(n uint64) string {
61+
idx := sort.Search(len(t.Ranges)-1, func(i int) bool {
62+
return t.Ranges[i] >= n
63+
})
64+
return t.Mappings[t.Ranges[idx]]
65+
}
66+
67+
func hash(s string) uint64 {
68+
h := fnv.New64a()
69+
_, _ = h.Write([]byte(s))
70+
return h.Sum64()
71+
}

‎mutexes/distributed-db/repositories/peers.go‎

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /