并发编程:上午练习

哲学家就餐问题

(返回练习)

  1. use std::sync::{mpsc, Arc, Mutex};
  2. use std::thread;
  3. use std::time::Duration;
  4. struct Fork;
  5. struct Philosopher {
  6. name: String,
  7. left_fork: Arc<Mutex<Fork>>,
  8. right_fork: Arc<Mutex<Fork>>,
  9. thoughts: mpsc::SyncSender<String>,
  10. }
  11. impl Philosopher {
  12. fn think(&self) {
  13. self.thoughts
  14. .send(format!("Eureka! {} has a new idea!", &self.name))
  15. .unwrap();
  16. }
  17. fn eat(&self) {
  18. println!("{} is trying to eat", &self.name);
  19. let _left = self.left_fork.lock().unwrap();
  20. let _right = self.right_fork.lock().unwrap();
  21. println!("{} is eating...", &self.name);
  22. thread::sleep(Duration::from_millis(10));
  23. }
  24. }
  25. static PHILOSOPHERS: &[&str] =
  26. &["Socrates", "Hypatia", "Plato", "Aristotle", "Pythagoras"];
  27. fn main() {
  28. let (tx, rx) = mpsc::sync_channel(10);
  29. let forks = (0..PHILOSOPHERS.len())
  30. .map(|_| Arc::new(Mutex::new(Fork)))
  31. .collect::<Vec<_>>();
  32. for i in 0..forks.len() {
  33. let tx = tx.clone();
  34. let mut left_fork = Arc::clone(&forks[i]);
  35. let mut right_fork = Arc::clone(&forks[(i + 1) % forks.len()]);
  36. // To avoid a deadlock, we have to break the symmetry
  37. // somewhere. This will swap the forks without deinitializing
  38. // either of them.
  39. if i == forks.len() - 1 {
  40. std::mem::swap(&mut left_fork, &mut right_fork);
  41. }
  42. let philosopher = Philosopher {
  43. name: PHILOSOPHERS[i].to_string(),
  44. thoughts: tx,
  45. left_fork,
  46. right_fork,
  47. };
  48. thread::spawn(move || {
  49. for _ in 0..100 {
  50. philosopher.eat();
  51. philosopher.think();
  52. }
  53. });
  54. }
  55. drop(tx);
  56. for thought in rx {
  57. println!("{thought}");
  58. }
  59. }

(back to exercise)

  1. use std::sync::{mpsc, Arc, Mutex};
  2. use std::thread;
  3. use reqwest::blocking::Client;
  4. use reqwest::Url;
  5. use scraper::{Html, Selector};
  6. use thiserror::Error;
  7. #[derive(Error, Debug)]
  8. enum Error {
  9. #[error("request error: {0}")]
  10. ReqwestError(#[from] reqwest::Error),
  11. #[error("bad http response: {0}")]
  12. BadResponse(String),
  13. }
  14. #[derive(Debug)]
  15. struct CrawlCommand {
  16. url: Url,
  17. extract_links: bool,
  18. }
  19. fn visit_page(client: &Client, command: &CrawlCommand) -> Result<Vec<Url>, Error> {
  20. println!("Checking {:#}", command.url);
  21. let response = client.get(command.url.clone()).send()?;
  22. if !response.status().is_success() {
  23. return Err(Error::BadResponse(response.status().to_string()));
  24. }
  25. let mut link_urls = Vec::new();
  26. if !command.extract_links {
  27. return Ok(link_urls);
  28. }
  29. let base_url = response.url().to_owned();
  30. let body_text = response.text()?;
  31. let document = Html::parse_document(&body_text);
  32. let selector = Selector::parse("a").unwrap();
  33. let href_values = document
  34. .select(&selector)
  35. .filter_map(|element| element.value().attr("href"));
  36. for href in href_values {
  37. match base_url.join(href) {
  38. Ok(link_url) => {
  39. link_urls.push(link_url);
  40. }
  41. Err(err) => {
  42. println!("On {base_url:#}: ignored unparsable {href:?}: {err}");
  43. }
  44. }
  45. }
  46. Ok(link_urls)
  47. }
  48. struct CrawlState {
  49. domain: String,
  50. visited_pages: std::collections::HashSet<String>,
  51. }
  52. impl CrawlState {
  53. fn new(start_url: &Url) -> CrawlState {
  54. let mut visited_pages = std::collections::HashSet::new();
  55. visited_pages.insert(start_url.as_str().to_string());
  56. CrawlState { domain: start_url.domain().unwrap().to_string(), visited_pages }
  57. }
  58. /// Determine whether links within the given page should be extracted.
  59. fn should_extract_links(&self, url: &Url) -> bool {
  60. let Some(url_domain) = url.domain() else {
  61. return false;
  62. };
  63. url_domain == self.domain
  64. }
  65. /// Mark the given page as visited, returning false if it had already
  66. /// been visited.
  67. fn mark_visited(&mut self, url: &Url) -> bool {
  68. self.visited_pages.insert(url.as_str().to_string())
  69. }
  70. }
  71. type CrawlResult = Result<Vec<Url>, (Url, Error)>;
  72. fn spawn_crawler_threads(
  73. command_receiver: mpsc::Receiver<CrawlCommand>,
  74. result_sender: mpsc::Sender<CrawlResult>,
  75. thread_count: u32,
  76. ) {
  77. let command_receiver = Arc::new(Mutex::new(command_receiver));
  78. for _ in 0..thread_count {
  79. let result_sender = result_sender.clone();
  80. let command_receiver = command_receiver.clone();
  81. thread::spawn(move || {
  82. let client = Client::new();
  83. loop {
  84. let command_result = {
  85. let receiver_guard = command_receiver.lock().unwrap();
  86. receiver_guard.recv()
  87. };
  88. let Ok(crawl_command) = command_result else {
  89. // The sender got dropped. No more commands coming in.
  90. break;
  91. };
  92. let crawl_result = match visit_page(&client, &crawl_command) {
  93. Ok(link_urls) => Ok(link_urls),
  94. Err(error) => Err((crawl_command.url, error)),
  95. };
  96. result_sender.send(crawl_result).unwrap();
  97. }
  98. });
  99. }
  100. }
  101. fn control_crawl(
  102. start_url: Url,
  103. command_sender: mpsc::Sender<CrawlCommand>,
  104. result_receiver: mpsc::Receiver<CrawlResult>,
  105. ) -> Vec<Url> {
  106. let mut crawl_state = CrawlState::new(&start_url);
  107. let start_command = CrawlCommand { url: start_url, extract_links: true };
  108. command_sender.send(start_command).unwrap();
  109. let mut pending_urls = 1;
  110. let mut bad_urls = Vec::new();
  111. while pending_urls > 0 {
  112. let crawl_result = result_receiver.recv().unwrap();
  113. pending_urls -= 1;
  114. match crawl_result {
  115. Ok(link_urls) => {
  116. for url in link_urls {
  117. if crawl_state.mark_visited(&url) {
  118. let extract_links = crawl_state.should_extract_links(&url);
  119. let crawl_command = CrawlCommand { url, extract_links };
  120. command_sender.send(crawl_command).unwrap();
  121. pending_urls += 1;
  122. }
  123. }
  124. }
  125. Err((url, error)) => {
  126. bad_urls.push(url);
  127. println!("Got crawling error: {:#}", error);
  128. continue;
  129. }
  130. }
  131. }
  132. bad_urls
  133. }
  134. fn check_links(start_url: Url) -> Vec<Url> {
  135. let (result_sender, result_receiver) = mpsc::channel::<CrawlResult>();
  136. let (command_sender, command_receiver) = mpsc::channel::<CrawlCommand>();
  137. spawn_crawler_threads(command_receiver, result_sender, 16);
  138. control_crawl(start_url, command_sender, result_receiver)
  139. }
  140. fn main() {
  141. let start_url = reqwest::Url::parse("https://www.google.org").unwrap();
  142. let bad_urls = check_links(start_url);
  143. println!("Bad URLs: {:#?}", bad_urls);
  144. }