Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 4e4aa18

Browse files
add distributed database example barebones
1 parent babfc90 commit 4e4aa18

File tree

17 files changed

+634
-0
lines changed

17 files changed

+634
-0
lines changed

‎mutexes/README.md‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ the database consistency which otherwise is not possible for a non-serial schedu
124124
- [Schedule - Wiki](https://en.wikipedia.org/wiki/Schedule_(computer_science))
125125
- [Recoverability - Wiki](https://en.wikipedia.org/wiki/Schedule_(computer_science)#Recoverable)
126126
- [2PL - Wiki](https://en.wikipedia.org/wiki/Two-phase_locking)
127+
- [2PC - Wiki](https://en.wikipedia.org/wiki/Two-phase_commit_protocol)
128+
- [Gossip Protocol - Wiki](https://en.wikipedia.org/wiki/Gossip_protocol)
127129
- [Transaction Processing - Wiki](https://en.wikipedia.org/wiki/Transaction_processing)
128130
- [Pessimistic vs Optimistic Locking - StackOverflow](https://stackoverflow.com/questions/129329/optimistic-vs-pessimistic-locking)
129131
- [Pessimistic vs Optimistic Locking - StackOverflow Explanation](https://stackoverflow.com/a/58952004)

‎mutexes/distributed-db/README.md‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Distributed Cache Database
2+
3+
### Drawbacks
4+
5+
- HTTP is not the most efficient protocol for communication and data transfer
6+
- Strings (JSON) are not the most efficient way for data transfer
7+
- Updates get lost if any peer servers are down
8+
- Updates get lost if the host becomes unavailable for the peer server resolving the summary
9+
- Snapshot-ing can cause data loss if the process is interrupted or killed (NOT DURABLE)
10+
- Keeping all the data in memory is inefficient and dangerous
11+
- Set and Get operations don't have any kind of Data Consistency control (READ/WRITE acknowledgements)
12+
- The Set operation does not have an option to control replication factor (number of copies on each peer)
13+
- There's no partitioning strategy, like how the data is paged/stored on the server for fast WRITE and READ access

‎mutexes/distributed-db/app/app.go‎

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package app
2+
3+
import (
4+
"flag"
5+
"log"
6+
"net/http"
7+
"strconv"
8+
9+
"distributed-db/clients"
10+
"distributed-db/controllers"
11+
"distributed-db/models"
12+
"distributed-db/repositories"
13+
"distributed-db/services"
14+
"distributed-db/workers"
15+
)
16+
17+
func New() (*App, error) {
18+
var peers models.StringList
19+
port := flag.Int("port", 8080, "the port of the running server")
20+
flag.Var(&peers, "peer", "the list of peer servers to gossip to")
21+
22+
flag.Parse()
23+
24+
addr := ":" + strconv.Itoa(*port)
25+
peersRepo, err := repositories.NewPeers(peers)
26+
if err != nil {
27+
return nil, err
28+
}
29+
cacheRepo := repositories.NewCache()
30+
httpClient := clients.NewHTTP("localhost" + addr)
31+
svc := services.NewCache(cacheRepo, peersRepo, httpClient)
32+
router := controllers.NewRouter(svc)
33+
srv := &http.Server{
34+
Addr: addr,
35+
Handler: router,
36+
}
37+
w := workers.NewGossip(svc)
38+
a := &App{
39+
Server: srv,
40+
Worker: w,
41+
}
42+
43+
return a, nil
44+
}
45+
46+
type App struct {
47+
Server *http.Server
48+
Worker workers.Gossip
49+
}
50+
51+
func (a App) Start() error {
52+
go a.Worker.Start()
53+
54+
log.Println("server started on address", a.Server.Addr)
55+
err := a.Server.ListenAndServe()
56+
if err != nil {
57+
return err
58+
}
59+
60+
return nil
61+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package clients
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"net/http"
7+
"net/url"
8+
9+
"distributed-db/models"
10+
)
11+
12+
func NewHTTP(host string) *HTTPClient {
13+
client := HTTPClient{
14+
host: host,
15+
httpClient: &http.Client{},
16+
}
17+
return &client
18+
}
19+
20+
type HTTPClient struct {
21+
host string
22+
httpClient *http.Client
23+
}
24+
25+
func (c *HTTPClient) Get(peer string, key string) (models.CacheItem, error) {
26+
body := models.GetRequest{
27+
Keys: []string{key},
28+
}
29+
req, err := c.makeRequest(http.MethodGet, c.url(peer, "get"), body)
30+
if err != nil {
31+
return models.CacheItem{}, err
32+
}
33+
34+
res, err := c.httpClient.Do(req)
35+
if err != nil {
36+
return models.CacheItem{}, err
37+
}
38+
39+
var cacheItem []models.CacheItem
40+
err = json.NewDecoder(res.Body).Decode(&cacheItem)
41+
if err != nil {
42+
return models.CacheItem{}, err
43+
}
44+
45+
return cacheItem[0], nil
46+
}
47+
48+
func (c *HTTPClient) Gossip(peer string, summary models.Summary) error {
49+
body := models.GossipRequest{
50+
Summary: summary,
51+
}
52+
req, err := c.makeRequest(http.MethodPost, c.url(peer, "gossip"), body)
53+
if err != nil {
54+
return err
55+
}
56+
57+
_, err = c.httpClient.Do(req)
58+
if err != nil {
59+
return err
60+
}
61+
62+
return nil
63+
}
64+
65+
func (c *HTTPClient) url(peer, path string) string {
66+
u := url.URL{
67+
Scheme: "http",
68+
Host: peer,
69+
Path: path,
70+
}
71+
return u.String()
72+
}
73+
74+
func (c *HTTPClient) makeRequest(method, url string, body interface{}) (*http.Request, error) {
75+
bs, err := json.Marshal(body)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
req, err := http.NewRequest(method, url, bytes.NewReader(bs))
81+
if err != nil {
82+
return nil, err
83+
}
84+
req.Host = c.host
85+
86+
return req, nil
87+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
"net/http"
7+
8+
"distributed-db/models"
9+
)
10+
11+
type cacheGetter interface {
12+
Get(keys []string) []models.CacheItem
13+
}
14+
15+
func get(svc cacheGetter) http.HandlerFunc {
16+
return func(w http.ResponseWriter, r *http.Request) {
17+
var req models.GetRequest
18+
err := json.NewDecoder(r.Body).Decode(&req)
19+
if err != nil {
20+
log.Printf("could not decode get request: %v", err)
21+
w.WriteHeader(http.StatusInternalServerError)
22+
return
23+
}
24+
25+
values := svc.Get(req.Keys)
26+
27+
w.Header().Set("Content-Type", "application/json")
28+
err = json.NewEncoder(w).Encode(values)
29+
if err != nil {
30+
log.Printf("could not encode json: %v", err)
31+
w.WriteHeader(http.StatusInternalServerError)
32+
}
33+
}
34+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
"net/http"
7+
8+
"distributed-db/models"
9+
)
10+
11+
type cacheSummaryResolver interface {
12+
ResolveSummary(peer string, summary models.Summary)
13+
}
14+
15+
func gossip(svc cacheSummaryResolver) http.HandlerFunc {
16+
return func(w http.ResponseWriter, r *http.Request) {
17+
var req models.GossipRequest
18+
err := json.NewDecoder(r.Body).Decode(&req)
19+
if err != nil {
20+
log.Printf("could not decode gossip request: %v", err)
21+
w.WriteHeader(http.StatusInternalServerError)
22+
return
23+
}
24+
25+
svc.ResolveSummary(r.Host, req.Summary)
26+
}
27+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package controllers
2+
3+
import (
4+
"net/http"
5+
)
6+
7+
type CacheService interface {
8+
cacheGetter
9+
cacheSetter
10+
cacheSummaryResolver
11+
}
12+
13+
func NewRouter(svc CacheService) http.Handler {
14+
mux := http.NewServeMux()
15+
mux.HandleFunc("/set", set(svc))
16+
mux.HandleFunc("/get", get(svc))
17+
mux.HandleFunc("/gossip", gossip(svc))
18+
19+
return mux
20+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package controllers
2+
3+
import (
4+
"encoding/json"
5+
"log"
6+
"net/http"
7+
)
8+
9+
type cacheSetter interface {
10+
Set(key, value string)
11+
}
12+
13+
type setRequest struct {
14+
Key string `json:"key"`
15+
Value string `json:"value"`
16+
}
17+
18+
func set(svc cacheSetter) http.HandlerFunc {
19+
return func(w http.ResponseWriter, r *http.Request) {
20+
var req setRequest
21+
err := json.NewDecoder(r.Body).Decode(&req)
22+
if err != nil {
23+
log.Printf("could not decode set request: %v", err)
24+
w.WriteHeader(http.StatusInternalServerError)
25+
return
26+
}
27+
28+
svc.Set(req.Key, req.Value)
29+
30+
log.Printf("successfully stored record with key: %s", req.Key)
31+
w.WriteHeader(http.StatusOK)
32+
}
33+
}

‎mutexes/distributed-db/go.mod‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module distributed-db
2+
3+
go 1.17

‎mutexes/distributed-db/main.go‎

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
"distributed-db/app"
7+
)
8+
9+
func main() {
10+
a, err := app.New()
11+
if err != nil {
12+
log.Fatalf("could not create the app: %v", err)
13+
}
14+
15+
err = a.Start()
16+
if err != nil {
17+
log.Fatalf("could not start the app: %v", err)
18+
}
19+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /