IT评测·应用市场-qidao123.com

标题: Python Joblib库使用学习总结 [打印本页]

作者: 小秦哥    时间: 2023-6-10 22:05
标题: Python Joblib库使用学习总结
实践环境

python 3.6.2
Joblib
简介

Joblib是一组在Python中提供轻量级流水线的工具。特别是:
Joblib已被优化得很快速,很健壮了,特别是在大数据上,并对numpy数组进行了特定的优化。
主要功能

parallel for loops

常见用法

Joblib提供了一个简单的助手类,用于使用多进程为循环实现并行。核心思想是将要执行的代码编写为生成器表达式,并将其转换为并行计算
  1. >>> from math import sqrt
  2. >>> [sqrt(i ** 2) for i in range(10)]
  3. [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
复制代码
使用以下代码,可以分布到2个CPU上:
  1. >>> from math import sqrt
  2. >>> from joblib import Parallel, delayed
  3. >>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
  4. [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
复制代码
输出可以是一个生成器,在可以获取结果时立即返回结果,即使后续任务尚未完成。输出的顺序始终与输入的顺序相匹配:输出的顺序总是匹配输入的顺序:
  1. >>> from math import sqrt
  2. >>> from joblib import Parallel, delayed
  3. >>> parallel = Parallel(n_jobs=2, return_generator=True) # py3.7往后版本才支持return_generator参数
  4. >>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
  5. >>> print(type(output_generator))
  6. <class 'generator'>
  7. >>> print(next(output_generator))
  8. 0.0
  9. >>> print(next(output_generator))
  10. 1.0
  11. >>> print(list(output_generator))
  12. [2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
复制代码
此生成器允许减少joblib.Parallel的内存占用调用
基于线程的并行VS基于进程的并行

默认情况下,joblib.Parallel使用'loky'后端模块启动单独的Python工作进程,以便在分散的CPU上同时执行任务。对于一般的Python程序来说,这是一个合理的默认值,但由于输入和输出数据需要在队列中序列化以便同工作进程进行通信,因此可能会导致大量开销(请参阅序列化和进程)。
当你知道你调用的函数是基于一个已编译的扩展,并且该扩展在大部分计算过程中释放了Python全局解释器锁(GIL)时,使用线程而不是Python进程作为并发工作者会更有效。例如,在Cython函数的with nogil 块中编写CPU密集型代码。
如果希望代码有效地使用线程,只需传递preferre='threads'作为joblib.Parallel构造函数的参数即可。在这种情况下,joblib将自动使用"threading"后端,而不是默认的"loky"后端
  1. >>> Parallel(n_jobs=2, prefer=threads')(
  2. ...     delayed(sqrt)(i ** 2) for i in range(10))
  3. [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
复制代码
也可以在上下文管理器的帮助下手动选择特定的后端实现:
  1. >>> from joblib import parallel_backend
  2. >>> with parallel_backend('threading', n_jobs=2):
  3. ...    Parallel()(delayed(sqrt)(i ** 2) for i in range(10))
  4. ...
  5. [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
复制代码
后者在调用内部使用joblib.Parallel的库时特别有用,不会将后端部分作为其公共API的一部分公开。
'loky'后端可能并不总是可获取。
一些罕见的系统不支持多处理(例如Pyodide)。在这种情况下,loky后端不可用,使用线程作为默认后端。
除了内置的joblib后端之外,还可以使用几个特定于集群的后端:
序列化与进程

要在多个python进程之间共享函数定义,必须依赖序列化协议。python中的标准协议是pickle ,但它在标准库中的默认实现有几个限制。例如,它不能序列化交互式定义的函数或在__main__模块中定义的函数。
为了避免这种限制,loky后端现在依赖于cloudpickle以序列化python对象。cloudpickle是pickle协议的另一种实现方式,允许序列化更多的对象,特别是交互式定义的函数。因此,对于大多数用途,loky后端应该可以完美的工作。
cloudpickle的主要缺点就是它可能比标准类库中的pickle慢,特别是,对于大型python字典或列表来说,这一点至关重要,因为它们的序列化时间可能慢100倍。有两种方法可以更改 joblib的序列化过程以缓和此问题:
共享内存语义

joblib的默认后端将在独立的Python进程中运行每个函数调用,因此它们不能更改主程序中定义的公共Python对象。
然而,如果并行函数确实需要依赖于线程的共享内存语义,则应显示的使用require='sharemem',例如:
  1. >>> shared_set = set()
  2. >>> def collect(x):
  3. ...    shared_set.add(x)
  4. ...
  5. >>> Parallel(n_jobs=2, require='sharedmem')(
  6. ...     delayed(collect)(i) for i in range(5))
  7. [None, None, None, None, None]
  8. >>> sorted(shared_set)
  9. [0, 1, 2, 3, 4]
复制代码
请记住,从性能的角度来看,依赖共享内存语义可能是次优的,因为对共享Python对象的并发访问将受到锁争用的影响。
注意,不使用共享内存的情况下,任务进程之间的内存资源是相互独立的,举例说明如下:
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import time
  4. import threading
  5. from joblib import Parallel, delayed, parallel_backend
  6. from collections import deque
  7. GLOBAL_LIST = []
  8. class TestClass():
  9.     def __init__(self):
  10.         self.job_queue = deque()
  11.     def add_jobs(self):
  12.         i = 0
  13.         while i < 3:
  14.             time.sleep(1)
  15.             i += 1
  16.             GLOBAL_LIST.append(i)
  17.             self.job_queue.append(i)
  18.             print('obj_id:', id(self),  'job_queue:', self.job_queue, 'global_list:', GLOBAL_LIST)
  19. def get_job_queue_list(obj):
  20.     i = 0
  21.     while not obj.job_queue and i < 3:
  22.         time.sleep(1)
  23.         i += 1
  24.         print('obj_id:', id(obj), 'job_queue:', obj.job_queue, 'global_list:', GLOBAL_LIST)
  25.     return obj.job_queue
  26. if __name__ == "__main__":
  27.     obj = TestClass()
  28.     def test_fun():
  29.         with parallel_backend("multiprocessing", n_jobs=2):
  30.             Parallel()(delayed(get_job_queue_list)(obj) for i in range(2))
  31.     thread = threading.Thread(target=test_fun, name="parse_log")
  32.     thread.start()
  33.     time.sleep(1)
  34.     obj.add_jobs()
  35.     print('global_list_len:', len(GLOBAL_LIST))
复制代码
控制台输出:
  1. obj_id: 1554577912664 job_queue: deque([]) global_list: []
  2. obj_id: 1930069893920 job_queue: deque([]) global_list: []
  3. obj_id: 2378500766968 job_queue: deque([1]) global_list: [1]
  4. obj_id: 1554577912664 job_queue: deque([]) global_list: []
  5. obj_id: 1930069893920 job_queue: deque([]) global_list: []
  6. obj_id: 2378500766968 job_queue: deque([1, 2]) global_list: [1, 2]
  7. obj_id: 1554577912664 job_queue: deque([]) global_list: []
  8. obj_id: 1930069893920 job_queue: deque([]) global_list: []
  9. obj_id: 2378500766968 job_queue: deque([1, 2, 3]) global_list: [1, 2, 3]
  10. global_list_len: 3
复制代码
通过输出可知,通过joblib.Parallel开启的进程,其占用内存和主线程占用的内存资源是相互独立
复用worer池

一些算法需要对并行函数进行多次连续调用,同时对中间结果进行处理。在一个循环中多次调用joblib.Parallel次优的,因为它会多次创建和销毁一个workde(线程或进程)池,这可能会导致大量开销。
在这种情况下,使用joblib.Parallel类的上下文管理器API更有效,以便对joblib.Parallel对象的多次调用可以复用同一worker池。
  1. from joblib import Parallel, delayed
  2. from math import sqrt
  3. with Parallel(n_jobs=2) as parallel:
  4.    accumulator = 0.
  5.    n_iter = 0
  6.    while accumulator < 1000:
  7.        results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5))
  8.        accumulator += sum(results)  # synchronization barrier
  9.        n_iter += 1
  10. print(accumulator, n_iter)  #输出: 1136.5969161564717 14                          
复制代码
请注意,现在基于进程的并行默认使用'loky'后端,该后端会自动尝试自己维护和重用worker池,即使是在没有上下文管理器的调用中也是如此
笔者实践发现,即便采用这种实现方式,其运行效率也是非常低下的,应该尽量避免这种设计(实践环境 Python3.6)
...略

Parallel参考文档
  1. class joblib.Parallel(n_jobs=default(None), backend=None, return_generator=False, verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))
复制代码
常用参数说明
参考文档

https://joblib.readthedocs.io/en/latest/
https://joblib.readthedocs.io/
https://joblib.readthedocs.io/en/latest/parallel.html#common-usage

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4