class: inverse, middle, center # Concurrency
&
Parallelism # in
Rust
--- ## Parallelism vs. Concurrency
See
??? 1. Параллелизм вычислений * конкуррентность: несколько вычислений идут независимо друг для друга, могут выполнятся на одном и том же физическом процессоре, конкуррируют за среду выполнения. * пареллелизм: несколько вычислений идут одновременно в отдельных потоках, требуют многопроцессорной системы, вычисления не конкуррируют за среду выполнения. 2. Параллелизм данных * конкуррентность: несколько вычислений выполняются в общей среде и могут управлять общим изменяемым состоянием, конкурируют на доступ к ресурсам. * параллелизм: несколько вычислений выполняются независимо друг от друга на разных наборах данных, никак не пересекаются друг с другом, конкурренция отсутствует. --- ## Data races ```rust use std::thread; fn main() { let mut vec = Vec::new(); for n in 0..10 { thread::spawn(move || { vec.push(n); }); } println!("{:?}", vec); } ``` ??? В обычной ситуации (в случае с Си) здесь была бы гонка данных из-за одновременной модифицации массива из разных потоков. В Rust работает семантинка перемещения, которая гарантирует, что есть только один владелец с правом изменения. --- ## Data races ```rust use std::thread; fn main() { let mut vec = Vec::new(); for n in 0..10 { // ./race.rs:7:23: 11:10 note: `vec` moved into // closure environment here because it has // type `collections::vec::Vec
`, // which is non-copyable thread::spawn(move || { vec.push(n); }); } println!("{:?}", vec); } ``` --- ## Building blocks: Mutex & Arc ```rust use std::thread; use std::sync::{Arc, Mutex}; fn main() { let vec = Arc::new(Mutex::new(Vec::new())); let jobs = (0..10).map(|n| { let vec = vec.clone(); thread::spawn(move || { vec.lock().unwrap().push(n); }) }).collect::
>(); jobs.into_iter().fold((), |_, job| { job.join().unwrap(); }); println!("{:?}", vec); } ``` --- ## Building blocks: Atomics ```rust use std::thread; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; use std::ptr; #[derive(Debug)] struct Cons
{ data: T, next: *mut Cons
} #[derive(Debug)] struct List
{ head: AtomicPtr
> } ``` --- ## Building blocks: Atomics ```rust impl
List
{ fn new() -> List
{ List { head: AtomicPtr::new(ptr::null_mut()) } } fn push(&self, item: T) { let mut boxed = Box::into_raw(Box::new(Cons { data: item, next: ptr::null_mut() })); loop { let old_head = self.head.load(Ordering::Relaxed); unsafe { (*boxed).next = old_head; } if self.head.compare_and_swap( old_head, boxed,Ordering::SeqCst) == old_head { break; } } } } ``` --- ## Building blocks: Atomics ```rust fn main() { let list = Arc::new(List::new()); (0..10).map(|n| { let list = list.clone(); thread::spawn(move || { list.push(n); }) }).fold((), |_, job| { job.join().unwrap(); }); println!("{:?}", list); } ``` --- ## Building blocks: Channels ```rust use std::thread; use std::sync::mpsc::channel; fn main() { let (tx, rx) = channel(); const N_JOBS: usize = 10; let jobs = (0..N_JOBS).map(|n| { let tx = tx.clone(); thread::spawn(move || { for m in 0..n { tx.send(m).unwrap(); } }) }).collect::
>(); let sum = rx.iter().take(45).fold(0, |a, n| a + n); jobs.into_iter().fold((), |_, job| { job.join().unwrap(); }); println!("sum = {}", sum); } ``` --- ## Send & Sync * Send: safe to move value between threads * Thread-safe data transfer. * Sync: safe to read value simultanously * Thread-safe aliasing, i.e. sharing reference between threads. * Copy: memcpy()-able type * Opt-in copy semantic, instead of default move semantic. * Automatically thread-safe, as it's always copied in a bit-perfect manner when transfered. --- ## Libraries * Thread pool * Crossbeam * Rayon * Coroutine * Mio & friends * ...and others at crates.io --- ## Thread pool ```rust extern crate threadpool; use std::sync::mpsc::channel; use threadpool::ThreadPool; fn main() { let (tx, rx) = channel(); const N_WORKERS: usize = 4; const N_JOBS: usize = 10; let pool = ThreadPool::new(N_WORKERS); for job_n in 0..N_JOBS { let tx = tx.clone(); pool.execute(move || { for m in 0..job_n { tx.send(m).unwrap(); } }) } let sum = rx.iter().take(45).fold(0, |a, n| a + n); println!("sum = {}", sum); } ``` --- ## Crossbeam ```rust use std::thread; fn main() { let vec = vec![1, 2, 3, 4, 5]; let mut jobs = Vec::new(); // error: `vec` does not live long enough for x in &vec { jobs.push(thread::spawn(move || { println!("x = {}", x); })); } jobs.into_iter().fold((), |_, job| { job.join().unwrap(); }); } ``` --- ## Crossbeam ```rust extern crate crossbeam; fn main() { let vec = vec![1, 2, 3, 4, 5]; crossbeam::scope(|scope| { for x in &vec { scope.spawn(move || { println!("x = {}", x); }); } }); } ``` --- ## Crossbeam What about mutability? ```rust extern crate crossbeam; fn main() { let mut vec = vec![1, 2, 3, 4, 5]; crossbeam::scope(|mut scope| { for x in &mut vec { scope.spawn(move || { *x += 1; println!("x = {}", x); }); } }); } ``` --- ## Crossbeam And what about mutable container? ```rust extern crate crossbeam; fn main() { let mut vec = vec![]; crossbeam::scope(|scope| { for n in 0..10 { let p = &mut vec; scope.spawn(move || { p.push(n); }); } }); } ``` --- ## Rayon ```rust #![feature(iter_arith)] fn main() { let vec = vec![0..10]; let sum = vec.iter().map(|n| n * 2).sum(); println!("sum = {}", sum); } ``` --- ## Rayon Let's make it parallel! ```rust extern crate rayon; use rayon::prelude::*; fn main() { let vec = vec![0..10]; let sum = vec.par_iter().map(|n| n * 2).sum(); println!("sum = {}", sum); } ``` --- ## Coroutine ```rust extern crate coroutine; use coroutine::asymmetric::Coroutine as coro; fn main() { const N_JOBS: usize = 10; let co = coro::spawn(|me| { for n in 0..N_JOBS { for m in 0..n { me.yield_with(m); } } }); let sum = co.fold(0, |a, n| a + n); println!("sum = {}", sum); } ``` --- ## Mio ```rust extern crate mio; use mio::tcp::*; const SERVER: mio::Token = mio::Token(0); struct Pong { server: TcpListener, } impl mio::Handler for Pong { type Timeout = (); type Message = (); fn ready(&mut self, event_loop: &mut mio::EventLoop
, token: mio::Token, events: mio::EventSet) { match token { SERVER => { /*...*/ } _ => panic!("Received unknown token"), } } } ``` --- ## Mio ```rust fn main() { let address = "0.0.0.0:6567".parse().unwrap(); let server = TcpListener::bind(&address).unwrap(); let mut event_loop = mio::EventLoop::new().unwrap(); event_loop.register(&server, SERVER); println!("running pingpong server"); event_loop.run(&mut Pong { server: server }); } ``` --- ## Mioco ```rust extern crate mioco; fn main() { mioco::start(|| -> io::Result<()> { let addr = listend_addr(); let listener = try!(TcpListener::bind(&addr)); println!("Starting tcp echo server on {:?}", try!(listener.local_addr())); loop { let mut conn = try!(listener.accept()); mioco::spawn(move || -> io::Result<()> { let mut buf = [0u8; 1024 * 16]; loop { let size = try!(conn.read(&mut buf)); if size == 0 {/* eof */ break; } let _ = try!(conn.write_all(&mut buf[0..size])); } Ok(()) }); } }).unwrap().unwrap(); } ``` --- class: center, middle # That's all, folks! Thank you for your patience! --- class: center, middle # Questions?