Jumping back into Rust 🦀 after a break

Cover Image for Jumping back into Rust 🦀 after a break
Justin Bender
Justin Bender
Contents

Jumping back into Rust 🦀 after a break

What are the goals of this course?

  • Give you a comprehensive understanding of the Rust syntax and language.
  • Enable you to modify existing programs and write new programs in Rust.
  • Show you common Rust idioms.

Topics I would like to explore next:

Non-goals

Rust is a large language and we won't be able to cover all of it in a few days. Some non-goals of this course are:

  • Learning how to develop macros. You should go to the rust book or rust by example book.

  • Day 1: Basic Rust, syntax, control flow, creating and consuming values.
  • Day 2: Memory management, ownership, compound data types, and the standard library.
  • Day 3: Generics, traits, error handling, testing, and unsafe Rust.

I think I'm going to skip most of this and go right into the concurrency sections. It's a bit more interesting to me over just going over the items I've already looked at for a while.


Concurrency In Rust

Setup the project:

cargo init concurrency
cd concurrency
cargo add tokio --features full
cargo run

Rust has full support for concurrency using OS threads with mutexes and channels

The Rust type system plays an important role in making many concurrency bugs compile time bugs. This is often referred to as fearless concurrency since you can rely on the compiler to ensure correctness at runtime.

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("Count in thread: {i}!");
            thread::sleep(Duration::from_millis(5));
        }
    });

    for i in 1..5 {
        println!("Main thread: {i}");
        thread::sleep(Duration::from_millis(5));
    }
}
  • Threads are all daemon threads, the main thread does not wait for them.

  • Thread panics are independent of eachother.

    • Panics can carry a payload, which can be unpacked with downcase_ref.
  • Notice that the thread is stopped before it reaches 10 — the main thread is not waiting.

  • Use let handle = thread::spawn(...) and later handle.join() to wait for the thread to finish.

  • Trigger a panic in the thread, notice how this doesn’t affect main.

  • Use the Result return value from handle.join() to get access to the panic payload. This is a good time to talk about Any.

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("Count in thread: {i}!");
            if i == 2 { panic!("Thread paniced at {}", i); }
            thread::sleep(Duration::from_millis(5));
        }
    });

    for i in 1..5 {
        println!("Main thread: {i}");
        thread::sleep(Duration::from_millis(5));
    }

    let result = handle.join();

    match result {
        Ok(_) => println!("Thread completed successfully."),
        Err(e) => {
            println!("Thread panicked: {:?}", e);
            if let Some(payload) = e.downcast_ref::<&str>() {
                println!("Panic payload {}", payload);
            } else if let Some(payload) = e.downcast_ref::<String>() {
                println!("Panic payload {}", payload);
            } else {
                println!("Unknown panic payload type.");
            }
        }
    }
}

This will make the extra thread fail at the 2nd loop. It still prints an interesting error that I don't understand, but we will continue on in hopes that it will become more clear in the future.

Normal threads cannot borrow from their environment.

use std::thread;

fn foo() {
    let s = String::from("Hello");
    thread::spawn(|| {
        println!("Length: {}", s.len());
    });
}

fn main() {
    foo();
}

However, you can used a scoped thread for this:

use std::thread;

fn foo() {
    let s = String::from("Hello");
    thread::scope(|scope| {
        scope.spawn(|| {
            println!("Length: {}", s.len());
        });
    });
}

fn main() {
    foo();
}
  • The reason for that is that when the thread::scope function completes, all the threads are guaranteed to be joined, so they can return borrowed data.
  • Normal Rust borrowing rules apply: you can either borrow mutably by one thread, or immutably by any number of threads.

Channels

Rust channels have two parts: a Sender<T> and a Receiver<T>. The two parts are connected via the channel, but only see the end-points.

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    tx.send(10).unwrap();
    tx.send(20).unwrap();

    println!("Received: {:?}", rx.recv());
    println!("Received: {:?}", rx.recv());

    let tx2 = tx.clone();
    tx2.send(30).unwrap();
    println!("Received: {:?}", rx.recv());
}
  • mpsc stands for Multi-Producer, Single-Consumer. Sender and SyncSender implement Clone (so you can make multiple producers) but Receiver does not.
  • send() and recv() return Result. If they return Err, it means the counterpart Sender or Receiver is dropped and the channel is closed.

Unbounded Channels

You get an unbounded and asynchronous channel with mpsc::channel():

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let thread_id = thread::current().id();
        for i in 1..10 {
            tx.send(format!("Message {i}")).unwrap();
            println!("{thread_id:?}: sent Message {i}");
        }
        println!("{thread_id:?}: done");
    });
    thread::sleep(Duration::from_millis(100));

    for msg in rx.iter() {
        println!("Main: got {msg}");
    }
}

Bounded Channels

With bounded channels (synchronous) channels, send can block the current thread:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::sync_channel(4);

    thread::spawn(move || {
        let thread_id = thread::current().id();
        for i in 1..10 {
            tx.send(format!("Message {i}")).unwrap();
            println!("{thread_id:?}: sent Message {i}");
        }
        println!("{thread_id:?}: done");
    });
    thread::sleep(Duration::from_millis(100));

    for msg in rx.iter() {
        println!("Main: got {msg}");
    }
}

The output is a bit strange to me.

   Compiling concurrency v0.1.0 (/Users/justinbender/projects/rust_programs/concurrency)
    Finished dev [unoptimized + debuginfo] target(s) in 0.14s
     Running `target/debug/concurrency`
ThreadId(2): sent Message 1
ThreadId(2): sent Message 2
ThreadId(2): sent Message 3
ThreadId(2): sent Message 4
Main: got Message 1
Main: got Message 2
Main: got Message 3
Main: got Message 4
Main: got Message 5
ThreadId(2): sent Message 5
ThreadId(2): sent Message 6
ThreadId(2): sent Message 7
ThreadId(2): sent Message 8
ThreadId(2): sent Message 9
ThreadId(2): done
Main: got Message 6
Main: got Message 7
Main: got Message 8
Main: got Message 9
[Finished running. Exit status: 0]
  • Calling send will block the current thread until there is space in the channel for the new message. The thread can be blocked indefinitely if there is nobody who reads from the channel.
  • A call to send will abort with an error ( that is why it returns Result) if the channel is closed. A channel is closed when the receiver is dropped.
  • A bounded channel with a size of zero is called a "rendezvous channel. Every send will block the current thread until another thread calls read`.

Send and Sync

How does Rust know to forbid sharing access across threads? The answer is in two traits:

  • Send: a type T is Send if it is safe to move a T across a thread boundary.
  • Sync: a type T is Sync if it is safe to move a &T across a thread boundary.

Send and Sync are unsafe traits. The compiler will automatically derive them for your types as long as they only contain Send and Sync types. You can also implement them manually when you know it is valid.

  • One can think of these traits as markers that the type has certain thread-safety properties.
  • They can be used in the generic constraints as normal traits.

Send

A type T is Send if it is safe to move a T value to another thread.

The effect of moving ownership to another thread is that destructors wil run in that thread. So the question is when you can allocate a value in one thread and deallocate it in another.

An example, a connection to the SQLite library must only be accessed form a single thread.

Sync

A type T is Sync if it is safe to access T value from multiple threads at the same time.

More precisely, the definition is:

T is Sync if and only if &T is Send

This statement is essentially a shorthand way of saying that if a type is thread-safe for shared use, it is also thread-safe to pass references of it across threads.

This is because if a type is Sync it means that it can be shared across multiple threads without the risk of data races or other synchronization issues, so it is safe to move it to another thread. A reference of the type is also safe to move to another thread, because the data it references can be accessed for any thread safely.

Examples

Send + Sync

Most types you come across are Send + Sync:

  • i8, f32, bool, char, &str, ...
  • (T1, T2), [T; N], &[T], struct { x: T }, ...
  • String, Option<T>, Vec<T>, Box<T>, ...
  • Arc<T>: Explicitly thread-safe via atomic reference count.
  • Mutex<T>: Explicitly thread-safe via internal locking
  • AtomicBool, AtomicU8, ...: Uses special atomic instructions.

The generic types are typically Send + Sync when the type parameters are Send + Sync.

Send + !Sync

These types can be moved to other threads, but they're not thread-safe. Typically because of interior mutability.

  • mpsc::Sender<T>
  • mpsc::Receiver<T>
  • Cell<T>
  • RefCell<T>

!Send + Sync

These types are thread safe, but they cannot be moved to another thread:

  • MutexGuard<T>: Uses OS level primitives which must be deallocated on the thread which created them.

!Send + !Sync

These types are not thread-safe and cannot be moved to other threads:

  • Rc<T>: each Rc<T> has a reference to and RcBox<T>, which contains a non-atomic reference count.
  • *const T, *mut T: Rust assumes raw pointers may have special concurrency considerations.

Shared state

Rust uses the type system to enforce synchronization of shared data. This is primarily done via two types.

  • Arc<T>, atomic reference counted T: handles shared between threads and takes care to deallocate T when the last reference is dropped.
  • Mutex<T>: ensures mutability exclusive access to the T value.

Arc

Arc<T> allows shared read-only access via Arc::clone:

use std::thread;
use std::sync::Arc;

fn main() {
    let v = Arc::new(vec![10, 20, 30]);
    let mut handles = Vec::new();
    for _ in 1..5 {
        let v = Arc::clone(&v);
        handles.push(thread::spawn(move || {
            let thread_id = thread::current().id();
            println!("{thread_id:?}: {v:?}");
        }));
    }
    handles.into_iter().for_each(|h| h.join().unwrap());
    println!("v: {v:?}");
}
  • Arc stands for "Atomic Reference Counted", a thread safe version or `Rc that used atomic operations.
  • Arc<T> implements Clone whether or not T does. It implements Send and Sync if and only if T implements them both.
  • Arc::clone() has the cost of atomic operations that get executed, but after that the use of the T is free.
  • Beware of reference cycles, Arc does not use a garbage collector to detect them.
    • std::sync::Weak can help.

Mutex

Mutex<T> ensures mutual exclusion and allows mutable access to T behind a read-only interface:

use std::sync::Mutex;

fn main() {
    let v = Mutex::new(vec![10, 20, 30]);
    println!("v: {:?}", v.lock().unwrap());

    {
        let mut guard = v.lock().unwrap();
        guard.push(40);
    }
    {
        let mut guard2 = v.lock().unwrap();
        guard2.push(50);
    }

    println!("v: {:?}", v.lock().unwrap());
}

Notice how we have a impl<T: Send> Sync for Mutex<T> blanket implementation.

  • Mutex in Rust looks like a collection with just one element - the protected data.
    • It is not possible to forget to acquire the mutex before accessing the protected data.
  • You can get an &mut T from an &Mutex<T> by taking the lock. The MutexGuard ensures that the &mut T doesn't outlive the lock being held.
  • Mutex<T> implements both Send and Sync iff (if and only if) T implements Send.
  • A read-write lock counterpart - RwLock.
  • Why does lock() return a Result?
    • If the thread that held the Mutex panicked, the Mutex becomes "poisoned" to signal that the data is protected might be an inconsistent state. Calling lock() on a poisoned mutex fails with a PoisonError. You can call into_inner() on the error to recover the data regardless.

Example

Let us see Arc and Mutex in action:

use std::thread;
// use std::sync::{Arc, Mutex};

fn main() {
    let v = vec![10, 20, 30];
    let handle = thread::spawn(|| {
        v.push(10);
    });
    v.push(1000);

    handle.join().unwrap();
    println!("v: {v:?}");
}

What can we do to make it work?

use std::thread;
use std::sync::{Arc, Mutex};

fn main() {
    let v = Arc::new(Mutex::new(vec![10, 20, 30]));
    let v2 = Arc::clone(&v);

    let handle = thread::spawn(move || {
        let mut v2 = v2.lock().unwrap();
        v2.push(10);
    });

    {
        let mut v = v.lock().unwrap();
        v.push(1000);
    }

    handle.join().unwrap();
    println!("v: {v:#?}");
}
  • v is wrapped in both Arc and Mutex, because their concerns are orthogonal.
    • Wrapping a Mutex in an Arc is a common pattern to share mutable state between threads.
  • v: Arc<_> needs to be cloned as v2 before it can be moved into another thread. Note move was added to the lambda signature.
  • Blocks are introduced to narrow the scope of the LockGuard as much as possible.

Exercises

Let us practice our new concurrency skills with:

Exercise 1

  • Dining philosophers: a classic problem in concurrency.
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

struct Fork;

struct Philosopher {
    name: String,
    left_fork: Arc<Mutex<Fork>>,
    right_fork: Arc<Mutex<Fork>>,
    thoughts: mpsc::SyncSender<String>,
}

impl Philosopher {
    fn think(&self) {
        self.thoughts
            .send(format!("Eureka! {} has a new idea!", &self.name))
            .unwrap();
    }

    fn eat(&self) {
        println!("{} is trying to eat", &self.name);
        let _left = self.left_fork.lock().unwrap();
        let _right = self.right_fork.lock().unwrap();

        println!("{} is eating...", &self.name);
        thread::sleep(Duration::from_millis(10));
    }
}

static PHILOSOPHERS: &[&str] =
&["Socrates", "Plato", "Aristotle", "Thales", "Pythagoras"];

fn main() {
    let (tx, rx) = mpsc::sync_channel(10);

    let forks = (0..PHILOSOPHERS.len())
        .map(|_| Arc::new(Mutex::new(Fork)))
        .collect::<Vec<_>>();

    for i in 0..forks.len() {
        let tx = tx.clone();
        let mut left_fork = Arc::clone(&forks[i]);
        let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]);

        // To avoid a deadlock, we have to break the symmetry
        // somewhere. This will swap the forks without deinitializing
        // either of them
        if i == forks.len() - 1 {
            std::mem::swap(&mut left_fork, &mut right_fork);
        }

        let philosopher = Philosopher {
            name: PHILOSOPHERS[i].to_string(),
            thoughts: tx,
            left_fork,
            right_fork,
        };

        thread::spawn(move || {
            for _ in 0..10 {
                philosopher.eat();
                philosopher.think();
            }
        });
    }
    drop(tx);
    for thought in rx {
        println!("{thought}");
    }
}
  • What's going on here? Does it make sense?
  • We bring in the multiple producer single consumer, Atomic Reference counter, mutual exclusive primitive.
  • We bring in threads and Duration.
  • We build a basic Fork struct with no internal values.
  • We also build up a Philosopher struct. That has the values name, left_fork, right_fork, and thoughts.
    • name: String
    • left_fork: Atomic reference with an internal mutual exclusive primitive with an internal Fork struct
    • right_fork: Atomic reference with an internal mutual exclusive primitive with an internal Fork struct
    • thoughts: multiple producer single consumer sync sender with an internal String
  • We implement the Philosopher with two methods. Think and Eat. Both only taking in 1 value &self.
    • Think: We will create a send from mpsc::SyncSender with a new idea alert for the specific Philosopher by name.
    • Eat: We will print the philosopher is eating, with a lock for the left_fork and a lock for the right_fork. I'm not 100% sure what this is going to do fully and why. There will also be a line printed for the philosopher eating and a thread sleep for a 10 millisecond duration.
  • We create a static array of slices including the philosophers names. This is created this way to have this exist for the life of the program. Not requiring data to be transferred, but accessed with a read only setup.

Now we start the program within main.

  • We start our sync_channel with a buffer of 10.
  • We start the forks that will loop through 0 to the length of the Philosophers slices. Which will be mapped with a Fork into a Mutex that is also inside of a Arc. Which will be collected into a Vector.
  • Let's loop through the forks at this point. What is the point of creating and looping thought this vector? I'm not 100% sure. Seems as if it's just creating a loop that has forks in them. We may be able to do this with just a normal loop?
  • We clone the tx and create the left and right forks as mutable variables. This allows for forks to be the same fork as other philosophers. Possibly creating possible deadlocks.
  • We then run a function called std::mem::swap that will take references to the left and right forks. To allow for the breaking of the circular waiting condition. There is some magic going on in here. Not 100% sure how or why this works correctly.
  • We create the philosopher with the data from inside of the loop with the left and right fork.
  • We then create a thread that moves into a for loop that runs eat and think for each philosopher. Running 10 times for each philosopher.
  • We then drop the tx in the main thread and then start going through the rx and print the thoughts out. This allows for the buffer to clear out and everything to keep printing.

This is the solution for the first challenge. It's a bit confusing and it has some parts that might be a bit more involved than is needed. This is fine. This just shows us different methods that can show how the internal work.

Exercise 2

  • Multi-threaded link checker: a larger project where you'll use Cargo to download dependencies and then check links in parallel.
use std::{sync::Arc, sync::Mutex, sync::mpsc, thread};

use reqwest::{blocking::Client, Url};
use scraper::{Html, Selector};
use thiserror::Error;

#[derive(Error, Debug)]
enum Error {
    #[error("request error: {0}")]
    ReqwestError(#[from] reqwest::Error),
    #[error("bad http response: {0}")]
    BadResponse(String),
}

#[derive(Debug)]
struct CrawlCommand {
    url: Url,
    extract_links: bool,
}

fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
    println!("Checking {:#}", command.url);
    let response = client.get(command.url.clone()).send()?;
    if !response.status().is_success() {
        return Err(Error::BadResponse(response.status().to_string()));
    }

    let mut link_urls = Vec::new();
    if !command.extract_links {
        return Ok(link_urls);
    }

    let base_url = response.url().to_owned();
    let body_text = response.text()?;
    let document = Html::parse_document(&body_text);

    let selector = Selector::parse("a").unwrap();
    let href_values = document
        .select(&selector)
        .filter_map(|element| element.value().attr("href"));
    for href in href_values {
        match base_url.join(href) {
            Ok(link_url) => {
                link_urls.push(link_url);
            }
            Err(err) => {
                println!("On {base_url:#}: ignored unparsable {href:?}: {err}");
            }
        }
    }
    Ok(link_urls)
}

struct CrawlState {
    domain: String,
    visited_pages: std::collections::HashSet<String>,
}

impl CrawlState {
    fn new(start_url: &Url) -> CrawlState {
        let mut visited_pages = std::collections::HashSet::new();
        visited_pages.insert(start_url.as_str().to_string());
        CrawlState {
            domain: start_url.domain().unwrap().to_string(),
            visited_pages,
        }
    }

    /// Determine whether links within the given page should be extracted
    fn should_extract_links(&self, url: &Url) -> bool {
        let Some(url_domain) = url.domain() else {
            return false;
        };
        url_domain == self.domain
    }

    /// Mark the given pages as visited, returning true if it had already
    /// been visited
    fn mark_visited(&mut self, url: &Url) -> bool {
        self.visited_pages.insert(url.as_str().to_string())
    }
}

type CrawlResult = Result<Vec<Url>, (Url, Error)>;
fn spawn_crawler_threads(
    command_receiver: mpsc::Receiver<CrawlCommand>,
    result_sender: mpsc::Sender<CrawlResult>,
    thread_count: u32,
) {
    let command_receiver = Arc::new(Mutex::new(command_receiver));

    for _ in 0..thread_count {
        let result_sender = result_sender.clone();
        let command_receiver = command_receiver.clone();
        thread::spawn(move || {
            let client = Client::new();
            loop {
                let command_result = {
                    let receiver_guard = command_receiver.lock().unwrap();
                    receiver_guard.recv()
                };
                let Ok(crawl_command) = command_result else {
                    // The sender got dropped. No more commands coming in.
                    break;
                };
                let crawl_result = match visit_page(&client, &crawl_command) {
                    Ok(link_urls) => Ok(link_urls),
                    Err(error) => Err((crawl_command.url, error)),
                };
                result_sender.send(crawl_result).unwrap();
            }
        });
    }
}

fn control_crawl(
    start_url: Url,
    command_sender: mpsc::Sender<CrawlCommand>,
    result_receiver: mpsc::Receiver<CrawlResult>,
) -> Vec<Url> {
    let mut crawl_state = CrawlState::new(&start_url);
    let start_command = CrawlCommand { url: start_url, extract_links: true };
    command_sender.send(start_command).unwrap();
    let mut pending_urls = 1;

    let mut bad_urls = Vec::new();
    while pending_urls > 0 {
        let crawl_result = result_receiver.recv().unwrap();
        pending_urls -= 1;

        match crawl_result {
            Ok(link_urls) => {
                for url in link_urls {
                    if crawl_state.mark_visited(&url) {
                        let extract_links = crawl_state.should_extract_links(&url);
                        let crawl_command = CrawlCommand { url, extract_links };
                        command_sender.send(crawl_command).unwrap();
                        pending_urls += 1;
                    }
                }
            }
            Err((url, error)) => {
                bad_urls.push(url);
                println!("Got crawling error: {:#}", error);
                continue;
            }
        }
    }
    bad_urls
}

fn check_links(start_url: Url) -> Vec<Url> {
    let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
    let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
    spawn_crawler_threads(command_receiver, result_sender, 16);
    control_crawl(start_url, command_sender, result_receiver)
}

fn main() {
    let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
    let bad_urls = check_links(start_url);
    println!("Bad URLs: {:#?}", bad_urls);
}
  • What's going on here? Does it make sense?

I want to start from the very bottom and working the way up.

fn main() {
    let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
    let bad_urls = check_links(start_url);
    println!("Bad URLs: {:#?}", bad_urls);
}
  • We will pull out the URL from google.org for our start URL.
  • We will create a bad_urls variable and use check_links with our start_url as the point.
  • We print out the bad_urls

What does this even mean? What's really going on?

fn check_links(start_url: Url) -> Vec<Url> {
    let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
    let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
    spawn_crawler_threads(command_receiver, result_sender, 16);
    control_crawl(start_url, command_sender, result_receiver)
}

The types/structs inside of the channels

type CrawlResult = Result<Vec<Url>, (Url, Error)>;

#[derive(Debug)]
struct CrawlCommand {
    url: Url,
    extract_links: bool,
}
  • we create two multiple producers and single consumer channels. One with a CrawlResult and another with the CrawlCommand.
  • We spawn_crawler_threads and send in the command receiver and the result sender. Along with 16 for the buffer size.
  • We run control_crawl that takes in the start_url, the command sender and the result receiver.
fn spawn_crawler_threads(
    command_receiver: mpsc::Receiver<CrawlCommand>,
    result_sender: mpsc::Sender<CrawlResult>,
    thread_count: u32,
) {
    let command_receiver = Arc::new(Mutex::new(command_receiver));

    for _ in 0..thread_count {
        let result_sender = result_sender.clone();
        let command_receiver = command_receiver.clone();
        thread::spawn(move || {
            let client = Client::new();
            loop {
                let command_result = {
                    let receiver_guard = command_receiver.lock().unwrap();
                    receiver_guard.recv()
                };
                let Ok(crawl_command) = command_result else {
                    // The sender got dropped. No more commands coming in.
                    break;
                };
                let crawl_result = match visit_page(&client, &crawl_command) {
                    Ok(link_urls) => Ok(link_urls),
                    Err(error) => Err((crawl_command.url, error)),
                };
                result_sender.send(crawl_result).unwrap();
            }
        });
    }
}
  • We take in the command receiver, result sender and thread count.
  • We put our command receiver inside of a Arc and Mutex to pass around threads
  • We loop through our thread count from 0 to the thread count number.
  • Inside the loop we clone the sender for that thread.
  • We also clone the receiver for each thread.
  • Now we spawn a thread.
  • We create a blocking Client from reqwest.
  • Next we start a loop that will continue until if the command result doesn't end up in an error. If it errors there will be a break in the loop.
  • Before the chance for the loop break. You create a receiver guard that locks the command receiver and unwraps it.
  • We then attempt to open up the receiver on this thread. If that fails and errors it will break.
  • If it's successful then the crawl_command will be reciever and the run a visit page. With a reference to the client and the crawl command. If it's successful it will return the link urls or if it error. It will reutrn the crawl_command url and the error inside of a tuple.
  • It will take that crawl result and use the result_sender to send the crawl results.
fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
    println!("Checking {:#}", command.url);
    let response = client.get(command.url.clone()).send()?;
    if !response.status().is_success() {
        return Err(Error::BadResponse(response.status().to_string()));
    }

    let mut link_urls = Vec::new();
    if !command.extract_links {
        return Ok(link_urls);
    }

    let base_url = response.url().to_owned();
    let body_text = response.text()?;
    let document = Html::parse_document(&body_text);

    let selector = Selector::parse("a").unwrap();
    let href_values = document
        .select(&selector)
        .filter_map(|element| element.value().attr("href"));
    for href in href_values {
        match base_url.join(href) {
            Ok(link_url) => {
                link_urls.push(link_url);
            }
            Err(err) => {
                println!("On {base_url:#}: ignored unparsable {href:?}: {err}");
            }
        }
    }
    Ok(link_urls)
}
  • Let's explore the visit page since we go into this inside for the spawned crawler threads.
  • We print checking the page we are visiting.
  • We create a client get call and return a response. With this response we either return an error or we don't and move on.
  • We create a vector for the new link urls.
  • If there is not command for extract links we will return Ok with links_url.
  • We then move to extract the body of the document that we returned and parse out the a tags. Pulling out a vector of href_value.
  • We will then loop through those href values and joining the base_url with the href. If it comes back OK then we push the link_url to the link_urls vector we made earlier.
  • If we error then we will print the error. After the loop we will return the link_urls.
  • Let's move onto the control_crawl since we've setup the crawl threads.
fn control_crawl(
    start_url: Url,
    command_sender: mpsc::Sender<CrawlCommand>,
    result_receiver: mpsc::Receiver<CrawlResult>,
) -> Vec<Url> {
    let mut crawl_state = CrawlState::new(&start_url);
    let start_command = CrawlCommand { url: start_url, extract_links: true };
    command_sender.send(start_command).unwrap();
    let mut pending_urls = 1;

    let mut bad_urls = Vec::new();
    while pending_urls > 0 {
        let crawl_result = result_receiver.recv().unwrap();
        pending_urls -= 1;

        match crawl_result {
            Ok(link_urls) => {
                for url in link_urls {
                    if crawl_state.mark_visited(&url) {
                        let extract_links = crawl_state.should_extract_links(&url);
                        let crawl_command = CrawlCommand { url, extract_links };
                        command_sender.send(crawl_command).unwrap();
                        pending_urls += 1;
                    }
                }
            }
            Err((url, error)) => {
                bad_urls.push(url);
                println!("Got crawling error: {:#}", error);
                continue;
            }
        }
    }
    bad_urls
}