Rustでstrategy patternを実装してマルチスレッドにする。
タイトルほぼそのまま。最近はPythonもSVGも捨ててRustにハマってます。
動機
研究の実験コードをRustで書こうと血迷った結果、threadpoolを書く必要が出てきて、そのthreadpoolのqueueをFIFOやpriority queueに変える必要がでてきた。
具体的にはtraitをjavaでいうinterfaceのようにして、振る舞いを変更したい。
古来のOOPにおいては、strategy patternという名前がついているので、それをrustでやりたい。
調べる
シングルスレッドは、偉大なる先人が既にやっていた。
結論
シングルスレッドにおいては、trait Behaviour
に対して、Box<Behaviour+'a>
をつかう。
雰囲気はこんなかんじ。
trait Behaviour { fn run(&self); } struct YoujoBehaviour{} impl Behaviour for YoujoBehaviour{ fn run(&self){println!("はわわ");} } struct Caller<'a> { behaviour: Box<Behaviour+'a>, } impl<'a> Caller<'a> { fn run(&self) { self.behaviour.run(); } } fn main(){ let c = Caller{behaviour: Box::new(YoujoBehaviour{})}; c.run() }
マルチスレッド化
treadpoolのqueueのstrategyを変えるようにしたいので、ArcやMutexでいい感じにする。ただし、multithreadにする関係で、いくつかのtraitを持つ必要がある、SizedとSendである。このあたり良くわかっていない。rustのdocumentのconcurrencyをよんだけれどあたまがわるいのでいまいちわからない。
https://doc.rust-lang.org/book/concurrency.html
ちなみに、この実装ではqueueを変更したらworker(thread)も再生成しないといけない。worker立ち上げっぱなしでqueueを変更するのは大変そう。
use std::sync::{Arc, Mutex}; use std::vec::Vec; trait Queue<T> { fn new() -> Self where Self: Sized; // required fn enqueue(&mut self, item: T); fn dequeue(&mut self) -> Option<T>; } #[derive(Debug)] struct FIFOQueue<T> { data: Vec<T>, } impl<T> Queue<T> for FIFOQueue<T> { fn new() -> Self { FIFOQueue { data: Vec::new() } } fn enqueue(&mut self, item: T) { self.data.insert(0, item); } fn dequeue(&mut self) -> Option<T> { self.data.pop() } } #[derive(Debug)] struct FILOQueue<T> { data: Vec<T>, } impl<T> Queue<T> for FILOQueue<T> { fn new() -> Self { FILOQueue { data: Vec::new() } } fn enqueue(&mut self, item: T) { self.data.push(item); } fn dequeue(&mut self) -> Option<T> { self.data.pop() } } type SharableQueue<'a, T> = Queue<T> + 'a + Sized + Send; // important!! SizedとSendが別threadから参照するために必要の様子 struct Searcher { queue: Arc<Mutex<SharableQueue<'static, usize>>>, // multi-threadにする関係上、寿命を'staticにしないと行けない様子。 // Searcherが死んでもthreadが生きてる場合があるからと思われる(よくわかっていない) // 現状の実装ではSeracherが死んだらthreadも死ぬはずなので、がんばればlifetimeもっと短くできそう // でも、こういうとこしっかりしてるのみてると、rustが好きになる } impl Searcher { fn run(&self, thread_num: usize) { (0..thread_num) .map(|i| { let queue = self.queue.clone(); std::thread::spawn(move || { let item = queue.lock().unwrap().dequeue(); match item { Some(val) => println!("[thread{}]dequeue {}", i, val), None => {} } }) }) .collect::<Vec<_>>() .into_iter() .map(|w| w.join()) .collect::<Vec<_>>(); } fn set_queue(&mut self, queue: Arc<Mutex<SharableQueue<'static, usize>>>) { self.queue = queue; } } fn main() { let fifo = Arc::new(Mutex::new(FIFOQueue::<usize>::new())); let filo = Arc::new(Mutex::new(FILOQueue::<usize>::new())); for val in 1..16 { fifo.lock().unwrap().enqueue(val); filo.lock().unwrap().enqueue(val); } println!("fifo"); println!("{:?}", *fifo.lock().unwrap()); let mut searcher = Searcher { queue: fifo.clone() }; searcher.run(8); println!(""); println!("filo"); println!("{:?}", *filo.lock().unwrap()); searcher.set_queue(filo.clone()); searcher.run(8); }
ちゃんとうごいた。
ただし、Ignoring drop flag in destructor for 〜〜 because the struct is unsized. See issue #16758
とでてくる。
英語力が低いのでgithubのissueを読んでも何が行けないのかよくわからない。たぶん、コンパイラの実装がなんらかなのかな。負荷かけてためしたところmemory leakはしなかったのでまぁいいかなと。
% rustc strategy.rs warning: Ignoring drop flag in destructor for std::sync::Mutex<Queue<usize> + Send + Sized + 'static> because the struct is unsized. See issue #16758 % ./strategy fifo FIFOQueue { data: [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1] } [thread2]dequeue 1 [thread0]dequeue 2 [thread3]dequeue 3 [thread1]dequeue 4 [thread4]dequeue 5 [thread5]dequeue 6 [thread6]dequeue 7 [thread7]dequeue 8 filo FILOQueue { data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] } [thread0]dequeue 15 [thread1]dequeue 14 [thread2]dequeue 13 [thread3]dequeue 12 [thread4]dequeue 11 [thread5]dequeue 10 [thread6]dequeue 9 [thread7]dequeue 8
ソースコード
以上のソースコードはgistにあげておきました。WTFPLライセンスということにしておきます。
いま書いてるもの
Pythonから叩ける、分枝限定法をマルチスレッドで実現するRustで書かれたもの。
いいかんじにできたらblogに書きます。