golang常用库之-KV数据库之pebble

打印 上一主题 下一主题

主题 826|帖子 826|积分 2480

golang常用库之-KV数据库之pebble

rocksdb是一款由Facebook使用C/C++开发的嵌入式的长期化的KV数据库。
Pebble 是 Cockroach 参考 RocksDB 并用 Go 语言开发的高性能 KV 存储引擎。
pebble

github地点:https://github.com/cockroachdb/pebble
Pebble 是一个受 LevelDB/RocksDB 开导的键值存储,专注于 CockroachDB 的性能和内部使用。Pebble 继承了 RocksDB 文件格式和一些扩展名,比方范围删除逻辑删除、表级绽放过滤器和 MANIFEST 格式更新。
Pebble 故意不寻求在 RocksDB 中包含所有功能,而是专门针对 CockroachDB 所需的用例和功能集, RocksDB 有大量 Pebble 中没有实现的功能。
Pebble 对 RocksDB 举行了多项改进!
Pebble 在 CockroachDB v20.1(2020 年 5 月发布)中作为 RocksDB 的替代存储引擎引入,并在当时成功用于生产。Pebble 在 CockroachDB v20.2(2020 年 11 月发布)中成为默认存储引擎。Pebble 正在被 CockroachDB 的用户大规模地用于生产,而且被认为是稳定且可用于生产的。
官方示例代码

  1. package main
  2. import (
  3.         "fmt"
  4.         "log"
  5.         "github.com/cockroachdb/pebble"
  6. )
  7. func main() {
  8.         db, err := pebble.Open("demo", &pebble.Options{})
  9.         if err != nil {
  10.                 log.Fatal(err)
  11.         }
  12.         key := []byte("hello")
  13.         if err := db.Set(key, []byte("world"), pebble.Sync); err != nil {
  14.                 log.Fatal(err)
  15.         }
  16.         value, closer, err := db.Get(key)
  17.         if err != nil {
  18.                 log.Fatal(err)
  19.         }
  20.         fmt.Printf("%s %s\n", key, value)
  21.         if err := closer.Close(); err != nil {
  22.                 log.Fatal(err)
  23.         }
  24.         if err := db.Close(); err != nil {
  25.                 log.Fatal(err)
  26.         }
  27. }
复制代码
实战

来自项目https://github.com/NethermindEth/juno 的
db/pebble/db.go
  1. package pebble
  2. import (
  3.         "sync"
  4.         "testing"
  5.         "github.com/NethermindEth/juno/db"
  6.         "github.com/NethermindEth/juno/utils"
  7.         "github.com/cockroachdb/pebble"
  8.         "github.com/cockroachdb/pebble/vfs"
  9. )
  10. const (
  11.         // minCache is the minimum amount of memory in megabytes to allocate to pebble read and write caching.
  12.         minCache = 8
  13. )
  14. var _ db.DB = (*DB)(nil)
  15. type DB struct {
  16.         pebble   *pebble.DB
  17.         wMutex   *sync.Mutex
  18.         listener db.EventListener
  19. }
  20. // New opens a new database at the given path
  21. func New(path string, cache uint, maxOpenFiles int, logger pebble.Logger) (db.DB, error) {
  22.         // Ensure that the specified cache size meets a minimum threshold.
  23.         cache = max(minCache, cache)
  24.         pDB, err := newPebble(path, &pebble.Options{
  25.                 Logger:       logger,
  26.                 Cache:        pebble.NewCache(int64(cache * utils.Megabyte)),
  27.                 MaxOpenFiles: maxOpenFiles,
  28.         })
  29.         if err != nil {
  30.                 return nil, err
  31.         }
  32.         return pDB, nil
  33. }
  34. // NewMem opens a new in-memory database
  35. func NewMem() (db.DB, error) {
  36.         return newPebble("", &pebble.Options{
  37.                 FS: vfs.NewMem(),
  38.         })
  39. }
  40. // NewMemTest opens a new in-memory database, panics on error
  41. func NewMemTest(t *testing.T) db.DB {
  42.         memDB, err := NewMem()
  43.         if err != nil {
  44.                 t.Fatalf("create in-memory db: %v", err)
  45.         }
  46.         t.Cleanup(func() {
  47.                 if err := memDB.Close(); err != nil {
  48.                         t.Errorf("close in-memory db: %v", err)
  49.                 }
  50.         })
  51.         return memDB
  52. }
  53. func newPebble(path string, options *pebble.Options) (*DB, error) {
  54.         pDB, err := pebble.Open(path, options)
  55.         if err != nil {
  56.                 return nil, err
  57.         }
  58.         return &DB{pebble: pDB, wMutex: new(sync.Mutex), listener: &db.SelectiveListener{}}, nil
  59. }
  60. // WithListener registers an EventListener
  61. func (d *DB) WithListener(listener db.EventListener) db.DB {
  62.         d.listener = listener
  63.         return d
  64. }
  65. // NewTransaction : see db.DB.NewTransaction
  66. func (d *DB) NewTransaction(update bool) (db.Transaction, error) {
  67.         txn := &Transaction{
  68.                 listener: d.listener,
  69.         }
  70.         if update {
  71.                 d.wMutex.Lock()
  72.                 txn.lock = d.wMutex
  73.                 txn.batch = d.pebble.NewIndexedBatch()
  74.         } else {
  75.                 txn.snapshot = d.pebble.NewSnapshot()
  76.         }
  77.         return txn, nil
  78. }
  79. // Close : see io.Closer.Close
  80. func (d *DB) Close() error {
  81.         return d.pebble.Close()
  82. }
  83. // View : see db.DB.View
  84. func (d *DB) View(fn func(txn db.Transaction) error) error {
  85.         return db.View(d, fn)
  86. }
  87. // Update : see db.DB.Update
  88. func (d *DB) Update(fn func(txn db.Transaction) error) error {
  89.         return db.Update(d, fn)
  90. }
  91. // Impl : see db.DB.Impl
  92. func (d *DB) Impl() any {
  93.         return d.pebble
  94. }
复制代码


  • DB 布局体:

    • pebble: Pebble 数据库的实例。
    • wMutex: 用于控制写操作的互斥锁。
    • listener: 数据库事件监听器。

  • New 函数:

    • 用于打开一个给定路径的数据库。
    • 确保指定的缓存巨细满足最小阈值要求。
    • 创建一个新的 Pebble 实例,并返回数据库实例。

  • newPebble 函数: 用于创建一个新的 Pebble 实例
  1. func newPebble(path string, options *pebble.Options) (*DB, error) {
  2.         pDB, err := pebble.Open(path, options)
  3.         if err != nil {
  4.                 return nil, err
  5.         }
  6.         return &DB{pebble: pDB, wMutex: new(sync.Mutex), listener: &db.SelectiveListener{}}, nil
  7. }
复制代码
使用 pebble.Open 函数打开一个 Pebble 数据库,传入给定的路径和选项。
假如打开成功,则创建一个新的 DB 布局体实例,并初始化其中的字段:
pebble 字段使用刚刚打开的 Pebble 数据库实例。
wMutex 字段为一个新的互斥锁。
listener 字段为一个新的 db.SelectiveListener 实例的指针。


  • WithListener 方法:
    用于注册一个事件监听器。
  • Close: 关闭数据库。
  • View: 执行数据库的只读操作。
  • Update: 执行数据库的读写操作。
  • Impl: 返回底层的 Pebble 实例。
  • NewTransaction: 创建一个新的事件。
    NewTransaction 比较特殊是自定义业务有关的方法, “NewTransaction 返回一个在该数据库上的事件。假如请求创建一个更新事件,而另一个更新事件正在举行中,那么该方法应该会被阻塞。”
  1. func (d *DB) NewTransaction(update bool) (db.Transaction, error) {
  2.         txn := &Transaction{
  3.                 listener: d.listener,
  4.         }
  5.         if update {
  6.                 d.wMutex.Lock()
  7.                 txn.lock = d.wMutex
  8.                 txn.batch = d.pebble.NewIndexedBatch()
  9.         } else {
  10.                 txn.snapshot = d.pebble.NewSnapshot()
  11.         }
  12.         return txn, nil
  13. }
复制代码


  • 假如 update 为 true,则表现创建一个更新事件。此时,会获取 DB 对象的写入互斥锁 wMutex,将该互斥锁赋给事件对象的 lock 字段,并使用 Pebble 数据库的 NewIndexedBatch 方法创建一个新的批处置惩罚对象,并将其赋给事件对象的 batch 字段。
  • 假如 update 为 false,则表现创建一个只读事件。此时,会使用 Pebble 数据库的 NewSnapshot 方法创建一个新的快照对象,并将其赋给事件对象的 snapshot 字段。
   如许做的目的是为了包管数据库的一致性和隔离性。假如多个更新事件同时举行,可能会导致数据库状态出现不一致的情况。通过阻塞方式,可以确保同一时间只有一个更新事件在举行。
  db/pebble/transaction.go
  1. package pebble
  2. import (
  3.         "errors"
  4.         "io"
  5.         "sync"
  6.         "time"
  7.         "github.com/NethermindEth/juno/db"
  8.         "github.com/NethermindEth/juno/utils"
  9.         "github.com/cockroachdb/pebble"
  10. )
  11. var ErrDiscardedTransaction = errors.New("discarded txn")
  12. var _ db.Transaction = (*Transaction)(nil)
  13. type Transaction struct {
  14.         batch    *pebble.Batch
  15.         snapshot *pebble.Snapshot
  16.         lock     *sync.Mutex
  17.         listener db.EventListener
  18. }
  19. // Discard : see db.Transaction.Discard
  20. func (t *Transaction) Discard() error {
  21.         if t.batch != nil {
  22.                 if err := t.batch.Close(); err != nil {
  23.                         return err
  24.                 }
  25.                 t.batch = nil
  26.         }
  27.         if t.snapshot != nil {
  28.                 if err := t.snapshot.Close(); err != nil {
  29.                         return err
  30.                 }
  31.                 t.snapshot = nil
  32.         }
  33.         if t.lock != nil {
  34.                 t.lock.Unlock()
  35.                 t.lock = nil
  36.         }
  37.         return nil
  38. }
  39. // Commit : see db.Transaction.Commit
  40. func (t *Transaction) Commit() error {
  41.         start := time.Now()
  42.         defer func() { t.listener.OnCommit(time.Since(start)) }()
  43.         if t.batch != nil {
  44.                 return utils.RunAndWrapOnError(t.Discard, t.batch.Commit(pebble.Sync))
  45.         }
  46.         return utils.RunAndWrapOnError(t.Discard, ErrDiscardedTransaction)
  47. }
  48. // Set : see db.Transaction.Set
  49. func (t *Transaction) Set(key, val []byte) error {
  50.         start := time.Now()
  51.         if t.batch == nil {
  52.                 return errors.New("read only transaction")
  53.         }
  54.         if len(key) == 0 {
  55.                 return errors.New("empty key")
  56.         }
  57.         defer func() { t.listener.OnIO(true, time.Since(start)) }()
  58.         return t.batch.Set(key, val, pebble.Sync)
  59. }
  60. // Delete : see db.Transaction.Delete
  61. func (t *Transaction) Delete(key []byte) error {
  62.         start := time.Now()
  63.         if t.batch == nil {
  64.                 return errors.New("read only transaction")
  65.         }
  66.         defer func() { t.listener.OnIO(true, time.Since(start)) }()
  67.         return t.batch.Delete(key, pebble.Sync)
  68. }
  69. // Get : see db.Transaction.Get
  70. func (t *Transaction) Get(key []byte, cb func([]byte) error) error {
  71.         start := time.Now()
  72.         var val []byte
  73.         var closer io.Closer
  74.         var err error
  75.         if t.batch != nil {
  76.                 val, closer, err = t.batch.Get(key)
  77.         } else if t.snapshot != nil {
  78.                 val, closer, err = t.snapshot.Get(key)
  79.         } else {
  80.                 return ErrDiscardedTransaction
  81.         }
  82.         defer t.listener.OnIO(false, time.Since(start))
  83.         if err != nil {
  84.                 if errors.Is(err, pebble.ErrNotFound) {
  85.                         return db.ErrKeyNotFound
  86.                 }
  87.                 return err
  88.         }
  89.         return utils.RunAndWrapOnError(closer.Close, cb(val))
  90. }
  91. // Impl : see db.Transaction.Impl
  92. func (t *Transaction) Impl() any {
  93.         if t.batch != nil {
  94.                 return t.batch
  95.         }
  96.         if t.snapshot != nil {
  97.                 return t.snapshot
  98.         }
  99.         return nil
  100. }
  101. // NewIterator : see db.Transaction.NewIterator
  102. func (t *Transaction) NewIterator() (db.Iterator, error) {
  103.         var (
  104.                 iter *pebble.Iterator
  105.                 err  error
  106.         )
  107.         if t.batch != nil {
  108.                 iter, err = t.batch.NewIter(nil)
  109.                 if err != nil {
  110.                         return nil, err
  111.                 }
  112.         } else if t.snapshot != nil {
  113.                 iter, err = t.snapshot.NewIter(nil)
  114.                 if err != nil {
  115.                         return nil, err
  116.                 }
  117.         } else {
  118.                 return nil, ErrDiscardedTransaction
  119.         }
  120.         return &iterator{iter: iter}, nil
  121. }
复制代码
pebble常用方法

NewSnapshot方法

NewSnapshot 方法,用于在 DB 类型上创建一个数据库的快照。
这个方法会返回一个 Snapshot 类型的对象,该对象代表了当前数据库状态的一个点时视图。
NewIndexedBatch 方法

NewIndexedBatch 方法,用于在 DB 类型上创建一个新的读写批处置惩罚对象。
参考

[视频、保举]数据存储与检索(详解b+树存储引擎(innodb、boltdb、buntdb等)、lsm树存储引擎(bitcask、moss、pebble、leveldb等))
参考URL: https://www.bilibili.com/video/BV1Zv411G7ty/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南七星之家

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表