...ing logging 4.0

はてなブログに移行しました。D言語の話とかいろいろ。

core.sync.*を使ってみる

色々教えてもらってすごくありがたいけれど,全然理解できなくてしょんぼり.勉強が足りなさすぎる・・・orz
とりあえず動いているように見える・・・が?

import std.stdio, core.thread, core.sync.condition, core.sync.mutex;
class Actor
{
	private const(int)[][] arr_;
	private bool disposed_;
	private Thread consumer_;
	private Condition condition_;
	private Mutex mutex_;
	private int count;
	private int sum;
	
	this()
	{
		condition_ = new Condition(mutex_ = new Mutex);
		consumer_ = new Thread({
			synchronized (mutex_) // ロックを取得
			{
				for(;;)
				{
					while (arr_.length == 0)
					{
						condition_.wait(); // notifyされるまでロックを解放(=スレッド停止)
						if (disposed_) break;
					}
					writeln(count++, ": ", arr_[0]);
						// MEMO: 文字列の表示がおかしいけどcountは正しいのでwriteln内部に何かある?
					foreach (e; arr_[0])
					{
						sum += e;
					}
					writeln(sum); // MEMO: 合計も正しい
					arr_ = arr_[1..$];
				}
			}
		});
		consumer_.start();
	}
	void send(const(int)[] arr)
	{
		synchronized (mutex_)
		{
			arr_ ~= arr;
			condition_.notify(); // waitしているスレッドを再度ロックさせてそのスレッドを再開
		}
	}
	void dispose()
	{
		synchronized (mutex_)
		{
			disposed_ = true;
			condition_.notify(); // waitしているスレッドを再度ロックさせてそのスレッドを再開
		}
	}
	@property bool disposed() const
	{
		synchronized (mutex_)
		{
			return disposed_;
		}
	}
}
void main()
{
	auto a = new Actor;
	auto tg = new ThreadGroup;
	// producer1
	tg.create = {
		for (int i; i < 5; i++)
		{
			a.send([1, 2, 3, i]);
		}
	};
	// producer2
	tg.create = {
		for (int i; i < 5; i++)
		{
			a.send([4, 5, 6, i]);
		}
	};
	tg.create = {
		readln();
		a.dispose();
	};
	tg.joinAll();
}
0: [1, 2, 3, 0]
6
1: [1, 2, 3, 1]
1: [1, 2, 3, 1]
13
2: [1, 2, 3, 2]
21
3: [1, 2, 3, 3]
30
4: [1, 2, 3, 4]
40
5: [4, 5, 6, 0]
55
6: [4, 5, 6, 1]
71
7: [4, 5, 6, 2]
88
8: [4, 5, 6, 3]
106
9: [4, 5, 6, 4]
125

出力結果が少しおかしい.