cocuh's note

type(あうとぷっと) -> 駄文

Rustでstrategy patternを実装してマルチスレッドにする。

タイトルほぼそのまま。最近はPythonSVGも捨ててRustにハマってます。

動機

研究の実験コードをRustで書こうと血迷った結果、threadpoolを書く必要が出てきて、そのthreadpoolのqueueをFIFOやpriority queueに変える必要がでてきた。
具体的にはtraitをjavaでいうinterfaceのようにして、振る舞いを変更したい。
古来のOOPにおいては、strategy patternという名前がついているので、それをrustでやりたい。

調べる

シングルスレッドは、偉大なる先人が既にやっていた。

github.com

結論

シングルスレッドにおいては、trait Behaviourに対して、Box<Behaviour+'a> をつかう。

雰囲気はこんなかんじ。

trait Behaviour {
  fn run(&self);
}
struct YoujoBehaviour{}
impl Behaviour for YoujoBehaviour{
  fn run(&self){println!("はわわ");}
}

struct Caller<'a> {
  behaviour: Box<Behaviour+'a>,
}
impl<'a> Caller<'a> {
  fn run(&self) {
    self.behaviour.run();
  }
}

fn main(){
  let c = Caller{behaviour: Box::new(YoujoBehaviour{})};
  c.run()
}

マルチスレッド化

treadpoolのqueueのstrategyを変えるようにしたいので、ArcやMutexでいい感じにする。ただし、multithreadにする関係で、いくつかのtraitを持つ必要がある、SizedとSendである。このあたり良くわかっていない。rustのdocumentのconcurrencyをよんだけれどあたまがわるいのでいまいちわからない。

https://doc.rust-lang.org/book/concurrency.html

ちなみに、この実装ではqueueを変更したらworker(thread)も再生成しないといけない。worker立ち上げっぱなしでqueueを変更するのは大変そう。

use std::sync::{Arc, Mutex};
use std::vec::Vec;

trait Queue<T> {
    fn new() -> Self where Self: Sized; // required
    fn enqueue(&mut self, item: T);
    fn dequeue(&mut self) -> Option<T>;
}

#[derive(Debug)]
struct FIFOQueue<T> {
    data: Vec<T>,
}
impl<T> Queue<T> for FIFOQueue<T> {
    fn new() -> Self {
        FIFOQueue { data: Vec::new() }
    }
    fn enqueue(&mut self, item: T) {
        self.data.insert(0, item);
    }
    fn dequeue(&mut self) -> Option<T> {
        self.data.pop()
    }
}

#[derive(Debug)]
struct FILOQueue<T> {
    data: Vec<T>,
}
impl<T> Queue<T> for FILOQueue<T> {
    fn new() -> Self {
        FILOQueue { data: Vec::new() }
    }
    fn enqueue(&mut self, item: T) {
        self.data.push(item);
    }
    fn dequeue(&mut self) -> Option<T> {
        self.data.pop()
    }
}

type SharableQueue<'a, T> = Queue<T> + 'a + Sized + Send;
// important!! SizedとSendが別threadから参照するために必要の様子

struct Searcher {
    queue: Arc<Mutex<SharableQueue<'static, usize>>>,
    // multi-threadにする関係上、寿命を'staticにしないと行けない様子。
    // Searcherが死んでもthreadが生きてる場合があるからと思われる(よくわかっていない)
    // 現状の実装ではSeracherが死んだらthreadも死ぬはずなので、がんばればlifetimeもっと短くできそう
    // でも、こういうとこしっかりしてるのみてると、rustが好きになる
}
impl Searcher {
    fn run(&self, thread_num: usize) {
        (0..thread_num)
            .map(|i| {
                let queue = self.queue.clone();
                std::thread::spawn(move || {
                    let item = queue.lock().unwrap().dequeue();
                    match item {
                        Some(val) => println!("[thread{}]dequeue {}", i, val),
                        None => {}
                    }
                })
            })
            .collect::<Vec<_>>()
            .into_iter()
            .map(|w| w.join())
            .collect::<Vec<_>>();
    }
    fn set_queue(&mut self, queue: Arc<Mutex<SharableQueue<'static, usize>>>) {
        self.queue = queue;
    }
}

fn main() {
    let fifo = Arc::new(Mutex::new(FIFOQueue::<usize>::new()));
    let filo = Arc::new(Mutex::new(FILOQueue::<usize>::new()));
    for val in 1..16 {
        fifo.lock().unwrap().enqueue(val);
        filo.lock().unwrap().enqueue(val);
    }

    println!("fifo");
    println!("{:?}", *fifo.lock().unwrap());
    let mut searcher = Searcher { queue: fifo.clone() };
    searcher.run(8);

    println!("");
    println!("filo");
    println!("{:?}", *filo.lock().unwrap());
    searcher.set_queue(filo.clone());
    searcher.run(8);
}

ちゃんとうごいた。
ただし、Ignoring drop flag in destructor for 〜〜 because the struct is unsized. See issue #16758 とでてくる。
英語力が低いのでgithubのissueを読んでも何が行けないのかよくわからない。たぶん、コンパイラの実装がなんらかなのかな。負荷かけてためしたところmemory leakはしなかったのでまぁいいかなと。

% rustc strategy.rs
warning: Ignoring drop flag in destructor for std::sync::Mutex<Queue<usize> + Send + Sized + 'static> because the struct is unsized. See issue #16758
% ./strategy
fifo
FIFOQueue { data: [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1] }
[thread2]dequeue 1
[thread0]dequeue 2
[thread3]dequeue 3
[thread1]dequeue 4
[thread4]dequeue 5
[thread5]dequeue 6
[thread6]dequeue 7
[thread7]dequeue 8

filo
FILOQueue { data: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] }
[thread0]dequeue 15
[thread1]dequeue 14
[thread2]dequeue 13
[thread3]dequeue 12
[thread4]dequeue 11
[thread5]dequeue 10
[thread6]dequeue 9
[thread7]dequeue 8

ソースコード

以上のソースコードはgistにあげておきました。WTFPLライセンスということにしておきます。

youjo.rs · GitHub

strategy.rs · GitHub

いま書いてるもの

Pythonから叩ける、分枝限定法をマルチスレッドで実現するRustで書かれたもの。

いいかんじにできたらblogに書きます。

github.com