f 数据仓库与分析-【突破内存限制】Dask大数据并行计算框架:Python数据科学家的救星 | 轻松处理TB级数据 - Powered by qidao123.com技术社区

【突破内存限制】Dask大数据并行计算框架:Python数据科学家的救星 | 轻松 ...

打印 上一主题 下一主题

主题 2078|帖子 2078|积分 6234

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Dask:大规模数据并行计算框架

在处理大规模数据集时,传统的Python数据分析库(如NumPy和Pandas)每每会因内存限制而无法满足需求。Dask作为一个灵活的并行计算框架,通过扩展这些熟悉的接口,使数据科学家能够处理超出内存限制的大型数据集,同时保持熟悉的编程体验。本文将具体介绍Dask的核心概念、利用方法及实际应用案例。
1. Dask简介

1.1 什么是Dask?

Dask是一个灵活的开源Python库,用于并行计算。它具有以下核心特点:


  • 扩展熟悉的接口:提供与NumPy、Pandas和Python尺度库兼容的API,使现有代码可以轻松迁移
  • 支持超大规模数据:能够处理超出内存限制的数据集
  • 动态使命调度:高效调度并利用命计算
  • 适用于单机和分布式集群:同一套代码可以在条记本电脑和大型集群上运行
1.2 为什么须要Dask?

当面临以下情况时,Dask特殊有效:


  • 数据集巨细超过RAM容量
  • 计算过程须要加快
  • 处理流式或及时数据
  • 须要利用多核或多机进行计算
  • 希望在保持熟悉的Python接口的同时处理大数据
1.3 Dask vs 其他技术

与其他大数据处理技术相比,Dask有其独特优势:
    技术   优势   劣势         Dask   与Python生态系统无缝集成,易于学习和利用   在特定范畴的优化不如专用系统       Spark   成熟的大数据生态系统,SQL支持完善   Python接口是包装层,有性能和灵活性限制       Ray   优化了微使命和呆板学习工作负载   数据处理功能不如Dask完善       单机工具(Pandas/NumPy)   简朴,丰富的功能   受限于单机内存   2. 安装与配置

2.1 安装Dask

  1. # 基本安装
  2. pip install dask
  3. # 安装完整功能(包括分布式组件和可视化)
  4. pip install "dask[complete]"
  5. # 使用conda安装
  6. conda install dask
复制代码
2.2 导入根本组件

  1. import dask
  2. import dask.dataframe as dd
  3. import dask.array as da
  4. import dask.bag as db
  5. from dask.distributed import Client
复制代码
3. Dask的核心概念

3.1 延迟计算(Lazy Evaluation)

Dask利用延迟计算模式,构建操纵的有向无环图(DAG),只有在调用compute()或persist()时才会实际执行计算。
  1. import dask
  2. import time
  3. def inc(x):
  4.     time.sleep(1)  # 模拟耗时计算
  5.     return x + 1
  6. def add(x, y):
  7.     time.sleep(1)  # 模拟耗时计算
  8.     return x + y
  9. # 创建延迟计算对象
  10. x = dask.delayed(inc)(1)
  11. y = dask.delayed(inc)(2)
  12. z = dask.delayed(add)(x, y)
  13. # 此时尚未执行任何计算
  14. print("已创建计算图,但尚未执行")
  15. # 可视化计算图
  16. z.visualize(filename='task_graph.png')
  17. # 执行计算
  18. result = z.compute()
  19. print(f"计算结果: {
  20.      result}")
复制代码
3.2 使命图(Task Graph)

使命图是Dask的核心概念,它是由操纵和数据依赖关系构成的有向无环图(DAG)。
  1. # 上面例子中的任务图可视化展示了以下依赖关系:
  2. # inc(1) -> x
  3. # inc(2) -> y
  4. # add(x, y) -> z
复制代码
3.3 分区与块(Partitions & Chunks)

Dask将大型数据集分割为更小的块,这些块可以单独处理,然后再组合结果。
  1. import numpy as np
  2. import dask.array as da
  3. # 创建一个大型NumPy数组(假设无法完全放入内存)
  4. # 生成8000x8000的数组,共享内存约有512MB
  5. large_array = np.random.random((8000, 8000))
  6. # 使用Dask Array处理,将数据分割成1000x1000的块
  7. dask_array = da.from_array(large_array, chunks=(1000, 1000))
  8. # 每个块并行计算均值
  9. mean_result = dask_array.mean().compute()
  10. print(f"数组均值: {
  11.      mean_result}")
复制代码
3.4 调度器(Schedulers)

Dask提供多种调度器以顺应不同的计算情况:


  • 线程调度器:单机多核并行,适合NumPy和Pandas加快
  • 进程调度器:避免全局解释器锁(GIL)限制
  • 分布式调度器:用于集群计算
  1. from dask.distributed import Client
  2. # 创建本地分布式客户端
  3. client = Client()
  4. print(client)
  5. # 查看集群信息和仪表板地址
  6. print(client.dashboard_link)
复制代码
4. Dask数据结构

4.1 Dask DataFrame

Dask DataFrame是Pandas DataFrame的分布式版本,API几乎相同。
  1. import pandas as pd
  2. import dask.dataframe as dd
  3. import numpy as np
  4. # 创建示例Pandas DataFrame
  5. pdf = pd.DataFrame({
  6.    
  7.     'id': range(10000),
  8.     'value': np.random.random(10000),
  9.     'category': np.random.choice(['A', 'B', 'C', 'D'], 10000)
  10. })
  11. # 转换为Dask DataFrame,按行分区
  12. ddf = dd.from_pandas(pdf, npartitions=4)
  13. print(f"分区数: {
  14.      ddf.npartitions}")
  15. # 与Pandas类似的操作
  16. result = ddf.groupby('category')['value'].mean().compute()
  17. print(result)
  18. # 从磁盘读取大型CSV文件(超出内存大小的文件)
  19. # ddf = dd.read_csv('large_file.csv', blocksize='64MB')
  20. # 延迟计算,直至调用compute()
  21. filtered = ddf[ddf.value > 0.5]
  22. grouped = filtered.groupby('category')['value'].mean()
  23. # 执行计算
  24. result = grouped.compute()
  25. print(result)
复制代码
4.2 Dask Array

Dask Array是NumPy ndarray的分布式版本,处理大型多维数组。
  1. import dask.array as da
  2. import numpy as np
  3. # 创建一个16GB的随机数组(8000x8000x256),远超大多数机器的内存
  4. shape = (8000, 8000, 256)
  5. chunks = (1000, 1000, 64)  # 每个块约为500MB
  6. # 创建懒加载的大型随机数组
  7. x = da.random.random(shape, chunks=chunks)
  8. print(f"数组形状: {
  9.      x.shape}")
  10. print(f"块形状: {
  11.      x.chunks}")
  12. print(f"块数量: {
  13.      len(x.chunks[0]) * len(x.chunks[1]) * len(x.chunks[2])}")
  14. # 执行矩阵计算
  15. y = (x + x.T).mean(axis=0)
  16. # 实际计算结果
  17. result = y[0, 0].compute()
  18. print(f"计算结果: {
  19.      result}")
复制代码
4.3 Dask Bag

Dask Bag适用于非结构化或半结构化数据,雷同Python的列表。
  1. import dask.bag as db
  2. # 从文本文件创建Bag
  3. # bag = db.read_text('large_text_file.txt').map(json.loads)
  4. # 创建示例Bag
  5. data = [{
  6.    "name": f"user_{
  7.      i}", "value": i %
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美食家大橙子

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表