golang常用库之-KV数据库之pebble
rocksdb是一款由Facebook使用C/C++开发的嵌入式的长期化的KV数据库。
Pebble 是 Cockroach 参考 RocksDB 并用 Go 语言开发的高性能 KV 存储引擎。
pebble
github地点:https://github.com/cockroachdb/pebble
Pebble 是一个受 LevelDB/RocksDB 开导的键值存储,专注于 CockroachDB 的性能和内部使用。Pebble 继承了 RocksDB 文件格式和一些扩展名,比方范围删除逻辑删除、表级绽放过滤器和 MANIFEST 格式更新。
Pebble 故意不寻求在 RocksDB 中包含所有功能,而是专门针对 CockroachDB 所需的用例和功能集, RocksDB 有大量 Pebble 中没有实现的功能。
Pebble 对 RocksDB 举行了多项改进!
Pebble 在 CockroachDB v20.1(2020 年 5 月发布)中作为 RocksDB 的替代存储引擎引入,并在当时成功用于生产。Pebble 在 CockroachDB v20.2(2020 年 11 月发布)中成为默认存储引擎。Pebble 正在被 CockroachDB 的用户大规模地用于生产,而且被认为是稳定且可用于生产的。
官方示例代码
- package main
- import (
- "fmt"
- "log"
- "github.com/cockroachdb/pebble"
- )
- func main() {
- db, err := pebble.Open("demo", &pebble.Options{})
- if err != nil {
- log.Fatal(err)
- }
- key := []byte("hello")
- if err := db.Set(key, []byte("world"), pebble.Sync); err != nil {
- log.Fatal(err)
- }
- value, closer, err := db.Get(key)
- if err != nil {
- log.Fatal(err)
- }
- fmt.Printf("%s %s\n", key, value)
- if err := closer.Close(); err != nil {
- log.Fatal(err)
- }
- if err := db.Close(); err != nil {
- log.Fatal(err)
- }
- }
复制代码 实战
来自项目https://github.com/NethermindEth/juno 的
db/pebble/db.go
- package pebble
- import (
- "sync"
- "testing"
- "github.com/NethermindEth/juno/db"
- "github.com/NethermindEth/juno/utils"
- "github.com/cockroachdb/pebble"
- "github.com/cockroachdb/pebble/vfs"
- )
- const (
- // minCache is the minimum amount of memory in megabytes to allocate to pebble read and write caching.
- minCache = 8
- )
- var _ db.DB = (*DB)(nil)
- type DB struct {
- pebble *pebble.DB
- wMutex *sync.Mutex
- listener db.EventListener
- }
- // New opens a new database at the given path
- func New(path string, cache uint, maxOpenFiles int, logger pebble.Logger) (db.DB, error) {
- // Ensure that the specified cache size meets a minimum threshold.
- cache = max(minCache, cache)
- pDB, err := newPebble(path, &pebble.Options{
- Logger: logger,
- Cache: pebble.NewCache(int64(cache * utils.Megabyte)),
- MaxOpenFiles: maxOpenFiles,
- })
- if err != nil {
- return nil, err
- }
- return pDB, nil
- }
- // NewMem opens a new in-memory database
- func NewMem() (db.DB, error) {
- return newPebble("", &pebble.Options{
- FS: vfs.NewMem(),
- })
- }
- // NewMemTest opens a new in-memory database, panics on error
- func NewMemTest(t *testing.T) db.DB {
- memDB, err := NewMem()
- if err != nil {
- t.Fatalf("create in-memory db: %v", err)
- }
- t.Cleanup(func() {
- if err := memDB.Close(); err != nil {
- t.Errorf("close in-memory db: %v", err)
- }
- })
- return memDB
- }
- func newPebble(path string, options *pebble.Options) (*DB, error) {
- pDB, err := pebble.Open(path, options)
- if err != nil {
- return nil, err
- }
- return &DB{pebble: pDB, wMutex: new(sync.Mutex), listener: &db.SelectiveListener{}}, nil
- }
- // WithListener registers an EventListener
- func (d *DB) WithListener(listener db.EventListener) db.DB {
- d.listener = listener
- return d
- }
- // NewTransaction : see db.DB.NewTransaction
- func (d *DB) NewTransaction(update bool) (db.Transaction, error) {
- txn := &Transaction{
- listener: d.listener,
- }
- if update {
- d.wMutex.Lock()
- txn.lock = d.wMutex
- txn.batch = d.pebble.NewIndexedBatch()
- } else {
- txn.snapshot = d.pebble.NewSnapshot()
- }
- return txn, nil
- }
- // Close : see io.Closer.Close
- func (d *DB) Close() error {
- return d.pebble.Close()
- }
- // View : see db.DB.View
- func (d *DB) View(fn func(txn db.Transaction) error) error {
- return db.View(d, fn)
- }
- // Update : see db.DB.Update
- func (d *DB) Update(fn func(txn db.Transaction) error) error {
- return db.Update(d, fn)
- }
- // Impl : see db.DB.Impl
- func (d *DB) Impl() any {
- return d.pebble
- }
复制代码
- DB 布局体:
- pebble: Pebble 数据库的实例。
- wMutex: 用于控制写操作的互斥锁。
- listener: 数据库事件监听器。
- New 函数:
- 用于打开一个给定路径的数据库。
- 确保指定的缓存巨细满足最小阈值要求。
- 创建一个新的 Pebble 实例,并返回数据库实例。
- newPebble 函数: 用于创建一个新的 Pebble 实例
- func newPebble(path string, options *pebble.Options) (*DB, error) {
- pDB, err := pebble.Open(path, options)
- if err != nil {
- return nil, err
- }
- return &DB{pebble: pDB, wMutex: new(sync.Mutex), listener: &db.SelectiveListener{}}, nil
- }
复制代码 使用 pebble.Open 函数打开一个 Pebble 数据库,传入给定的路径和选项。
假如打开成功,则创建一个新的 DB 布局体实例,并初始化其中的字段:
pebble 字段使用刚刚打开的 Pebble 数据库实例。
wMutex 字段为一个新的互斥锁。
listener 字段为一个新的 db.SelectiveListener 实例的指针。
- WithListener 方法:
用于注册一个事件监听器。
- Close: 关闭数据库。
- View: 执行数据库的只读操作。
- Update: 执行数据库的读写操作。
- Impl: 返回底层的 Pebble 实例。
- NewTransaction: 创建一个新的事件。
NewTransaction 比较特殊是自定义业务有关的方法, “NewTransaction 返回一个在该数据库上的事件。假如请求创建一个更新事件,而另一个更新事件正在举行中,那么该方法应该会被阻塞。”
- func (d *DB) NewTransaction(update bool) (db.Transaction, error) {
- txn := &Transaction{
- listener: d.listener,
- }
- if update {
- d.wMutex.Lock()
- txn.lock = d.wMutex
- txn.batch = d.pebble.NewIndexedBatch()
- } else {
- txn.snapshot = d.pebble.NewSnapshot()
- }
- return txn, nil
- }
复制代码
- 假如 update 为 true,则表现创建一个更新事件。此时,会获取 DB 对象的写入互斥锁 wMutex,将该互斥锁赋给事件对象的 lock 字段,并使用 Pebble 数据库的 NewIndexedBatch 方法创建一个新的批处置惩罚对象,并将其赋给事件对象的 batch 字段。
- 假如 update 为 false,则表现创建一个只读事件。此时,会使用 Pebble 数据库的 NewSnapshot 方法创建一个新的快照对象,并将其赋给事件对象的 snapshot 字段。
如许做的目的是为了包管数据库的一致性和隔离性。假如多个更新事件同时举行,可能会导致数据库状态出现不一致的情况。通过阻塞方式,可以确保同一时间只有一个更新事件在举行。
db/pebble/transaction.go
- package pebble
- import (
- "errors"
- "io"
- "sync"
- "time"
- "github.com/NethermindEth/juno/db"
- "github.com/NethermindEth/juno/utils"
- "github.com/cockroachdb/pebble"
- )
- var ErrDiscardedTransaction = errors.New("discarded txn")
- var _ db.Transaction = (*Transaction)(nil)
- type Transaction struct {
- batch *pebble.Batch
- snapshot *pebble.Snapshot
- lock *sync.Mutex
- listener db.EventListener
- }
- // Discard : see db.Transaction.Discard
- func (t *Transaction) Discard() error {
- if t.batch != nil {
- if err := t.batch.Close(); err != nil {
- return err
- }
- t.batch = nil
- }
- if t.snapshot != nil {
- if err := t.snapshot.Close(); err != nil {
- return err
- }
- t.snapshot = nil
- }
- if t.lock != nil {
- t.lock.Unlock()
- t.lock = nil
- }
- return nil
- }
- // Commit : see db.Transaction.Commit
- func (t *Transaction) Commit() error {
- start := time.Now()
- defer func() { t.listener.OnCommit(time.Since(start)) }()
- if t.batch != nil {
- return utils.RunAndWrapOnError(t.Discard, t.batch.Commit(pebble.Sync))
- }
- return utils.RunAndWrapOnError(t.Discard, ErrDiscardedTransaction)
- }
- // Set : see db.Transaction.Set
- func (t *Transaction) Set(key, val []byte) error {
- start := time.Now()
- if t.batch == nil {
- return errors.New("read only transaction")
- }
- if len(key) == 0 {
- return errors.New("empty key")
- }
- defer func() { t.listener.OnIO(true, time.Since(start)) }()
- return t.batch.Set(key, val, pebble.Sync)
- }
- // Delete : see db.Transaction.Delete
- func (t *Transaction) Delete(key []byte) error {
- start := time.Now()
- if t.batch == nil {
- return errors.New("read only transaction")
- }
- defer func() { t.listener.OnIO(true, time.Since(start)) }()
- return t.batch.Delete(key, pebble.Sync)
- }
- // Get : see db.Transaction.Get
- func (t *Transaction) Get(key []byte, cb func([]byte) error) error {
- start := time.Now()
- var val []byte
- var closer io.Closer
- var err error
- if t.batch != nil {
- val, closer, err = t.batch.Get(key)
- } else if t.snapshot != nil {
- val, closer, err = t.snapshot.Get(key)
- } else {
- return ErrDiscardedTransaction
- }
- defer t.listener.OnIO(false, time.Since(start))
- if err != nil {
- if errors.Is(err, pebble.ErrNotFound) {
- return db.ErrKeyNotFound
- }
- return err
- }
- return utils.RunAndWrapOnError(closer.Close, cb(val))
- }
- // Impl : see db.Transaction.Impl
- func (t *Transaction) Impl() any {
- if t.batch != nil {
- return t.batch
- }
- if t.snapshot != nil {
- return t.snapshot
- }
- return nil
- }
- // NewIterator : see db.Transaction.NewIterator
- func (t *Transaction) NewIterator() (db.Iterator, error) {
- var (
- iter *pebble.Iterator
- err error
- )
- if t.batch != nil {
- iter, err = t.batch.NewIter(nil)
- if err != nil {
- return nil, err
- }
- } else if t.snapshot != nil {
- iter, err = t.snapshot.NewIter(nil)
- if err != nil {
- return nil, err
- }
- } else {
- return nil, ErrDiscardedTransaction
- }
- return &iterator{iter: iter}, nil
- }
复制代码 pebble常用方法
NewSnapshot方法
NewSnapshot 方法,用于在 DB 类型上创建一个数据库的快照。
这个方法会返回一个 Snapshot 类型的对象,该对象代表了当前数据库状态的一个点时视图。
NewIndexedBatch 方法
NewIndexedBatch 方法,用于在 DB 类型上创建一个新的读写批处置惩罚对象。
参考
[视频、保举]数据存储与检索(详解b+树存储引擎(innodb、boltdb、buntdb等)、lsm树存储引擎(bitcask、moss、pebble、leveldb等))
参考URL: https://www.bilibili.com/video/BV1Zv411G7ty/
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |