Commit 97221e85 authored by ale's avatar ale

Update vendored dependencies

parent 64eb5fb2
......@@ -11,5 +11,6 @@
Damian Gryski <dgryski@gmail.com>
Google Inc.
Jan Mercl <0xjnml@gmail.com>
Klaus Post <klauspost@gmail.com>
Rodolfo Carvalho <rhcarvalho@gmail.com>
Sebastien Binet <seb.binet@gmail.com>
......@@ -29,6 +29,7 @@
Damian Gryski <dgryski@gmail.com>
Jan Mercl <0xjnml@gmail.com>
Kai Backman <kaib@golang.org>
Klaus Post <klauspost@gmail.com>
Marc-Antoine Ruel <maruel@chromium.org>
Nigel Tao <nigeltao@golang.org>
Rob Pike <r@golang.org>
......
......@@ -52,6 +52,8 @@ const (
// Otherwise, a newly allocated slice will be returned.
//
// The dst and src must not overlap. It is valid to pass a nil dst.
//
// Decode handles the Snappy block format, not the Snappy stream format.
func Decode(dst, src []byte) ([]byte, error) {
dLen, s, err := decodedLen(src)
if err != nil {
......@@ -83,6 +85,8 @@ func NewReader(r io.Reader) *Reader {
}
// Reader is an io.Reader that can read Snappy-compressed bytes.
//
// Reader handles the Snappy stream format, not the Snappy block format.
type Reader struct {
r io.Reader
err error
......
......@@ -85,14 +85,28 @@ func decode(dst, src []byte) int {
if offset <= 0 || d < offset || length > len(dst)-d {
return decodeErrCodeCorrupt
}
// Copy from an earlier sub-slice of dst to a later sub-slice. Unlike
// the built-in copy function, this byte-by-byte copy always runs
// Copy from an earlier sub-slice of dst to a later sub-slice.
// If no overlap, use the built-in copy:
if offset >= length {
copy(dst[d:d+length], dst[d-offset:])
d += length
continue
}
// Unlike the built-in copy function, this byte-by-byte copy always runs
// forwards, even if the slices overlap. Conceptually, this is:
//
// d += forwardCopy(dst[d:d+length], dst[d-offset:])
for end := d + length; d != end; d++ {
dst[d] = dst[d-offset]
//
// We align the slices into a and b and show the compiler they are the same size.
// This allows the loop to run without bounds checks.
a := dst[d : d+length]
b := dst[d-offset:]
b = b[:len(a)]
for i := range a {
a[i] = b[i]
}
d += length
}
if d != len(dst) {
return decodeErrCodeCorrupt
......
......@@ -15,6 +15,8 @@ import (
// Otherwise, a newly allocated slice will be returned.
//
// The dst and src must not overlap. It is valid to pass a nil dst.
//
// Encode handles the Snappy block format, not the Snappy stream format.
func Encode(dst, src []byte) []byte {
if n := MaxEncodedLen(len(src)); n < 0 {
panic(ErrTooLarge)
......@@ -139,6 +141,8 @@ func NewBufferedWriter(w io.Writer) *Writer {
}
// Writer is an io.Writer that can write Snappy-compressed bytes.
//
// Writer handles the Snappy stream format, not the Snappy block format.
type Writer struct {
w io.Writer
err error
......
module github.com/golang/snappy
......@@ -238,6 +238,11 @@ func newBatch() interface{} {
return &Batch{}
}
// MakeBatch returns empty batch with preallocated buffer.
func MakeBatch(n int) *Batch {
return &Batch{data: make([]byte, 0, n)}
}
func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
var index batchIndex
for i, o := 0, 0; o < len(data); i++ {
......
......@@ -38,6 +38,12 @@ type DB struct {
inWritePaused int32 // The indicator whether write operation is paused by compaction
aliveSnaps, aliveIters int32
// Compaction statistic
memComp uint32 // The cumulative number of memory compaction
level0Comp uint32 // The cumulative number of level0 compaction
nonLevel0Comp uint32 // The cumulative number of non-level0 compaction
seekComp uint32 // The cumulative number of seek compaction
// Session.
s *session
......@@ -468,7 +474,7 @@ func recoverTable(s *session, o *opt.Options) error {
}
// Commit.
return s.commit(rec)
return s.commit(rec, false)
}
func (db *DB) recoverJournal() error {
......@@ -538,7 +544,7 @@ func (db *DB) recoverJournal() error {
rec.setJournalNum(fd.Num)
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
if err := db.s.commit(rec, false); err != nil {
fr.Close()
return err
}
......@@ -617,7 +623,7 @@ func (db *DB) recoverJournal() error {
// Commit.
rec.setJournalNum(db.journalFd.Num)
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
if err := db.s.commit(rec, false); err != nil {
// Close journal on error.
if db.journal != nil {
db.journal.Close()
......@@ -872,6 +878,10 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
......@@ -953,15 +963,29 @@ func (db *DB) GetProperty(name string) (value string, err error) {
value = "Compactions\n" +
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
"-------+------------+---------------+---------------+---------------+---------------\n"
var totalTables int
var totalSize, totalRead, totalWrite int64
var totalDuration time.Duration
for level, tables := range v.levels {
duration, read, write := db.compStats.getStat(level)
if len(tables) == 0 && duration == 0 {
continue
}
totalTables += len(tables)
totalSize += tables.size()
totalRead += read
totalWrite += write
totalDuration += duration
value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
float64(read)/1048576.0, float64(write)/1048576.0)
}
value += "-------+------------+---------------+---------------+---------------+---------------\n"
value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(),
float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0)
case p == "compcount":
value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp))
case p == "iostats":
value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
float64(db.s.stor.reads())/1048576.0,
......@@ -1013,11 +1037,16 @@ type DBStats struct {
BlockCacheSize int
OpenedTablesCount int
LevelSizes []int64
LevelSizes Sizes
LevelTablesCounts []int
LevelRead []int64
LevelWrite []int64
LevelRead Sizes
LevelWrite Sizes
LevelDurations []time.Duration
MemComp uint32
Level0Comp uint32
NonLevel0Comp uint32
SeekComp uint32
}
// Stats populates s with database statistics.
......@@ -1054,16 +1083,17 @@ func (db *DB) Stats(s *DBStats) error {
for level, tables := range v.levels {
duration, read, write := db.compStats.getStat(level)
if len(tables) == 0 && duration == 0 {
continue
}
s.LevelDurations = append(s.LevelDurations, duration)
s.LevelRead = append(s.LevelRead, read)
s.LevelWrite = append(s.LevelWrite, write)
s.LevelSizes = append(s.LevelSizes, tables.size())
s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
}
s.MemComp = atomic.LoadUint32(&db.memComp)
s.Level0Comp = atomic.LoadUint32(&db.level0Comp)
s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp)
s.SeekComp = atomic.LoadUint32(&db.seekComp)
return nil
}
......
......@@ -8,6 +8,7 @@ package leveldb
import (
"sync"
"sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb/errors"
......@@ -260,7 +261,7 @@ func (db *DB) compactionCommit(name string, rec *sessionRecord) {
db.compCommitLk.Lock()
defer db.compCommitLk.Unlock() // Defer is necessary.
db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
return db.s.commit(rec)
return db.s.commit(rec, true)
}, nil)
}
......@@ -324,10 +325,12 @@ func (db *DB) memCompaction() {
db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
// Save compaction stats
for _, r := range rec.addedTables {
stats.write += r.size
}
db.compStats.addStat(flushLevel, stats)
atomic.AddUint32(&db.memComp, 1)
// Drop frozen memdb.
db.dropFrozenMem()
......@@ -588,6 +591,14 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
for i := range stats {
db.compStats.addStat(c.sourceLevel+1, &stats[i])
}
switch c.typ {
case level0Compaction:
atomic.AddUint32(&db.level0Comp, 1)
case nonLevel0Compaction:
atomic.AddUint32(&db.nonLevel0Comp, 1)
case seekCompaction:
atomic.AddUint32(&db.seekComp, 1)
}
}
func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
......
......@@ -78,13 +78,17 @@ func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Rang
}
rawIter := db.newRawIterator(auxm, auxt, islice, ro)
iter := &dbIter{
db: db,
icmp: db.s.icmp,
iter: rawIter,
seq: seq,
strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
key: make([]byte, 0),
value: make([]byte, 0),
db: db,
icmp: db.s.icmp,
iter: rawIter,
seq: seq,
strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
disableSampling: db.s.o.GetDisableSeeksCompaction() || db.s.o.GetIteratorSamplingRate() <= 0,
key: make([]byte, 0),
value: make([]byte, 0),
}
if !iter.disableSampling {
iter.samplingGap = db.iterSamplingRate()
}
atomic.AddInt32(&db.aliveIters, 1)
runtime.SetFinalizer(iter, (*dbIter).Release)
......@@ -107,13 +111,14 @@ const (
// dbIter represent an interator states over a database session.
type dbIter struct {
db *DB
icmp *iComparer
iter iterator.Iterator
seq uint64
strict bool
smaplingGap int
db *DB
icmp *iComparer
iter iterator.Iterator
seq uint64
strict bool
disableSampling bool
samplingGap int
dir dir
key []byte
value []byte
......@@ -122,10 +127,14 @@ type dbIter struct {
}
func (i *dbIter) sampleSeek() {
if i.disableSampling {
return
}
ikey := i.iter.Key()
i.smaplingGap -= len(ikey) + len(i.iter.Value())
for i.smaplingGap < 0 {
i.smaplingGap += i.db.iterSamplingRate()
i.samplingGap -= len(ikey) + len(i.iter.Value())
for i.samplingGap < 0 {
i.samplingGap += i.db.iterSamplingRate()
i.db.sampleSeek(ikey)
}
}
......
......@@ -142,6 +142,10 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Value() methods), its content should not be
// modified unless noted otherwise.
//
// The iterator must be released after use, by calling Release method.
// Releasing the snapshot doesn't mean releasing the iterator too, the
// iterator would be still valid until released.
......
......@@ -69,6 +69,13 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
// The returned iterator has locks on its own resources, so it can live beyond
// the lifetime of the transaction who creates them.
//
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
......@@ -205,7 +212,7 @@ func (tr *Transaction) Commit() error {
tr.stats.startTimer()
var cerr error
for retry := 0; retry < 3; retry++ {
cerr = tr.db.s.commit(&tr.rec)
cerr = tr.db.s.commit(&tr.rec, false)
if cerr != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
select {
......@@ -248,13 +255,14 @@ func (tr *Transaction) discard() {
// Discard transaction.
for _, t := range tr.tables {
tr.db.logf("transaction@discard @%d", t.fd.Num)
if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
tr.db.s.reuseFileNum(t.fd.Num)
}
// Iterator may still use the table, so we use tOps.remove here.
tr.db.s.tops.remove(t.fd)
}
}
// Discard discards the transaction.
// This method is noop if transaction is already closed (either committed or
// discarded)
//
// Other methods should not be called after transaction has been discarded.
func (tr *Transaction) Discard() {
......@@ -278,8 +286,10 @@ func (db *DB) waitCompaction() error {
// until in-flight transaction is committed or discarded.
// The returned transaction handle is safe for concurrent use.
//
// Transaction is expensive and can overwhelm compaction, especially if
// Transaction is very expensive and can overwhelm compaction, especially if
// transaction size is small. Use with caution.
// The rule of thumb is if you need to merge at least same amount of
// `Options.WriteBuffer` worth of data then use transaction, otherwise don't.
//
// The transaction must be closed once done, either by committing or discarding
// the transaction.
......
......@@ -16,7 +16,7 @@ func bloomHash(key []byte) uint32 {
type bloomFilter int
// The bloom filter serializes its parameters and is backward compatible
// Name: The bloom filter serializes its parameters and is backward compatible
// with respect to them. Therefor, its parameters are not added to its
// name.
func (bloomFilter) Name() string {
......
......@@ -397,6 +397,10 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) {
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
......
......@@ -278,6 +278,14 @@ type Options struct {
// The default is false.
DisableLargeBatchTransaction bool
// DisableSeeksCompaction allows disabling 'seeks triggered compaction'.
// The purpose of 'seeks triggered compaction' is to optimize database so
// that 'level seeks' can be minimized, however this might generate many
// small compaction which may not preferable.
//
// The default is false.
DisableSeeksCompaction bool
// ErrorIfExist defines whether an error should returned if the DB already
// exist.
//
......@@ -309,6 +317,8 @@ type Options struct {
// IteratorSamplingRate defines approximate gap (in bytes) between read
// sampling of an iterator. The samples will be used to determine when
// compaction should be triggered.
// Use negative value to disable iterator sampling.
// The iterator sampling is disabled if DisableSeeksCompaction is true.
//
// The default is 1MiB.
IteratorSamplingRate int
......@@ -526,6 +536,13 @@ func (o *Options) GetDisableLargeBatchTransaction() bool {
return o.DisableLargeBatchTransaction
}
func (o *Options) GetDisableSeeksCompaction() bool {
if o == nil {
return false
}
return o.DisableSeeksCompaction
}
func (o *Options) GetErrorIfExist() bool {
if o == nil {
return false
......@@ -548,8 +565,10 @@ func (o *Options) GetFilter() filter.Filter {
}
func (o *Options) GetIteratorSamplingRate() int {
if o == nil || o.IteratorSamplingRate <= 0 {
if o == nil || o.IteratorSamplingRate == 0 {
return DefaultIteratorSamplingRate
} else if o.IteratorSamplingRate < 0 {
return 0
}
return o.IteratorSamplingRate
}
......
......@@ -47,15 +47,24 @@ type session struct {
o *cachedOptions
icmp *iComparer
tops *tOps
fileRef map[int64]int
manifest *journal.Writer
manifestWriter storage.Writer
manifestFd storage.FileDesc
stCompPtrs []internalKey // compaction pointers; need external synchronization
stVersion *version // current version
vmu sync.Mutex
stCompPtrs []internalKey // compaction pointers; need external synchronization
stVersion *version // current version
ntVersionId int64 // next version id to assign
refCh chan *vTask
relCh chan *vTask
deltaCh chan *vDelta
abandon chan int64
closeC chan struct{}
closeW sync.WaitGroup
vmu sync.Mutex
// Testing fields
fileRefCh chan chan map[int64]int // channel used to pass current reference stat
}
// Creates new initialized session instance.
......@@ -68,13 +77,21 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
return
}
s = &session{
stor: newIStorage(stor),
storLock: storLock,
fileRef: make(map[int64]int),
stor: newIStorage(stor),
storLock: storLock,
refCh: make(chan *vTask),
relCh: make(chan *vTask),
deltaCh: make(chan *vDelta),
abandon: make(chan int64),
fileRefCh: make(chan chan map[int64]int),
closeC: make(chan struct{}),
}
s.setOptions(o)
s.tops = newTableOps(s)
s.setVersion(newVersion(s))
s.closeW.Add(1)
go s.refLoop()
s.setVersion(nil, newVersion(s))
s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
return
}
......@@ -90,7 +107,11 @@ func (s *session) close() {
}
s.manifest = nil
s.manifestWriter = nil
s.setVersion(&version{s: s, closing: true})
s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId})
// Close all background goroutines
close(s.closeC)
s.closeW.Wait()
}
// Release session lock.
......@@ -180,19 +201,27 @@ func (s *session) recover() (err error) {
}
s.manifestFd = fd
s.setVersion(staging.finish())
s.setVersion(rec, staging.finish(false))
s.setNextFileNum(rec.nextFileNum)
s.recordCommited(rec)
return nil
}
// Commit session; need external synchronization.
func (s *session) commit(r *sessionRecord) (err error) {
func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
v := s.version()
defer v.release()
// spawn new version based on current version
nv := v.spawn(r)
nv := v.spawn(r, trivial)
// abandon useless version id to prevent blocking version processing loop.
defer func() {
if err != nil {
s.abandon <- nv.id
s.logf("commit@abandon useless vid D%d", nv.id)
}
}()
if s.manifest == nil {
// manifest journal writer not yet created, create one
......@@ -203,7 +232,7 @@ func (s *session) commit(r *sessionRecord) (err error) {
// finally, apply new version if no error rise
if err == nil {
s.setVersion(nv)
s.setVersion(r, nv)
}
return
......
......@@ -14,6 +14,13 @@ import (
"github.com/syndtr/goleveldb/leveldb/opt"
)
const (
undefinedCompaction = iota
level0Compaction
nonLevel0Compaction
seekCompaction
)
func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
v := s.version()
defer v.release()
......@@ -50,6 +57,7 @@ func (s *session) pickCompaction() *compaction {
var sourceLevel int
var t0 tFiles
var typ int
if v.cScore >= 1 {
sourceLevel = v.cLevel
cptr := s.getCompPtr(sourceLevel)
......@@ -63,18 +71,24 @@ func (s *session) pickCompaction() *compaction {
if len(t0) == 0 {
t0 = append(t0, tables[0])
}
if sourceLevel == 0 {
typ = level0Compaction
} else {
typ = nonLevel0Compaction
}
} else {
if p := atomic.LoadPointer(&v.cSeek); p != nil {
ts := (*tSet)(p)
sourceLevel = ts.level
t0 = append(t0, ts.table)
typ = seekCompaction
} else {
v.release()
return nil
}
}
return newCompaction(s, v, sourceLevel, t0)
return newCompaction(s, v, sourceLevel, t0, typ)
}
// Create compaction from given level and range; need external synchronization.
......@@ -109,13 +123,18 @@ func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit
}
}
return newCompaction(s, v, sourceLevel, t0)
typ := level0Compaction
if sourceLevel != 0 {
typ = nonLevel0Compaction
}
return newCompaction(s, v, sourceLevel, t0, typ)
}
func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction {
func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles, typ int) *compaction {
c := &compaction{
s: s,
v: v,
typ: typ,
sourceLevel: sourceLevel,
levels: [2]tFiles{t0, nil},
maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
......@@ -131,6 +150,7 @@ type compaction struct {
s *session
v *version
typ int
sourceLevel int
levels [2]tFiles
maxGPOverlaps int64
......@@ -181,10 +201,14 @@ func (c *compaction) expand() {
t0, t1 := c.levels[0], c.levels[1]
imin, imax := t0.getRange(c.s.icmp)
// We expand t0 here just incase ukey hop across tables.
t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
if len(t0) != len(c.levels[0]) {
imin, imax = t0.getRange(c.s.icmp)
// For non-zero levels, the ukey can't hop across tables at all.
if c.sourceLevel == 0 {
// We expand t0 here just incase ukey hop across tables.
t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
if len(t0) != len(c.levels[0]) {
imin, imax = t0.getRange(c.s.icmp)
}
}
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
......
......@@ -9,6 +9,7 @@ package leveldb
import (
"fmt"
"sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/storage"
......@@ -39,19 +40,213 @@ func (s *session) newTemp() storage.FileDesc {
return storage.FileDesc{Type: storage.TypeTemp, Num: num}
}
func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
ref += s.fileRef[fd.Num]
if ref > 0 {
s.fileRef[fd.Num] = ref
} else if ref == 0 {
delete(s.fileRef, fd.Num)
} else {
panic(fmt.Sprintf("negative ref: %v", fd))
}
return ref
// Session state.
const (
// maxCachedNumber represents the maximum number of version tasks
// that can be cached in the ref loop.
maxCachedNumber = 256
// maxCachedTime represents the maximum time for ref loop to cache
// a version task.
maxCachedTime = 5 * time.Minute
)