1 00:00:00,400 --> 00:00:03,699 In this video, we will use MPSC channels 2 00:00:03,699 --> 00:00:06,160 to communicate between threads. Here are 3 00:00:06,160 --> 00:00:08,500 the four threads of execution in our 4 00:00:08,500 --> 00:00:10,450 program. We start with the main thread 5 00:00:10,450 --> 00:00:12,969 that launches the three child threads, 6 00:00:12,969 --> 00:00:15,490 the reed thread, the stats thread, and the 7 00:00:15,490 --> 00:00:17,710 write thread. In this video, we're gonna 8 00:00:17,710 --> 00:00:20,260 set up two channels a channel to 9 00:00:20,260 --> 00:00:22,540 communicate from the read thread to the 10 00:00:22,540 --> 00:00:24,490 stats thread, and a channel to 11 00:00:24,490 --> 00:00:26,680 communicate from the stats thread to the 12 00:00:26,680 --> 00:00:29,050 write thread. We are going to send 13 00:00:29,050 --> 00:00:32,738 buffers of u8 across channels, and use an 14 00:00:32,738 --> 00:00:35,940 empty buffer as a sentinel to stop, 15 00:00:35,940 --> 00:00:39,820 instead of using an atomically reference 16 00:00:39,820 --> 00:00:42,309 counted mutex guarded boolean to 17 00:00:42,309 --> 00:00:44,859 indicate though we need to stop. Let's 18 00:00:44,859 --> 00:00:48,219 start by importing the MPSC module 19 00:00:48,219 --> 00:00:51,309 instead of arc and mutex, then let's 20 00:00:51,309 --> 00:00:53,800 go down and create our channels. We will 21 00:00:53,800 --> 00:00:56,409 receive a tuple containing a sender and 22 00:00:56,409 --> 00:00:59,409 a receiver side of the channel. I'm going 23 00:00:59,409 --> 00:01:02,260 to name the channels after the thread 24 00:01:02,260 --> 00:01:04,840 that is receiving the data, and then I'll 25 00:01:04,840 --> 00:01:07,900 use TX to mean this is the transmitting 26 00:01:07,900 --> 00:01:11,740 to that thread, and rx to mean the 27 00:01:11,740 --> 00:01:14,500 receiving inside of that thread. So stats 28 00:01:14,500 --> 00:01:16,870 TX will be used to transmit something to 29 00:01:16,870 --> 00:01:19,900 the stats thread, and stats rx will be 30 00:01:19,900 --> 00:01:22,480 used to receive something in the stats 31 00:01:22,480 --> 00:01:25,360 thread. We also receive data in the write 32 00:01:25,360 --> 00:01:27,670 thread so let's make that channel. We 33 00:01:27,670 --> 00:01:29,410 have our transmit to the write thread 34 00:01:29,410 --> 00:01:31,750 and receive in the write thread ends of 35 00:01:31,750 --> 00:01:34,360 the channel. I should mention that MPSC 36 00:01:34,360 --> 00:01:37,630 stands for a multiple producer single 37 00:01:37,630 --> 00:01:39,940 consumer, so you can actually take that 38 00:01:39,940 --> 00:01:43,330 sending side the TX sides of the channel 39 00:01:43,330 --> 00:01:45,550 and clone them, and send them to 40 00:01:45,550 --> 00:01:47,500 different places, different threads, and 41 00:01:47,500 --> 00:01:50,230 use them all to write to the same 42 00:01:50,230 --> 00:01:51,910 channel, but you can only have one 43 00:01:51,910 --> 00:01:53,530 receiving side of the channel. 44 00:01:53,530 --> 00:01:57,040 So now let's clean up our mutex and 45 00:01:57,040 --> 00:01:59,980 replace them in our arguments with our 46 00:01:59,980 --> 00:02:02,770 channels, updating our function 47 00:02:02,770 --> 00:02:04,930 signatures we'll fix these errors, but 48 00:02:04,930 --> 00:02:06,820 first let's talk about how this data 49 00:02:06,820 --> 00:02:09,549 flows through. We give the read thread a 50 00:02:09,549 --> 00:02:12,489 sender to the stats thread, we give the 51 00:02:12,489 --> 00:02:14,090 stats thread its own 52 00:02:14,090 --> 00:02:16,459 receiver and the sender to the write 53 00:02:16,459 --> 00:02:18,920 thread, and then we give the write thread 54 00:02:18,920 --> 00:02:21,890 its own receiver. So now let's go to the 55 00:02:21,890 --> 00:02:25,430 reed thread and update it to accept that 56 00:02:25,430 --> 00:02:28,430 sender to the stats thread. So first 57 00:02:28,430 --> 00:02:30,379 thing we need to do is go to our imports 58 00:02:30,379 --> 00:02:33,049 and update it, we want the sender from 59 00:02:33,049 --> 00:02:36,440 the MPSC module, then we can go down and 60 00:02:36,440 --> 00:02:40,370 replace our quit argument with our stats 61 00:02:40,370 --> 00:02:45,980 sender, which is a Vec of u8. And now we 62 00:02:45,980 --> 00:02:48,530 can go down and actually do our two dues. 63 00:02:48,530 --> 00:02:51,680 First we need to send this empty buffer 64 00:02:51,680 --> 00:02:54,890 so stats tx send will send that across the 65 00:02:54,890 --> 00:02:57,890 channel. Now sending could fail so this 66 00:02:57,890 --> 00:02:59,420 returns a result which you need to do 67 00:02:59,420 --> 00:03:02,120 something with. Let's look and see if 68 00:03:02,120 --> 00:03:05,019 it's an error, and if it is an error 69 00:03:05,019 --> 00:03:09,410 let's just exit out cleanly, because that 70 00:03:09,410 --> 00:03:11,720 means that our stats thread has been 71 00:03:11,720 --> 00:03:13,400 shut down, so we're probably shutting 72 00:03:13,400 --> 00:03:16,700 down. And then of course, if we've run out 73 00:03:16,700 --> 00:03:18,859 of data to read we know we need to shut 74 00:03:18,859 --> 00:03:21,590 down. So let's send that empty buffer 75 00:03:21,590 --> 00:03:24,500 over to the stats thread, but in this 76 00:03:24,500 --> 00:03:26,060 case we'll ignore the result all 77 00:03:26,060 --> 00:03:28,400 together because we're shutting down. Ok, 78 00:03:28,400 --> 00:03:30,260 let's go back up and recap what we did. 79 00:03:30,260 --> 00:03:33,380 We imported sender, we updated our 80 00:03:33,380 --> 00:03:36,709 argument, we came down here, we send our 81 00:03:36,709 --> 00:03:40,340 buffer over to the stats thread, we check 82 00:03:40,340 --> 00:03:41,930 to see if it's an error we exit out 83 00:03:41,930 --> 00:03:44,720 cleanly, and then we'll send our empty 84 00:03:44,720 --> 00:03:47,389 vector if we're done. Now we can go on to 85 00:03:47,389 --> 00:03:49,269 the stats that I didn't handle that. 86 00:03:49,269 --> 00:03:52,130 First our imports this time we'll need 87 00:03:52,130 --> 00:03:55,489 both the receiver and the sender, those 88 00:03:55,489 --> 00:03:58,880 come from the MPSC module, and then we 89 00:03:58,880 --> 00:04:01,010 can go down and add both of those to our 90 00:04:01,010 --> 00:04:04,400 arguments. Our stats receiver is a 91 00:04:04,400 --> 00:04:07,760 receiver of Vec u8, and then of course 92 00:04:07,760 --> 00:04:10,910 our write sender to send to the write 93 00:04:10,910 --> 00:04:14,690 thread as a sender of Vec u8, whoa this 94 00:04:14,690 --> 00:04:17,209 line is getting really long, let's go see 95 00:04:17,209 --> 00:04:19,548 what Cargo fmt would do to this line. 96 00:04:19,548 --> 00:04:21,620 I'll just switch over to the terminal 97 00:04:21,620 --> 00:04:23,450 and run it real quick, and then switch 98 00:04:23,450 --> 00:04:26,209 back and now you can see the vertical 99 00:04:26,209 --> 00:04:27,440 formatting style 100 00:04:27,440 --> 00:04:30,680 for argument lists. Notice it added 101 00:04:30,680 --> 00:04:33,290 an extra comment after the write sender, 102 00:04:33,290 --> 00:04:35,390 now let's go down and implement our 103 00:04:35,390 --> 00:04:37,790 to-do. Let's actually receive the buffer, 104 00:04:37,790 --> 00:04:39,860 we are going to call receive on our 105 00:04:39,860 --> 00:04:41,750 channel receiver, which is a blocking 106 00:04:41,750 --> 00:04:43,880 call that returns a result once we get 107 00:04:43,880 --> 00:04:45,650 data, which we're just going to end wrap 108 00:04:45,650 --> 00:04:47,150 in this case, and then we're going to 109 00:04:47,150 --> 00:04:48,920 grab the number of bytes in the buffer, 110 00:04:48,920 --> 00:04:50,930 I'll show you why in just a second. 111 00:04:50,930 --> 00:04:53,210 I'll change total bytes to increment by 112 00:04:53,210 --> 00:04:54,950 the number of bytes that I've just 113 00:04:54,950 --> 00:04:57,290 collected, and then come down and 114 00:04:57,290 --> 00:04:59,090 implement the other to do, which is 115 00:04:59,090 --> 00:05:00,800 sending the vector over to the write 116 00:05:00,800 --> 00:05:03,860 thread. So in this case, I know it's 117 00:05:03,860 --> 00:05:05,240 returning the result, I know I want to 118 00:05:05,240 --> 00:05:06,590 check the error, so I'm just going to 119 00:05:06,590 --> 00:05:08,840 start with my yes statement and I will 120 00:05:08,840 --> 00:05:11,030 break out cleanly if we are not able to 121 00:05:11,030 --> 00:05:13,430 send. We know the pipes are gonna break 122 00:05:13,430 --> 00:05:15,110 as we're trying to write to them. What 123 00:05:15,110 --> 00:05:17,410 you may not have realized is that send 124 00:05:17,410 --> 00:05:20,750 consumes the value that you give it, so 125 00:05:20,750 --> 00:05:22,520 we no longer have buffer to look at, and 126 00:05:22,520 --> 00:05:25,310 write here I want to see how many bytes 127 00:05:25,310 --> 00:05:27,440 I received and break out, but I can't 128 00:05:27,440 --> 00:05:29,570 look at buffer dot Len, that's why I 129 00:05:29,570 --> 00:05:32,180 saved num bytes, so I can look at it now. 130 00:05:32,180 --> 00:05:34,790 So let's check to see if num bytes is 0 131 00:05:34,790 --> 00:05:37,669 and if it is will break out. Finally, 132 00:05:37,669 --> 00:05:39,860 let's go fix a bug that I notice that we 133 00:05:39,860 --> 00:05:41,990 ended up with in the last section. We're 134 00:05:41,990 --> 00:05:44,120 only supposed to print if we're not 135 00:05:44,120 --> 00:05:46,400 silent, so I'll just surround this with 136 00:05:46,400 --> 00:05:48,260 an if not silent, now we should be good 137 00:05:48,260 --> 00:05:51,560 to go on our stats module. Quick recap, we 138 00:05:51,560 --> 00:05:53,930 updated our imports we updated our 139 00:05:53,930 --> 00:05:56,870 argument list to get our receiver and 140 00:05:56,870 --> 00:05:57,890 our sender, 141 00:05:57,890 --> 00:06:00,620 we receive a buffer from the read thread, 142 00:06:00,620 --> 00:06:03,290 we count the number of bytes in it, and 143 00:06:03,290 --> 00:06:05,600 then we send it over to the write thread, 144 00:06:05,600 --> 00:06:08,540 break if there's an error, and then if we 145 00:06:08,540 --> 00:06:10,340 didn't actually receive anything we also 146 00:06:10,340 --> 00:06:12,620 need to break out, and we fix the bug 147 00:06:12,620 --> 00:06:14,750 with our output. And that's good for the 148 00:06:14,750 --> 00:06:17,510 stats module. Let's move over to the 149 00:06:17,510 --> 00:06:19,760 write module again. Once again we're 150 00:06:19,760 --> 00:06:22,910 going to begin by removing arc and mutex 151 00:06:22,910 --> 00:06:27,110 in favor of our MPSC receiver, but we got 152 00:06:27,110 --> 00:06:29,330 to remember put MPSC on there or it 153 00:06:29,330 --> 00:06:31,640 won't come in. Alright, let's go and 154 00:06:31,640 --> 00:06:34,820 change our argument this is the write 155 00:06:34,820 --> 00:06:37,700 receiver, which is a receiver of a Vec 156 00:06:37,700 --> 00:06:40,789 of u8, and then we can hop write down to 157 00:06:40,789 --> 00:06:41,090 our two 158 00:06:41,090 --> 00:06:43,669 do, and receive our buffer just like we 159 00:06:43,669 --> 00:06:46,070 did before. Once again we'll use the 160 00:06:46,070 --> 00:06:48,560 blocking receive and just unwrap on 161 00:06:48,560 --> 00:06:51,500 error. Instead of this quit block here 162 00:06:51,500 --> 00:06:54,410 let's look for empty buffer Sentinel, 163 00:06:54,410 --> 00:06:57,500 so if buffer is empty then we're going 164 00:06:57,500 --> 00:06:59,090 to break out of our loop and shut down 165 00:06:59,090 --> 00:07:02,240 cleanly. And I believe that is all we 166 00:07:02,240 --> 00:07:04,930 have left to do. Let's go give it a try. 167 00:07:04,930 --> 00:07:07,960 Let's start by running Cargo check again, 168 00:07:07,960 --> 00:07:09,800 great no errors, 169 00:07:09,800 --> 00:07:12,290 how about Cargo Clippy any style changes, 170 00:07:12,290 --> 00:07:14,690 nothing great. So let's run Cargo fmt 171 00:07:14,690 --> 00:07:17,150 just in case, and then let's clear the 172 00:07:17,150 --> 00:07:19,190 screen and do some testing. First let's 173 00:07:19,190 --> 00:07:22,040 do our tried and true echo hello to our 174 00:07:22,040 --> 00:07:25,160 program, and see that it outputs our six 175 00:07:25,160 --> 00:07:28,400 in our hello, there it goes great. Let's 176 00:07:28,400 --> 00:07:30,320 try to isolate the two outputs to make 177 00:07:30,320 --> 00:07:32,270 sure that we don't have that extra 178 00:07:32,270 --> 00:07:35,900 newline bug, remember our - - okay. So 179 00:07:35,900 --> 00:07:38,450 there's our six if we're out our output, 180 00:07:38,450 --> 00:07:41,539 what if we do silent okay, there's our 181 00:07:41,539 --> 00:07:43,760 hello no six looks correct I don't see 182 00:07:43,760 --> 00:07:46,220 any stray new lines this time. Let's pipe 183 00:07:46,220 --> 00:07:48,830 yes to our program and just redirect our 184 00:07:48,830 --> 00:07:50,960 output to dev null for a while, and see 185 00:07:50,960 --> 00:07:53,780 if we can just accept a bunch of input, 186 00:07:53,780 --> 00:07:56,740 great looks like it's running just fine. 187 00:07:56,740 --> 00:08:00,139 So let's change that up a little bit, 188 00:08:00,139 --> 00:08:03,800 instead let's pipe over to head and try 189 00:08:03,800 --> 00:08:05,870 breaking our pipe and making sure that 190 00:08:05,870 --> 00:08:08,530 we exit cleanly. 191 00:08:08,530 --> 00:08:11,470 Okay there we go, everything appears to 192 00:08:11,470 --> 00:08:14,830 be working as intended. In the next video, 193 00:08:14,830 --> 00:08:16,990 we will refactor our code to use 194 00:08:16,990 --> 00:08:19,180 crossbeam channels instead of MPSC 195 00:08:19,180 --> 00:08:24,180 channels.