并发模型
并发模型是用来实现差别应用场景中并发任务的编程模型,常见的并发模型分为基于内存共享的并发模型和基于消息通信的并发模型。
Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高,因此得到了广泛的支持和利用。
当前鸿蒙ArkTS提供了TaskPool和Worker两种并发本领,TaskPool和Worker都基于Actor并发模型实现。
内存共享并发模型指多线程同时执行任务,这些线程依赖同一内存并且都有权限访问,线程访问内存前需要抢占并锁定内存的利用权,没有抢占到内存的线程需要等待其他线程开释利用权再执行。
Actor并发模型每一个线程都是一个独立Actor,每个Actor有自己独立的内存,Actor之间通过消息通报机制触发对方Actor的举动,差别Actor之间不能直接访问对方的内存空间。Actor并发模型对比内存共享并发模型的上风在于差别线程间内存隔离,不会产生差别线程竞争同一内存资源的问题。开发者不需要考虑对内存上锁导致的一系列功能、性能问题,提拔了开发效率。
由于Actor并发模型线程之间不共享内存,需要通过线程间通信机制传输并发任务和任务结果。
TaskPool简介
任务池(TaskPool)作用是为应用步伐提供一个多线程的运行环境,降低团体资源的斲丧、进步体系的团体性能,且您无需关心线程实例的生命周期。
TaskPool支持开发者在宿主线程封装任务抛给任务队列,体系选择合适的工作线程,进行任务的分发及执行,再将结果返回给宿主线程。接口直观易用,支持任务的执行、取消,以及指定优先级的本领,同时通过体系统一线程管理,联合动态调理及负载均衡算法,可以节约体系资源。体系默认会启动一个任务工作线程,当任务较多时会扩容,工作线程数量上限跟当前设备的物理核数相关,详细数量内部管理,保证最优的调理及执行效率,长时间没有任务分发时会缩容,减少工作线程数量。
TaskPool留意事项
- 实现任务的函数需要利用@Concurrent装饰器标注,且仅支持在.ets文件中利用。
- 从API version 11开始,跨并发实例通报带方法的实例对象时,该类必须利用装饰器@Sendable装饰器标注,且仅支持在.ets文件中利用。
- 任务函数在TaskPool工作线程的执行耗时不能超过3分钟(不包含Promise和async/await异步调用的耗时,例如网络下载、文件读写等I/O任务的耗时),否则会被逼迫退出。
- 实现任务的函数入参需满足序列化支持的范例,详情请参见线程间通信对象。
- ArrayBuffer参数在TaskPool中默认转移,需要设置转移列表的话可通过接口setTransferList()设置。
- 由于差别线程中上下文对象是差别的,因此TaskPool工作线程只能利用线程安全的库,例如UI相关的非线程安全库不能利用。
- 序列化传输的数据量巨细限定为16MB。
- Priority的IDLE优先级是用来标记需要在背景运行的耗时任务(例如数据同步、备份),它的优先级别是最低的。这种优先级标记的任务只会在全部线程都空闲的环境下触发执行,并且只会占用一个线程来执行。
- Promise不支持跨线程通报,如果TaskPool返回pending或rejected状态的Promise,会返回失败;对于fulfilled状态的Promise,TaskPool会解析返回的结果,如果结果可以跨线程通报,则返回成功。
- 不支持在TaskPool工作线程中利用AppStorage。
TaskPool应用实例
生产者消费者模型应用taskPool的详细代码实现
1.生产者
- import { taskpool } from '@kit.ArkTS';
- import { stingToUint8, uint8TransformString } from './utils';
- @Concurrent
- export async function producer(ArrayBuffer: Int32Array, dataBuffer: Uint8Array, newStr: string) {
- let i32a = ArrayBuffer;
- let array = dataBuffer
- if (array[array.length-1] !== 0) {
- taskpool.Task.sendData(false)
- let runner = new taskpool.SequenceRunner()
- console.log("-----atomics-producer-push-fal-" + newStr)
- return
- }
- let jsonStr: string = uint8TransformString(array)
- let arr: string[] = []
- try {
- arr= JSON.parse(jsonStr) as string[]
- } catch (e) {
- taskpool.Task.sendData(false)
- return
- }
- arr.push(newStr)
- let newArrJson = JSON.stringify(arr) ?? ''
- //console.log("newArrJson" + newArrJson)
- let isFinish = stingToUint8(newArrJson,array,4)
- if (!isFinish) {
- arr.pop()
- let newArrJson1 = JSON.stringify(arr) ?? ''
- stingToUint8(newArrJson1,array,4)
- taskpool.Task.sendData(false)
- console.log("-----atomics-producer-push-fal-" + newStr)
- }else{
- console.log("-----atomics-producer-push-sec-" + newStr)
- }
- Atomics.notify(i32a, 0, 1)
- Promise.resolve()
- }
复制代码 2.消费者
- import { getStringArrayFromJson, testMethod, uint8TransformString} from './utils';
- import { buffer, taskpool } from '@kit.ArkTS';
- import { ThreadUtils } from './ThreadUtils';
- @Concurrent
- export async function consumerTask(ArrayBuffer: Int32Array, dataBuffer: Uint8Array): Promise<void> {
- let i32a = ArrayBuffer;
- let array = dataBuffer
- while (true) {
- let jsonStr: string = uint8TransformString(array)
- let arr = getStringArrayFromJson(jsonStr)
- if (arr.length == 0) {
- Atomics.wait(i32a, 0, 0);
- } else {
- let i = 4
- for (let index = 0; index < array.byteLength; index++) {
- if (i >= array.byteLength) {
- break
- }
- Atomics.store(array, i++, 0)
- }
- taskpool.Task.sendData(true)
- let writeResult: boolean = true
- while ((writeResult == true || writeResult == false)) {
- let ele = arr.shift()
- if (!ele) {
- break
- }
- writeResult = await ThreadUtils.getInstance().writeToFile(ele)
- console.log('-----atomics-consumer-' + ele)
- }
- }
- }
- }
复制代码 3.字符串和字节码相互转换工具
- export function testMethod(str: string) {
- console.log('--test-function-str-' + str)
- }
- export function uint8TransformString(array:Uint8Array): string{
- let jsonStr: string = JSON.stringify([])
- let tempArr: number[] = []
- let j = 0
- for (let index = 0; index < array.length; index++) {
- if (array[index] == 0) {
- continue
- }
- tempArr[j++] = array[index]
- }
- let temp = new Uint8Array(tempArr)
- if (temp.byteLength > 0) {
- let str = '';
- for (let i = 0; i < temp.length; ) {
- let byte1 = temp[i];
- let codePoint: number
- if (byte1 >> 7 === 0) { // 1字节
- codePoint = byte1;
- i += 1;
- } else if (byte1 >> 5 === 0b110) { // 2字节
- codePoint = ((byte1 & 0b11111) << 6) | (temp[i + 1] & 0b111111);
- i += 2;
- } else if (byte1 >> 4 === 0b1110) { // 3字节
- codePoint = ((byte1 & 0b1111) << 12) | ((temp[i + 1] & 0b111111) << 6) | (temp[i + 2] & 0b111111);
- i += 3;
- } else {
- // 错误处理:不支持的字节序列
- i += 1; // 跳过当前字节
- continue;
- }
- str += String.fromCodePoint(codePoint)
- console.info('字节流转成可理解的字符串:' + str);
- }
- jsonStr = str
- }
- return jsonStr
- }
- //
- export function stingToUint8(json: string, array:Uint8Array,formIndex: number = 0) : boolean{
- let i = formIndex
- let isFinish = true
- for (let index = 0; index < json.length; index++) {
- if (i >= array.byteLength) {
- if (index < json.length - 1) {
- isFinish = false
- }
- break
- }
- const element = json.charCodeAt(index);
- if (element > 0x7FF) {
- Atomics.store(array, i++, (0xE0 | (element >> 12)))
- Atomics.store(array, i++, (0x80 | ((element >> 6) & 0x3F)))
- Atomics.store(array, i++, (0x80 | (element & 0x3F)))
- } else if (element > 0x7F) {
- Atomics.store(array, i++, (0xC0 | (element >> 6)))
- Atomics.store(array, i++, (0x80 | (element & 0x3F)))
- } else {
- Atomics.store(array, i++, (element))
- }
- }
- //剩余空间赋值0
- for (let index = i; index < array.length; index++) {
- array[index] = 0
- }
- return isFinish
- }
复制代码 4.单例工具
- import { taskpool } from '@kit.ArkTS';
- import { it } from '@ohos/hypium';
- import { consumerTask } from './consumer';
- import { producer } from './product';
- export class ThreadUtils {
- private tempLogList: Array<string> = new Array()
- private static instance: ThreadUtils
- private sab :SharedArrayBuffer
- private ui8 :Uint8Array
- private i32a :Int32Array
- private constructor(bufferSize:number = 1024) {
- this.sab = new SharedArrayBuffer(bufferSize)
- this.ui8 = new Uint8Array(this.sab)
- this.i32a = new Int32Array(this.sab)
- this.startConsumer()
- };
- writeLog(log: string) {
- if (this.flag) {
- this.tempLogList.push(log)
- }else {
- this.product(log)
- }
- }
- public static getInstance(bufferSize:number = 1024): ThreadUtils {
- if (!ThreadUtils.instance) {
- ThreadUtils.instance = new ThreadUtils(bufferSize);
- }
- return ThreadUtils.instance;
- }
- async writeToFile(content: string): Promise<boolean> {
- return new Promise((resolve, reject) => {
- setTimeout(() => {
- console.log("日志写入完成=" + content)
- console.log('pop element=' + content)
- resolve(true)
- }, 4000)
- })
- }
- lastTask:taskpool.Task | undefined
- flag = false
- async product(log: string):Promise<boolean> {
- return new Promise<boolean>((resolve,reject)=>{
- let newLog = log
- let task = new taskpool.Task(producer, this.i32a, this.ui8, newLog)
- if (this.lastTask) {
- task.addDependency(this.lastTask)
- }
- this.lastTask = task
- task.onReceiveData((success: boolean) => {
- if (!success) {
- this.flag = true
- this.tempLogList.unshift(log)
- resolve(false)
- }
- })
- taskpool.execute(task).then(()=>{
- console.log('------taskpool.execute.then-----')
- resolve(true)
- });
- })
- }
- isWhile = false
- async startConsumer() {
- let task = new taskpool.Task(consumerTask, this.i32a, this.ui8)
- task.onReceiveData(async (hasSpace: boolean) => {
- if (hasSpace) {
- this.flag = false
- if (this.tempLogList.length > 0 && this.isWhile == false){
- let item = this.tempLogList.shift()
- console.log('---item---'+ item)
- this.isWhile = true
- let com = true
- while (item && this.flag == false && com){
- com = await this.product(item)
- item = this.tempLogList.shift()
- }
- this.isWhile = false
- }
- }
- })
- taskpool.execute(task)
- }
- }
复制代码 5.页面UI应用
- import { buffer, taskpool } from '@kit.ArkTS';
- import { consumerTask } from '../consumer';
- import { producer } from '../product';
- import { router } from '@kit.ArkUI';
- import { ThreadUtils } from '../ThreadUtils';
- @Entry
- @Component
- struct Index {
- timer = -1
- count = 0
- logTool = ThreadUtils.getInstance(32)
- aboutToAppear(): void {
- }
- @State inputText:string =''
- build() {
- Column({space: 20}) {
- TextInput({text: $$this.inputText})
- .width('80%')
- Button() {
- Text("生产日志").padding(10)
- }
- .backgroundColor(Color.Gray)
- .onClick(async () => {
- this.timer = setInterval(()=>{
- this.logTool.writeLog ('item' + this.count)
- this.count += 1
- },1000)
- })
- Button() {
- Text("停止生产").padding(10)
- }
- .backgroundColor(Color.Gray)
- .onClick(async () => {
- clearInterval(this.timer)
- // router.pushUrl({
- // url: 'pages/TaskPoolPage'
- // })
- })
- }
- .alignItems(HorizontalAlign.Center)
- .justifyContent(FlexAlign.Center)
- .height('100%')
- .width('100%')
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |