Flutter——手撸线程(Isolate)池和子线程工作模块的设计

tech2024-01-07  76

前言

一些时候我们执行异步操作,如一个api请求,可直接使用async/await 完成。但有时候我们可能需要短时间内执行大量异步操作,如打印,写入文件等等,那么再用async/await 直接插入到event queue中势必会造成卡顿,为此我们应该将它们放在子线程中。

不过频繁创建/释放isolate也是对资源的浪费,所以我决定模仿java写一个线程池(基础版)。

基础版 略显稚嫩,还请海涵

结构图

主要分为三部分:

main isolate : flutter 主线程 proxy isolate : 子线程 负责接收主线程的指令,并下发给work isolate work isolate : 实际执行任务的子线程

流程

主线程提交任务,proxy线程接收并缓存下来,同时向线程池(3个线程)内部的空闲线程进行任务派发

生命周期:线程创建后,便会一直存在,与APP一致

之后通过反射在工作线程中调用预先定义好的方法。

对于开发人员,无需关注线程内部原理,只需要定义一个方法, 然后通过WorkerMainProxy类的invokeWorker就可以实现子线程执行。

接下来我们分别实现它们,还是老规矩,代码较多时我会将说明写在注释里。

main_isolate

一些常量和数据结构

因为isolate之间只能通过sendPort.send互相通信,所以我们先定义一些常量

const int kSendPortKey = 6633;//第二个元素则为 sendPort const int kTaskKey = 8844; // 第二个元素为task const int kTaskParamsKey = 10055; // 第二个元素为 方法对应的 参数 const int kTaskResult = 15500;//任务返回结果 /// const String kMethodName = 'kMethodName'; const String kNameArgs = 'kNameArgs';

port发送的message可以是 null,num,double,bool,或者包含上述类型的List和Map,以及SendPort,所以我们需要设计一下数据结构。

我这里统一使用数组 :

[key,data] key 是上方的常量值 当data为task时,类型会是Map,然后用到上面的字符串常量

WorkerMainProxy

这个类内部与proxy_isolate联系,并通过invokeWorker方法,将任务提交给proxy_isolate,我们看一下他的代码,说明我写在注释里

class WorkerMainProxy{ static WorkerMainProxy _instance; static WorkerMainProxy getInstance(){ if(_instance == null){ _instance = WorkerMainProxy._(); } return _instance; } factory WorkerMainProxy()=>getInstance(); WorkerMainProxy._(); //主线程的收听端口 final ReceivePort receivePort = ReceivePort(); //proxy Isolate isolate; //向proxy发送信息的port SendPort childPort; //因为线程初始化为异步,确保下达的任务不丢失,创建这个缓存 List<TaskWrapper> taskCache = []; //是否在初始化proxy线程 bool initializing = false; ///nameArgs key: params name ///value: params value . ///and type can only 'num,null,double,String' void invokeWorker({String methodName,Map<String,dynamic> nameArgs})async{ taskCache.add(TaskWrapper(methodName, nameArgs)); if(isolate == null && !initializing){ initializing = true; //初始化子线程,第一个参数为顶级函数,由子线程执行, //第二个参数为当前线程的sendport,子线程可以用这个向当前线程发送消息 isolate = await Isolate.spawn(proxyHandler, receivePort.sendPort); //收听子线程发送的消息 receivePort.listen((message) { if(message[0] == kSendPortKey){ //子线程的sendport传递过来 childPort = message[1]; sendTask(); }else if(message[0]==kTaskResult){ //初版不支持返回结果 ///要考虑多线程不同步的情况 //可能造成难以预料的结果,所以暂不考虑加入返回结果的功能 } }); }else if(isolate != null && childPort != null){ sendTask(); } } //当childPort初始化成功后,开始发送任务 void sendTask(){ if(taskCache.length > 0){ taskCache.forEach((element) { childPort.send([kTaskKey,element.methodName,element.nameArgs]); }); taskCache.clear(); } } }

功能很简单,接下来我们看 proxy_isolate的实现

proxy_isolate

结构图

proxyHandler

这个方法是属于proxy 线程(是isolate,叫线程叫习惯了,别被我搞乱了)的,

切记dart线程是不能共享内存的, 同时不要在子线程里使用dart:ui 或者flutter的东西,否则会报错

代码如下:

//任务缓存 List<TaskWrapper> taskLog = []; //收/发端口 final ReceivePort receiveMainPort = ReceivePort(); final SendPort sendPortOfProxy = receiveMainPort.sendPort; //线程池 final Map<int,WorkIsolateWrapper> workers = {}; void proxyHandler(SendPort mainPort)async{ //监听主线程的信息 receiveMainPort.listen((message) { if(message[0] == kTaskKey){ //对主线程的任务进行缓存 String name = message[1]; Map<String,dynamic> args = message[2]; TaskWrapper wrapper = TaskWrapper(name,args); taskLog.add(wrapper); } }); //将proxy的sendport传给主线程 mainPort.send([kSendPortKey,sendPortOfProxy]); /// create 3 work isolate ///创建三个工作线程 List.generate(3, (index) async{ //同理 final ReceivePort proxyPort = ReceivePort(); final SendPort proxySendPort = proxyPort.sendPort; Isolate.spawn(_workerIsolate, proxySendPort,paused: true) .then((isolate) { //生成ID int id = Random().nextInt(1000); while(workers.containsKey(id)){ id = Random().nextInt(1000); } var worker = WorkIsolateWrapper(id,proxyPort, proxySendPort, isolate); workers[id] = worker; //启动工作线程 worker.init(); }); }); //启动代理 runProxy(); } //创建了一个timer用于循环从任务缓存取出任务,并分发给空闲的工作线程 void runProxy(){ final timer = Timer.periodic(Duration(milliseconds: 1), (timer) { if(taskLog.length>0){ workers.forEach((key, value) { if(taskLog.length > 0){ if(value.isStandBy()){ //向空闲的线程发送任务 TaskWrapper task = taskLog.first; value.setStatus(false);// not free value.workSendPort.send([kTaskKey,{kMethodName:task.methodName, kNameArgs:task.nameArgs}]); //移除对应任务 taskLog.removeWhere((element) => element == task); } } }); } }); }

对工作线程进行了一下包装WorkIsolateWrapper,方便操作,我们来看一下他的内部结构

WorkIsolateWrapper

class WorkIsolateWrapper { final int id; final ReceivePort proxyPort; final SendPort proxySendPort; //work_isolate final Isolate _isolate; WorkIsolateWrapper(this.id,this.proxyPort, this.proxySendPort, this._isolate); ///是否空闲 bool _isFree = true; bool isStandBy()=> _isFree&&initSuccess; setStatus(bool status){ _isFree = status; } SendPort workSendPort; bool initSuccess = false; //这就是我们上面调用的init方法 //这个方法开始初始化 work isoalte,并把其sendPort保存下来。 init() { _isolate.resume(_isolate.pauseCapability); proxyPort.listen((message) { //监听工作线程 if (message[0] == kSendPortKey) { //保存工作线程的 sendPort workSendPort = message[1]; initSuccess = true; }else if(message[0] == kWorkDone){ ///work done setStatus(true); print('isolate $id 完成了 :${message[1].toString()}'); } }); } }

至此proxy_isolate就实现了,我们接下来看一下work_isolate的实现

work_isolate

实现

const int kWorkDone = 98766; //处理完后的回复tag void _workerIsolate(SendPort proxyPort){ //启动反射,这里我在后面介绍 initializeReflectable(); final ReceivePort receivePort = ReceivePort(); final SendPort sendPort = receivePort.sendPort; receivePort.listen((message) { if(message[0] == kTaskKey){ ///执行任务 //对msg进行解构再组装 Map method = message[1]; String mn = method[kMethodName]; Map<Symbol,dynamic> nameArguments = {}; if(method[kNameArgs] is Map){ ///为了避免顺序错误导致的参数异常,这里不使用positionalArguments (method[kNameArgs] as Map).forEach((key, value) { nameArguments[Symbol(key)] = value; }); //功能模板类,里面定义了一些列方法,通过反射进行调用 final WorkList workerList = WorkList(); final InstanceMirror instanceMirror = myReflect.reflect(workerList); //执行静态方法用的,这里不需要先注释掉 //final ClassMirror classMirror = myReflect.reflectType(WorkList); //调用方法 instanceMirror.invoke(mn, [],nameArguments); ///work done ///结构暂定为 [order flag, result(Map)] ///个人认为这种多线程处理任务,最好不要有返回结果 ....待设计 proxyPort.send([kWorkDone,{'method':mn,'args':nameArguments.toString()}]); } } }); //将工作线程的sendPort发送给proxy proxyPort.send([kSendPortKey,sendPort]); }

workList 反射模板

介绍

我们在worklist里定义好方法,子线程通过反射就可以调用这些方法,这样实际使用的时候,开发人员就不需要关心子线程的实现,只用在WorkList里面定义好自己的方法,然后通过WorkerMainProxy.invoerWorker()方法调用即可。

实现

首先我们需要两个插件:

reflectable: ^2.2.5 build_runner: ^1.7.0

之后我们在main.dart文件中增加如下代码:

///具体方法模板类 @myReflect class WorkList{ test({String n,String m}){ print(' test method $n'); } } ///插件反射对象 const myReflect = MyReflectable(); class MyReflectable extends Reflectable{ const MyReflectable():super(invokingCapability); }

同时在根目录中增加一个build.yaml文件用来生成代码

targets: $default: builders: reflectable: generate_for: - lib/main.dart

都弄好了后,我们在控制台执行

flutter packages pub run build_runner build

过一小会,你就会看到main.dart下方多了一个

main.refelctable.dart

这样我们就完成了反射模板workList的配置,每次编辑workList后都需要运行一下

flutter packages pub run build_runner build

测试

至此整个功能就开发完毕了,我们测试一下,点击按钮发起100次调用:

RaisedButton(onPressed: ()async{ List.generate(100, (index){ WorkerMainProxy.getInstance() ///参数一:方法名字,参数二:方法对应的命名参数, ///务必确保参数名与WorkList中的一致 .invokeWorker(methodName: 'test',nameArgs: {'n':'第$index次唤起','m':'第二个参数'}); }); }, child: Text('测试worker'),),

可以看到测试结果是符合我们预期的,初版功能就开发完毕了。

结语

以上只是初版功能,还有诸多需要完善的,后续我会不断增加,另外功能模块也可以进一步拓展,如 java中的cache线程池。

也欢迎大家补充,如有不足还请指出,

该功能已加入 Bedrock开发框架,希望大家多提意见 😃

Bedrock开发框架

我的其它文章

Flutter混合开发——一种另类却高效的的原生View嵌入方法

见微知著,Flutter在游戏开发的表现及跨平台带来的优势

Flutter——封装一个方便使用的Loading弹窗

Flutter 自定义View——仿同花顺自选股列表

最新回复(0)