目录
源码篇
前面从work-runtime库的AndroidManifest.xml中了解到,WorkManager工作主要通过ContentProvider、Service、BroadcastReceiver协同工作,而关注几个主要Service工作流程发现,最终都会执行到Processor.startWork()
方法,此篇将重点关注Processor.startWork()
是何方神圣。
首先我们简单过一下 mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras)
的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public boolean startWork(@NonNull String id, @Nullable WorkerParameters.RuntimeExtras runtimeExtras) { WorkerWrapper workWrapper; // 保证任务队列(MAP)只进或只出 synchronized (mLock) { // doing 验证id不在队列中 // 初始化WorkerWrapper workWrapper = new WorkerWrapper.Builder(...).build(); ListenableFuture<Boolean> future = workWrapper.getFuture(); // 为执行结果添加监听,注意这里监听回调何时调用后面会说明 // 特别注意:future.addListener中 FutureListener 是一个Runnable对象 // 特别注意: FutureListener 是一个Runnable对象 future.addListener( new FutureListener(this, id, future), mWorkTaskExecutor.getMainThreadExecutor()); // 将此任务加入队列 mEnqueuedWorkMap.put(id, workWrapper); } // 使用工作线程执行任务 mWorkTaskExecutor.getBackgroundExecutor().execute(workWrapper); }
|
mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras)
主要做了三个操作:
- 将任务要素封装为WorkerWrapper并为其执行期望添加监听
- 任务要素入队
- 工作线程执行处理任务
相对主要的,我们继续跟进工作线程处理任务流程execute(Runnable)
, excute将WorkerWrapper继续封装为Task并加入任务队列,若当前无活跃Task则安排任务执行,任务执行完毕后继续检查任务队列,直到任务队列空置为止:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public void execute(@NonNull Runnable command) { synchronized (mLock) { mTasks.add(new Task(this, command)); if (mActive == null) { scheduleNext(); } } }
void scheduleNext() { synchronized (mLock) { if ((mActive = mTasks.poll()) != null) { mExecutor.execute(mActive); } } }
|
mExecutor.execute(mActive)
会通过Task执行execute(Runnable)
传入的Runnable,即执行WorkerWrapper
(这里有点绕,可以多看两遍~),Task不论WorkerWrapper执行结果最终会继续轮询任务队列(此处不赘述)。此时,我们将目光移到WorkerWrapper.run()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @WorkerThread @Override public void run() { mTags = mWorkTagDao.getTagsForWorkSpecId(mWorkSpecId); mWorkDescription = createWorkDescription(mTags); runWorker(); } private void runWorker() { // 各个状态判断最终调用到resolve(false); resolve(true); // OR resolve(false);
// 尝试将工作设置为运行状态。注意,这可能会失败,因为自从上次在这个函数的顶部检查之后,另一个线程可能已经修改了数据库。 if (trySetRunning()) { if (tryCheckForInterruptionAndResolve()) { return; }
final SettableFuture<ListenableWorker.Result> future = SettableFuture.create(); // Call mWorker.startWork() on the main thread. mWorkTaskExecutor.getMainThreadExecutor() .execute(new Runnable() { @Override public void run() { try { // 调用Worker执行工作任务 mInnerFuture = mWorker.startWork(); // 将任务结果回调添加到期望回调中 future.setFuture(mInnerFuture); } catch (Throwable e) { future.setException(e); }
} });
// Avoid synthetic accessors. final String workDescription = mWorkDescription; future.addListener(new Runnable() { @Override @SuppressLint("SyntheticAccessor") public void run() { // 最终调用到resolve(false); resolve(false); } }, mWorkTaskExecutor.getBackgroundExecutor()); } else { // 最终调用到resolve(false); resolve(boolean); }
}
private void resolve(final boolean needsReschedule) { // 数据库事务操作 mFuture.set(needsReschedule); }
|
runWorker()
中包含各种任务状态的判断,并尝试将任务置为运行状态,若设置运行状态成功,则调用Worker.startWork()
即最终会走向个人定义的Worker的doWork()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Override public final @NonNull ListenableFuture<Result> startWork() { mFuture = SettableFuture.create(); getBackgroundExecutor().execute(new Runnable() { @Override public void run() { try { Result result = doWork(); mFuture.set(result); } catch (Throwable throwable) { mFuture.setException(throwable); }
} }); return mFuture; }
|
后续跟进resolve
, 此方法除了数据库相关操作(更新Worker状态)外,最终会执行mFuture.set(needsReschedule)
,即目标期望回调。
在跟进绕来绕去的mFuture.set(needsReschedule)
之前,先回想一下,还记得WorkManagerImpl.startWork
代码片段里的初始化代码段,future.addListener中 FutureListener 是一个Runnable对象,而addListener入参最终会被封装到AbstractFuture.Listener
中:
1 2 3 4 5
| Listener(Runnable task, Executor executor) { this.task = task; this.executor = executor; }
|
切回正题,继续跟进mFuture.set(needsReschedule)
,此处set(boolean)最终会调用AbstractFuture.set()
,继而调用AbstractFuture.complete
:
1 2 3 4 5 6 7 8
| protected boolean set(@Nullable V value) { Object valueToSet = value == null ? NULL : value; if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { complete(this); return true; } return false; }
|
AbstractFuture.complete
会遍历期望回调结合,找出其中代表执行结果回调的项并执行Runnable回调封装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| static void complete(AbstractFuture<?> future) { Listener next = null; outer: while (true) { future.releaseWaiters(); future.afterDone(); next = future.clearListeners(next); future = null; while (next != null) { Listener curr = next; next = next.next; Runnable task = curr.task; if (task instanceof SetFuture) { // do something continue outer; } else { // 执行Runnable回调封装 executeListener(task, curr.executor); } } break; } }
|
这里executeListener(task, curr.executor)
中的task参数即在Processor中封装WorkWrapper
时的FutureListener
参数,即此处最终会执行FutureListener.run()
方法,从而调用mExecutionListener.onExecuted(mWorkSpecId, needsReschedule)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private static class FutureListener implements Runnable { @Override public void run() { boolean needsReschedule; try { needsReschedule = mFuture.get(); } catch (InterruptedException | ExecutionException e) { needsReschedule = true; } mExecutionListener.onExecuted(mWorkSpecId, needsReschedule); } }
|
我们回想一下Processor.startWork()
中关于设置futureListener的代码:
1 2 3
| future.addListener( new FutureListener(this, id, future), mWorkTaskExecutor.getMainThreadExecutor());
|
此处FutureListener中的第一个参数ExecutionListener是“this”,即会调用ExecutionListener.onExecuted()
:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void onExecuted( @NonNull final String workSpecId, boolean needsReschedule) {
synchronized (mLock) { mEnqueuedWorkMap.remove(workSpecId); for (ExecutionListener executionListener : mOuterListeners) { executionListener.onExecuted(workSpecId, needsReschedule); } } }
|
让我们来看看mOuterListeners
赋值:
1 2 3 4 5
| public void addExecutionListener(@NonNull ExecutionListener executionListener) { synchronized (mLock) { mOuterListeners.add(executionListener); } }
|
嗨,跑了一圈,我们又回来了,即上面onExecuted
中遍历执行的executionListener.onExecuted(workSpecId, needsReschedule)
,
以SystemJobService为例,会调用SystemJobService.onExecuted()
:
1 2 3 4 5 6 7 8 9 10 11
| @Override public void onExecuted(@NonNull String workSpecId, boolean needsReschedule) { Logger.get().debug(TAG, String.format("%s executed on JobScheduler", workSpecId)); JobParameters parameters; synchronized (mJobParameters) { parameters = mJobParameters.remove(workSpecId); } if (parameters != null) { jobFinished(parameters, needsReschedule); } }
|
至此,从XXXService调用SystemXXXService执行任务到最终任务结束及回调流程形成了一个完整的链。