Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit f139daa

Browse files
add token ranges in the distribute-db project
1 parent b9517de commit f139daa

File tree

18 files changed

+326
-211
lines changed

18 files changed

+326
-211
lines changed

‎mutexes/distributed-db/README.md‎

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,25 @@
1111
- Set and Get operations don't have any kind of Data Consistency control (READ/WRITE acknowledgements)
1212
- The Set operation does not have an option to control replication factor (number of copies on each peer)
1313
- There's no partitioning strategy, like how the data is paged/stored on the server for fast WRITE and READ access
14+
15+
```text
16+
GOSSIP ONLY spreads information about the nodes
17+
DATA REPLICATION is done through REPLICATION FACTOR and CONSISTENCY LEVEL (WRITE/READ CONSISTENCY LEVEL)
18+
19+
For a really efficient database like Cassandra
20+
we need to implement COMPACTION and SSTables and have an INDEX and MemTable
21+
COMPACTION (runs in the background):
22+
Merge multiple SSTables into bigger SSTables -> update the index
23+
SSTables:
24+
SSTables are sorted string tables stored on Disk once the MemTable is flushed.
25+
When MemTable is flushed -> create/update the INDEX
26+
MemTable:
27+
Represents In-Memory data of the database
28+
INDEX:
29+
Represents a map[Key:SSTable Offset] (kept on disk)
30+
The INDEX is kept on disk because its size can be big
31+
SUMMARY
32+
Represents an index for the INDEX or a map[N Keys Range (Bucket) : INDEX FILE] => map[0:index1], map[100:index2], map[200:index3]
33+
COMMIT LOG
34+
Append only file in case the MemTable gets lost. It is destroyed after the MemTable is flushed
35+
```

‎mutexes/distributed-db/app/app.go‎

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"log"
77
"net/http"
8-
"strconv"
98

109
"distributed-db/clients"
1110
"distributed-db/controllers"
@@ -16,19 +15,22 @@ import (
1615
)
1716

1817
func New() (*App, error) {
19-
peers := models.Peers{}
18+
nodes := models.Nodes{Map: map[string]struct{}{}}
2019
port := flag.Int("port", 8080, "the port of the running server")
21-
flag.Var(&peers, "peer", "the list of peer servers to gossip to")
20+
flag.Var(&nodes, "node", "the list of nodes to talk to")
2221

2322
flag.Parse()
24-
if len(peers) < models.MinimumPeers {
25-
return nil, fmt.Errorf("need at least %d peer servers", models.MinimumPeers)
23+
if len(nodes.Map) < 1 {
24+
return nil, fmt.Errorf("need at least 1 node to talk to")
2625
}
2726

28-
addr := ":" + strconv.Itoa(*port)
27+
28+
addr := fmt.Sprintf("localhost:%d", *port)
29+
nodes.CurrentNode = addr
30+
tokens := models.NewTokens(nodes, 5)
2931
cacheRepo := repositories.NewCache()
30-
httpClient := clients.NewHTTP("localhost"+addr)
31-
svc := services.NewCache(cacheRepo, httpClient, peers)
32+
httpClient := clients.NewHTTP(addr)
33+
svc := services.NewCache(cacheRepo, httpClient, tokens)
3234
router := controllers.NewRouter(svc)
3335
srv := &http.Server{
3436
Addr: addr,

‎mutexes/distributed-db/clients/http.go‎

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"net/http"
77
"net/url"
8+
"time"
89

910
"distributed-db/models"
1011
)
@@ -22,11 +23,11 @@ type HTTPClient struct {
2223
httpClient *http.Client
2324
}
2425

25-
func (c *HTTPClient) Get(peer string, key string) (models.CacheItem, error) {
26+
func (c *HTTPClient) Get(node string, key string) (models.CacheItem, error) {
2627
body := models.GetRequest{
2728
Keys: []string{key},
2829
}
29-
req, err := c.makeRequest(http.MethodGet, c.url(peer, "get"), body)
30+
req, err := c.makeRequest(http.MethodGet, c.url(node, "get"), body)
3031
if err != nil {
3132
return models.CacheItem{}, err
3233
}
@@ -45,9 +46,13 @@ func (c *HTTPClient) Get(peer string, key string) (models.CacheItem, error) {
4546
return cacheItem[0], nil
4647
}
4748

48-
func (c *HTTPClient) Gossip(peer string, summary models.Summary) error {
49-
body := models.GossipMessage{}
50-
req, err := c.makeRequest(http.MethodPost, c.url(peer, "gossip"), body)
49+
func (c *HTTPClient) Gossip(node string, nodes []string, tokensChecksum string) error {
50+
body := models.GossipRequest{
51+
Nodes: nodes,
52+
CreatedAt: time.Now().UTC(),
53+
TokensChecksum: tokensChecksum,
54+
}
55+
req, err := c.makeRequest(http.MethodPost, c.url(node, "gossip"), body)
5156
if err != nil {
5257
return err
5358
}
@@ -60,10 +65,30 @@ func (c *HTTPClient) Gossip(peer string, summary models.Summary) error {
6065
return nil
6166
}
6267

63-
func (c *HTTPClient) url(peer, path string) string {
68+
func (c *HTTPClient) Tokens(node string) (models.TokenMappings, error) {
69+
req, err := c.makeRequest(http.MethodGet, c.url(node, "tokens"), nil)
70+
if err != nil {
71+
return models.TokenMappings{}, err
72+
}
73+
74+
res, err := c.httpClient.Do(req)
75+
if err != nil {
76+
return models.TokenMappings{}, err
77+
}
78+
79+
var tokensRes models.TokensResponse
80+
err = json.NewDecoder(res.Body).Decode(&tokensRes)
81+
if err != nil {
82+
return models.TokenMappings{}, err
83+
}
84+
85+
return tokensRes.Tokens, nil
86+
}
87+
88+
func (c *HTTPClient) url(node, path string) string {
6489
u := url.URL{
6590
Scheme: "http",
66-
Host: peer,
91+
Host: node,
6792
Path: path,
6893
}
6994
return u.String()

‎mutexes/distributed-db/controllers/gossip.go‎

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,33 @@ import (
88
"distributed-db/models"
99
)
1010

11-
type cacheSummaryResolver interface {
12-
ResolveSummary(peer string, summary models.Summary)
11+
type tokensUpdater interface {
12+
UpdateTokens(node string, newNodes []string, tokensChecksumstring) ([]string, error)
1313
}
1414

15-
func gossip(svc cacheSummaryResolver) http.HandlerFunc {
15+
func gossip(svc tokensUpdater) http.HandlerFunc {
1616
return func(w http.ResponseWriter, r *http.Request) {
17-
var req models.GossipMessage
17+
var req models.GossipRequest
1818
err := json.NewDecoder(r.Body).Decode(&req)
1919
if err != nil {
2020
log.Printf("could not decode gossip request: %v", err)
2121
w.WriteHeader(http.StatusInternalServerError)
2222
return
2323
}
2424

25-
svc.ResolveSummary(r.Host, models.Summary{})
25+
oldNodes, err := svc.UpdateTokens(r.Host, req.Nodes, req.TokensChecksum)
26+
if err != nil {
27+
log.Printf("could not update tokens: %v", err)
28+
w.WriteHeader(http.StatusInternalServerError)
29+
return
30+
}
31+
32+
w.Header().Set("Content-Type", "application/json")
33+
res := models.GossipResponse{Nodes: oldNodes}
34+
err = json.NewEncoder(w).Encode(res)
35+
if err != nil {
36+
log.Printf("could not encode gossip response: %v", err)
37+
w.WriteHeader(http.StatusInternalServerError)
38+
}
2639
}
2740
}

‎mutexes/distributed-db/controllers/router.go‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ import (
77
type CacheService interface {
88
cacheGetter
99
cacheSetter
10-
cacheSummaryResolver
10+
tokensGetter
11+
tokensUpdater
1112
}
1213

1314
func NewRouter(svc CacheService) http.Handler {
1415
mux := http.NewServeMux()
1516
mux.HandleFunc("/set", set(svc))
1617
mux.HandleFunc("/get", get(svc))
1718
mux.HandleFunc("/gossip", gossip(svc))
19+
mux.HandleFunc("/tokens", tokens(svc))
1820

1921
return mux
2022
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
"net/http"
7+
8+
"distributed-db/models"
9+
)
10+
11+
type tokensGetter interface {
12+
GetTokens() map[uint64]string
13+
}
14+
15+
func tokens(svc tokensGetter) http.HandlerFunc {
16+
return func(w http.ResponseWriter, r *http.Request) {
17+
res := models.TokensResponse{
18+
Tokens: svc.GetTokens(),
19+
}
20+
21+
w.Header().Set("Content-Type", "application/json")
22+
err := json.NewEncoder(w).Encode(res)
23+
if err != nil {
24+
log.Printf("could not encode gossip response: %v", err)
25+
w.WriteHeader(http.StatusInternalServerError)
26+
}
27+
}
28+
}

‎mutexes/distributed-db/models/cache.go‎

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,4 @@ type CacheItem struct {
88
Key string `json:"key"`
99
Value string `json:"value"`
1010
UpdatedAt time.Time `json:"updated_at"`
11-
BucketID int `json:"-"`
1211
}
13-
14-
type Summary map[string]time.Time

‎mutexes/distributed-db/models/flag.go‎

Lines changed: 0 additions & 35 deletions
This file was deleted.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package models
2+
3+
import (
4+
"strings"
5+
)
6+
7+
type Nodes struct {
8+
Map map[string]struct{}
9+
CurrentNode string
10+
}
11+
12+
func (n Nodes) Set(s string) error {
13+
n.Add(s)
14+
return nil
15+
}
16+
17+
func (n Nodes) String() string {
18+
return strings.Join(n.List(len(n.Map)), ",")
19+
}
20+
21+
func (n Nodes) Add(node string) {
22+
if node == n.CurrentNode {
23+
return
24+
}
25+
n.Map[node] = struct{}{}
26+
}
27+
28+
func (n Nodes) List(a int) []string {
29+
i, keys := 0, make([]string, 0, len(n.Map))
30+
for k := range n.Map {
31+
if i == a {
32+
break
33+
}
34+
keys = append(keys, k)
35+
i++
36+
}
37+
return keys
38+
}

‎mutexes/distributed-db/models/gossip.go‎

Lines changed: 0 additions & 10 deletions
This file was deleted.

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /