1 00:00:00,410 --> 00:00:02,710 In this video, we will separate input, 2 00:00:02,710 --> 00:00:04,880 statistics and output logic into 3 00:00:04,880 --> 00:00:05,890 separate threads. 4 00:00:05,890 --> 00:00:08,690 Unlike other videos in this series, we 5 00:00:08,690 --> 00:00:10,610 are not going to end up in a fully 6 00:00:10,610 --> 00:00:12,320 functional State at the end of this 7 00:00:12,320 --> 00:00:12,920 video. 8 00:00:12,920 --> 00:00:16,099 However, we will end up with three child 9 00:00:16,099 --> 00:00:18,470 threads that we spawn which do 10 00:00:18,470 --> 00:00:21,169 everything that they need to except talk 11 00:00:21,169 --> 00:00:23,480 to each other. So we'll be able to start 12 00:00:23,480 --> 00:00:26,269 up the program see that it starts up and 13 00:00:26,269 --> 00:00:30,109 shuts down in a coordinated fashion, but 14 00:00:30,109 --> 00:00:33,259 it won't actually process our data. Let's 15 00:00:33,259 --> 00:00:35,600 go ahead and do our imports, from the 16 00:00:35,600 --> 00:00:38,450 sync module we need our canned mutex, arc 17 00:00:38,450 --> 00:00:40,520 is our atomic reference counter, and 18 00:00:40,520 --> 00:00:42,950 mutex will protect access to mutable 19 00:00:42,950 --> 00:00:45,800 data. And then we need the thread module 20 00:00:45,800 --> 00:00:49,160 so we can spawn threads. Since I need to 21 00:00:49,160 --> 00:00:51,350 pass the three different arguments that 22 00:00:51,350 --> 00:00:53,270 we parse to three different threads, I'm 23 00:00:53,270 --> 00:00:55,430 going to go ahead and D structure the 24 00:00:55,430 --> 00:00:57,740 struct into three local variables, that I 25 00:00:57,740 --> 00:01:00,560 can move to each thread separately. The 26 00:01:00,560 --> 00:01:02,570 other option would be to make three 27 00:01:02,570 --> 00:01:06,950 clones of the args struct. So in file, out 28 00:01:06,950 --> 00:01:09,920 file, and silent are three local variables 29 00:01:09,920 --> 00:01:12,499 that I created out of args. Now we can 30 00:01:12,499 --> 00:01:16,249 create quit which will be an atomically 31 00:01:16,249 --> 00:01:19,850 reference counted, mutex guarded boolean 32 00:01:19,850 --> 00:01:22,939 that starts as false, and this we will 33 00:01:22,939 --> 00:01:25,999 use to coordinate the shutdown of our 34 00:01:25,999 --> 00:01:29,090 threads, since we're not adding channel 35 00:01:29,090 --> 00:01:32,240 communication in this video. Cloning an 36 00:01:32,240 --> 00:01:34,310 arc just creates a new copy of it, 37 00:01:34,310 --> 00:01:36,499 pointing to the same data, so I'm making 38 00:01:36,499 --> 00:01:39,920 three clones so I can pass one to each 39 00:01:39,920 --> 00:01:42,619 child thread, and still have one in the 40 00:01:42,619 --> 00:01:45,020 main thread to use. The arc will 41 00:01:45,020 --> 00:01:47,450 coordinate clean up once nobody's using 42 00:01:47,450 --> 00:01:49,459 a reference to this data anymore, and the 43 00:01:49,459 --> 00:01:53,329 mutex will protect our mutable access to 44 00:01:53,329 --> 00:01:56,509 the data. Spawning a thread returns a 45 00:01:56,509 --> 00:01:58,819 thread handle, we are going to spawn 46 00:01:58,819 --> 00:02:01,099 three threads, so we'll have three thread 47 00:02:01,099 --> 00:02:04,130 handles. First our read thread, will do 48 00:02:04,130 --> 00:02:06,679 read handle here. The thread spawn 49 00:02:06,679 --> 00:02:10,400 function takes a closure, and we will 50 00:02:10,400 --> 00:02:12,740 usually use move closures because 51 00:02:12,740 --> 00:02:14,090 otherwise borrowing 52 00:02:14,090 --> 00:02:16,430 gets really really tricky. You can see 53 00:02:16,430 --> 00:02:19,700 that I'm passing in my quit reference, 54 00:02:19,700 --> 00:02:22,340 and I'm also passing in the local 55 00:02:22,340 --> 00:02:25,790 variables that the functions need. You 56 00:02:25,790 --> 00:02:27,230 can see that the closure doesn't take 57 00:02:27,230 --> 00:02:28,730 any arguments because there's no 58 00:02:28,730 --> 00:02:31,790 arguments between pipes. So we've got our 59 00:02:31,790 --> 00:02:35,209 three handles here, read, stats and write 60 00:02:35,209 --> 00:02:38,720 handles, they're really similar, and I'm 61 00:02:38,720 --> 00:02:40,340 going to change these loop functions so 62 00:02:40,340 --> 00:02:43,069 that they'll return the same result type 63 00:02:43,069 --> 00:02:45,049 as well, right now the statuses returning 64 00:02:45,049 --> 00:02:46,489 something different, so we'll fix that in 65 00:02:46,489 --> 00:02:49,040 a minute. But right now we want to join 66 00:02:49,040 --> 00:02:51,620 the threads, and crash our main thread if 67 00:02:51,620 --> 00:02:54,260 any of our threads have crashed. Sounds a 68 00:02:54,260 --> 00:02:55,760 little unintuitive but we're gonna do 69 00:02:55,760 --> 00:02:58,010 some simplistic handling here. So when 70 00:02:58,010 --> 00:03:00,860 you join a thread handle let's wait till 71 00:03:00,860 --> 00:03:02,599 the thread returns, and when it returns 72 00:03:02,599 --> 00:03:05,360 it will return a thread result that 73 00:03:05,360 --> 00:03:08,180 wraps what we return from the functions 74 00:03:08,180 --> 00:03:09,890 with just an I/O result. So we've got a 75 00:03:09,890 --> 00:03:12,410 nested result here. So the first thing 76 00:03:12,410 --> 00:03:14,180 I'm going to do is go through each of 77 00:03:14,180 --> 00:03:17,390 these handles, and just call unwrap, so 78 00:03:17,390 --> 00:03:19,340 the thread will really only return an 79 00:03:19,340 --> 00:03:21,739 error if the thread crashes. So if the 80 00:03:21,739 --> 00:03:23,150 thread crashes might as well crash the 81 00:03:23,150 --> 00:03:25,010 whole program, so I'll just call unwrap 82 00:03:25,010 --> 00:03:28,599 on each of the joins to get to the inner 83 00:03:28,599 --> 00:03:32,599 I/O result, or crash. So here I've got my 84 00:03:32,599 --> 00:03:35,359 read I/O result, my stats I/O result, in my 85 00:03:35,359 --> 00:03:38,239 write I/O result. And now we're done 86 00:03:38,239 --> 00:03:40,190 gathering all of those we can handle 87 00:03:40,190 --> 00:03:41,540 them, let's look at each of them and 88 00:03:41,540 --> 00:03:43,940 return an error if any of them are 89 00:03:43,940 --> 00:03:45,829 errors. If they're success we'll just 90 00:03:45,829 --> 00:03:47,299 ignore them, so we'll just use the 91 00:03:47,299 --> 00:03:50,299 question mark operator to unwrap the success 92 00:03:50,299 --> 00:03:52,849 and ignore it, if it's there or return 93 00:03:52,849 --> 00:03:54,889 and error from main. Now if we take a 94 00:03:54,889 --> 00:03:57,139 look at the rest of main, we see that 95 00:03:57,139 --> 00:04:00,019 it's the main loop that we need to 96 00:04:00,019 --> 00:04:02,900 refactor into loops in each of the 97 00:04:02,900 --> 00:04:04,910 threads, will keep the okay at the end, 98 00:04:04,910 --> 00:04:07,370 and in just a minute we'll come get this 99 00:04:07,370 --> 00:04:10,130 top part and put it in the read loop, 100 00:04:10,130 --> 00:04:12,590 which we're going to create write now. So 101 00:04:12,590 --> 00:04:16,310 first we need to do our imports, which is 102 00:04:16,310 --> 00:04:18,289 just our canned mutex from the sync 103 00:04:18,289 --> 00:04:20,930 module, and then we need to go at our 104 00:04:20,930 --> 00:04:23,089 quit parameter, which is our atomically 105 00:04:23,089 --> 00:04:26,790 reference counted, mutex guarded boolean. 106 00:04:26,790 --> 00:04:29,100 Since we won't be using return values to 107 00:04:29,100 --> 00:04:32,220 communicate actual data, we're going to 108 00:04:32,220 --> 00:04:34,380 change our results to just be the empty 109 00:04:34,380 --> 00:04:37,080 results, just to communicate errors if 110 00:04:37,080 --> 00:04:38,970 there's an error. The reader logic 111 00:04:38,970 --> 00:04:41,100 doesn't change, but we do need to go down 112 00:04:41,100 --> 00:04:42,900 to this return result and take the 113 00:04:42,900 --> 00:04:45,990 buffer out of the okay. And then we can 114 00:04:45,990 --> 00:04:48,660 go back to main and grab that block we 115 00:04:48,660 --> 00:04:51,090 highlight it, and come back over to read, 116 00:04:51,090 --> 00:04:54,270 and paste in the beginnings of this loop 117 00:04:54,270 --> 00:04:56,790 right after we create the buffer. Let's 118 00:04:56,790 --> 00:04:59,010 go down and grab the read call that we 119 00:04:59,010 --> 00:05:01,830 used inside this function originally and 120 00:05:01,830 --> 00:05:04,590 let's cut that out, and then we can go 121 00:05:04,590 --> 00:05:06,450 and replace that call that we had 122 00:05:06,450 --> 00:05:09,030 outside in main with the real read 123 00:05:09,030 --> 00:05:11,820 function that gets the num read. And then 124 00:05:11,820 --> 00:05:14,820 we can alter this match so that instead 125 00:05:14,820 --> 00:05:17,100 of looking for an empty buffer, it looks 126 00:05:17,100 --> 00:05:20,070 at the number read variable, which we can 127 00:05:20,070 --> 00:05:23,250 match explicitly with a zero instead of 128 00:05:23,250 --> 00:05:25,560 a guard, and the other number gets 129 00:05:25,560 --> 00:05:27,990 returned directly an error gets broken 130 00:05:27,990 --> 00:05:30,810 out and ignored, then we can clean this 131 00:05:30,810 --> 00:05:33,930 up and close the loop, and add a reminder 132 00:05:33,930 --> 00:05:36,930 that in a future video we'll need to 133 00:05:36,930 --> 00:05:40,580 send this buffer to the stats thread. 134 00:05:40,580 --> 00:05:43,140 Similarly, if we break out of the thread 135 00:05:43,140 --> 00:05:46,560 we will need to send an empty buffer to 136 00:05:46,560 --> 00:05:48,000 the stats thread to let it know that 137 00:05:48,000 --> 00:05:48,870 we're done. 138 00:05:48,870 --> 00:05:51,600 But for now we'll use quit to signal 139 00:05:51,600 --> 00:05:54,420 that we're done. We'll lock the mutex and 140 00:05:54,420 --> 00:05:56,250 then we'll unwrap the result that we 141 00:05:56,250 --> 00:05:59,070 receive, it would only be an error if the 142 00:05:59,070 --> 00:06:02,070 thread holding the lock crashed, and then 143 00:06:02,070 --> 00:06:04,290 we can dereference and alter the actual 144 00:06:04,290 --> 00:06:06,450 variable, and now we can go add an empty 145 00:06:06,450 --> 00:06:09,270 tour okay, and then go up and remove this 146 00:06:09,270 --> 00:06:11,310 total bytes line that was for stats, we 147 00:06:11,310 --> 00:06:13,290 don't need that here. Oh and then we need 148 00:06:13,290 --> 00:06:15,990 to add match on our match here, we missed 149 00:06:15,990 --> 00:06:17,940 our match statement that should fix our 150 00:06:17,940 --> 00:06:20,730 syntax, then we just do a little bit of 151 00:06:20,730 --> 00:06:23,220 white space cleanup, and I think we're 152 00:06:23,220 --> 00:06:26,340 all ready to go. So let's go back and 153 00:06:26,340 --> 00:06:28,500 review our read module, so we do our 154 00:06:28,500 --> 00:06:31,980 imports, we added our quit, we returned 155 00:06:31,980 --> 00:06:34,560 results this part was the same Peter 156 00:06:34,560 --> 00:06:37,950 buffer we loop, we read into our buffer, 157 00:06:37,950 --> 00:06:40,410 if there's zero bytes we break otherwise 158 00:06:40,410 --> 00:06:41,070 we return 159 00:06:41,070 --> 00:06:42,630 number of bites if this an error we 160 00:06:42,630 --> 00:06:46,020 break. We need to send the buffer that we 161 00:06:46,020 --> 00:06:48,210 create out of the slice to the stats 162 00:06:48,210 --> 00:06:50,550 thread in the future, or send an empty 163 00:06:50,550 --> 00:06:52,980 buffer at the end, right now we'll unlock 164 00:06:52,980 --> 00:06:55,260 our quit we'll change it to true to let 165 00:06:55,260 --> 00:06:56,850 everyone else know we're done, and we'll 166 00:06:56,850 --> 00:07:01,610 exit. Next up we have our stats thread. 167 00:07:01,610 --> 00:07:04,650 This one won't take nearly as long. Start 168 00:07:04,650 --> 00:07:07,020 with our imports we need the I/O result 169 00:07:07,020 --> 00:07:08,550 because we don't have it yet we want to 170 00:07:08,550 --> 00:07:12,240 return it. We want the same arc and mutex 171 00:07:12,240 --> 00:07:15,690 imports as before, then we'll keep silent 172 00:07:15,690 --> 00:07:18,480 but remove all of the other arguments, 173 00:07:18,480 --> 00:07:21,060 and add our quit argument, again that's 174 00:07:21,060 --> 00:07:24,840 an arc mutex of a boolean, and then we 175 00:07:24,840 --> 00:07:26,490 want this to return the same thing as 176 00:07:26,490 --> 00:07:29,880 the other loops. So it's a result of an 177 00:07:29,880 --> 00:07:32,580 empty, which means we need to return an 178 00:07:32,580 --> 00:07:36,240 ok empty at the end. And then we can go 179 00:07:36,240 --> 00:07:38,730 back up to the top and change this to a 180 00:07:38,730 --> 00:07:41,910 declaration of immutable total bytes, and 181 00:07:41,910 --> 00:07:45,360 start it at 0, now let's make our loop so 182 00:07:45,360 --> 00:07:47,910 put our loop here, close it down at the 183 00:07:47,910 --> 00:07:50,310 end of the print, and then the first 184 00:07:50,310 --> 00:07:52,470 thing that we need to do is receive our 185 00:07:52,470 --> 00:07:55,680 bytes from the read thread. So let's put 186 00:07:55,680 --> 00:07:57,210 a placeholder here to remind us of that 187 00:07:57,210 --> 00:07:58,710 because we're not doing it in this video. 188 00:07:58,710 --> 00:08:01,170 And since we're not receiving those 189 00:08:01,170 --> 00:08:04,380 bytes let's make an empty buffer on the 190 00:08:04,380 --> 00:08:06,540 same type that we'd expect to receive so 191 00:08:06,540 --> 00:08:09,420 we can compile. Now we can actually count 192 00:08:09,420 --> 00:08:11,100 the bytes in our buffer though it'll 193 00:08:11,100 --> 00:08:13,320 just be 0 right now, add that to our 194 00:08:13,320 --> 00:08:16,350 total bytes, if not silent is still 195 00:08:16,350 --> 00:08:18,810 correct, this print is correct, but this 196 00:08:18,810 --> 00:08:21,780 last one that goes outside the loop, and 197 00:08:21,780 --> 00:08:23,880 we don't need a boolean anymore guarding 198 00:08:23,880 --> 00:08:25,170 it because we know we're outside the 199 00:08:25,170 --> 00:08:27,600 loop. But once we've counted our data we 200 00:08:27,600 --> 00:08:29,610 do need to pass it on to the write loop, 201 00:08:29,610 --> 00:08:31,950 so that it can write it out to standard 202 00:08:31,950 --> 00:08:34,349 out or a file, and finally since we don't 203 00:08:34,349 --> 00:08:36,479 have an actual buffer to look for a 0 204 00:08:36,479 --> 00:08:38,610 length, let's look at quit and see if 205 00:08:38,610 --> 00:08:40,979 it's time to quit out of this loop, will 206 00:08:40,979 --> 00:08:43,169 break out of this loop due to either the 207 00:08:43,169 --> 00:08:44,700 break or the end of the loop, quit will 208 00:08:44,700 --> 00:08:46,380 go out of scope which will release the 209 00:08:46,380 --> 00:08:49,320 lock. So let's recap we imported our can 210 00:08:49,320 --> 00:08:52,200 mutex, we edit our quit parameter we 211 00:08:52,200 --> 00:08:54,370 return an empty result, we 212 00:08:54,370 --> 00:08:56,799 our total bytes, we start our loop, we 213 00:08:56,799 --> 00:08:58,029 need to receive the vector of bytes, 214 00:08:58,029 --> 00:08:59,770 we've got a placeholder for that, we 215 00:08:59,770 --> 00:09:02,290 count the bytes, we increment total bytes, 216 00:09:02,290 --> 00:09:04,120 if we're not silent we print out our 217 00:09:04,120 --> 00:09:06,520 stats, we need to send the vector over 218 00:09:06,520 --> 00:09:08,470 the write loop, and then we quit and 219 00:09:08,470 --> 00:09:11,560 we've got our final print. And now we can 220 00:09:11,560 --> 00:09:13,779 go to our write module. This is the last 221 00:09:13,779 --> 00:09:16,570 module that we'll need to change, great 222 00:09:16,570 --> 00:09:18,700 we're almost there. Start with our 223 00:09:18,700 --> 00:09:21,490 imports≤ let's get our arc and mutex 224 00:09:21,490 --> 00:09:24,070 like we did in our others, and then let's 225 00:09:24,070 --> 00:09:27,520 go remove this buffer, and add our quit 226 00:09:27,520 --> 00:09:30,550 parameter, it's the same as before arc 227 00:09:30,550 --> 00:09:34,420 mutex bool. Then we'll go modify a result 228 00:09:34,420 --> 00:09:37,210 to be an empty result, and then of course 229 00:09:37,210 --> 00:09:39,580 our okay's need to become empty, so we 230 00:09:39,580 --> 00:09:42,130 take out this false, take out this true, 231 00:09:42,130 --> 00:09:44,500 let's fix these comments so they 232 00:09:44,500 --> 00:09:46,510 represent reality, so this has stopped 233 00:09:46,510 --> 00:09:48,550 the program cleanly. This is just our 234 00:09:48,550 --> 00:09:50,310 success case so we don't need a comment. 235 00:09:50,310 --> 00:09:53,200 This block of code for the writer all 236 00:09:53,200 --> 00:09:54,670 looks correct, I don't think we need to 237 00:09:54,670 --> 00:09:56,650 change anything here. So let's start 238 00:09:56,650 --> 00:09:59,500 below it, let's create our loop and then 239 00:09:59,500 --> 00:10:02,200 put the logic inside of it. The first 240 00:10:02,200 --> 00:10:04,000 thing we need to do in our write loop is 241 00:10:04,000 --> 00:10:06,490 receive the bytes that we need to write. 242 00:10:06,490 --> 00:10:08,800 Of course we are not doing that in this 243 00:10:08,800 --> 00:10:10,480 video, so we will put a placeholder for 244 00:10:10,480 --> 00:10:13,000 that, and then we'll create an empty 245 00:10:13,000 --> 00:10:15,700 buffer so that we can compile and run 246 00:10:15,700 --> 00:10:21,100 our code. All right now, write doesn't 247 00:10:21,100 --> 00:10:23,230 need to do anything if we haven't 248 00:10:23,230 --> 00:10:25,870 received anything, so the first thing we 249 00:10:25,870 --> 00:10:27,610 want to do with write is check if it's 250 00:10:27,610 --> 00:10:30,370 time to quit. So for this video we'll 251 00:10:30,370 --> 00:10:35,640 check our quit once again, but 252 00:10:35,640 --> 00:10:38,250 this will last to the end of the block, 253 00:10:38,250 --> 00:10:40,980 and that could lock through a long write 254 00:10:40,980 --> 00:10:42,240 if we don't quit, 255 00:10:42,240 --> 00:10:44,100 so we're gonna put quit inside of its 256 00:10:44,100 --> 00:10:47,279 own scope, that way at the end of this 257 00:10:47,279 --> 00:10:49,589 scope quit will go out of scope and 258 00:10:49,589 --> 00:10:53,490 release the lock. So we can move on. This 259 00:10:53,490 --> 00:10:56,550 write looks correct, the handling of 260 00:10:56,550 --> 00:10:58,230 these conditions look correct, the 261 00:10:58,230 --> 00:11:00,510 returns look correct, I think we're all 262 00:11:00,510 --> 00:11:02,550 done with write, so let's go back and 263 00:11:02,550 --> 00:11:05,339 clean up main. All of this logic has 264 00:11:05,339 --> 00:11:07,230 already been implemented, so I can simply 265 00:11:07,230 --> 00:11:09,779 remove it. There is one last thing read 266 00:11:09,779 --> 00:11:12,209 loop, stats loop, write loop, we didn't 267 00:11:12,209 --> 00:11:14,670 rename them so let's go to our functions 268 00:11:14,670 --> 00:11:17,040 and actually put underscore loop at the 269 00:11:17,040 --> 00:11:19,740 end of each of them, otherwise I really 270 00:11:19,740 --> 00:11:22,529 don't think it will compile. So now if I 271 00:11:22,529 --> 00:11:24,750 go back to main the IDE should have 272 00:11:24,750 --> 00:11:28,019 colored our tan function color, good. So 273 00:11:28,019 --> 00:11:29,700 if we go there and add that semicolon 274 00:11:29,700 --> 00:11:32,360 then the compiler will know what to do. 275 00:11:32,360 --> 00:11:34,950 Let's go back to the terminal, clear the 276 00:11:34,950 --> 00:11:37,560 screen and run it again. Good now we can 277 00:11:37,560 --> 00:11:39,959 try some things out now remember the 278 00:11:39,959 --> 00:11:42,120 read loop reads the data in, and once 279 00:11:42,120 --> 00:11:44,279 it's read it all in it sets the quit 280 00:11:44,279 --> 00:11:46,920 boolean to true it which causes the 281 00:11:46,920 --> 00:11:49,380 other threads to exit. So that's all 282 00:11:49,380 --> 00:11:51,750 we'll see, so if we echo hello to Cargo 283 00:11:51,750 --> 00:11:54,180 run, we see zero because we didn't pass 284 00:11:54,180 --> 00:11:56,910 any data to stats and everything exited 285 00:11:56,910 --> 00:11:59,670 successfully. This isn't very exciting 286 00:11:59,670 --> 00:12:02,850 output I'll concede, but this is 287 00:12:02,850 --> 00:12:04,350 demonstrating what we wanted to 288 00:12:04,350 --> 00:12:06,630 demonstrate. We are spawning three child 289 00:12:06,630 --> 00:12:10,050 threads, they communicate shut down via a 290 00:12:10,050 --> 00:12:12,810 mutex, and then they are all joined to 291 00:12:12,810 --> 00:12:15,600 the main thread which exits. In the next 292 00:12:15,600 --> 00:12:19,019 video, we will use MPSC channels to 293 00:12:19,019 --> 00:12:24,019 communicate between threads.