forked from DlangRen/Programming-in-D
-
Notifications
You must be signed in to change notification settings - Fork 1
/
concurrency.d
1343 lines (1031 loc) · 34 KB
/
concurrency.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
Ddoc
$(DERS_BOLUMU $(IX concurrency, message passing) $(IX message passing concurrency) 基于消息传递的并发)
$(P
并发(concurrency)与上一章的并行(parallelism)很相似,但还是存在较多差异。这两个概念都涉及多线程,且并行是基于并发的,刚接触它们时感到有些迷惑也属正常。
)
$(P
$(IX parallelism vs. concurrency) $(IX concurrency vs. parallelism) 并发和并行的区别主要有以下几点:
)
$(UL
$(LI
并行的主要目的是利用微处理器的多核提高程序的性能。而并发则是一个在单核环境里也可以使用的概念。并发可以让一个程序同时在多个线程上运行。比如说服务器程序就是并发的,它需要同时处理多个客户端的请求。
)
$(LI
在并行中,任务之间相互独立。事实上如果同时运行的任务依赖其他任务的结果就可能会引发程序错误。而对于并发,线程间的相互依赖是很常见的。
)
$(LI
虽然两者都涉及线程操作,但并行线程被封装成了任务。而并发则需要显式利用线程。
)
$(LI
并行易于使用,并且只要任务相互独立,便可以轻松地生成可正常工作的程序。并发只有在基于 $(I 消息传递) 实现时才比较简单。若使用传统的基于锁的数据共享实现的并发模型,则很难写出正确的并发程序。
)
)
$(P
D 语言支持两种并发模型:消息传递和数据共享。本章主要讲解消息传递,而数据共享则被放置到下一章。
)
$(H5 相关概念)
$(P
$(IX thread) $(B 线程):操作系统执行程序的工作单元叫做 $(I 线程)。D 语言程序在操作系统指定的线程上执行 $(C main()) 函数。通常情况下程序的所有操作都将在这个线程中完成。程序也可以自由地创建线程以实现在同一时间执行多个任务的功能。实际上,上一章讲解的任务是基于线程的,只不过这些线程是由 $(C std.parallelism) 自动启动。
)
$(P
操作系统会不定期的将线程暂停一段时间。也就是说,即使是一个简单的自增操作也可能会在执行到一半时被操作系统暂停:
)
---
++i;
---
$(P
上面这个看似简单的操作实际上包含三个步骤:读取变量的值、将其加一、将结果写回变量所在的内存。线程可能会暂停在这三步中的任何一步上,停顿一段时间后才会继续。
)
$(P
$(IX message) $(B 消息):在线程间传递的数据叫做消息。任何类型任何长短的数据都可以被称为消息。
)
$(P
$(IX thread id) $(B 线程 ID):每个线程都有一个 ID,你可以用它来指定消息的接收者。
)
$(P
$(IX owner) $(B 所有者):任何启动另一个线程的线程即为该新线程的所有者。
)
$(P
$(IX worker) $(B 工作线程):由所有者启动的任何线种都叫做工作线程。
)
$(H5 $(IX spawn) 启动线程)
$(P
$(C spawn()) 需要一个函数指针作为参数,并且会从该函数启动一个新线程。该函数(包括它可能调用的其他函数)包含的所有操作都会在新线程中执行。使用 $(C spawn()) 启动的线程和使用 $(LINK2 /ders/d.en/parallelism.html, $(C task())) 启动的线程之间最大的差异在于,$(C spawn()) 允许线程间消息传递。
)
$(P
新线程启动后,所有者和工作线程将会独立执行,看上去它们就像是独立的程序:
)
---
import std.stdio;
import std.concurrency;
import core.thread;
void worker() {
foreach (i; 0 .. 5) {
Thread.sleep(500.msecs);
writeln(i, " (worker)");
}
}
void main() {
$(HILITE spawn(&worker));
foreach (i; 0 .. 5) {
Thread.sleep(300.msecs);
writeln(i, " (main)");
}
writeln("main is done.");
}
---
$(P
本章中的例子调用 $(C Thread.sleep) 减慢线程执行的速度来更方便的展示线程运行的情况。这个程序的输出表明有两个线程:一个用于运行 $(C main()),另一个则由 $(C spawn()) 启动。它们同时相互独立地执行:
)
$(SHELL
0 (main)
0 (worker)
1 (main)
2 (main)
1 (worker)
3 (main)
2 (worker)
4 (main)
main is done.
3 (worker)
4 (worker)
)
$(P
程序在所有线程执行完毕后才会退出。从上面的输出中我们可以看到,在函数 $(C main()) 输出 “main is done.” 并退出之后,$(C worker()) 仍然在继续执行。
)
$(P
线程函数所需的参数可以通过 $(C spawn()) 的第二个及以后面的参数传入。下面程序中的两个工作线程分别输出四个数字。线程函数的参数为初始数字:
)
---
import std.stdio;
import std.concurrency;
import core.thread;
void worker($(HILITE int firstNumber)) {
foreach (i; 0 .. 4) {
Thread.sleep(500.msecs);
writeln(firstNumber + i);
}
}
void main() {
foreach (i; 1 .. 3) {
spawn(&worker, $(HILITE i * 10));
}
}
---
$(P
其中一个线程的输出被高亮了:
)
$(SHELL
10
$(HILITE 20)
11
$(HILITE 21)
12
$(HILITE 22)
13
$(HILITE 23)
)
$(P
程序的输出顺序可能会和上面有所不同,具体情况取决于操作系统对线程的调度。
)
$(P
$(IX CPU bound) $(IX I/O bound) $(IX thread performance) 每个操作系统都会限制线程同时运行的个数。这种限制可能是对用户的,也可能是对整个操作系统的,当然也可能是对其他某些级别。如果处于忙碌中的工作线程的数量超过系统内核的数量,那么系统的整体性能有可能下降。在指定运行时间消耗大量 CPU 资源的线程叫做 $(I CPU 密集型)。与之相对的是消耗大量时间等待事件、用户输入、来自互联网的数据或调用了 $(C Thread.sleep) 等情况的线程。这种线程被称作 $(I I/O 密集型)。如果大部分线程都是 I/O 密集型的,那么程序不需要担心由于线程数超过内核数而造成性能下降的问题。出于性能设计的考量,我们需要谨慎分析并确定线程的类型。
)
$(H5 $(IX Tid) $(IX thisTid) $(IX ownerTid) 线程 ID)
$(P
$(C thisTid()) 返回$(I 当前)线程的 ID。通常调用它的时候不需要带圆括号:
)
---
import std.stdio;
import std.concurrency;
void printTid(string tag) {
writefln("%s: %s", tag, $(HILITE thisTid));
}
void worker() {
printTid("Worker");
}
void main() {
spawn(&worker);
printTid("Owner ");
}
---
$(P
$(C thisTid()) 返回的类型为 $(C Tid),它对程序没有意义。它甚至没重载 $(C toString()):
)
$(SHELL
Owner : Tid(22310e53100)
Worker: Tid(22310e53000)
)
$(P
我们之前一直没有用到的 $(C spawn()) 的返回值即为工作线程的 ID:
)
---
$(HILITE Tid myWorker) = spawn(&worker);
---
$(P
相应地,使用 $(C ownerTid()) 可以获得工作线程的所有者的 ID。
)
$(P
总之,调用 $(C ownerTid) 获取其所有者 ID,通过 $(C spawn()) 的返回值获取工作线程 ID。
)
$(H5 $(IX send) $(IX receiveOnly) 消息传递)
$(P
$(C send()) 可用于发送消息,而 $(C receiveOnly()) 可用于等待待定类型的消息。(此外还有 $(C prioritySend())、$(C receive()) 和 $(C receiveTimeout()) ——在后面章节介绍它们。)
)
$(P
在下面程序里,所有者线程会向工作线程发送 $(C int) 类型的消息,并等待工作线程返回 $(C double) 类型的消息。工作线程会不停地返回消息,一直到所有者线程发送一个 $(C int) 型的负数为止。下面是所有者线程:
)
---
void $(CODE_DONT_TEST)main() {
Tid worker = spawn(&workerFunc);
foreach (value; 1 .. 5) {
$(HILITE worker.send)(value);
double result = $(HILITE receiveOnly!double)();
writefln("sent: %s, received: %s", value, result);
}
/* 向工作线程发送一个负数
*使其终止*/
$(HILITE worker.send)(-1);
}
---
$(P
$(C main()) 将 $(C spawn()) 的返回值储存在 $(C worker) 变量中并通过它来给工作线程发送消息。
)
$(P
另一方面,工作线程需要 $(C int) 类型的消息并对其进行计算,之后将计算得到的 $(C double) 类型的结果返回给其所有者:
)
---
void workerFunc() {
int value = 0;
while (value >= 0) {
value = $(HILITE receiveOnly!int)();
double result = to!double(value) / 5;
$(HILITE ownerTid.send)(result);
}
}
---
$(P
主线程会将它发送的和接收的消息一起输出:
)
$(SHELL
sent: 1, received: 0.2
sent: 2, received: 0.4
sent: 3, received: 0.6
sent: 4, received: 0.8
)
$(P
也可以在消息中一次发送多个值,这些值都会成为同一消息的一部分。下面这个消息就是由三个部分组成:
)
---
ownerTid.send($(HILITE thisTid, 42, 1.5));
---
$(P
如果在一次消息中传递多个值的话,接收者会将它们看作一个元组。此时,$(C receiveOnly()) 的模版参数的类型要与每一个元组成员的类型对应:
)
---
/* 等待一个包含 Tid、int 和 double 类型的消息。*/
auto message = receiveOnly!($(HILITE Tid, int, double))();
auto sender = message[0]; // Tid 类型
auto integer = message[1]; // int 类型
auto floating = message[2]; // double 类型
---
$(P
$(IX MessageMismatch) 如果类型不匹配,则程序将会抛出一个 $(C MessageMismatch) 异常:
)
---
import std.concurrency;
void workerFunc() {
ownerTid.send("hello"); $(CODE_NOTE 发送 $(HILITE string))
}
void main() {
spawn(&workerFunc);
auto message = receiveOnly!double(); $(CODE_NOTE 期望 $(HILITE double))
}
---
$(P
输出:
)
$(SHELL
std.concurrency.$(HILITE MessageMismatch)@std/concurrency.d(235):
Unexpected message type: expected 'double', got 'immutable(char)[]'
)
$(P
所有者无法捕获由工作线程抛出的异常。一种解决方案是在工作线程中捕获潜在的由接收信息引发的异常。随后就会看到这个。
)
$(H6 示例)
$(P
现在我们在一个模拟程序里实践一下刚了解到的内容。
)
$(P
下面这个程序模拟的是两个互不相关的机器人在二维空间中随机移动。每个机器人的移动都是由一个独立的线程控制的。线程在启动时需要传入三个参数:
)
$(UL
$(LI 机器人的编号 (id):这个参数会随着消息传回线程所有者,这样我们就可以通过它确认消息的来源。
)
$(LI 起点:机器人的初始位置。
)
$(LI 每一步的间隔时间:决定机器人何时走下一步。
)
)
$(P
这些信息可以储存在下面这个 $(C Job) 结构中:
)
---
struct Job {
size_t robotId;
Position origin;
Duration restDuration;
}
---
$(P
移动机器人的线程会不断地将对应机器人的 ID 和它的移动情况发送给所有者线程:
)
---
void robotMover(Job job) {
Position from = job.origin;
while (true) {
Thread.sleep(job.restDuration);
Position to = randomNeighbor(from);
Movement movement = Movement(from, to);
from = to;
ownerTid.send($(HILITE MovementMessage)(job.robotId, movement));
}
}
---
$(P
The owner simply waits for these messages in an unconditional loop. 它通过消息中的机器人 ID 来识别机器人。所有者会简单地将其运动情况输出:
)
---
while (true) {
auto message = receiveOnly!$(HILITE MovementMessage)();
writefln("%s %s",
robots[message.robotId], message.movement);
}
---
$(P
本例中的所有消息都是从工作线程向线程所有者传递的。当然在许多程序中消息传递不止这么简单。
)
$(P
下面是完整的程序:
)
---
import std.stdio;
import std.random;
import std.string;
import std.concurrency;
import core.thread;
struct Position {
int line;
int column;
string toString() {
return format("%s,%s", line, column);
}
}
struct Movement {
Position from;
Position to;
string toString() {
return ((from == to)
? format("%s (idle)", from)
: format("%s -> %s", from, to));
}
}
class Robot {
string image;
Duration restDuration;
this(string image, Duration restDuration) {
this.image = image;
this.restDuration = restDuration;
}
override string toString() {
return format("%s(%s)", image, restDuration);
}
}
/* 返回一个坐标在 0,0 周边的随机位置。*/
Position randomPosition() {
return Position(uniform!"[]"(-10, 10),
uniform!"[]"(-10, 10));
}
/* 返回一个坐标,它相对从指定坐标最多变化一步。*/
int randomStep(int current) {
return current + uniform!"[]"(-1, 1);
}
/* 返回指定位置周围的坐标。它既可能是
* 八个方向中的一个,也可能是
* 指定那个位置本身。*/
Position randomNeighbor(Position position) {
return Position(randomStep(position.line),
randomStep(position.column));
}
struct Job {
size_t robotId;
Position origin;
Duration restDuration;
}
struct MovementMessage {
size_t robotId;
Movement movement;
}
void robotMover(Job job) {
Position from = job.origin;
while (true) {
Thread.sleep(job.restDuration);
Position to = randomNeighbor(from);
Movement movement = Movement(from, to);
from = to;
ownerTid.send(MovementMessage(job.robotId, movement));
}
}
void main() {
/* 不同移动时间间隔的机器人。*/
Robot[] robots = [ new Robot("A", 600.msecs),
new Robot("B", 2000.msecs),
new Robot("C", 5000.msecs) ];
/* 为每一个机器人启动一个移动线程。*/
foreach (robotId, robot; robots) {
spawn(&robotMover, Job(robotId,
randomPosition(),
robot.restDuration));
}
/* 准备好接收有关机器人的移动情况
* 的信息。*/
while (true) {
auto message = receiveOnly!MovementMessage();
/* 输出机器人的运动情况。*/
writefln("%s %s",
robots[message.robotId], message.movement);
}
}
---
$(P
程序会不停地显示所有机器人的运动信息,除非手动终止:
)
$(SHELL
A(600 ms) 6,2 -> 7,3
A(600 ms) 7,3 -> 8,3
A(600 ms) 8,3 -> 7,3
B(2 secs) -7,-4 -> -6,-3
A(600 ms) 7,3 -> 6,2
A(600 ms) 6,2 -> 7,1
A(600 ms) 7,1 (idle)
B(2 secs) -6,-3 (idle)
A(600 ms) 7,1 -> 7,2
A(600 ms) 7,2 -> 7,3
C(5 secs) -4,-4 -> -3,-5
A(600 ms) 7,3 -> 6,4
...
)
$(P
这个程序展现了并发的强大之处:机器人的移动可以在单独的线程中独立计算,而且它们之间无需相互交换信息。所有者线程仅仅是将收件箱中的消息一个一个取出来并$(I 按顺序)输出。
)
$(H5 $(IX delegate, message passing) 接收不同类型的消息)
$(P
$(C receiveOnly()) 只能接收一种类型的消息。而 $(C receive()) 可以接收多种类型的消息。它会把这些消息分发给各个消息处理委托。当接收到消息时,它会将消息类型与每个委托的类型进行比较。如果委托参数的类型与消息类型相同,则把消息交由对应的委托处理。
)
$(P
例如,下面这个 $(C receive()) 使用了两个委托分别用来处理类型为 $(C int) 和 $(C string) 的消息:
)
---
$(CODE_NAME workerFunc)void workerFunc() {
bool isDone = false;
while (!isDone) {
void intHandler($(HILITE int) message) {
writeln("handling int message: ", message);
if (message == -1) {
writeln("exiting");
isDone = true;
}
}
void stringHandler($(HILITE string) message) {
writeln("handling string message: ", message);
}
receive($(HILITE &intHandler), $(HILITE &stringHandler));
}
}
---
$(P
$(C int) 消息匹配 $(C intHandler()),而 $(C string) 消息匹配 $(C stringHandler())。上面的工作线程可以用下面程序来测试:
)
---
$(CODE_XREF workerFunc)import std.stdio;
import std.concurrency;
// ...
void main() {
auto worker = spawn(&workerFunc);
worker.send(10);
worker.send(42);
worker.send("hello");
worker.send(-1); // ← 终止工作线程
}
---
$(P
程序的输出说明了接收端的函数是如何匹配和处理消息的:
)
$(SHELL
handling int message: 10
handling int message: 42
handling string message: hello
handling int message: -1
exiting
)
$(P
lambda 函数和定义了 $(C opCall()) 成员函数的对象都可以传递给 $(C receive()) 作为消息处理器。下面这个工作线程使用 lambda 函数处理消息。程序还定义了一个 $(C Exit) 类型来通知线程退出。相对于使用像 -1 这样的任意值,用一个特定的类型来传递特定的消息会让程序更易读。
)
$(P
有 3 个匿名函数被传递给了 $(C receive()) 来作为消息处理器。它们的花括号已被高亮:
)
---
import std.stdio;
import std.concurrency;
struct Exit {
}
void workerFunc() {
bool isDone = false;
while (!isDone) {
receive(
(int message) $(HILITE {)
writeln("int message: ", message);
$(HILITE }),
(string message) $(HILITE {)
writeln("string message: ", message);
$(HILITE }),
(Exit message) $(HILITE {)
writeln("exiting");
isDone = true;
$(HILITE }));
}
}
void main() {
auto worker = spawn(&workerFunc);
worker.send(10);
worker.send(42);
worker.send("hello");
worker.send($(HILITE Exit()));
}
---
$(H6 接收任意类型的消息)
$(P
$(IX Variant, concurrency) $(C std.variant.Variant) 类型可以封装任意类型的数据。如果消息无法与参数列表前面指定的各个处理函数相匹配,那么它们将会与一个 $(C Variant) 类型的处理函数匹配:
)
---
import std.stdio;
import std.concurrency;
void workerFunc() {
receive(
(int message) { /* ... */ },
(double message) { /* ... */ },
($(HILITE Variant) message) {
writeln("Unexpected message: ", message);
});
}
struct SpecialMessage {
// ...
}
void main() {
auto worker = spawn(&workerFunc);
worker.send(SpecialMessage());
}
---
$(P
输出:
)
$(SHELL
Unexpected message: SpecialMessage()
)
$(P
有关 $(C Variant) 的详细内容已超出本章范围。
)
$(H5 $(IX receiveTimeout) 在指定的时间内等待消息)
$(P
可能经过一段时间后就不再需要继续等待消息了。消息的发送者可能正在忙碌或因异常终止。$(C receiveTimeout()) 可以防止出现无限等待消息这样的情况。
)
$(P
$(C receiveTimeout()) 的第一个参数决定等待消息时要等待多长时间。如果在指定时间内接收到了消息,函数返回值为 $(C true) ;如果超时则返回 $(C false)。
)
---
import std.stdio;
import std.concurrency;
import core.thread;
void workerFunc() {
Thread.sleep(3.seconds);
ownerTid.send("hello");
}
void main() {
spawn(&workerFunc);
writeln("Waiting for a message");
bool received = false;
while (!received) {
received = $(HILITE receiveTimeout)(600.msecs,
(string message) {
writeln("received: ", message);
});
if (!received) {
writeln("... no message yet");
/* ... 可在此处执行其他操作... */
}
}
}
---
$(P
上面的线程所有者将等待消息 600 毫秒。如果消息超时它还会继续执行其他操作:
)
$(SHELL
Waiting for a message
... no message yet
... no message yet
... no message yet
... no message yet
received: hello
)
$(H5 $(IX exception, concurrency) 工作线程中的异常)
$(P
上一章的 $(C std.parallelism) 自动捕获 task 执行中抛出的异常并在所有者的线程中重新抛出。它使得所有者线程可以捕获工作线程的异常:
)
---
try {
theTask.yieldForce();
} catch (Exception exc) {
writefln("Detected an error in the task: '%s'",
exc.msg);
}
---
$(P
$(C std.concurrency) 并未提供这种捕获异常的方法。但你也可以在工作线程中手动捕获异常并将其发送给所有者。就像我们下面看到的那样,可以将 $(C OwnerTerminated) 和 $(C LinkTerminated) 当作消息传递。
)
$(P
下面这个 $(C calculate()) 接收一个 $(C string) 消息,将其转换为 $(C double) 并加 0.5,之后将运算的结果作为消息传递回去:
)
---
$(CODE_NAME calculate)void calculate() {
while (true) {
auto message = receiveOnly!string();
ownerTid.send(to!double(message) + 0.5);
}
}
---
$(P
如果该字符串不能转换为一个 $(C double) 值,则调用 $(C to!double()) 会抛出异常。由于异常会立刻终止工作线程,所有者只能收到第一条消息的反馈:
)
---
$(CODE_XREF calculate)import std.stdio;
import std.concurrency;
import std.conv;
// ...
void main() {
Tid calculator = spawn(&calculate);
calculator.send("1.2");
calculator.send("hello"); // ← 错误的输入
calculator.send("3.4");
foreach (i; 0 .. 3) {
auto message = receiveOnly!double();
writefln("result %s: %s", i, message);
}
}
---
$(P
由于工作线程已被终止,所有者只会收到将“1.2”变为 1.7 的消息的反馈。而它并不知道工作线程已经终止,所有者线程会被阻塞来等待永远不会到达的消息:
)
$(SHELL
result 0: 1.7
$(SHELL_NOTE 等待永远不会到达的消息)
)
$(P
工作线程能做的就是手动捕获异常并将其作为特殊的错误信息发送给所有者。下面这个程序就把出错的原因封装在 $(C CalculationFailure) 消息中传递回去。此外,这个程序还使用了特殊的消息类型来通知工作线程退出:
)
---
import std.stdio;
import std.concurrency;
import std.conv;
struct CalculationFailure {
string reason;
}
struct Exit {
}
void calculate() {
bool isDone = false;
while (!isDone) {
receive(
(string message) {
try {
ownerTid.send(to!double(message) + 0.5);
} $(HILITE catch) (Exception exc) {
ownerTid.send(CalculationFailure(exc.msg));
}
},
(Exit message) {
isDone = true;
});
}
}
void main() {
Tid calculator = spawn(&calculate);
calculator.send("1.2");
calculator.send("hello"); // ← 错误的输入
calculator.send("3.4");
calculator.send(Exit());
foreach (i; 0 .. 3) {
writef("result %s: ", i);
receive(
(double message) {
writeln(message);
},
(CalculationFailure message) {
writefln("ERROR! '%s'", message.reason);
});
}
}
---
$(P
这次错误的原因会被所有者输出:
)
$(SHELL
result 0: 1.7
result 1: ERROR! 'no digits seen'
result 2: 3.9
)
$(P
另外一种方法是直接将异常对象发送回所有者。所有者既可以处理异常对象也可以重新抛出:
)
---
// ... 工作线程中 ...
try {
// ...
} catch ($(HILITE shared(Exception)) exc) {
ownerTid.send(exc);
}},
// ... 所有者线程 ...
receive(
// ...
($(HILITE shared(Exception)) exc) {
throw exc;
});
---
$(P
下一章会解释为什么此处必须使用 $(C shared) 说明符。
)
$(H5 检测线程终止)
$(P
线程可以检测消息的接收者是否已经终止。
)
$(H6 $(IX OwnerTerminated) $(C OwnerTerminated) 异常)
$(P
如果所有者线程已被终止,工作线程在接收消息时会抛出这个异常。下方程序中处在中间层的线程所有者在发送两条消息后就立即退出。这会导致工作线程抛出 $(C OwnerTerminated) 异常:
)
---
import std.stdio;
import std.concurrency;
void main() {
spawn(&intermediaryFunc);
}
void intermediaryFunc() {
auto worker = spawn(&workerFunc);
worker.send(1);
worker.send(2);
} // ← 发送两条消息后立刻终止
void workerFunc() {
while (true) {
auto m = receiveOnly!int(); // ← 如果
// 拥有者线程已经终止,
// 它将抛出异常。
writeln("Message: ", m);
}
}
---
$(P
输出:
)
$(SHELL
Message: 1
Message: 2
std.concurrency.$(HILITE OwnerTerminated)@std/concurrency.d(248):
Owner terminated
)
$(P
工作线程也可以通过捕获这个异常来优雅地退出:
)
---
void workerFunc() {
bool isDone = false;
while (!isDone) {
try {
auto m = receiveOnly!int();
writeln("Message: ", m);
} catch ($(HILITE OwnerTerminated) exc) {
writeln("The owner has terminated.");
isDone = true;
}
}
}
---
$(P
输出:
)
$(SHELL
Message: 1
Message: 2
The owner has terminated.
)
$(P
之后我们会看到也可以将这个异常当作消息发送。
)
$(H6 $(IX LinkTerminated) $(IX spawnLinked) $(C LinkTerminated) 异常)
$(P
$(C spawnLinked()) 与 $(C spawn()) 用法相同。当由 $(C spawnLinked()) 创建的线程终止时,拥有者线程将会抛出 $(C LinkTerminated) 异常。
)
---
import std.stdio;
import std.concurrency;