Jumping back into Rust 🦀 after a break
Jumping back into Rust 🦀 after a break
What are the goals of this course?
- Give you a comprehensive understanding of the Rust syntax and language.
- Enable you to modify existing programs and write new programs in Rust.
- Show you common Rust idioms.
Topics I would like to explore next:
Non-goals
Rust is a large language and we won't be able to cover all of it in a few days. Some non-goals of this course are:
- Learning how to develop macros. You should go to the rust book or rust by example book.
- Day 1: Basic Rust, syntax, control flow, creating and consuming values.
- Day 2: Memory management, ownership, compound data types, and the standard library.
- Day 3: Generics, traits, error handling, testing, and unsafe Rust.
I think I'm going to skip most of this and go right into the concurrency sections. It's a bit more interesting to me over just going over the items I've already looked at for a while.
Concurrency In Rust
Setup the project:
cargo init concurrency
cd concurrency
cargo add tokio --features full
cargo run
Rust has full support for concurrency using OS threads with mutexes and channels
The Rust type system plays an important role in making many concurrency bugs compile time bugs. This is often referred to as fearless concurrency since you can rely on the compiler to ensure correctness at runtime.
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("Count in thread: {i}!");
thread::sleep(Duration::from_millis(5));
}
});
for i in 1..5 {
println!("Main thread: {i}");
thread::sleep(Duration::from_millis(5));
}
}
-
Threads are all daemon threads, the main thread does not wait for them.
-
Thread panics are independent of eachother.
- Panics can carry a payload, which can be unpacked with
downcase_ref
.
- Panics can carry a payload, which can be unpacked with
-
Notice that the thread is stopped before it reaches 10 — the main thread is not waiting.
-
Use let handle = thread::spawn(...) and later handle.join() to wait for the thread to finish.
-
Trigger a panic in the thread, notice how this doesn’t affect main.
-
Use the Result return value from handle.join() to get access to the panic payload. This is a good time to talk about Any.
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("Count in thread: {i}!");
if i == 2 { panic!("Thread paniced at {}", i); }
thread::sleep(Duration::from_millis(5));
}
});
for i in 1..5 {
println!("Main thread: {i}");
thread::sleep(Duration::from_millis(5));
}
let result = handle.join();
match result {
Ok(_) => println!("Thread completed successfully."),
Err(e) => {
println!("Thread panicked: {:?}", e);
if let Some(payload) = e.downcast_ref::<&str>() {
println!("Panic payload {}", payload);
} else if let Some(payload) = e.downcast_ref::<String>() {
println!("Panic payload {}", payload);
} else {
println!("Unknown panic payload type.");
}
}
}
}
This will make the extra thread fail at the 2nd loop. It still prints an interesting error that I don't understand, but we will continue on in hopes that it will become more clear in the future.
Normal threads cannot borrow from their environment.
use std::thread;
fn foo() {
let s = String::from("Hello");
thread::spawn(|| {
println!("Length: {}", s.len());
});
}
fn main() {
foo();
}
However, you can used a scoped thread for this:
use std::thread;
fn foo() {
let s = String::from("Hello");
thread::scope(|scope| {
scope.spawn(|| {
println!("Length: {}", s.len());
});
});
}
fn main() {
foo();
}
-
The reason for that is that when the
thread::scope
function completes, all the threads are guaranteed to be joined, so they can return borrowed data. - Normal Rust borrowing rules apply: you can either borrow mutably by one thread, or immutably by any number of threads.
Channels
Rust channels have two parts: a Sender<T>
and a Receiver<T>
. The two parts
are connected via the channel, but only see the end-points.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
tx.send(10).unwrap();
tx.send(20).unwrap();
println!("Received: {:?}", rx.recv());
println!("Received: {:?}", rx.recv());
let tx2 = tx.clone();
tx2.send(30).unwrap();
println!("Received: {:?}", rx.recv());
}
-
mpsc
stands for Multi-Producer, Single-Consumer.Sender
andSyncSender
implementClone
(so you can make multiple producers) butReceiver
does not. -
send()
andrecv()
returnResult
. If they returnErr
, it means the counterpartSender
orReceiver
is dropped and the channel is closed.
Unbounded Channels
You get an unbounded and asynchronous channel with mpsc::channel()
:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 1..10 {
tx.send(format!("Message {i}")).unwrap();
println!("{thread_id:?}: sent Message {i}");
}
println!("{thread_id:?}: done");
});
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Main: got {msg}");
}
}
Bounded Channels
With bounded channels (synchronous) channels, send
can block the current
thread:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::sync_channel(4);
thread::spawn(move || {
let thread_id = thread::current().id();
for i in 1..10 {
tx.send(format!("Message {i}")).unwrap();
println!("{thread_id:?}: sent Message {i}");
}
println!("{thread_id:?}: done");
});
thread::sleep(Duration::from_millis(100));
for msg in rx.iter() {
println!("Main: got {msg}");
}
}
The output is a bit strange to me.
Compiling concurrency v0.1.0 (/Users/justinbender/projects/rust_programs/concurrency)
Finished dev [unoptimized + debuginfo] target(s) in 0.14s
Running `target/debug/concurrency`
ThreadId(2): sent Message 1
ThreadId(2): sent Message 2
ThreadId(2): sent Message 3
ThreadId(2): sent Message 4
Main: got Message 1
Main: got Message 2
Main: got Message 3
Main: got Message 4
Main: got Message 5
ThreadId(2): sent Message 5
ThreadId(2): sent Message 6
ThreadId(2): sent Message 7
ThreadId(2): sent Message 8
ThreadId(2): sent Message 9
ThreadId(2): done
Main: got Message 6
Main: got Message 7
Main: got Message 8
Main: got Message 9
[Finished running. Exit status: 0]
-
Calling
send
will block the current thread until there is space in the channel for the new message. The thread can be blocked indefinitely if there is nobody who reads from the channel. -
A call to
send
will abort with an error ( that is why it returnsResult
) if the channel is closed. A channel is closed when the receiver is dropped. - A bounded channel with a size of zero is called a "rendezvous channel
. Every send will block the current thread until another thread calls
read`.
Send and Sync
How does Rust know to forbid sharing access across threads? The answer is in two traits:
-
Send
: a typeT
isSend
if it is safe to move aT
across a thread boundary. -
Sync
: a typeT
isSync
if it is safe to move a&T
across a thread boundary.
Send
and Sync
are unsafe traits. The compiler will automatically derive them
for your types as long as they only contain Send
and Sync
types. You can
also implement them manually when you know it is valid.
- One can think of these traits as markers that the type has certain thread-safety properties.
- They can be used in the generic constraints as normal traits.
Send
A type
T
isSend
if it is safe to move aT
value to another thread.
The effect of moving ownership to another thread is that destructors wil run in that thread. So the question is when you can allocate a value in one thread and deallocate it in another.
An example, a connection to the SQLite library must only be accessed form a single thread.
Sync
A type
T
isSync
if it is safe to accessT
value from multiple threads at the same time.
More precisely, the definition is:
T
isSync
if and only if&T
isSend
This statement is essentially a shorthand way of saying that if a type is thread-safe for shared use, it is also thread-safe to pass references of it across threads.
This is because if a type is Sync it means that it can be shared across multiple threads without the risk of data races or other synchronization issues, so it is safe to move it to another thread. A reference of the type is also safe to move to another thread, because the data it references can be accessed for any thread safely.
Examples
Send + Sync
Most types you come across are Send + Sync
:
i8
,f32
,bool
,char
,&str
, ...(T1, T2)
,[T; N]
,&[T]
,struct { x: T }
, ...String, Option<T>
,Vec<T>
,Box<T>
, ...Arc<T>
: Explicitly thread-safe via atomic reference count.Mutex<T>
: Explicitly thread-safe via internal lockingAtomicBool
,AtomicU8
, ...: Uses special atomic instructions.
The generic types are typically Send + Sync
when the type parameters are
Send + Sync
.
Send + !Sync
These types can be moved to other threads, but they're not thread-safe. Typically because of interior mutability.
mpsc::Sender<T>
mpsc::Receiver<T>
Cell<T>
RefCell<T>
!Send + Sync
These types are thread safe, but they cannot be moved to another thread:
-
MutexGuard<T>
: Uses OS level primitives which must be deallocated on the thread which created them.
!Send + !Sync
These types are not thread-safe and cannot be moved to other threads:
-
Rc<T>
: eachRc<T>
has a reference to andRcBox<T>
, which contains a non-atomic reference count. -
*const T
,*mut T
: Rust assumes raw pointers may have special concurrency considerations.
Shared state
Rust uses the type system to enforce synchronization of shared data. This is primarily done via two types.
-
Arc<T>
, atomic reference countedT
: handles shared between threads and takes care to deallocateT
when the last reference is dropped. Mutex<T>
: ensures mutability exclusive access to theT
value.
Arc
Arc<T>
allows shared read-only access via Arc::clone
:
use std::thread;
use std::sync::Arc;
fn main() {
let v = Arc::new(vec![10, 20, 30]);
let mut handles = Vec::new();
for _ in 1..5 {
let v = Arc::clone(&v);
handles.push(thread::spawn(move || {
let thread_id = thread::current().id();
println!("{thread_id:?}: {v:?}");
}));
}
handles.into_iter().for_each(|h| h.join().unwrap());
println!("v: {v:?}");
}
-
Arc
stands for "Atomic Reference Counted", a thread safe version or `Rc that used atomic operations. -
Arc<T>
implementsClone
whether or notT
does. It implementsSend
andSync
if and only ifT
implements them both. -
Arc::clone()
has the cost of atomic operations that get executed, but after that the use of theT
is free. -
Beware of reference cycles,
Arc
does not use a garbage collector to detect them.std::sync::Weak
can help.
Mutex
Mutex<T>
ensures mutual exclusion and allows mutable access to T
behind a
read-only interface:
use std::sync::Mutex;
fn main() {
let v = Mutex::new(vec![10, 20, 30]);
println!("v: {:?}", v.lock().unwrap());
{
let mut guard = v.lock().unwrap();
guard.push(40);
}
{
let mut guard2 = v.lock().unwrap();
guard2.push(50);
}
println!("v: {:?}", v.lock().unwrap());
}
Notice how we have a impl<T: Send> Sync for Mutex<T>
blanket implementation.
-
Mutex
in Rust looks like a collection with just one element - the protected data.- It is not possible to forget to acquire the mutex before accessing the protected data.
-
You can get an
&mut T
from an&Mutex<T>
by taking the lock. TheMutexGuard
ensures that the&mut T
doesn't outlive the lock being held. -
Mutex<T>
implements bothSend
andSync
iff (if and only if)T
implementsSend
. - A read-write lock counterpart -
RwLock
. - Why does
lock()
return aResult
?-
If the thread that held the
Mutex
panicked, theMutex
becomes "poisoned" to signal that the data is protected might be an inconsistent state. Callinglock()
on a poisoned mutex fails with aPoisonError
. You can callinto_inner()
on the error to recover the data regardless.
-
If the thread that held the
Example
Let us see Arc
and Mutex
in action:
use std::thread;
// use std::sync::{Arc, Mutex};
fn main() {
let v = vec![10, 20, 30];
let handle = thread::spawn(|| {
v.push(10);
});
v.push(1000);
handle.join().unwrap();
println!("v: {v:?}");
}
What can we do to make it work?
use std::thread;
use std::sync::{Arc, Mutex};
fn main() {
let v = Arc::new(Mutex::new(vec![10, 20, 30]));
let v2 = Arc::clone(&v);
let handle = thread::spawn(move || {
let mut v2 = v2.lock().unwrap();
v2.push(10);
});
{
let mut v = v.lock().unwrap();
v.push(1000);
}
handle.join().unwrap();
println!("v: {v:#?}");
}
-
v
is wrapped in bothArc
andMutex
, because their concerns are orthogonal.-
Wrapping a
Mutex
in anArc
is a common pattern to share mutable state between threads.
-
Wrapping a
-
v: Arc<_>
needs to be cloned asv2
before it can be moved into another thread. Notemove
was added to the lambda signature. -
Blocks are introduced to narrow the scope of the
LockGuard
as much as possible.
Exercises
Let us practice our new concurrency skills with:
Exercise 1
- Dining philosophers: a classic problem in concurrency.
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Fork;
struct Philosopher {
name: String,
left_fork: Arc<Mutex<Fork>>,
right_fork: Arc<Mutex<Fork>>,
thoughts: mpsc::SyncSender<String>,
}
impl Philosopher {
fn think(&self) {
self.thoughts
.send(format!("Eureka! {} has a new idea!", &self.name))
.unwrap();
}
fn eat(&self) {
println!("{} is trying to eat", &self.name);
let _left = self.left_fork.lock().unwrap();
let _right = self.right_fork.lock().unwrap();
println!("{} is eating...", &self.name);
thread::sleep(Duration::from_millis(10));
}
}
static PHILOSOPHERS: &[&str] =
&["Socrates", "Plato", "Aristotle", "Thales", "Pythagoras"];
fn main() {
let (tx, rx) = mpsc::sync_channel(10);
let forks = (0..PHILOSOPHERS.len())
.map(|_| Arc::new(Mutex::new(Fork)))
.collect::<Vec<_>>();
for i in 0..forks.len() {
let tx = tx.clone();
let mut left_fork = Arc::clone(&forks[i]);
let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]);
// To avoid a deadlock, we have to break the symmetry
// somewhere. This will swap the forks without deinitializing
// either of them
if i == forks.len() - 1 {
std::mem::swap(&mut left_fork, &mut right_fork);
}
let philosopher = Philosopher {
name: PHILOSOPHERS[i].to_string(),
thoughts: tx,
left_fork,
right_fork,
};
thread::spawn(move || {
for _ in 0..10 {
philosopher.eat();
philosopher.think();
}
});
}
drop(tx);
for thought in rx {
println!("{thought}");
}
}
- What's going on here? Does it make sense?
- We bring in the multiple producer single consumer, Atomic Reference counter, mutual exclusive primitive.
- We bring in threads and Duration.
- We build a basic Fork struct with no internal values.
-
We also build up a Philosopher struct. That has the values name, left_fork,
right_fork, and thoughts.
- name: String
- left_fork: Atomic reference with an internal mutual exclusive primitive with an internal Fork struct
- right_fork: Atomic reference with an internal mutual exclusive primitive with an internal Fork struct
- thoughts: multiple producer single consumer sync sender with an internal String
-
We implement the Philosopher with two methods. Think and Eat. Both only taking
in 1 value &self.
-
Think: We will create a send from
mpsc::SyncSender
with a new idea alert for the specific Philosopher by name. - Eat: We will print the philosopher is eating, with a lock for the left_fork and a lock for the right_fork. I'm not 100% sure what this is going to do fully and why. There will also be a line printed for the philosopher eating and a thread sleep for a 10 millisecond duration.
-
Think: We will create a send from
- We create a static array of slices including the philosophers names. This is created this way to have this exist for the life of the program. Not requiring data to be transferred, but accessed with a read only setup.
Now we start the program within main
.
- We start our
sync_channel
with a buffer of 10. - We start the forks that will loop through 0 to the length of the Philosophers slices. Which will be mapped with a Fork into a Mutex that is also inside of a Arc. Which will be collected into a Vector.
- Let's loop through the forks at this point. What is the point of creating and looping thought this vector? I'm not 100% sure. Seems as if it's just creating a loop that has forks in them. We may be able to do this with just a normal loop?
- We clone the tx and create the left and right forks as mutable variables. This allows for forks to be the same fork as other philosophers. Possibly creating possible deadlocks.
-
We then run a function called
std::mem::swap
that will take references to the left and right forks. To allow for the breaking of the circular waiting condition. There is some magic going on in here. Not 100% sure how or why this works correctly. - We create the philosopher with the data from inside of the loop with the left and right fork.
- We then create a thread that moves into a for loop that runs eat and think for each philosopher. Running 10 times for each philosopher.
- We then drop the tx in the main thread and then start going through the rx and print the thoughts out. This allows for the buffer to clear out and everything to keep printing.
This is the solution for the first challenge. It's a bit confusing and it has some parts that might be a bit more involved than is needed. This is fine. This just shows us different methods that can show how the internal work.
Exercise 2
- Multi-threaded link checker: a larger project where you'll use Cargo to download dependencies and then check links in parallel.
use std::{sync::Arc, sync::Mutex, sync::mpsc, thread};
use reqwest::{blocking::Client, Url};
use scraper::{Html, Selector};
use thiserror::Error;
#[derive(Error, Debug)]
enum Error {
#[error("request error: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("bad http response: {0}")]
BadResponse(String),
}
#[derive(Debug)]
struct CrawlCommand {
url: Url,
extract_links: bool,
}
fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
println!("Checking {:#}", command.url);
let response = client.get(command.url.clone()).send()?;
if !response.status().is_success() {
return Err(Error::BadResponse(response.status().to_string()));
}
let mut link_urls = Vec::new();
if !command.extract_links {
return Ok(link_urls);
}
let base_url = response.url().to_owned();
let body_text = response.text()?;
let document = Html::parse_document(&body_text);
let selector = Selector::parse("a").unwrap();
let href_values = document
.select(&selector)
.filter_map(|element| element.value().attr("href"));
for href in href_values {
match base_url.join(href) {
Ok(link_url) => {
link_urls.push(link_url);
}
Err(err) => {
println!("On {base_url:#}: ignored unparsable {href:?}: {err}");
}
}
}
Ok(link_urls)
}
struct CrawlState {
domain: String,
visited_pages: std::collections::HashSet<String>,
}
impl CrawlState {
fn new(start_url: &Url) -> CrawlState {
let mut visited_pages = std::collections::HashSet::new();
visited_pages.insert(start_url.as_str().to_string());
CrawlState {
domain: start_url.domain().unwrap().to_string(),
visited_pages,
}
}
/// Determine whether links within the given page should be extracted
fn should_extract_links(&self, url: &Url) -> bool {
let Some(url_domain) = url.domain() else {
return false;
};
url_domain == self.domain
}
/// Mark the given pages as visited, returning true if it had already
/// been visited
fn mark_visited(&mut self, url: &Url) -> bool {
self.visited_pages.insert(url.as_str().to_string())
}
}
type CrawlResult = Result<Vec<Url>, (Url, Error)>;
fn spawn_crawler_threads(
command_receiver: mpsc::Receiver<CrawlCommand>,
result_sender: mpsc::Sender<CrawlResult>,
thread_count: u32,
) {
let command_receiver = Arc::new(Mutex::new(command_receiver));
for _ in 0..thread_count {
let result_sender = result_sender.clone();
let command_receiver = command_receiver.clone();
thread::spawn(move || {
let client = Client::new();
loop {
let command_result = {
let receiver_guard = command_receiver.lock().unwrap();
receiver_guard.recv()
};
let Ok(crawl_command) = command_result else {
// The sender got dropped. No more commands coming in.
break;
};
let crawl_result = match visit_page(&client, &crawl_command) {
Ok(link_urls) => Ok(link_urls),
Err(error) => Err((crawl_command.url, error)),
};
result_sender.send(crawl_result).unwrap();
}
});
}
}
fn control_crawl(
start_url: Url,
command_sender: mpsc::Sender<CrawlCommand>,
result_receiver: mpsc::Receiver<CrawlResult>,
) -> Vec<Url> {
let mut crawl_state = CrawlState::new(&start_url);
let start_command = CrawlCommand { url: start_url, extract_links: true };
command_sender.send(start_command).unwrap();
let mut pending_urls = 1;
let mut bad_urls = Vec::new();
while pending_urls > 0 {
let crawl_result = result_receiver.recv().unwrap();
pending_urls -= 1;
match crawl_result {
Ok(link_urls) => {
for url in link_urls {
if crawl_state.mark_visited(&url) {
let extract_links = crawl_state.should_extract_links(&url);
let crawl_command = CrawlCommand { url, extract_links };
command_sender.send(crawl_command).unwrap();
pending_urls += 1;
}
}
}
Err((url, error)) => {
bad_urls.push(url);
println!("Got crawling error: {:#}", error);
continue;
}
}
}
bad_urls
}
fn check_links(start_url: Url) -> Vec<Url> {
let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
spawn_crawler_threads(command_receiver, result_sender, 16);
control_crawl(start_url, command_sender, result_receiver)
}
fn main() {
let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
let bad_urls = check_links(start_url);
println!("Bad URLs: {:#?}", bad_urls);
}
- What's going on here? Does it make sense?
I want to start from the very bottom and working the way up.
fn main() {
let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
let bad_urls = check_links(start_url);
println!("Bad URLs: {:#?}", bad_urls);
}
- We will pull out the URL from
google.org
for our start URL. - We will create a bad_urls variable and use check_links with our start_url as the point.
- We print out the bad_urls
What does this even mean? What's really going on?
fn check_links(start_url: Url) -> Vec<Url> {
let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
spawn_crawler_threads(command_receiver, result_sender, 16);
control_crawl(start_url, command_sender, result_receiver)
}
The types/structs inside of the channels
type CrawlResult = Result<Vec<Url>, (Url, Error)>;
#[derive(Debug)]
struct CrawlCommand {
url: Url,
extract_links: bool,
}
- we create two multiple producers and single consumer channels. One with a CrawlResult and another with the CrawlCommand.
- We spawn_crawler_threads and send in the command receiver and the result sender. Along with 16 for the buffer size.
- We run control_crawl that takes in the start_url, the command sender and the result receiver.
fn spawn_crawler_threads(
command_receiver: mpsc::Receiver<CrawlCommand>,
result_sender: mpsc::Sender<CrawlResult>,
thread_count: u32,
) {
let command_receiver = Arc::new(Mutex::new(command_receiver));
for _ in 0..thread_count {
let result_sender = result_sender.clone();
let command_receiver = command_receiver.clone();
thread::spawn(move || {
let client = Client::new();
loop {
let command_result = {
let receiver_guard = command_receiver.lock().unwrap();
receiver_guard.recv()
};
let Ok(crawl_command) = command_result else {
// The sender got dropped. No more commands coming in.
break;
};
let crawl_result = match visit_page(&client, &crawl_command) {
Ok(link_urls) => Ok(link_urls),
Err(error) => Err((crawl_command.url, error)),
};
result_sender.send(crawl_result).unwrap();
}
});
}
}
- We take in the command receiver, result sender and thread count.
- We put our command receiver inside of a Arc and Mutex to pass around threads
- We loop through our thread count from 0 to the thread count number.
- Inside the loop we clone the sender for that thread.
- We also clone the receiver for each thread.
- Now we spawn a thread.
- We create a blocking Client from reqwest.
- Next we start a loop that will continue until if the command result doesn't end up in an error. If it errors there will be a break in the loop.
- Before the chance for the loop break. You create a receiver guard that locks the command receiver and unwraps it.
- We then attempt to open up the receiver on this thread. If that fails and errors it will break.
- If it's successful then the crawl_command will be reciever and the run a visit page. With a reference to the client and the crawl command. If it's successful it will return the link urls or if it error. It will reutrn the crawl_command url and the error inside of a tuple.
- It will take that crawl result and use the result_sender to send the crawl results.
fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
println!("Checking {:#}", command.url);
let response = client.get(command.url.clone()).send()?;
if !response.status().is_success() {
return Err(Error::BadResponse(response.status().to_string()));
}
let mut link_urls = Vec::new();
if !command.extract_links {
return Ok(link_urls);
}
let base_url = response.url().to_owned();
let body_text = response.text()?;
let document = Html::parse_document(&body_text);
let selector = Selector::parse("a").unwrap();
let href_values = document
.select(&selector)
.filter_map(|element| element.value().attr("href"));
for href in href_values {
match base_url.join(href) {
Ok(link_url) => {
link_urls.push(link_url);
}
Err(err) => {
println!("On {base_url:#}: ignored unparsable {href:?}: {err}");
}
}
}
Ok(link_urls)
}
- Let's explore the visit page since we go into this inside for the spawned crawler threads.
- We print checking the page we are visiting.
- We create a client get call and return a response. With this response we either return an error or we don't and move on.
- We create a vector for the new link urls.
- If there is not command for extract links we will return Ok with links_url.
- We then move to extract the body of the document that we returned and parse out the a tags. Pulling out a vector of href_value.
- We will then loop through those href values and joining the base_url with the href. If it comes back OK then we push the link_url to the link_urls vector we made earlier.
- If we error then we will print the error. After the loop we will return the link_urls.
- Let's move onto the control_crawl since we've setup the crawl threads.
fn control_crawl(
start_url: Url,
command_sender: mpsc::Sender<CrawlCommand>,
result_receiver: mpsc::Receiver<CrawlResult>,
) -> Vec<Url> {
let mut crawl_state = CrawlState::new(&start_url);
let start_command = CrawlCommand { url: start_url, extract_links: true };
command_sender.send(start_command).unwrap();
let mut pending_urls = 1;
let mut bad_urls = Vec::new();
while pending_urls > 0 {
let crawl_result = result_receiver.recv().unwrap();
pending_urls -= 1;
match crawl_result {
Ok(link_urls) => {
for url in link_urls {
if crawl_state.mark_visited(&url) {
let extract_links = crawl_state.should_extract_links(&url);
let crawl_command = CrawlCommand { url, extract_links };
command_sender.send(crawl_command).unwrap();
pending_urls += 1;
}
}
}
Err((url, error)) => {
bad_urls.push(url);
println!("Got crawling error: {:#}", error);
continue;
}
}
}
bad_urls
}