From 9573f9c8695d508d5e7e3ec23241f90bda49099a Mon Sep 17 00:00:00 2001 From: renovate <renovate-bot@autistici.org> Date: Sun, 13 Nov 2022 13:15:44 +0000 Subject: [PATCH] Update module github.com/syndtr/goleveldb to v1 --- go.mod | 2 +- go.sum | 4 + vendor/github.com/golang/snappy/go.mod | 1 - .../syndtr/goleveldb/leveldb/batch.go | 5 - .../github.com/syndtr/goleveldb/leveldb/db.go | 46 +--- .../syndtr/goleveldb/leveldb/db_compaction.go | 13 +- .../syndtr/goleveldb/leveldb/db_iter.go | 43 ++-- .../goleveldb/leveldb/db_transaction.go | 16 +- .../syndtr/goleveldb/leveldb/filter/bloom.go | 2 +- .../syndtr/goleveldb/leveldb/opt/options.go | 21 +- .../syndtr/goleveldb/leveldb/session.go | 55 +--- .../goleveldb/leveldb/session_compaction.go | 38 +-- .../syndtr/goleveldb/leveldb/session_util.go | 240 +----------------- .../syndtr/goleveldb/leveldb/table.go | 91 +------ .../syndtr/goleveldb/leveldb/version.go | 81 ++---- vendor/modules.txt | 4 +- 16 files changed, 105 insertions(+), 557 deletions(-) delete mode 100644 vendor/github.com/golang/snappy/go.mod diff --git a/go.mod b/go.mod index f302ca6..67dab38 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,6 @@ require ( github.com/google/go-cmp v0.5.6 github.com/google/uuid v1.1.1 // indirect github.com/pborman/uuid v1.2.1 - github.com/syndtr/goleveldb v0.0.0-20190923125748-758128399b1d + github.com/syndtr/goleveldb v1.0.0 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 // indirect ) diff --git a/go.sum b/go.sum index b977e72..cd6cc2b 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= @@ -40,6 +42,8 @@ github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/syndtr/goleveldb v0.0.0-20190923125748-758128399b1d h1:OgkXbz/O0zsJoaB+z6n/a3bNGCbCWhBPLfGr6qaBprM= github.com/syndtr/goleveldb v0.0.0-20190923125748-758128399b1d/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/vendor/github.com/golang/snappy/go.mod b/vendor/github.com/golang/snappy/go.mod deleted file mode 100644 index f6406bb..0000000 --- a/vendor/github.com/golang/snappy/go.mod +++ /dev/null @@ -1 +0,0 @@ -module github.com/golang/snappy diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go index 823be93..2259200 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go @@ -238,11 +238,6 @@ func newBatch() interface{} { return &Batch{} } -// MakeBatch returns empty batch with preallocated buffer. -func MakeBatch(n int) *Batch { - return &Batch{data: make([]byte, 0, n)} -} - func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { var index batchIndex for i, o := 0, 0; o < len(data); i++ { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/vendor/github.com/syndtr/goleveldb/leveldb/db.go index 74e9826..90fedf7 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db.go @@ -38,12 +38,6 @@ type DB struct { inWritePaused int32 // The indicator whether write operation is paused by compaction aliveSnaps, aliveIters int32 - // Compaction statistic - memComp uint32 // The cumulative number of memory compaction - level0Comp uint32 // The cumulative number of level0 compaction - nonLevel0Comp uint32 // The cumulative number of non-level0 compaction - seekComp uint32 // The cumulative number of seek compaction - // Session. s *session @@ -474,7 +468,7 @@ func recoverTable(s *session, o *opt.Options) error { } // Commit. - return s.commit(rec, false) + return s.commit(rec) } func (db *DB) recoverJournal() error { @@ -544,7 +538,7 @@ func (db *DB) recoverJournal() error { rec.setJournalNum(fd.Num) rec.setSeqNum(db.seq) - if err := db.s.commit(rec, false); err != nil { + if err := db.s.commit(rec); err != nil { fr.Close() return err } @@ -623,7 +617,7 @@ func (db *DB) recoverJournal() error { // Commit. rec.setJournalNum(db.journalFd.Num) rec.setSeqNum(db.seq) - if err := db.s.commit(rec, false); err != nil { + if err := db.s.commit(rec); err != nil { // Close journal on error. if db.journal != nil { db.journal.Close() @@ -963,29 +957,15 @@ func (db *DB) GetProperty(name string) (value string, err error) { value = "Compactions\n" + " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + "-------+------------+---------------+---------------+---------------+---------------\n" - var totalTables int - var totalSize, totalRead, totalWrite int64 - var totalDuration time.Duration for level, tables := range v.levels { duration, read, write := db.compStats.getStat(level) if len(tables) == 0 && duration == 0 { continue } - totalTables += len(tables) - totalSize += tables.size() - totalRead += read - totalWrite += write - totalDuration += duration value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), float64(read)/1048576.0, float64(write)/1048576.0) } - value += "-------+------------+---------------+---------------+---------------+---------------\n" - value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", - totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(), - float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0) - case p == "compcount": - value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp)) case p == "iostats": value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f", float64(db.s.stor.reads())/1048576.0, @@ -1037,16 +1017,11 @@ type DBStats struct { BlockCacheSize int OpenedTablesCount int - LevelSizes Sizes + LevelSizes []int64 LevelTablesCounts []int - LevelRead Sizes - LevelWrite Sizes + LevelRead []int64 + LevelWrite []int64 LevelDurations []time.Duration - - MemComp uint32 - Level0Comp uint32 - NonLevel0Comp uint32 - SeekComp uint32 } // Stats populates s with database statistics. @@ -1083,17 +1058,16 @@ func (db *DB) Stats(s *DBStats) error { for level, tables := range v.levels { duration, read, write := db.compStats.getStat(level) - + if len(tables) == 0 && duration == 0 { + continue + } s.LevelDurations = append(s.LevelDurations, duration) s.LevelRead = append(s.LevelRead, read) s.LevelWrite = append(s.LevelWrite, write) s.LevelSizes = append(s.LevelSizes, tables.size()) s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables)) } - s.MemComp = atomic.LoadUint32(&db.memComp) - s.Level0Comp = atomic.LoadUint32(&db.level0Comp) - s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp) - s.SeekComp = atomic.LoadUint32(&db.seekComp) + return nil } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 6b70eb2..0c1b9a5 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -8,7 +8,6 @@ package leveldb import ( "sync" - "sync/atomic" "time" "github.com/syndtr/goleveldb/leveldb/errors" @@ -261,7 +260,7 @@ func (db *DB) compactionCommit(name string, rec *sessionRecord) { db.compCommitLk.Lock() defer db.compCommitLk.Unlock() // Defer is necessary. db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error { - return db.s.commit(rec, true) + return db.s.commit(rec) }, nil) } @@ -325,12 +324,10 @@ func (db *DB) memCompaction() { db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration) - // Save compaction stats for _, r := range rec.addedTables { stats.write += r.size } db.compStats.addStat(flushLevel, stats) - atomic.AddUint32(&db.memComp, 1) // Drop frozen memdb. db.dropFrozenMem() @@ -591,14 +588,6 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { for i := range stats { db.compStats.addStat(c.sourceLevel+1, &stats[i]) } - switch c.typ { - case level0Compaction: - atomic.AddUint32(&db.level0Comp, 1) - case nonLevel0Compaction: - atomic.AddUint32(&db.nonLevel0Comp, 1) - case seekCompaction: - atomic.AddUint32(&db.seekComp, 1) - } } func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go index e6e8ca5..03c24cd 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -78,17 +78,13 @@ func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Rang } rawIter := db.newRawIterator(auxm, auxt, islice, ro) iter := &dbIter{ - db: db, - icmp: db.s.icmp, - iter: rawIter, - seq: seq, - strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader), - disableSampling: db.s.o.GetDisableSeeksCompaction() || db.s.o.GetIteratorSamplingRate() <= 0, - key: make([]byte, 0), - value: make([]byte, 0), - } - if !iter.disableSampling { - iter.samplingGap = db.iterSamplingRate() + db: db, + icmp: db.s.icmp, + iter: rawIter, + seq: seq, + strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader), + key: make([]byte, 0), + value: make([]byte, 0), } atomic.AddInt32(&db.aliveIters, 1) runtime.SetFinalizer(iter, (*dbIter).Release) @@ -111,14 +107,13 @@ const ( // dbIter represent an interator states over a database session. type dbIter struct { - db *DB - icmp *iComparer - iter iterator.Iterator - seq uint64 - strict bool - disableSampling bool - - samplingGap int + db *DB + icmp *iComparer + iter iterator.Iterator + seq uint64 + strict bool + + smaplingGap int dir dir key []byte value []byte @@ -127,14 +122,10 @@ type dbIter struct { } func (i *dbIter) sampleSeek() { - if i.disableSampling { - return - } - ikey := i.iter.Key() - i.samplingGap -= len(ikey) + len(i.iter.Value()) - for i.samplingGap < 0 { - i.samplingGap += i.db.iterSamplingRate() + i.smaplingGap -= len(ikey) + len(i.iter.Value()) + for i.smaplingGap < 0 { + i.smaplingGap += i.db.iterSamplingRate() i.db.sampleSeek(ikey) } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go index 21d1e51..1a00001 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go @@ -69,9 +69,6 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) { // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // -// The returned iterator has locks on its own resources, so it can live beyond -// the lifetime of the transaction who creates them. -// // WARNING: Any slice returned by interator (e.g. slice returned by calling // Iterator.Key() or Iterator.Key() methods), its content should not be modified // unless noted otherwise. @@ -212,7 +209,7 @@ func (tr *Transaction) Commit() error { tr.stats.startTimer() var cerr error for retry := 0; retry < 3; retry++ { - cerr = tr.db.s.commit(&tr.rec, false) + cerr = tr.db.s.commit(&tr.rec) if cerr != nil { tr.db.logf("transaction@commit error R·%d %q", retry, cerr) select { @@ -255,14 +252,13 @@ func (tr *Transaction) discard() { // Discard transaction. for _, t := range tr.tables { tr.db.logf("transaction@discard @%d", t.fd.Num) - // Iterator may still use the table, so we use tOps.remove here. - tr.db.s.tops.remove(t.fd) + if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil { + tr.db.s.reuseFileNum(t.fd.Num) + } } } // Discard discards the transaction. -// This method is noop if transaction is already closed (either committed or -// discarded) // // Other methods should not be called after transaction has been discarded. func (tr *Transaction) Discard() { @@ -286,10 +282,8 @@ func (db *DB) waitCompaction() error { // until in-flight transaction is committed or discarded. // The returned transaction handle is safe for concurrent use. // -// Transaction is very expensive and can overwhelm compaction, especially if +// Transaction is expensive and can overwhelm compaction, especially if // transaction size is small. Use with caution. -// The rule of thumb is if you need to merge at least same amount of -// `Options.WriteBuffer` worth of data then use transaction, otherwise don't. // // The transaction must be closed once done, either by committing or discarding // the transaction. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go b/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go index 56ccbfb..bab0e99 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go @@ -16,7 +16,7 @@ func bloomHash(key []byte) uint32 { type bloomFilter int -// Name: The bloom filter serializes its parameters and is backward compatible +// The bloom filter serializes its parameters and is backward compatible // with respect to them. Therefor, its parameters are not added to its // name. func (bloomFilter) Name() string { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go index c02c1e9..528b164 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -278,14 +278,6 @@ type Options struct { // The default is false. DisableLargeBatchTransaction bool - // DisableSeeksCompaction allows disabling 'seeks triggered compaction'. - // The purpose of 'seeks triggered compaction' is to optimize database so - // that 'level seeks' can be minimized, however this might generate many - // small compaction which may not preferable. - // - // The default is false. - DisableSeeksCompaction bool - // ErrorIfExist defines whether an error should returned if the DB already // exist. // @@ -317,8 +309,6 @@ type Options struct { // IteratorSamplingRate defines approximate gap (in bytes) between read // sampling of an iterator. The samples will be used to determine when // compaction should be triggered. - // Use negative value to disable iterator sampling. - // The iterator sampling is disabled if DisableSeeksCompaction is true. // // The default is 1MiB. IteratorSamplingRate int @@ -536,13 +526,6 @@ func (o *Options) GetDisableLargeBatchTransaction() bool { return o.DisableLargeBatchTransaction } -func (o *Options) GetDisableSeeksCompaction() bool { - if o == nil { - return false - } - return o.DisableSeeksCompaction -} - func (o *Options) GetErrorIfExist() bool { if o == nil { return false @@ -565,10 +548,8 @@ func (o *Options) GetFilter() filter.Filter { } func (o *Options) GetIteratorSamplingRate() int { - if o == nil || o.IteratorSamplingRate == 0 { + if o == nil || o.IteratorSamplingRate <= 0 { return DefaultIteratorSamplingRate - } else if o.IteratorSamplingRate < 0 { - return 0 } return o.IteratorSamplingRate } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go index 7310209..3f391f9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go @@ -47,24 +47,15 @@ type session struct { o *cachedOptions icmp *iComparer tops *tOps + fileRef map[int64]int manifest *journal.Writer manifestWriter storage.Writer manifestFd storage.FileDesc - stCompPtrs []internalKey // compaction pointers; need external synchronization - stVersion *version // current version - ntVersionId int64 // next version id to assign - refCh chan *vTask - relCh chan *vTask - deltaCh chan *vDelta - abandon chan int64 - closeC chan struct{} - closeW sync.WaitGroup - vmu sync.Mutex - - // Testing fields - fileRefCh chan chan map[int64]int // channel used to pass current reference stat + stCompPtrs []internalKey // compaction pointers; need external synchronization + stVersion *version // current version + vmu sync.Mutex } // Creates new initialized session instance. @@ -77,21 +68,13 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { return } s = &session{ - stor: newIStorage(stor), - storLock: storLock, - refCh: make(chan *vTask), - relCh: make(chan *vTask), - deltaCh: make(chan *vDelta), - abandon: make(chan int64), - fileRefCh: make(chan chan map[int64]int), - closeC: make(chan struct{}), + stor: newIStorage(stor), + storLock: storLock, + fileRef: make(map[int64]int), } s.setOptions(o) s.tops = newTableOps(s) - - s.closeW.Add(1) - go s.refLoop() - s.setVersion(nil, newVersion(s)) + s.setVersion(newVersion(s)) s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") return } @@ -107,11 +90,7 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId}) - - // Close all background goroutines - close(s.closeC) - s.closeW.Wait() + s.setVersion(&version{s: s, closing: true}) } // Release session lock. @@ -201,27 +180,19 @@ func (s *session) recover() (err error) { } s.manifestFd = fd - s.setVersion(rec, staging.finish(false)) + s.setVersion(staging.finish()) s.setNextFileNum(rec.nextFileNum) s.recordCommited(rec) return nil } // Commit session; need external synchronization. -func (s *session) commit(r *sessionRecord, trivial bool) (err error) { +func (s *session) commit(r *sessionRecord) (err error) { v := s.version() defer v.release() // spawn new version based on current version - nv := v.spawn(r, trivial) - - // abandon useless version id to prevent blocking version processing loop. - defer func() { - if err != nil { - s.abandon <- nv.id - s.logf("commit@abandon useless vid D%d", nv.id) - } - }() + nv := v.spawn(r) if s.manifest == nil { // manifest journal writer not yet created, create one @@ -232,7 +203,7 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) { // finally, apply new version if no error rise if err == nil { - s.setVersion(r, nv) + s.setVersion(nv) } return diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go index 4c1d336..089cd00 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go @@ -14,13 +14,6 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" ) -const ( - undefinedCompaction = iota - level0Compaction - nonLevel0Compaction - seekCompaction -) - func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int { v := s.version() defer v.release() @@ -57,7 +50,6 @@ func (s *session) pickCompaction() *compaction { var sourceLevel int var t0 tFiles - var typ int if v.cScore >= 1 { sourceLevel = v.cLevel cptr := s.getCompPtr(sourceLevel) @@ -71,24 +63,18 @@ func (s *session) pickCompaction() *compaction { if len(t0) == 0 { t0 = append(t0, tables[0]) } - if sourceLevel == 0 { - typ = level0Compaction - } else { - typ = nonLevel0Compaction - } } else { if p := atomic.LoadPointer(&v.cSeek); p != nil { ts := (*tSet)(p) sourceLevel = ts.level t0 = append(t0, ts.table) - typ = seekCompaction } else { v.release() return nil } } - return newCompaction(s, v, sourceLevel, t0, typ) + return newCompaction(s, v, sourceLevel, t0) } // Create compaction from given level and range; need external synchronization. @@ -123,18 +109,13 @@ func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit } } - typ := level0Compaction - if sourceLevel != 0 { - typ = nonLevel0Compaction - } - return newCompaction(s, v, sourceLevel, t0, typ) + return newCompaction(s, v, sourceLevel, t0) } -func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles, typ int) *compaction { +func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction { c := &compaction{ s: s, v: v, - typ: typ, sourceLevel: sourceLevel, levels: [2]tFiles{t0, nil}, maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)), @@ -150,7 +131,6 @@ type compaction struct { s *session v *version - typ int sourceLevel int levels [2]tFiles maxGPOverlaps int64 @@ -201,14 +181,10 @@ func (c *compaction) expand() { t0, t1 := c.levels[0], c.levels[1] imin, imax := t0.getRange(c.s.icmp) - - // For non-zero levels, the ukey can't hop across tables at all. - if c.sourceLevel == 0 { - // We expand t0 here just incase ukey hop across tables. - t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0) - if len(t0) != len(c.levels[0]) { - imin, imax = t0.getRange(c.s.icmp) - } + // We expand t0 here just incase ukey hop across tables. + t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0) + if len(t0) != len(c.levels[0]) { + imin, imax = t0.getRange(c.s.icmp) } t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false) // Get entire range covered by compaction. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go index fc56b63..40cb2cf 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -9,7 +9,6 @@ package leveldb import ( "fmt" "sync/atomic" - "time" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/storage" @@ -40,214 +39,20 @@ func (s *session) newTemp() storage.FileDesc { return storage.FileDesc{Type: storage.TypeTemp, Num: num} } -// Session state. - -const ( - // maxCachedNumber represents the maximum number of version tasks - // that can be cached in the ref loop. - maxCachedNumber = 256 - - // maxCachedTime represents the maximum time for ref loop to cache - // a version task. - maxCachedTime = 5 * time.Minute -) - -// vDelta indicates the change information between the next version -// and the currently specified version -type vDelta struct { - vid int64 - added []int64 - deleted []int64 -} - -// vTask defines a version task for either reference or release. -type vTask struct { - vid int64 - files []tFiles - created time.Time -} - -func (s *session) refLoop() { - var ( - fileRef = make(map[int64]int) // Table file reference counter - ref = make(map[int64]*vTask) // Current referencing version store - deltas = make(map[int64]*vDelta) - referenced = make(map[int64]struct{}) - released = make(map[int64]*vDelta) // Released version that waiting for processing - abandoned = make(map[int64]struct{}) // Abandoned version id - next, last int64 - ) - // addFileRef adds file reference counter with specified file number and - // reference value - addFileRef := func(fnum int64, ref int) int { - ref += fileRef[fnum] - if ref > 0 { - fileRef[fnum] = ref - } else if ref == 0 { - delete(fileRef, fnum) - } else { - panic(fmt.Sprintf("negative ref: %v", fnum)) - } - return ref - } - // skipAbandoned skips useless abandoned version id. - skipAbandoned := func() bool { - if _, exist := abandoned[next]; exist { - delete(abandoned, next) - return true - } - return false - } - // applyDelta applies version change to current file reference. - applyDelta := func(d *vDelta) { - for _, t := range d.added { - addFileRef(t, 1) - } - for _, t := range d.deleted { - if addFileRef(t, -1) == 0 { - s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t}) - } - } - } - - timer := time.NewTimer(0) - <-timer.C // discard the initial tick - defer timer.Stop() - - // processTasks processes version tasks in strict order. - // - // If we want to use delta to reduce the cost of file references and dereferences, - // we must strictly follow the id of the version, otherwise some files that are - // being referenced will be deleted. - // - // In addition, some db operations (such as iterators) may cause a version to be - // referenced for a long time. In order to prevent such operations from blocking - // the entire processing queue, we will properly convert some of the version tasks - // into full file references and releases. - processTasks := func() { - timer.Reset(maxCachedTime) - // Make sure we don't cache too many version tasks. - for { - // Skip any abandoned version number to prevent blocking processing. - if skipAbandoned() { - next += 1 - continue - } - // Don't bother the version that has been released. - if _, exist := released[next]; exist { - break - } - // Ensure the specified version has been referenced. - if _, exist := ref[next]; !exist { - break - } - if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime { - break - } - // Convert version task into full file references and releases mode. - // Reference version(i+1) first and wait version(i) to release. - // FileRef(i+1) = FileRef(i) + Delta(i) - for _, tt := range ref[next].files { - for _, t := range tt { - addFileRef(t.fd.Num, 1) - } - } - // Note, if some compactions take a long time, even more than 5 minutes, - // we may miss the corresponding delta information here. - // Fortunately it will not affect the correctness of the file reference, - // and we can apply the delta once we receive it. - if d := deltas[next]; d != nil { - applyDelta(d) - } - referenced[next] = struct{}{} - delete(ref, next) - delete(deltas, next) - next += 1 - } - - // Use delta information to process all released versions. - for { - if skipAbandoned() { - next += 1 - continue - } - if d, exist := released[next]; exist { - if d != nil { - applyDelta(d) - } - delete(released, next) - next += 1 - continue - } - return - } - } - - for { - processTasks() - - select { - case t := <-s.refCh: - if _, exist := ref[t.vid]; exist { - panic("duplicate reference request") - } - ref[t.vid] = t - if t.vid > last { - last = t.vid - } - - case d := <-s.deltaCh: - if _, exist := ref[d.vid]; !exist { - if _, exist2 := referenced[d.vid]; !exist2 { - panic("invalid release request") - } - // The reference opt is already expired, apply - // delta here. - applyDelta(d) - continue - } - deltas[d.vid] = d - - case t := <-s.relCh: - if _, exist := referenced[t.vid]; exist { - for _, tt := range t.files { - for _, t := range tt { - if addFileRef(t.fd.Num, -1) == 0 { - s.tops.remove(t.fd) - } - } - } - delete(referenced, t.vid) - continue - } - if _, exist := ref[t.vid]; !exist { - panic("invalid release request") - } - released[t.vid] = deltas[t.vid] - delete(deltas, t.vid) - delete(ref, t.vid) - - case id := <-s.abandon: - if id >= next { - abandoned[id] = struct{}{} - } - - case <-timer.C: - - case r := <-s.fileRefCh: - ref := make(map[int64]int) - for f, c := range fileRef { - ref[f] = c - } - r <- ref - - case <-s.closeC: - s.closeW.Done() - return - } +func (s *session) addFileRef(fd storage.FileDesc, ref int) int { + ref += s.fileRef[fd.Num] + if ref > 0 { + s.fileRef[fd.Num] = ref + } else if ref == 0 { + delete(s.fileRef, fd.Num) + } else { + panic(fmt.Sprintf("negative ref: %v", fd)) } + return ref } +// Session state. + // Get current version. This will incr version ref, must call // version.release (exactly once) after use. func (s *session) version() *version { @@ -264,30 +69,13 @@ func (s *session) tLen(level int) int { } // Set current version to v. -func (s *session) setVersion(r *sessionRecord, v *version) { +func (s *session) setVersion(v *version) { s.vmu.Lock() defer s.vmu.Unlock() // Hold by session. It is important to call this first before releasing // current version, otherwise the still used files might get released. v.incref() if s.stVersion != nil { - if r != nil { - var ( - added = make([]int64, 0, len(r.addedTables)) - deleted = make([]int64, 0, len(r.deletedTables)) - ) - for _, t := range r.addedTables { - added = append(added, t.num) - } - for _, t := range r.deletedTables { - deleted = append(deleted, t.num) - } - select { - case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}: - case <-v.s.closeC: - s.log("reference loop already exist") - } - } // Release current version. s.stVersion.releaseNB() } @@ -308,7 +96,7 @@ func (s *session) setNextFileNum(num int64) { func (s *session) markFileNum(num int64) { nextFileNum := num + 1 for { - old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum + old, x := s.stNextFileNum, nextFileNum if old > x { x = old } @@ -326,7 +114,7 @@ func (s *session) allocFileNum() int64 { // Reuse given file number. func (s *session) reuseFileNum(num int64) { for { - old, x := atomic.LoadInt64(&s.stNextFileNum), num + old, x := s.stNextFileNum, num if old != x+1 { x = old } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table.go index b7759b2..1fac60d 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table.go @@ -7,7 +7,6 @@ package leveldb import ( - "bytes" "fmt" "sort" "sync/atomic" @@ -151,30 +150,6 @@ func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int { }) } -// Searches smallest index of tables whose its file number -// is smaller than the given number. -func (tf tFiles) searchNumLess(num int64) int { - return sort.Search(len(tf), func(i int) bool { - return tf[i].fd.Num < num - }) -} - -// Searches smallest index of tables whose its smallest -// key is after the given key. -func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int { - return sort.Search(len(tf), func(i int) bool { - return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0 - }) -} - -// Searches smallest index of tables whose its largest -// key is after the given key. -func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int { - return sort.Search(len(tf), func(i int) bool { - return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0 - }) -} - // Returns true if given key range overlaps with one or more // tables key range. If unsorted is true then binary search will not be used. func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool { @@ -206,50 +181,6 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo // expanded. // The dst content will be overwritten. func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { - // Short circuit if tf is empty - if len(tf) == 0 { - return nil - } - // For non-zero levels, there is no ukey hop across at all. - // And what's more, the files in these levels are strictly sorted, - // so use binary search instead of heavy traverse. - if !overlapped { - var begin, end int - // Determine the begin index of the overlapped file - if umin != nil { - index := tf.searchMinUkey(icmp, umin) - if index == 0 { - begin = 0 - } else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 { - // The min ukey overlaps with the index-1 file, expand it. - begin = index - 1 - } else { - begin = index - } - } - // Determine the end index of the overlapped file - if umax != nil { - index := tf.searchMaxUkey(icmp, umax) - if index == len(tf) { - end = len(tf) - } else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 { - // The max ukey overlaps with the index file, expand it. - end = index + 1 - } else { - end = index - } - } else { - end = len(tf) - } - // Ensure the overlapped file indexes are valid. - if begin >= end { - return nil - } - dst = make([]*tFile, end-begin) - copy(dst, tf[begin:end]) - return dst - } - dst = dst[:0] for i := 0; i < len(tf); { t := tf[i] @@ -262,9 +193,11 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 { umax = t.imax.ukey() // Restart search if it is overlapped. - dst = dst[:0] - i = 0 - continue + if overlapped { + dst = dst[:0] + i = 0 + continue + } } dst = append(dst, t) @@ -483,18 +416,16 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite // Removes table from persistent storage. It waits until // no one use the the table. -func (t *tOps) remove(fd storage.FileDesc) { - t.cache.Delete(0, uint64(fd.Num), func() { - if err := t.s.stor.Remove(fd); err != nil { - t.s.logf("table@remove removing @%d %q", fd.Num, err) +func (t *tOps) remove(f *tFile) { + t.cache.Delete(0, uint64(f.fd.Num), func() { + if err := t.s.stor.Remove(f.fd); err != nil { + t.s.logf("table@remove removing @%d %q", f.fd.Num, err) } else { - t.s.logf("table@remove removed @%d", fd.Num) + t.s.logf("table@remove removed @%d", f.fd.Num) } if t.evictRemoved && t.bcache != nil { - t.bcache.EvictNS(uint64(fd.Num)) + t.bcache.EvictNS(uint64(f.fd.Num)) } - // Try to reuse file num, useful for discarded transaction. - t.s.reuseFileNum(fd.Num) }) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/vendor/github.com/syndtr/goleveldb/leveldb/version.go index 9535e35..73f272a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/version.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/version.go @@ -9,7 +9,6 @@ package leveldb import ( "fmt" "sync/atomic" - "time" "unsafe" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -23,8 +22,7 @@ type tSet struct { } type version struct { - id int64 // unique monotonous increasing version id - s *session + s *session levels []tFiles @@ -41,11 +39,8 @@ type version struct { released bool } -// newVersion creates a new version with an unique monotonous increasing id. func newVersion(s *session) *version { - id := atomic.AddInt64(&s.ntVersionId, 1) - nv := &version{s: s, id: id - 1} - return nv + return &version{s: s} } func (v *version) incref() { @@ -55,11 +50,11 @@ func (v *version) incref() { v.ref++ if v.ref == 1 { - select { - case v.s.refCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}: - // We can use v.levels directly here since it is immutable. - case <-v.s.closeC: - v.s.log("reference loop already exist") + // Incr file ref. + for _, tt := range v.levels { + for _, t := range tt { + v.s.addFileRef(t.fd, 1) + } } } } @@ -71,11 +66,13 @@ func (v *version) releaseNB() { } else if v.ref < 0 { panic("negative version ref") } - select { - case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}: - // We can use v.levels directly here since it is immutable. - case <-v.s.closeC: - v.s.log("reference loop already exist") + + for _, tt := range v.levels { + for _, t := range tt { + if v.s.addFileRef(t.fd, -1) == 0 { + v.s.tops.remove(t) + } + } } v.released = true @@ -144,7 +141,6 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue } ukey := ikey.ukey() - sampleSeeks := !v.s.o.GetDisableSeeksCompaction() var ( tset *tSet @@ -162,7 +158,7 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue // Since entries never hop across level, finding key/value // in smaller level make later levels irrelevant. v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool { - if sampleSeeks && level >= 0 && !tseek { + if level >= 0 && !tseek { if tset == nil { tset = &tSet{level, t} } else { @@ -277,10 +273,10 @@ func (v *version) newStaging() *versionStaging { } // Spawn a new version based on this version. -func (v *version) spawn(r *sessionRecord, trivial bool) *version { +func (v *version) spawn(r *sessionRecord) *version { staging := v.newStaging() staging.commit(r) - return staging.finish(trivial) + return staging.finish() } func (v *version) fillRecord(r *sessionRecord) { @@ -450,7 +446,7 @@ func (p *versionStaging) commit(r *sessionRecord) { } } -func (p *versionStaging) finish(trivial bool) *version { +func (p *versionStaging) finish() *version { // Build new version. nv := newVersion(p.base.s) numLevel := len(p.levels) @@ -467,12 +463,6 @@ func (p *versionStaging) finish(trivial bool) *version { if level < len(p.levels) { scratch := p.levels[level] - // Short circuit if there is no change at all. - if len(scratch.added) == 0 && len(scratch.deleted) == 0 { - nv.levels[level] = baseTabels - continue - } - var nt tFiles // Prealloc list if possible. if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 { @@ -490,41 +480,6 @@ func (p *versionStaging) finish(trivial bool) *version { nt = append(nt, t) } - // Avoid resort if only files in this level are deleted - if len(scratch.added) == 0 { - nv.levels[level] = nt - continue - } - - // For normal table compaction, one compaction will only involve two levels - // of files. And the new files generated after merging the source level and - // source+1 level related files can be inserted as a whole into source+1 level - // without any overlap with the other source+1 files. - // - // When the amount of data maintained by leveldb is large, the number of files - // per level will be very large. While qsort is very inefficient for sorting - // already ordered arrays. Therefore, for the normal table compaction, we use - // binary search here to find the insert index to insert a batch of new added - // files directly instead of using qsort. - if trivial && len(scratch.added) > 0 { - added := make(tFiles, 0, len(scratch.added)) - for _, r := range scratch.added { - added = append(added, tableFileFromRecord(r)) - } - if level == 0 { - added.sortByNum() - index := nt.searchNumLess(added[len(added)-1].fd.Num) - nt = append(nt[:index], append(added, nt[index:]...)...) - } else { - added.sortByKey(p.base.s.icmp) - _, amax := added.getRange(p.base.s.icmp) - index := nt.searchMin(p.base.s.icmp, amax) - nt = append(nt[:index], append(added, nt[index:]...)...) - } - nv.levels[level] = nt - continue - } - // New tables. for _, r := range scratch.added { nt = append(nt, tableFileFromRecord(r)) diff --git a/vendor/modules.txt b/vendor/modules.txt index 1d2f65d..f69b957 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -8,7 +8,7 @@ github.com/PuerkitoBio/purell ## explicit # github.com/andybalholm/cascadia v1.2.0 github.com/andybalholm/cascadia -# github.com/golang/snappy v0.0.1 +# github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/golang/snappy # github.com/google/go-cmp v0.5.6 ## explicit @@ -23,7 +23,7 @@ github.com/google/uuid # github.com/pborman/uuid v1.2.1 ## explicit github.com/pborman/uuid -# github.com/syndtr/goleveldb v0.0.0-20190923125748-758128399b1d +# github.com/syndtr/goleveldb v1.0.0 ## explicit github.com/syndtr/goleveldb/leveldb github.com/syndtr/goleveldb/leveldb/cache -- GitLab