Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
R
replds
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
2
Issues
2
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Incidents
Environments
Packages & Registries
Packages & Registries
Container Registry
Analytics
Analytics
CI / CD
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ai3
tools
replds
Commits
d052f131
Commit
d052f131
authored
Aug 15, 2019
by
ale
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Use pointers wherever necessary
Make the garbage collector do its job!
parent
ec12b1e3
Pipeline
#4074
passed with stages
in 1 minute and 27 seconds
Changes
5
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
50 additions
and
43 deletions
+50
-43
cmd/replds/client.go
cmd/replds/client.go
+13
-15
fs.go
fs.go
+6
-6
http.go
http.go
+1
-1
server.go
server.go
+24
-15
server_test.go
server_test.go
+6
-6
No files found.
cmd/replds/client.go
View file @
d052f131
...
...
@@ -14,31 +14,29 @@ import (
"git.autistici.org/ai3/tools/replds"
)
func
nodeFromFile
(
path
,
rootDir
string
)
(
replds
.
Node
,
error
)
{
var
n
replds
.
Node
func
nodeFromFile
(
path
,
rootDir
string
)
(
*
replds
.
Node
,
error
)
{
stat
,
err
:=
os
.
Stat
(
path
)
if
err
!=
nil
{
return
n
,
err
return
n
il
,
err
}
data
,
err
:=
ioutil
.
ReadFile
(
path
)
if
err
!=
nil
{
return
n
,
err
return
n
il
,
err
}
rel
:=
path
if
rootDir
!=
""
{
rel
,
err
=
filepath
.
Rel
(
rootDir
,
path
)
if
err
!=
nil
{
return
n
,
err
return
n
il
,
err
}
}
n
=
replds
.
Node
{
return
&
replds
.
Node
{
Path
:
rel
,
Value
:
data
,
Timestamp
:
stat
.
ModTime
(),
}
return
n
,
nil
},
nil
}
type
putCmd
struct
{
...
...
@@ -108,9 +106,7 @@ func (c *putCmd) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{})
return
subcommands
.
ExitSuccess
}
type
rmCmd
struct
{
root
string
}
type
rmCmd
struct
{}
func
(
*
rmCmd
)
Name
()
string
{
return
"rm"
}
func
(
*
rmCmd
)
Synopsis
()
string
{
return
"Delete one or more files."
}
...
...
@@ -144,7 +140,7 @@ func (c *rmCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) su
now
:=
time
.
Now
()
var
req
replds
.
SetNodesRequest
for
_
,
arg
:=
range
f
.
Args
()
{
req
.
Nodes
=
append
(
req
.
Nodes
,
replds
.
Node
{
req
.
Nodes
=
append
(
req
.
Nodes
,
&
replds
.
Node
{
Path
:
arg
,
Deleted
:
true
,
Timestamp
:
now
,
...
...
@@ -184,7 +180,7 @@ func (c *syncCmd) SetFlags(f *flag.FlagSet) {
func
(
c
*
syncCmd
)
scanPath
(
base
string
)
map
[
string
]
time
.
Time
{
m
:=
make
(
map
[
string
]
time
.
Time
)
filepath
.
Walk
(
base
,
func
(
path
string
,
info
os
.
FileInfo
,
err
error
)
error
{
if
err
:=
filepath
.
Walk
(
base
,
func
(
path
string
,
info
os
.
FileInfo
,
err
error
)
error
{
if
err
!=
nil
||
!
info
.
Mode
()
.
IsRegular
()
{
return
nil
}
...
...
@@ -192,7 +188,9 @@ func (c *syncCmd) scanPath(base string) map[string]time.Time {
m
[
rel
]
=
info
.
ModTime
()
}
return
nil
})
});
err
!=
nil
{
log
.
Printf
(
"error: %v"
,
err
)
}
return
m
}
...
...
@@ -245,7 +243,7 @@ func (c *syncCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{})
if
c
.
del
{
now
:=
time
.
Now
()
for
_
,
path
:=
range
removed
{
req
.
Nodes
=
append
(
req
.
Nodes
,
replds
.
Node
{
req
.
Nodes
=
append
(
req
.
Nodes
,
&
replds
.
Node
{
Path
:
path
,
Deleted
:
true
,
Timestamp
:
now
,
...
...
fs.go
View file @
d052f131
...
...
@@ -20,9 +20,9 @@ func newFS(dir string) *FS {
// getAllNodes scans the root directory and returns a list of
// Nodes. All errors are ignored.
func
(
f
*
FS
)
getAllNodes
()
[]
Node
{
var
nodes
[]
Node
dir
Len
:=
len
(
f
.
dir
)
func
(
f
*
FS
)
getAllNodes
()
[]
*
Node
{
var
nodes
[]
*
Node
dir
Offset
:=
len
(
f
.
dir
)
+
1
if
err
:=
filepath
.
Walk
(
f
.
dir
,
func
(
path
string
,
info
os
.
FileInfo
,
err
error
)
error
{
// Ignore errors.
if
err
!=
nil
{
...
...
@@ -32,8 +32,8 @@ func (f *FS) getAllNodes() []Node {
if
!
info
.
Mode
()
.
IsRegular
()
{
return
nil
}
nodes
=
append
(
nodes
,
Node
{
Path
:
path
[
dir
Len
+
1
:
],
nodes
=
append
(
nodes
,
&
Node
{
Path
:
path
[
dir
Offset
:
],
Timestamp
:
info
.
ModTime
(),
})
return
nil
...
...
@@ -44,7 +44,7 @@ func (f *FS) getAllNodes() []Node {
}
// setNode applies a Node to the underlying filesystem.
func
(
f
*
FS
)
setNode
(
node
Node
)
error
{
func
(
f
*
FS
)
setNode
(
node
*
Node
)
error
{
path
:=
filepath
.
Join
(
f
.
dir
,
node
.
Path
)
// Tombstone? Then delete the file (ignore errors).
...
...
http.go
View file @
d052f131
...
...
@@ -84,7 +84,7 @@ func (s *HTTPServer) Handler() http.Handler {
return
h
}
func
nodes2str
(
nodes
[]
Node
)
string
{
func
nodes2str
(
nodes
[]
*
Node
)
string
{
var
tmp
[]
string
for
_
,
node
:=
range
nodes
{
tmp
=
append
(
tmp
,
fmt
.
Sprintf
(
"%s@%d"
,
node
.
Path
,
node
.
Timestamp
.
Unix
()))
...
...
server.go
View file @
d052f131
...
...
@@ -34,6 +34,15 @@ type Node struct {
Deleted
bool
`json:"deleted,omitempty"`
}
func
(
n
*
Node
)
Copy
()
*
Node
{
return
&
Node
{
Path
:
n
.
Path
,
Value
:
n
.
Value
,
Timestamp
:
n
.
Timestamp
,
Deleted
:
n
.
Deleted
,
}
}
func
(
n
*
Node
)
withoutValue
()
*
Node
{
return
&
Node
{
Path
:
n
.
Path
,
...
...
@@ -42,29 +51,29 @@ func (n *Node) withoutValue() *Node {
}
}
func
(
n
*
Node
)
metadataOnly
()
Node
{
return
Node
{
func
(
n
*
Node
)
metadataOnly
()
*
Node
{
return
&
Node
{
Path
:
n
.
Path
,
Timestamp
:
n
.
Timestamp
,
}
}
type
internalGetNodesRequest
struct
{
Nodes
[]
Node
`json:"nodes"`
Nodes
[]
*
Node
`json:"nodes"`
}
type
internalGetNodesResponse
struct
{
Nodes
[]
Node
`json:"nodes"`
Partial
bool
`json:"partial,omitempty"`
Nodes
[]
*
Node
`json:"nodes"`
Partial
bool
`json:"partial,omitempty"`
}
type
internalUpdateNodesRequest
struct
{
Nodes
[]
Node
`json:"nodes"`
Nodes
[]
*
Node
`json:"nodes"`
}
// SetNodesRequest is the request type for the SetNodes method.
type
SetNodesRequest
struct
{
Nodes
[]
Node
`json:"nodes"`
Nodes
[]
*
Node
`json:"nodes"`
}
// SetNodesResponse is the response returned by the SetNodes method.
...
...
@@ -76,9 +85,9 @@ type SetNodesResponse struct {
var
errTooOld
=
errors
.
New
(
"a more recent value exists for this key"
)
type
storage
interface
{
getAllNodes
()
[]
Node
getAllNodes
()
[]
*
Node
getNodeValue
(
string
)
([]
byte
,
error
)
setNode
(
Node
)
error
setNode
(
*
Node
)
error
}
// Server for the replicated filesync.
...
...
@@ -120,7 +129,7 @@ func NewServer(peers []string, dir string, tlsConfig *clientutil.TLSClientConfig
}
for
_
,
node
:=
range
s
.
storage
.
getAllNodes
()
{
s
.
nodes
[
node
.
Path
]
=
&
node
s
.
nodes
[
node
.
Path
]
=
node
}
if
len
(
s
.
nodes
)
>
0
{
log
.
Printf
(
"found %d entries in %s"
,
len
(
s
.
nodes
),
dir
)
...
...
@@ -156,7 +165,7 @@ func (s *Server) log(fmt string, args ...interface{}) {
// Update one or more nodes as a "transaction": if one update fails,
// the rest are aborted (but previous commits are not reverted, yet).
// Will ping peers with the data when updatePeers is true.
func
(
s
*
Server
)
doSetNodes
(
ctx
context
.
Context
,
nodes
[]
Node
,
updatePeers
bool
)
(
*
SetNodesResponse
,
error
)
{
func
(
s
*
Server
)
doSetNodes
(
ctx
context
.
Context
,
nodes
[]
*
Node
,
updatePeers
bool
)
(
*
SetNodesResponse
,
error
)
{
// Update local state.
for
_
,
node
:=
range
nodes
{
cur
,
ok
:=
s
.
nodes
[
node
.
Path
]
...
...
@@ -216,13 +225,13 @@ func (s *Server) internalUpdateNodes(ctx context.Context, req *internalUpdateNod
return
err
}
func
(
s
*
Server
)
getNodeWithValue
(
node
*
Node
)
(
out
Node
,
err
error
)
{
out
=
*
node
func
(
s
*
Server
)
getNodeWithValue
(
node
*
Node
)
(
out
*
Node
,
err
error
)
{
out
=
node
.
Copy
()
out
.
Value
,
err
=
s
.
storage
.
getNodeValue
(
node
.
Path
)
return
}
func
nodeDiff
(
reqNodes
[]
Node
,
myNodes
map
[
string
]
*
Node
,
adder
func
(
*
Node
)
bool
)
([]
string
,
bool
)
{
func
nodeDiff
(
reqNodes
[]
*
Node
,
myNodes
map
[
string
]
*
Node
,
adder
func
(
*
Node
)
bool
)
([]
string
,
bool
)
{
var
missing
[]
string
tmp
:=
make
(
map
[
string
]
struct
{})
for
_
,
reqNode
:=
range
reqNodes
{
...
...
@@ -353,7 +362,7 @@ var (
maxTSValue
int64
)
func
updateMaxTimestamp
(
nodes
[]
Node
)
{
func
updateMaxTimestamp
(
nodes
[]
*
Node
)
{
var
max
int64
for
_
,
node
:=
range
nodes
{
t
:=
node
.
Timestamp
.
Unix
()
...
...
server_test.go
View file @
d052f131
...
...
@@ -61,7 +61,7 @@ func newTestCluster(t testing.TB, n int, baseDir string) *testCluster {
// Create root directories.
for
i
:=
0
;
i
<
n
;
i
++
{
dir
:=
filepath
.
Join
(
baseDir
,
fmt
.
Sprintf
(
"fs%d"
,
i
))
os
.
Mkdir
(
dir
,
0755
)
os
.
Mkdir
(
dir
,
0755
)
// nolint
c
.
dirs
=
append
(
c
.
dirs
,
dir
)
}
...
...
@@ -128,8 +128,8 @@ func TestCluster_SetNodes(t *testing.T) {
c
:=
s
.
client
(
t
,
0
)
resp
,
err
:=
c
.
SetNodes
(
context
.
Background
(),
&
SetNodesRequest
{
Nodes
:
[]
Node
{
{
Nodes
:
[]
*
Node
{
&
Node
{
Path
:
"test1"
,
Value
:
[]
byte
(
"test content"
),
Timestamp
:
time
.
Now
(),
...
...
@@ -159,8 +159,8 @@ func TestCluster_SetNodes_1Broken(t *testing.T) {
c
:=
s
.
client
(
t
,
0
)
resp
,
err
:=
c
.
SetNodes
(
context
.
Background
(),
&
SetNodesRequest
{
Nodes
:
[]
Node
{
{
Nodes
:
[]
*
Node
{
&
Node
{
Path
:
"test1"
,
Value
:
[]
byte
(
"test content"
),
Timestamp
:
time
.
Now
(),
...
...
@@ -207,7 +207,7 @@ func TestCluster_PartialSync(t *testing.T) {
}
var
req
SetNodesRequest
for
i
:=
0
;
i
<
count
;
i
++
{
req
.
Nodes
=
append
(
req
.
Nodes
,
Node
{
req
.
Nodes
=
append
(
req
.
Nodes
,
&
Node
{
Path
:
fmt
.
Sprintf
(
"test%02d"
,
i
),
Value
:
testVal
,
Timestamp
:
time
.
Now
(),
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment