大连密封材料 发表于 2024-11-27 12:47:57

python3 主动更新的缓存类

这个类会在后台主动更新缓存数据,你只需要调用方法来获取数据即可。
主动更新缓存类

以下是 AutoUpdatingCache 类的实现:
import threading
import time

class AutoUpdatingCache:
    def __init__(self, update_function, expiry_time=60):
      """
      初始化缓存类。

      :param update_function: 一个函数,用于生成或更新缓存数据。
      :param expiry_time: 缓存的更新周期(秒)。
      """
      self.update_function = update_function
      self.expiry_time = expiry_time
      self.cache_data = None
      self.last_updated = 0
      self.lock = threading.Lock()
      self._start_background_update()

    def _start_background_update(self):
      # 启动后台线程更新缓存
      self.update_thread = threading.Thread(target=self._update_cache_periodically)
      self.update_thread.daemon = True
      self.update_thread.start()

    def _update_cache_periodically(self):
      while True:
            current_time = time.time()
            if current_time - self.last_updated >= self.expiry_time:
                self._update_cache()
            time.sleep(1)# 每秒检查一次

    def _update_cache(self):
      with self.lock:
            try:
                print("Updating cache...")
                new_data = self.update_function()
                self.cache_data = new_data
                self.last_updated = time.time()
                print("Cache updated!")
            except Exception as e:
                print(f"Error updating cache: {e}")

    def get_data(self):
      with self.lock:
            if self.cache_data is not None:
                return self.cache_data
            else:
                return "Cache is initializing, please try again later."
使用说明


[*] 界说一个数据生成函数
首先,需要界说一个用于生成或更新缓存数据的函数。这个函数可以是任何耗时的操作,例如从数据库查询、计算复杂结果等。
import time

def generate_cache_data():
    # 模拟耗时操作
    time.sleep(5)
    return {"value": "fresh data", "timestamp": time.time()}

[*] 创建缓存类的实例
将数据生成函数通报给 AutoUpdatingCache 类,并设置缓存更新周期。
cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)

[*] 获取缓存数据
在需要的地方调用 get_data() 方法即可获取缓存数据。
data = cache.get_data()
print(data)

完备示例

将以上步骤组合起来:
import threadingimport timeclass AutoUpdatingCache:    def __init__(self, update_function, expiry_time=60):      self.update_function = update_function      self.expiry_time = expiry_time      self.cache_data = None      self.last_updated = 0      self.lock = threading.Lock()      self._start_background_update()    def _start_background_update(self):      self.update_thread = threading.Thread(target=self._update_cache_periodically)      self.update_thread.daemon = True      self.update_thread.start()    def _update_cache_periodically(self):      while True:            current_time = time.time()            if current_time - self.last_updated >= self.expiry_time:                self._update_cache()            time.sleep(1)    def _update_cache(self):      with self.lock:            try:                print("Updating cache...")                new_data = self.update_function()                self.cache_data = new_data                self.last_updated = time.time()                print("Cache updated!")            except Exception as e:                print(f"Error updating cache: {e}")    def get_data(self):      with self.lock:            if self.cache_data is not None:                return self.cache_data            else:                return "Cache is initializing, please try again later."# 数据生成函数def generate_cache_data():    time.sleep(5)# 模拟耗时操作    return {"value": "fresh data", "timestamp": time.time()}# 创建缓存实例cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
# 模拟获取数据for _ in range(10):    data = cache.get_data()    print(data)    time.sleep(10) 代码表明



[*] AutoUpdatingCache 类

[*]init 方法:

[*]初始化缓存,设置数据生成函数和缓存更新周期。
[*]启动后台线程 _update_cache_periodically。

[*]_update_cache_periodically 方法:

[*]无穷循环,每隔一秒检查缓存是否需要更新。
[*]假如当前时间距离前次更新时间超过了 expiry_time,则调用 _update_cache。

[*]_update_cache 方法:

[*]使用 update_function 更新缓存数据。
[*]使用锁机制 threading.Lock 确保线程安全。

[*]get_data 方法:

[*]获取缓存数据。
[*]假如缓存数据为空(初始化中),返回提示信息。


[*] 数据生成函数

[*]generate_cache_data 函数模拟一个耗时操作,生成新的缓存数据。

[*] 使用示例

[*]创建缓存实例并在循环中每隔 10 秒获取一次数据,观察缓存的更新环境。

注意事项



[*] 线程安全:

[*]使用 threading.Lock 确保在多线程环境下数据访问的安全性。

[*] 非常处理:

[*]在更新缓存时,捕获可能的非常,防止线程瓦解。

[*] 后台线程:

[*]将线程设置为守护线程(daemon=True),使得主程序退出时,线程主动竣事。

应用场景

你可以将这个缓存类应用在 Web 应用程序中,例如在 Sanic 的路由中:
from sanic import Sanicfrom sanic.response import jsonapp = Sanic("CacheApp")@app.route("/data")async def get_cached_data(request):    data = cache.get_data()    return json({"data": data})if __name__ == "__main__":    # 确保缓存在应用启动前初始化    cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
    app.run(host="0.0.0.0", port=8000) 如许,用户在访问 /data 路由时,总是能得到缓存中的数据,而缓存会在后台主动更新,不会由于更新缓存而导致哀求超时。
页: [1]
查看完整版本: python3 主动更新的缓存类