Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 1f0acfe

Browse files
big time changes, not sure if the app works at this point
1 parent 898d064 commit 1f0acfe

File tree

12 files changed

+460
-73
lines changed

12 files changed

+460
-73
lines changed

‎mutexes/distributed-db/app/app.go‎

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ func New() (*App, error) {
2323

2424
flag.Parse()
2525

26-
if len(nodesMap) < 1 {
27-
return nil, fmt.Errorf("need at least 1 node to talk to")
28-
}
29-
3026
addr := fmt.Sprintf("localhost:%d", *port)
3127
if *dataDir == "" {
3228
*dataDir = fmt.Sprintf(".data/%s", addr)
3329
}
3430

31+
delete(nodesMap, addr)
32+
if len(nodesMap) < 1 {
33+
return nil, fmt.Errorf("need at least 1 node to talk to")
34+
}
35+
3536
nodes := models.NewNodes(addr, nodesMap)
3637
tokens := models.NewTokens(nodes, 256)
3738
cacheRepo := repositories.NewCache(*dataDir)
@@ -42,11 +43,13 @@ func New() (*App, error) {
4243
Addr: addr,
4344
Handler: router,
4445
}
45-
w := workers.NewGossip(svc)
46+
gossipWorker := workers.NewGossip(svc)
47+
streamerWorker := workers.NewStreamer(svc)
4648
a := &App{
47-
Server: srv,
48-
Worker: w,
49-
cacheRepo: cacheRepo,
49+
Server: srv,
50+
GossipWorker: gossipWorker,
51+
StreamerWorker: streamerWorker,
52+
cacheRepo: cacheRepo,
5053
}
5154

5255
return a, nil
@@ -57,13 +60,15 @@ type snapshotter interface {
5760
}
5861

5962
type App struct {
60-
Server *http.Server
61-
Worker workers.Gossip
62-
cacheRepo snapshotter
63+
Server *http.Server
64+
GossipWorker workers.Gossip
65+
StreamerWorker workers.Streamer
66+
cacheRepo snapshotter
6367
}
6468

6569
func (a App) Start(ctx context.Context) error {
66-
go a.Worker.Start(ctx)
70+
go a.GossipWorker.Start(ctx)
71+
go a.StreamerWorker.Start(ctx)
6772

6873
log.Println("server started on address", a.Server.Addr)
6974
err := a.Server.ListenAndServe()

‎mutexes/distributed-db/clients/http.go‎

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,27 @@ type HTTPClient struct {
2222
httpClient *http.Client
2323
}
2424

25+
func (c *HTTPClient) Get(node string, keys []string) ([]models.CacheItem, error) {
26+
body := models.GetRequest{Keys: keys}
27+
req, err := c.makeRequest(http.MethodGet, c.url(node, "get"), body)
28+
if err != nil {
29+
return []models.CacheItem{}, err
30+
}
31+
32+
res, err := c.httpClient.Do(req)
33+
if err != nil {
34+
return []models.CacheItem{}, err
35+
}
36+
37+
var cacheItems []models.CacheItem
38+
err = json.NewDecoder(res.Body).Decode(&cacheItems)
39+
if err != nil {
40+
return []models.CacheItem{}, err
41+
}
42+
43+
return cacheItems, nil
44+
}
45+
2546
func (c *HTTPClient) Set(node string, key, value string) (models.CacheItem, error) {
2647
body := models.SetRequest{
2748
Key: key,
@@ -47,9 +68,9 @@ func (c *HTTPClient) Set(node string, key, value string) (models.CacheItem, erro
4768
return item, nil
4869
}
4970

50-
func (c *HTTPClient) Get(node string, keys []string) ([]models.CacheItem, error) {
51-
body := models.GetRequest{Keys: keys}
52-
req, err := c.makeRequest(http.MethodGet, c.url(node, "get"), body)
71+
func (c *HTTPClient) SetBatch(node string, itemsmap[int]models.CacheItem) ([]models.CacheItem, error) {
72+
body := models.SetBatchRequest{Items: items}
73+
req, err := c.makeRequest(http.MethodPost, c.url(node, "set/batch"), body)
5374
if err != nil {
5475
return []models.CacheItem{}, err
5576
}
@@ -60,7 +81,7 @@ func (c *HTTPClient) Get(node string, keys []string) ([]models.CacheItem, error)
6081
}
6182

6283
var cacheItems []models.CacheItem
63-
err = json.NewDecoder(res.Body).Decode(&cacheItems)
84+
err = json.NewDecoder(res.Body).Decode(&items)
6485
if err != nil {
6586
return []models.CacheItem{}, err
6687
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
"net/http"
7+
8+
"distributed-db/models"
9+
)
10+
11+
type cacheRemover interface {
12+
Delete(keys []string)
13+
}
14+
15+
func remove(svc cacheRemover) http.HandlerFunc {
16+
return func(w http.ResponseWriter, r *http.Request) {
17+
var req models.DeleteRequest
18+
err := json.NewDecoder(r.Body).Decode(&req)
19+
if err != nil {
20+
log.Printf("could not decode delete request: %v", err)
21+
w.WriteHeader(http.StatusInternalServerError)
22+
return
23+
}
24+
25+
svc.Delete(req.Keys)
26+
w.WriteHeader(http.StatusOK)
27+
}
28+
}

‎mutexes/distributed-db/controllers/get.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ func get(svc cacheGetter) http.HandlerFunc {
2222
return
2323
}
2424

25-
values := svc.Get(req.Keys)
25+
items := svc.Get(req.Keys)
2626

2727
w.Header().Set("Content-Type", "application/json")
28-
err = json.NewEncoder(w).Encode(values)
28+
err = json.NewEncoder(w).Encode(items)
2929
if err != nil {
3030
log.Printf("could not encode json: %v", err)
3131
w.WriteHeader(http.StatusInternalServerError)

‎mutexes/distributed-db/controllers/router.go‎

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,20 @@ import (
77
type CacheService interface {
88
cacheGetter
99
cacheSetter
10+
cacheBatchSetter
11+
cacheRemover
1012
tokensGetter
1113
tokensUpdater
1214
}
1315

1416
func NewRouter(svc CacheService) http.Handler {
1517
mux := http.NewServeMux()
16-
mux.HandleFunc("/set", set(svc))
1718
mux.HandleFunc("/get", get(svc))
19+
mux.HandleFunc("/set", set(svc))
20+
mux.HandleFunc("/delete", remove(svc))
21+
// add a middleware that does not allow
22+
// requests that do not come from other nodes
23+
mux.HandleFunc("/set/batch", setBatch(svc))
1824
mux.HandleFunc("/gossip", gossip(svc))
1925
mux.HandleFunc("/tokens", tokens(svc))
2026

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
"net/http"
7+
8+
"distributed-db/models"
9+
)
10+
11+
type cacheBatchSetter interface {
12+
SetBatch(items map[int]models.CacheItem) []models.CacheItem
13+
}
14+
15+
func setBatch(svc cacheBatchSetter) http.HandlerFunc {
16+
return func(w http.ResponseWriter, r *http.Request) {
17+
var req models.SetBatchRequest
18+
err := json.NewDecoder(r.Body).Decode(&req)
19+
if err != nil {
20+
log.Printf("could not decode set batch request: %v", err)
21+
w.WriteHeader(http.StatusInternalServerError)
22+
return
23+
}
24+
25+
items := svc.SetBatch(req.Items)
26+
log.Printf("successfully stored batch")
27+
28+
w.Header().Set("Content-Type", "application/json")
29+
err = json.NewEncoder(w).Encode(items)
30+
if err != nil {
31+
log.Printf("could not encode set batch response: %v", err)
32+
}
33+
}
34+
}

‎mutexes/distributed-db/models/cache.go‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ import (
77
type CacheItem struct {
88
Key string `json:"key"`
99
Value string `json:"value"`
10-
UpdatedAt time.Time `json:"updated_at"`
10+
UpdatedAt time.Time `json:"updated_at,omitempty"`
1111
Node string `json:"node,omitempty"`
1212
}

‎mutexes/distributed-db/models/requests.go‎

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,37 @@
11
package models
22

3+
import (
4+
"time"
5+
)
6+
37
type GetRequest struct {
48
Keys []string `json:"keys"`
5-
// how many reads before returning
9+
// how many reads before returning (replication factor > 1) => TO BE IMPLEMENTED
10+
ConsistencyLevel int `json:"-"`
11+
}
12+
13+
type DeleteRequest struct {
14+
Keys []string `json:"keys"`
15+
// how many deletes before returning (replication factor > 1) => TO BE IMPLEMENTED
616
ConsistencyLevel int `json:"-"`
717
}
818

919
type SetRequest struct {
1020
Key string `json:"key"`
1121
Value string `json:"value"`
12-
// how many copies for this cache item
22+
// how many copies for this cache item [TO BE IMPLEMENTED]
23+
ReplicationFactor int `json:"-"`
24+
// how many writes before returning (replication factor > 1) => TO BE IMPLEMENTED
25+
ConsistencyLevel int `json:"-"`
26+
// for short-lived records [TO BE IMPLEMENTED]
27+
TTL time.Duration `json:"-"`
28+
}
29+
30+
type SetBatchRequest struct {
31+
Items map[int]CacheItem
32+
// how many copies for this cache item [TO BE IMPLEMENTED]
1333
ReplicationFactor int `json:"-"`
14-
// how many writes before returning
34+
// how many writes before returning (replication factor > 1) => TO BE IMPLEMENTED
1535
ConsistencyLevel int `json:"-"`
1636
}
1737

‎mutexes/distributed-db/models/tokens.go‎

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,6 @@ import (
1313
"time"
1414
)
1515

16-
type TokenMappings map[int]string
17-
18-
type Tokens struct {
19-
Mappings TokenMappings
20-
Nodes *Nodes
21-
ranges []int
22-
numberOfTokenRanges int
23-
}
24-
2516
func NewTokens(nodes *Nodes, numberOfTokenRanges int) *Tokens {
2617
nodeList := append([]string{nodes.Current()}, nodes.ListAll()...)
2718
tokenRange := math.MaxInt / len(nodeList) / numberOfTokenRanges
@@ -58,13 +49,32 @@ func NewTokens(nodes *Nodes, numberOfTokenRanges int) *Tokens {
5849
return tokens
5950
}
6051

61-
func (t *Tokens) GetNode(key string) string {
62-
token := HashKey(key)
63-
idx := sort.SearchInts(t.ranges, int(token))
52+
type TokenMappings map[int]string
53+
54+
type Tokens struct {
55+
Mappings TokenMappings
56+
ForeignTokens TokenMappings
57+
Nodes *Nodes
58+
ranges []int
59+
numberOfTokenRanges int
60+
}
61+
62+
func (t *Tokens) GetNode(token int) string {
63+
idx := sort.SearchInts(t.ranges, token)
6464
node := t.Mappings[t.ranges[idx]]
6565
return node
6666
}
6767

68+
func (t *Tokens) SetForeignTokens(items map[int]CacheItem, node string) {
69+
for token, _ := range items {
70+
t.ForeignTokens[token] = node
71+
}
72+
}
73+
74+
func (t *Tokens) DeleteForeignToken(token int) {
75+
delete(t.ForeignTokens, token)
76+
}
77+
6878
func (t *Tokens) Merge(mappings map[int]string) {
6979
newMappings := map[int]string{}
7080
nodes := t.Nodes.Map()

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /