鸿蒙多线程应用-taskPool

打印 上一主题 下一主题

主题 948|帖子 948|积分 2844

 并发模型

      并发模型是用来实现差别应用场景中并发任务的编程模型,常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。
        Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,因此得到了广泛的支持和利用。
       当前鸿蒙ArkTS提供了TaskPool和Worker两种并发本领,TaskPool和Worker都基于Actor并发模型实现。
       内存共享并发模型指多线程同时执行任务,这些线程依赖同一内存并且都有权限访问,线程访问内存前需要抢占并锁定内存的利用权,没有抢占到内存的线程需要等待其他线程开释利用权再执行。
       Actor并发模型每一个线程都是一个独立Actor,每个Actor有自己独立的内存,Actor之间通过消息通报机制触发对方Actor的举动,差别Actor之间不能直接访问对方的内存空间。Actor并发模型对比内存共享并发模型的上风在于差别线程间内存隔离,不会产生差别线程竞争同一内存资源的问题。开发者不需要考虑对内存上锁导致的一系列功能、性能问题,提拔了开发效率。
       由于Actor并发模型线程之间不共享内存,需要通过线程间通信机制传输并发任务和任务结果。
TaskPool简介

      任务池(TaskPool)作用是为应用步伐提供一个多线程的运行环境,降低团体资源的斲丧、进步体系的团体性能,且您无需关心线程实例的生命周期。
     TaskPool支持开发者在宿主线程封装任务抛给任务队列,体系选择合适的工作线程,进行任务的分发及执行,再将结果返回给宿主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的本领,同时通过体系统一线程管理,联合动态调理及负载均衡算法,可以节约体系资源。体系默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,详细数量内部管理,保证最优的调理及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。
TaskPool留意事项



  • 实现任务的函数需要利用@Concurrent装饰器标注,且仅支持在.ets文件中利用。
  • 从API version 11开始,跨并发实例通报带方法的实例对象时,该类必须利用装饰器@Sendable装饰器标注,且仅支持在.ets文件中利用。
  • 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被逼迫退出。
  • 实现任务的函数入参需满足序列化支持的范例,详情请参见线程间通信对象。
  • ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。
  • 由于差别线程中上下文对象是差别的,因此TaskPool工作线程只能利用线程安全的库,例如UI相关的非线程安全库不能利用。
  • 序列化传输的数据量巨细限定为16MB。
  • Priority的IDLE优先级是用来标记需要在背景运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级标记的任务只会在全部线程都空闲的环境下触发执行,并且只会占用一个线程来执行。
  • Promise不支持跨线程通报,如果TaskPool返回pending或rejected状态的Promise,会返回失败;对于fulfilled状态的Promise,TaskPool会解析返回的结果,如果结果可以跨线程通报,则返回成功。
  • 不支持在TaskPool工作线程中利用AppStorage。
TaskPool应用实例

       生产者消费者模型应用taskPool的详细代码实现
1.生产者
  1. import { taskpool } from '@kit.ArkTS';
  2. import { stingToUint8, uint8TransformString } from './utils';
  3. @Concurrent
  4. export async function producer(ArrayBuffer: Int32Array, dataBuffer: Uint8Array, newStr: string) {
  5.   let i32a = ArrayBuffer;
  6.   let array = dataBuffer
  7.   if (array[array.length-1] !== 0) {
  8.     taskpool.Task.sendData(false)
  9.     let runner = new taskpool.SequenceRunner()
  10.     console.log("-----atomics-producer-push-fal-" + newStr)
  11.     return
  12.   }
  13.   let jsonStr: string = uint8TransformString(array)
  14.   let arr: string[] = []
  15.   try {
  16.     arr= JSON.parse(jsonStr) as string[]
  17.   } catch (e) {
  18.     taskpool.Task.sendData(false)
  19.     return
  20.   }
  21.   arr.push(newStr)
  22.   let newArrJson = JSON.stringify(arr) ?? ''
  23.   //console.log("newArrJson" + newArrJson)
  24.   let isFinish = stingToUint8(newArrJson,array,4)
  25.   if (!isFinish) {
  26.     arr.pop()
  27.     let newArrJson1 = JSON.stringify(arr) ?? ''
  28.     stingToUint8(newArrJson1,array,4)
  29.     taskpool.Task.sendData(false)
  30.     console.log("-----atomics-producer-push-fal-" + newStr)
  31.   }else{
  32.     console.log("-----atomics-producer-push-sec-" + newStr)
  33.   }
  34.   Atomics.notify(i32a, 0, 1)
  35.   Promise.resolve()
  36. }
复制代码
2.消费者
  1. import { getStringArrayFromJson, testMethod, uint8TransformString} from './utils';
  2. import { buffer, taskpool } from '@kit.ArkTS';
  3. import { ThreadUtils } from './ThreadUtils';
  4. @Concurrent
  5. export async function consumerTask(ArrayBuffer: Int32Array, dataBuffer: Uint8Array): Promise<void> {
  6.   let i32a = ArrayBuffer;
  7.   let array = dataBuffer
  8.   while (true) {
  9.     let jsonStr: string = uint8TransformString(array)
  10.     let arr = getStringArrayFromJson(jsonStr)
  11.     if (arr.length == 0) {
  12.       Atomics.wait(i32a, 0, 0);
  13.     } else {
  14.       let i = 4
  15.       for (let index = 0; index < array.byteLength; index++) {
  16.         if (i >= array.byteLength) {
  17.           break
  18.         }
  19.         Atomics.store(array, i++, 0)
  20.       }
  21.       taskpool.Task.sendData(true)
  22.       let writeResult: boolean = true
  23.       while ((writeResult == true || writeResult == false)) {
  24.         let ele = arr.shift()
  25.         if (!ele) {
  26.           break
  27.         }
  28.         writeResult = await ThreadUtils.getInstance().writeToFile(ele)
  29.         console.log('-----atomics-consumer-' + ele)
  30.       }
  31.     }
  32.   }
  33. }
复制代码
3.字符串和字节码相互转换工具
  1. export function testMethod(str: string) {
  2.   console.log('--test-function-str-' + str)
  3. }
  4. export function uint8TransformString(array:Uint8Array): string{
  5.   let jsonStr: string = JSON.stringify([])
  6.   let tempArr: number[] = []
  7.   let j = 0
  8.   for (let index = 0; index < array.length; index++) {
  9.     if (array[index] == 0) {
  10.       continue
  11.     }
  12.     tempArr[j++] = array[index]
  13.   }
  14.   let temp = new Uint8Array(tempArr)
  15.   if (temp.byteLength > 0) {
  16.     let str = '';
  17.     for (let i = 0; i < temp.length; ) {
  18.       let byte1 = temp[i];
  19.       let codePoint: number
  20.       if (byte1 >> 7 === 0) { // 1字节
  21.         codePoint = byte1;
  22.         i += 1;
  23.       } else if (byte1 >> 5 === 0b110) { // 2字节
  24.         codePoint = ((byte1 & 0b11111) << 6) | (temp[i + 1] & 0b111111);
  25.         i += 2;
  26.       } else if (byte1 >> 4 === 0b1110) { // 3字节
  27.         codePoint = ((byte1 & 0b1111) << 12) | ((temp[i + 1] & 0b111111) << 6) | (temp[i + 2] & 0b111111);
  28.         i += 3;
  29.       } else {
  30.         // 错误处理:不支持的字节序列
  31.         i += 1; // 跳过当前字节
  32.         continue;
  33.       }
  34.       str += String.fromCodePoint(codePoint)
  35.       console.info('字节流转成可理解的字符串:' + str);
  36.     }
  37.     jsonStr = str
  38.   }
  39.   return jsonStr
  40. }
  41. //
  42. export function stingToUint8(json: string, array:Uint8Array,formIndex: number = 0) : boolean{
  43.   let i = formIndex
  44.   let isFinish = true
  45.   for (let index = 0; index < json.length; index++) {
  46.     if (i >= array.byteLength) {
  47.       if (index < json.length - 1) {
  48.         isFinish = false
  49.       }
  50.       break
  51.     }
  52.     const element = json.charCodeAt(index);
  53.     if (element > 0x7FF) {
  54.       Atomics.store(array, i++, (0xE0 | (element >> 12)))
  55.       Atomics.store(array, i++, (0x80 | ((element >> 6) & 0x3F)))
  56.       Atomics.store(array, i++, (0x80 | (element & 0x3F)))
  57.     } else if (element > 0x7F) {
  58.       Atomics.store(array, i++, (0xC0 | (element >> 6)))
  59.       Atomics.store(array, i++, (0x80 | (element & 0x3F)))
  60.     } else {
  61.       Atomics.store(array, i++, (element))
  62.     }
  63.   }
  64.   //剩余空间赋值0
  65.   for (let index = i; index < array.length; index++) {
  66.     array[index] = 0
  67.   }
  68.   return isFinish
  69. }
复制代码
4.单例工具
  1. import { taskpool } from '@kit.ArkTS';
  2. import { it } from '@ohos/hypium';
  3. import { consumerTask } from './consumer';
  4. import { producer } from './product';
  5. export class ThreadUtils {
  6.   private tempLogList: Array<string> = new Array()
  7.   private static instance: ThreadUtils
  8.   private sab :SharedArrayBuffer
  9.   private ui8 :Uint8Array
  10.   private i32a :Int32Array
  11.   private constructor(bufferSize:number = 1024) {
  12.     this.sab = new SharedArrayBuffer(bufferSize)
  13.     this.ui8 = new Uint8Array(this.sab)
  14.     this.i32a = new Int32Array(this.sab)
  15.     this.startConsumer()
  16.   };
  17.   writeLog(log: string) {
  18.     if (this.flag) {
  19.       this.tempLogList.push(log)
  20.     }else {
  21.       this.product(log)
  22.     }
  23.   }
  24.   public static getInstance(bufferSize:number = 1024): ThreadUtils {
  25.     if (!ThreadUtils.instance) {
  26.       ThreadUtils.instance = new ThreadUtils(bufferSize);
  27.     }
  28.     return ThreadUtils.instance;
  29.   }
  30.   async writeToFile(content: string): Promise<boolean> {
  31.     return new Promise((resolve, reject) => {
  32.       setTimeout(() => {
  33.         console.log("日志写入完成=" + content)
  34.         console.log('pop element=' + content)
  35.         resolve(true)
  36.       }, 4000)
  37.     })
  38.   }
  39.   lastTask:taskpool.Task | undefined
  40.   flag = false
  41.   async product(log: string):Promise<boolean> {
  42.     return new Promise<boolean>((resolve,reject)=>{
  43.       let newLog = log
  44.       let task = new taskpool.Task(producer, this.i32a, this.ui8, newLog)
  45.       if (this.lastTask) {
  46.         task.addDependency(this.lastTask)
  47.       }
  48.       this.lastTask = task
  49.       task.onReceiveData((success: boolean) => {
  50.         if (!success) {
  51.           this.flag = true
  52.           this.tempLogList.unshift(log)
  53.           resolve(false)
  54.         }
  55.       })
  56.       taskpool.execute(task).then(()=>{
  57.         console.log('------taskpool.execute.then-----')
  58.         resolve(true)
  59.       });
  60.     })
  61.   }
  62.   isWhile = false
  63.   async startConsumer() {
  64.     let task = new taskpool.Task(consumerTask, this.i32a, this.ui8)
  65.     task.onReceiveData(async (hasSpace: boolean) => {
  66.       if (hasSpace) {
  67.         this.flag = false
  68.         if (this.tempLogList.length > 0 && this.isWhile == false){
  69.           let item = this.tempLogList.shift()
  70.           console.log('---item---'+ item)
  71.           this.isWhile = true
  72.           let com = true
  73.           while (item && this.flag == false && com){
  74.             com = await this.product(item)
  75.             item = this.tempLogList.shift()
  76.           }
  77.           this.isWhile = false
  78.         }
  79.       }
  80.     })
  81.     taskpool.execute(task)
  82.   }
  83. }
复制代码
5.页面UI应用
  1. import { buffer, taskpool } from '@kit.ArkTS';
  2. import { consumerTask } from '../consumer';
  3. import { producer } from '../product';
  4. import { router } from '@kit.ArkUI';
  5. import { ThreadUtils } from '../ThreadUtils';
  6. @Entry
  7. @Component
  8. struct Index {
  9.   timer = -1
  10.   count = 0
  11.   logTool = ThreadUtils.getInstance(32)
  12.   aboutToAppear(): void {
  13.   }
  14.   @State inputText:string =''
  15.   build() {
  16.     Column({space: 20}) {
  17.       TextInput({text: $$this.inputText})
  18.         .width('80%')
  19.       Button() {
  20.         Text("生产日志").padding(10)
  21.       }
  22.       .backgroundColor(Color.Gray)
  23.       .onClick(async () => {
  24.         this.timer = setInterval(()=>{
  25.           this.logTool.writeLog ('item' + this.count)
  26.           this.count += 1
  27.         },1000)
  28.       })
  29.       Button() {
  30.         Text("停止生产").padding(10)
  31.       }
  32.       .backgroundColor(Color.Gray)
  33.       .onClick(async () => {
  34.         clearInterval(this.timer)
  35.         // router.pushUrl({
  36.         //   url: 'pages/TaskPoolPage'
  37.         // })
  38.       })
  39.     }
  40.     .alignItems(HorizontalAlign.Center)
  41.     .justifyContent(FlexAlign.Center)
  42.     .height('100%')
  43.     .width('100%')
  44.   }
  45. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

耶耶耶耶耶

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表