南七星之家 发表于 2024-7-28 19:58:54

golang常用库之-KV数据库之pebble

golang常用库之-KV数据库之pebble

rocksdb是一款由Facebook使用C/C++开发的嵌入式的长期化的KV数据库。
Pebble 是 Cockroach 参考 RocksDB 并用 Go 语言开发的高性能 KV 存储引擎。
pebble

github地点:https://github.com/cockroachdb/pebble
Pebble 是一个受 LevelDB/RocksDB 开导的键值存储,专注于 CockroachDB 的性能和内部使用。Pebble 继承了 RocksDB 文件格式和一些扩展名,比方范围删除逻辑删除、表级绽放过滤器和 MANIFEST 格式更新。
Pebble 故意不寻求在 RocksDB 中包含所有功能,而是专门针对 CockroachDB 所需的用例和功能集, RocksDB 有大量 Pebble 中没有实现的功能。
Pebble 对 RocksDB 举行了多项改进!
Pebble 在 CockroachDB v20.1(2020 年 5 月发布)中作为 RocksDB 的替代存储引擎引入,并在当时成功用于生产。Pebble 在 CockroachDB v20.2(2020 年 11 月发布)中成为默认存储引擎。Pebble 正在被 CockroachDB 的用户大规模地用于生产,而且被认为是稳定且可用于生产的。
官方示例代码

package main

import (
        "fmt"
        "log"

        "github.com/cockroachdb/pebble"
)

func main() {
        db, err := pebble.Open("demo", &pebble.Options{})
        if err != nil {
                log.Fatal(err)
        }
        key := []byte("hello")
        if err := db.Set(key, []byte("world"), pebble.Sync); err != nil {
                log.Fatal(err)
        }
        value, closer, err := db.Get(key)
        if err != nil {
                log.Fatal(err)
        }
        fmt.Printf("%s %s\n", key, value)
        if err := closer.Close(); err != nil {
                log.Fatal(err)
        }
        if err := db.Close(); err != nil {
                log.Fatal(err)
        }
}
实战

来自项目https://github.com/NethermindEth/juno 的
db/pebble/db.go
package pebble

import (
        "sync"
        "testing"

        "github.com/NethermindEth/juno/db"
        "github.com/NethermindEth/juno/utils"
        "github.com/cockroachdb/pebble"
        "github.com/cockroachdb/pebble/vfs"
)

const (
        // minCache is the minimum amount of memory in megabytes to allocate to pebble read and write caching.
        minCache = 8
)

var _ db.DB = (*DB)(nil)

type DB struct {
        pebble   *pebble.DB
        wMutex   *sync.Mutex
        listener db.EventListener
}

// New opens a new database at the given path
func New(path string, cache uint, maxOpenFiles int, logger pebble.Logger) (db.DB, error) {
        // Ensure that the specified cache size meets a minimum threshold.
        cache = max(minCache, cache)
        pDB, err := newPebble(path, &pebble.Options{
                Logger:       logger,
                Cache:      pebble.NewCache(int64(cache * utils.Megabyte)),
                MaxOpenFiles: maxOpenFiles,
        })
        if err != nil {
                return nil, err
        }
        return pDB, nil
}

// NewMem opens a new in-memory database
func NewMem() (db.DB, error) {
        return newPebble("", &pebble.Options{
                FS: vfs.NewMem(),
        })
}

// NewMemTest opens a new in-memory database, panics on error
func NewMemTest(t *testing.T) db.DB {
        memDB, err := NewMem()
        if err != nil {
                t.Fatalf("create in-memory db: %v", err)
        }
        t.Cleanup(func() {
                if err := memDB.Close(); err != nil {
                        t.Errorf("close in-memory db: %v", err)
                }
        })
        return memDB
}

func newPebble(path string, options *pebble.Options) (*DB, error) {
        pDB, err := pebble.Open(path, options)
        if err != nil {
                return nil, err
        }
        return &DB{pebble: pDB, wMutex: new(sync.Mutex), listener: &db.SelectiveListener{}}, nil
}

// WithListener registers an EventListener
func (d *DB) WithListener(listener db.EventListener) db.DB {
        d.listener = listener
        return d
}

// NewTransaction : see db.DB.NewTransaction
func (d *DB) NewTransaction(update bool) (db.Transaction, error) {
        txn := &Transaction{
                listener: d.listener,
        }
        if update {
                d.wMutex.Lock()
                txn.lock = d.wMutex
                txn.batch = d.pebble.NewIndexedBatch()
        } else {
                txn.snapshot = d.pebble.NewSnapshot()
        }

        return txn, nil
}

// Close : see io.Closer.Close
func (d *DB) Close() error {
        return d.pebble.Close()
}

// View : see db.DB.View
func (d *DB) View(fn func(txn db.Transaction) error) error {
        return db.View(d, fn)
}

// Update : see db.DB.Update
func (d *DB) Update(fn func(txn db.Transaction) error) error {
        return db.Update(d, fn)
}

// Impl : see db.DB.Impl
func (d *DB) Impl() any {
        return d.pebble
}


[*] DB 布局体:

[*]pebble: Pebble 数据库的实例。
[*]wMutex: 用于控制写操作的互斥锁。
[*]listener: 数据库事件监听器。

[*] New 函数:

[*]用于打开一个给定路径的数据库。
[*]确保指定的缓存巨细满足最小阈值要求。
[*]创建一个新的 Pebble 实例,并返回数据库实例。

[*] newPebble 函数: 用于创建一个新的 Pebble 实例
func newPebble(path string, options *pebble.Options) (*DB, error) {
        pDB, err := pebble.Open(path, options)
        if err != nil {
                return nil, err
        }
        return &DB{pebble: pDB, wMutex: new(sync.Mutex), listener: &db.SelectiveListener{}}, nil
}
使用 pebble.Open 函数打开一个 Pebble 数据库,传入给定的路径和选项。
假如打开成功,则创建一个新的 DB 布局体实例,并初始化其中的字段:
pebble 字段使用刚刚打开的 Pebble 数据库实例。
wMutex 字段为一个新的互斥锁。
listener 字段为一个新的 db.SelectiveListener 实例的指针。


[*]WithListener 方法:
用于注册一个事件监听器。
[*]Close: 关闭数据库。
[*]View: 执行数据库的只读操作。
[*]Update: 执行数据库的读写操作。
[*]Impl: 返回底层的 Pebble 实例。
[*]NewTransaction: 创建一个新的事件。
NewTransaction 比较特殊是自定义业务有关的方法, “NewTransaction 返回一个在该数据库上的事件。假如请求创建一个更新事件,而另一个更新事件正在举行中,那么该方法应该会被阻塞。”
func (d *DB) NewTransaction(update bool) (db.Transaction, error) {
        txn := &Transaction{
                listener: d.listener,
        }
        if update {
                d.wMutex.Lock()
                txn.lock = d.wMutex
                txn.batch = d.pebble.NewIndexedBatch()
        } else {
                txn.snapshot = d.pebble.NewSnapshot()
        }

        return txn, nil
}


[*]假如 update 为 true,则表现创建一个更新事件。此时,会获取 DB 对象的写入互斥锁 wMutex,将该互斥锁赋给事件对象的 lock 字段,并使用 Pebble 数据库的 NewIndexedBatch 方法创建一个新的批处置惩罚对象,并将其赋给事件对象的 batch 字段。
[*]假如 update 为 false,则表现创建一个只读事件。此时,会使用 Pebble 数据库的 NewSnapshot 方法创建一个新的快照对象,并将其赋给事件对象的 snapshot 字段。
   如许做的目的是为了包管数据库的一致性和隔离性。假如多个更新事件同时举行,可能会导致数据库状态出现不一致的情况。通过阻塞方式,可以确保同一时间只有一个更新事件在举行。
db/pebble/transaction.go
package pebble

import (
        "errors"
        "io"
        "sync"
        "time"

        "github.com/NethermindEth/juno/db"
        "github.com/NethermindEth/juno/utils"
        "github.com/cockroachdb/pebble"
)

var ErrDiscardedTransaction = errors.New("discarded txn")

var _ db.Transaction = (*Transaction)(nil)

type Transaction struct {
        batch    *pebble.Batch
        snapshot *pebble.Snapshot
        lock   *sync.Mutex
        listener db.EventListener
}

// Discard : see db.Transaction.Discard
func (t *Transaction) Discard() error {
        if t.batch != nil {
                if err := t.batch.Close(); err != nil {
                        return err
                }
                t.batch = nil
        }
        if t.snapshot != nil {
                if err := t.snapshot.Close(); err != nil {
                        return err
                }
                t.snapshot = nil
        }

        if t.lock != nil {
                t.lock.Unlock()
                t.lock = nil
        }
        return nil
}

// Commit : see db.Transaction.Commit
func (t *Transaction) Commit() error {
        start := time.Now()
        defer func() { t.listener.OnCommit(time.Since(start)) }()
        if t.batch != nil {
                return utils.RunAndWrapOnError(t.Discard, t.batch.Commit(pebble.Sync))
        }
        return utils.RunAndWrapOnError(t.Discard, ErrDiscardedTransaction)
}

// Set : see db.Transaction.Set
func (t *Transaction) Set(key, val []byte) error {
        start := time.Now()
        if t.batch == nil {
                return errors.New("read only transaction")
        }
        if len(key) == 0 {
                return errors.New("empty key")
        }

        defer func() { t.listener.OnIO(true, time.Since(start)) }()
        return t.batch.Set(key, val, pebble.Sync)
}

// Delete : see db.Transaction.Delete
func (t *Transaction) Delete(key []byte) error {
        start := time.Now()
        if t.batch == nil {
                return errors.New("read only transaction")
        }

        defer func() { t.listener.OnIO(true, time.Since(start)) }()
        return t.batch.Delete(key, pebble.Sync)
}

// Get : see db.Transaction.Get
func (t *Transaction) Get(key []byte, cb func([]byte) error) error {
        start := time.Now()
        var val []byte
        var closer io.Closer

        var err error
        if t.batch != nil {
                val, closer, err = t.batch.Get(key)
        } else if t.snapshot != nil {
                val, closer, err = t.snapshot.Get(key)
        } else {
                return ErrDiscardedTransaction
        }

        defer t.listener.OnIO(false, time.Since(start))
        if err != nil {
                if errors.Is(err, pebble.ErrNotFound) {
                        return db.ErrKeyNotFound
                }

                return err
        }
        return utils.RunAndWrapOnError(closer.Close, cb(val))
}

// Impl : see db.Transaction.Impl
func (t *Transaction) Impl() any {
        if t.batch != nil {
                return t.batch
        }

        if t.snapshot != nil {
                return t.snapshot
        }
        return nil
}

// NewIterator : see db.Transaction.NewIterator
func (t *Transaction) NewIterator() (db.Iterator, error) {
        var (
                iter *pebble.Iterator
                errerror
        )
        if t.batch != nil {
                iter, err = t.batch.NewIter(nil)
                if err != nil {
                        return nil, err
                }
        } else if t.snapshot != nil {
                iter, err = t.snapshot.NewIter(nil)
                if err != nil {
                        return nil, err
                }
        } else {
                return nil, ErrDiscardedTransaction
        }

        return &iterator{iter: iter}, nil
}
pebble常用方法

NewSnapshot方法

NewSnapshot 方法,用于在 DB 类型上创建一个数据库的快照。
这个方法会返回一个 Snapshot 类型的对象,该对象代表了当前数据库状态的一个点时视图。
NewIndexedBatch 方法

NewIndexedBatch 方法,用于在 DB 类型上创建一个新的读写批处置惩罚对象。
参考

[视频、保举]数据存储与检索(详解b+树存储引擎(innodb、boltdb、buntdb等)、lsm树存储引擎(bitcask、moss、pebble、leveldb等))
参考URL: https://www.bilibili.com/video/BV1Zv411G7ty/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: golang常用库之-KV数据库之pebble