WorkManager基本使用及源码分析(五) - Processor

2021年02月06日 99 字 Jetpack


目录


源码篇

前面从work-runtime库的AndroidManifest.xml中了解到,WorkManager工作主要通过ContentProvider、Service、BroadcastReceiver协同工作,而关注几个主要Service工作流程发现,最终都会执行到Processor.startWork()方法,此篇将重点关注Processor.startWork()是何方神圣。

首先我们简单过一下 mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras)的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean startWork(@NonNull String id,  @Nullable WorkerParameters.RuntimeExtras runtimeExtras) {
WorkerWrapper workWrapper;
// 保证任务队列(MAP)只进或只出
synchronized (mLock) {
// doing 验证id不在队列中
// 初始化WorkerWrapper
workWrapper = new WorkerWrapper.Builder(...).build();
ListenableFuture<Boolean> future = workWrapper.getFuture();
// 为执行结果添加监听,注意这里监听回调何时调用后面会说明
// 特别注意:future.addListener中 FutureListener 是一个Runnable对象
// 特别注意: FutureListener 是一个Runnable对象
future.addListener(
new FutureListener(this, id, future),
mWorkTaskExecutor.getMainThreadExecutor());
// 将此任务加入队列
mEnqueuedWorkMap.put(id, workWrapper);
}
// 使用工作线程执行任务
mWorkTaskExecutor.getBackgroundExecutor().execute(workWrapper);
}

mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras)主要做了三个操作:

  • 将任务要素封装为WorkerWrapper并为其执行期望添加监听
  • 任务要素入队
  • 工作线程执行处理任务

相对主要的,我们继续跟进工作线程处理任务流程execute(Runnable), excute将WorkerWrapper继续封装为Task并加入任务队列,若当前无活跃Task则安排任务执行,任务执行完毕后继续检查任务队列,直到任务队列空置为止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void execute(@NonNull Runnable command) {
// 保证任务队列只进或只出
synchronized (mLock) {
// 封装任务并入队
mTasks.add(new Task(this, command));
// 若无活跃任务,则尝试安排任务
if (mActive == null) {
scheduleNext();
}
}
}

// Synthetic access
void scheduleNext() {
// 保证任务队列只进或只出
synchronized (mLock) {
// 有任务且任务出队
if ((mActive = mTasks.poll()) != null) {
// 执行任务
mExecutor.execute(mActive);
}
}
}

mExecutor.execute(mActive)会通过Task执行execute(Runnable)传入的Runnable,即执行WorkerWrapper(这里有点绕,可以多看两遍~),Task不论WorkerWrapper执行结果最终会继续轮询任务队列(此处不赘述)。此时,我们将目光移到WorkerWrapper.run():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@WorkerThread
@Override
public void run() {
mTags = mWorkTagDao.getTagsForWorkSpecId(mWorkSpecId);
mWorkDescription = createWorkDescription(mTags);
runWorker();
}
private void runWorker() {
// 各个状态判断最终调用到resolve(false);
resolve(true); // OR resolve(false);

// 尝试将工作设置为运行状态。注意,这可能会失败,因为自从上次在这个函数的顶部检查之后,另一个线程可能已经修改了数据库。
if (trySetRunning()) {
if (tryCheckForInterruptionAndResolve()) {
return;
}

final SettableFuture<ListenableWorker.Result> future = SettableFuture.create();
// Call mWorker.startWork() on the main thread.
mWorkTaskExecutor.getMainThreadExecutor()
.execute(new Runnable() {
@Override
public void run() {
try {
// 调用Worker执行工作任务
mInnerFuture = mWorker.startWork();
// 将任务结果回调添加到期望回调中
future.setFuture(mInnerFuture);
} catch (Throwable e) {
future.setException(e);
}

}
});

// Avoid synthetic accessors.
final String workDescription = mWorkDescription;
future.addListener(new Runnable() {
@Override
@SuppressLint("SyntheticAccessor")
public void run() {
// 最终调用到resolve(false);
resolve(false);
}
}, mWorkTaskExecutor.getBackgroundExecutor());
} else {
// 最终调用到resolve(false);
resolve(boolean);
}

}

private void resolve(final boolean needsReschedule) {
// 数据库事务操作

mFuture.set(needsReschedule);
}

runWorker() 中包含各种任务状态的判断,并尝试将任务置为运行状态,若设置运行状态成功,则调用Worker.startWork() 即最终会走向个人定义的Worker的doWork()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public final @NonNull ListenableFuture<Result> startWork() {
mFuture = SettableFuture.create();
getBackgroundExecutor().execute(new Runnable() {
@Override
public void run() {
try {
Result result = doWork();
mFuture.set(result);
} catch (Throwable throwable) {
mFuture.setException(throwable);
}

}
});
return mFuture;
}

后续跟进resolve, 此方法除了数据库相关操作(更新Worker状态)外,最终会执行mFuture.set(needsReschedule),即目标期望回调。

在跟进绕来绕去的mFuture.set(needsReschedule)之前,先回想一下,还记得WorkManagerImpl.startWork代码片段里的初始化代码段,future.addListener中 FutureListener 是一个Runnable对象,而addListener入参最终会被封装到AbstractFuture.Listener中:

1
2
3
4
5
// 此处Runnable task实际为FutureListener
Listener(Runnable task, Executor executor) {
this.task = task;
this.executor = executor;
}

切回正题,继续跟进mFuture.set(needsReschedule),此处set(boolean)最终会调用AbstractFuture.set(),继而调用AbstractFuture.complete

1
2
3
4
5
6
7
8
protected boolean set(@Nullable V value) {
Object valueToSet = value == null ? NULL : value;
if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
complete(this);
return true;
}
return false;
}

AbstractFuture.complete会遍历期望回调结合,找出其中代表执行结果回调的项并执行Runnable回调封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void complete(AbstractFuture<?> future) {
Listener next = null;
outer:
while (true) {
future.releaseWaiters();
future.afterDone();
next = future.clearListeners(next);
future = null;
while (next != null) {
Listener curr = next;
next = next.next;
Runnable task = curr.task;
if (task instanceof SetFuture) {
// do something
continue outer;
} else {
// 执行Runnable回调封装
executeListener(task, curr.executor);
}
}
break;
}
}

这里executeListener(task, curr.executor)中的task参数即在Processor中封装WorkWrapper时的FutureListener参数,即此处最终会执行FutureListener.run()方法,从而调用mExecutionListener.onExecuted(mWorkSpecId, needsReschedule):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class FutureListener implements Runnable {
// something

@Override
public void run() {
boolean needsReschedule;
try {
needsReschedule = mFuture.get();
} catch (InterruptedException | ExecutionException e) {
// Should never really happen(?)
needsReschedule = true;
}
mExecutionListener.onExecuted(mWorkSpecId, needsReschedule);
}
}

我们回想一下Processor.startWork()中关于设置futureListener的代码:

1
2
3
future.addListener(
new FutureListener(this, id, future),
mWorkTaskExecutor.getMainThreadExecutor());

此处FutureListener中的第一个参数ExecutionListener是“this”,即会调用ExecutionListener.onExecuted():

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void onExecuted(
@NonNull final String workSpecId,
boolean needsReschedule) {

synchronized (mLock) {
mEnqueuedWorkMap.remove(workSpecId);
// mOuterListeners来自哪里,还记得本节SystemJobService最开始需要记住的addExecutionListener吗,没错,就是这里。
for (ExecutionListener executionListener : mOuterListeners) {
executionListener.onExecuted(workSpecId, needsReschedule);
}
}
}

让我们来看看mOuterListeners赋值:

1
2
3
4
5
public void addExecutionListener(@NonNull ExecutionListener executionListener) {
synchronized (mLock) {
mOuterListeners.add(executionListener);
}
}

嗨,跑了一圈,我们又回来了,即上面onExecuted中遍历执行的executionListener.onExecuted(workSpecId, needsReschedule)
以SystemJobService为例,会调用SystemJobService.onExecuted():

1
2
3
4
5
6
7
8
9
10
11
@Override
public void onExecuted(@NonNull String workSpecId, boolean needsReschedule) {
Logger.get().debug(TAG, String.format("%s executed on JobScheduler", workSpecId));
JobParameters parameters;
synchronized (mJobParameters) {
parameters = mJobParameters.remove(workSpecId);
}
if (parameters != null) {
jobFinished(parameters, needsReschedule);
}
}

至此,从XXXService调用SystemXXXService执行任务到最终任务结束及回调流程形成了一个完整的链。