并发编程:上午练习
哲学家就餐问题
(返回练习)
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Fork;
struct Philosopher {
name: String,
left_fork: Arc<Mutex<Fork>>,
right_fork: Arc<Mutex<Fork>>,
thoughts: mpsc::SyncSender<String>,
}
impl Philosopher {
fn think(&self) {
self.thoughts
.send(format!("Eureka! {} has a new idea!", &self.name))
.unwrap();
}
fn eat(&self) {
println!("{} is trying to eat", &self.name);
let _left = self.left_fork.lock().unwrap();
let _right = self.right_fork.lock().unwrap();
println!("{} is eating...", &self.name);
thread::sleep(Duration::from_millis(10));
}
}
static PHILOSOPHERS: &[&str] =
&["Socrates", "Hypatia", "Plato", "Aristotle", "Pythagoras"];
fn main() {
let (tx, rx) = mpsc::sync_channel(10);
let forks = (0..PHILOSOPHERS.len())
.map(|_| Arc::new(Mutex::new(Fork)))
.collect::<Vec<_>>();
for i in 0..forks.len() {
let tx = tx.clone();
let mut left_fork = Arc::clone(&forks[i]);
let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]);
// To avoid a deadlock, we have to break the symmetry
// somewhere. This will swap the forks without deinitializing
// either of them.
if i == forks.len() - 1 {
std::mem::swap(&mut left_fork, &mut right_fork);
}
let philosopher = Philosopher {
name: PHILOSOPHERS[i].to_string(),
thoughts: tx,
left_fork,
right_fork,
};
thread::spawn(move || {
for _ in 0..100 {
philosopher.eat();
philosopher.think();
}
});
}
drop(tx);
for thought in rx {
println!("{thought}");
}
}
Link Checker
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use reqwest::blocking::Client;
use reqwest::Url;
use scraper::{Html, Selector};
use thiserror::Error;
#[derive(Error, Debug)]
enum Error {
#[error("request error: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("bad http response: {0}")]
BadResponse(String),
}
#[derive(Debug)]
struct CrawlCommand {
url: Url,
extract_links: bool,
}
fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
println!("Checking {:#}", command.url);
let response = client.get(command.url.clone()).send()?;
if !response.status().is_success() {
return Err(Error::BadResponse(response.status().to_string()));
}
let mut link_urls = Vec::new();
if !command.extract_links {
return Ok(link_urls);
}
let base_url = response.url().to_owned();
let body_text = response.text()?;
let document = Html::parse_document(&body_text);
let selector = Selector::parse("a").unwrap();
let href_values = document
.select(&selector)
.filter_map(|element| element.value().attr("href"));
for href in href_values {
match base_url.join(href) {
Ok(link_url) => {
link_urls.push(link_url);
}
Err(err) => {
println!("On {base_url:#}: ignored unparsable {href:?}: {err}");
}
}
}
Ok(link_urls)
}
struct CrawlState {
domain: String,
visited_pages: std::collections::HashSet<String>,
}
impl CrawlState {
fn new(start_url: &Url) -> CrawlState {
let mut visited_pages = std::collections::HashSet::new();
visited_pages.insert(start_url.as_str().to_string());
CrawlState { domain: start_url.domain().unwrap().to_string(), visited_pages }
}
/// Determine whether links within the given page should be extracted.
fn should_extract_links(&self, url: &Url) -> bool {
let Some(url_domain) = url.domain() else {
return false;
};
url_domain == self.domain
}
/// Mark the given page as visited, returning false if it had already
/// been visited.
fn mark_visited(&mut self, url: &Url) -> bool {
self.visited_pages.insert(url.as_str().to_string())
}
}
type CrawlResult = Result<Vec<Url>, (Url, Error)>;
fn spawn_crawler_threads(
command_receiver: mpsc::Receiver<CrawlCommand>,
result_sender: mpsc::Sender<CrawlResult>,
thread_count: u32,
) {
let command_receiver = Arc::new(Mutex::new(command_receiver));
for _ in 0..thread_count {
let result_sender = result_sender.clone();
let command_receiver = command_receiver.clone();
thread::spawn(move || {
let client = Client::new();
loop {
let command_result = {
let receiver_guard = command_receiver.lock().unwrap();
receiver_guard.recv()
};
let Ok(crawl_command) = command_result else {
// The sender got dropped. No more commands coming in.
break;
};
let crawl_result = match visit_page(&client, &crawl_command) {
Ok(link_urls) => Ok(link_urls),
Err(error) => Err((crawl_command.url, error)),
};
result_sender.send(crawl_result).unwrap();
}
});
}
}
fn control_crawl(
start_url: Url,
command_sender: mpsc::Sender<CrawlCommand>,
result_receiver: mpsc::Receiver<CrawlResult>,
) -> Vec<Url> {
let mut crawl_state = CrawlState::new(&start_url);
let start_command = CrawlCommand { url: start_url, extract_links: true };
command_sender.send(start_command).unwrap();
let mut pending_urls = 1;
let mut bad_urls = Vec::new();
while pending_urls > 0 {
let crawl_result = result_receiver.recv().unwrap();
pending_urls -= 1;
match crawl_result {
Ok(link_urls) => {
for url in link_urls {
if crawl_state.mark_visited(&url) {
let extract_links = crawl_state.should_extract_links(&url);
let crawl_command = CrawlCommand { url, extract_links };
command_sender.send(crawl_command).unwrap();
pending_urls += 1;
}
}
}
Err((url, error)) => {
bad_urls.push(url);
println!("Got crawling error: {:#}", error);
continue;
}
}
}
bad_urls
}
fn check_links(start_url: Url) -> Vec<Url> {
let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
spawn_crawler_threads(command_receiver, result_sender, 16);
control_crawl(start_url, command_sender, result_receiver)
}
fn main() {
let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
let bad_urls = check_links(start_url);
println!("Bad URLs: {:#?}", bad_urls);
}