这个类会在后台主动更新缓存数据,你只需要调用方法来获取数据即可。
主动更新缓存类
以下是 AutoUpdatingCache 类的实现:
- import threading
- import time
- class AutoUpdatingCache:
- def __init__(self, update_function, expiry_time=60):
- """
- 初始化缓存类。
- :param update_function: 一个函数,用于生成或更新缓存数据。
- :param expiry_time: 缓存的更新周期(秒)。
- """
- self.update_function = update_function
- self.expiry_time = expiry_time
- self.cache_data = None
- self.last_updated = 0
- self.lock = threading.Lock()
- self._start_background_update()
- def _start_background_update(self):
- # 启动后台线程更新缓存
- self.update_thread = threading.Thread(target=self._update_cache_periodically)
- self.update_thread.daemon = True
- self.update_thread.start()
- def _update_cache_periodically(self):
- while True:
- current_time = time.time()
- if current_time - self.last_updated >= self.expiry_time:
- self._update_cache()
- time.sleep(1) # 每秒检查一次
- def _update_cache(self):
- with self.lock:
- try:
- print("Updating cache...")
- new_data = self.update_function()
- self.cache_data = new_data
- self.last_updated = time.time()
- print("Cache updated!")
- except Exception as e:
- print(f"Error updating cache: {e}")
- def get_data(self):
- with self.lock:
- if self.cache_data is not None:
- return self.cache_data
- else:
- return "Cache is initializing, please try again later."
复制代码 使用说明
- 界说一个数据生成函数
首先,需要界说一个用于生成或更新缓存数据的函数。这个函数可以是任何耗时的操作,例如从数据库查询、计算复杂结果等。
- import time
- def generate_cache_data():
- # 模拟耗时操作
- time.sleep(5)
- return {"value": "fresh data", "timestamp": time.time()}
复制代码 - 创建缓存类的实例
将数据生成函数通报给 AutoUpdatingCache 类,并设置缓存更新周期。
- cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
复制代码 - 获取缓存数据
在需要的地方调用 get_data() 方法即可获取缓存数据。
- data = cache.get_data()
- print(data)
复制代码 完备示例
将以上步骤组合起来:
- import threadingimport timeclass AutoUpdatingCache: def __init__(self, update_function, expiry_time=60): self.update_function = update_function self.expiry_time = expiry_time self.cache_data = None self.last_updated = 0 self.lock = threading.Lock() self._start_background_update() def _start_background_update(self): self.update_thread = threading.Thread(target=self._update_cache_periodically) self.update_thread.daemon = True self.update_thread.start() def _update_cache_periodically(self): while True: current_time = time.time() if current_time - self.last_updated >= self.expiry_time: self._update_cache() time.sleep(1) def _update_cache(self): with self.lock: try: print("Updating cache...") new_data = self.update_function() self.cache_data = new_data self.last_updated = time.time() print("Cache updated!") except Exception as e: print(f"Error updating cache: {e}") def get_data(self): with self.lock: if self.cache_data is not None: return self.cache_data else: return "Cache is initializing, please try again later."# 数据生成函数def generate_cache_data(): time.sleep(5) # 模拟耗时操作 return {"value": "fresh data", "timestamp": time.time()}# 创建缓存实例cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
- # 模拟获取数据for _ in range(10): data = cache.get_data() print(data) time.sleep(10)
复制代码 代码表明
- AutoUpdatingCache 类
- init 方法:
- 初始化缓存,设置数据生成函数和缓存更新周期。
- 启动后台线程 _update_cache_periodically。
- _update_cache_periodically 方法:
- 无穷循环,每隔一秒检查缓存是否需要更新。
- 假如当前时间距离前次更新时间超过了 expiry_time,则调用 _update_cache。
- _update_cache 方法:
- 使用 update_function 更新缓存数据。
- 使用锁机制 threading.Lock 确保线程安全。
- get_data 方法:
- 获取缓存数据。
- 假如缓存数据为空(初始化中),返回提示信息。
- 数据生成函数
- generate_cache_data 函数模拟一个耗时操作,生成新的缓存数据。
- 使用示例
- 创建缓存实例并在循环中每隔 10 秒获取一次数据,观察缓存的更新环境。
注意事项
- 线程安全:
- 使用 threading.Lock 确保在多线程环境下数据访问的安全性。
- 非常处理:
- 后台线程:
- 将线程设置为守护线程(daemon=True),使得主程序退出时,线程主动竣事。
应用场景
你可以将这个缓存类应用在 Web 应用程序中,例如在 Sanic 的路由中:
- from sanic import Sanicfrom sanic.response import jsonapp = Sanic("CacheApp")@app.route("/data")async def get_cached_data(request): data = cache.get_data() return json({"data": data})if __name__ == "__main__": # 确保缓存在应用启动前初始化 cache = AutoUpdatingCache(update_function=generate_cache_data, expiry_time=30)
- app.run(host="0.0.0.0", port=8000)
复制代码 如许,用户在访问 /data 路由时,总是能得到缓存中的数据,而缓存会在后台主动更新,不会由于更新缓存而导致哀求超时。
|