Root One

数学中心のブログです。

並列処理でzeta(2)を計算

今回は次の有名な等式(の右辺)をプログラム(C++)で並列処理をして近似計算してみようというのがテーマです。
 \displaystyle \frac{\pi^2}{6} = 1 + \frac{1}{2^2} + \frac{1}{3^2} + \frac{1}{4^2} +\frac{1}{5^2} +  \cdots
単純に足し上げる場合と並列処理をした場合で時間がどれくらい異なるかを実験してみました。
ちなみに正確な値は
1.64493406684822\cdots
となっています。
並列化に用いる手法はいろいろとあるようですが、ここでは std::thread を用いるものを考えます。

単純に足し上げるケース

最初に並列処理せずに第1項から順番に1億項まで足し上げるプログラムについて考えます。

#include <thread>
#include <iostream>
#include <vector>
using namespace std;
double zeta2(double n) {
	double sum = 0.0;
	for (long long i = 1; i < n; i++)
	{
		sum +=1.0 / (i*i);
	}
	return sum;
}
int main()
{
	auto start = clock();
	long long N = 100000000;
	cout.precision(14);
	cout << "zeta(2):" << zeta2(N) << endl;
	auto end = clock();
	cout << "time:" << (end - start) / (double)CLOCKS_PER_SEC << endl;
}

実行結果.

zeta(2):1.6449340578346
time:0.623

計測時間は環境によりますが、実行結果を見ると1億項の計算に0.6秒程度かかるようです。

並列処理 (2スレッド)

上の計算を並列化します。一番最初に考えられるのは、偶数項と奇数項に分ける方法です。つまり
 \displaystyle \sum_{n=1}^{N} \frac{1}{n^2} =\sum_{n \text{ is odd}} \frac{1}{n^2} +\sum_{n \text{ is even}}\frac{1}{n^2}
として右辺を二つのスレッドに分けて計算します。

#include <thread>
#include <iostream>
#include <vector>
using namespace std;
double zeta2_odd(int n) {
	double sum = 0.0;
	for (int i = 1; 2 * i - 1 < n; i++)
	{
		double d = 2 * i - 1;
		sum += 1.0 / (d * d);
	}
	return sum;
}
double zeta2_even(int n) {
	double sum = 0.0;
	for (int i = 1; 2 * i < n; i++)
	{
		double d = 2 * i;
		sum += 1.0 / (d * d);
	}
	return sum;
}
int main()
{
	auto start = clock();
	long long N = 100000000;
	double sum1 = 0.0;
	double sum2 = 0.0;
	thread th1([&sum1, N] {sum1 += zeta2_odd(N); });
	thread th2([&sum2, N] {sum2 += zeta2_even(N); });
	th1.join();
	th2.join();
	double sum = sum1 + sum2;
	cout.precision(14);
	cout << "zeta(2):" << sum << endl;
	auto end = clock();
	cout << "time:" << (end - start) / (double)CLOCKS_PER_SEC << endl;
}

実行結果.

zeta(2):1.6449340573291
time:0.374

計測時間は 0.623 から 0.374 に改善されました。
ただし、この結果は8コアのパソコンで計測しており、おそらく1コアだとあまり速度は改善されない?のかもしれません。

複数スレッドで計算

2スレッドで計算すると計算速度が向上したので、少し大胆に10スレッドに増やしてみます。計算の分割方法ですが、分母をスレッド数で割ったあまりで分割して計算します。
 \displaystyle \sum_{n=1} \frac{1}{(10n-9)^2} +\sum_{n=1}\frac{1}{(10n-8)^2}+\cdots + \sum_{n=1}\frac{1}{(10n)^2}

#include <thread>
#include <iostream>
#include <vector>
using namespace std;
double zeta2(double n, double m, double k) {
	double sum = 0.0;
	for (long long i = 1; m * i - k < n; i++)
	{
		double d = m * i - k;
		sum = sum + 1.0 / (d * d);
	}
	return sum;
}
int main()
{
	auto start = clock();
	long long N = 100000000;
	const int num_thread = 10;
	vector<double> sumV(num_thread);
	double sum = 0.0;
	thread th[num_thread];
	for (int i = 0; i < num_thread; i++)
	{
		th[i] = thread([N, &sumV, i, num_thread] {sumV[i] += zeta2(N, num_thread, i); });
	}
	for (int i = 0; i < num_thread; i++)
	{
		th[i].join();
	}
	for (int i = 0; i < sumV.size(); i++)
	{
		sum += sumV[i];
	}
	cout.precision(14);
	cout << "zeta(2):" << sum << endl;
	auto end = clock();
	cout << "time:" << (end - start) / (double)CLOCKS_PER_SEC << endl;
}

実行結果.

zeta(2):1.6449340569441
time:0.181

最初の計測時間と比較すると、0.623から0.181となり3倍以上速くなったことがわかります。
10スレッド用意したので10倍とはならなかったですが、3倍以上速くなれば充分嬉しいですね。
今回はCPUで並列処理をしましたが、GPUを用いるとどれくらい速くなるのかについても気になるところです。

追記(OpenMPを用いたケース)

上記の場合だとOpenMPを使ったほうがずっと簡潔に書けます。

#include <iostream>
#include <numeric> 
#include <vector>
#include <omp.h>
#include <time.h>
using namespace std;

double zeta2(double n, double m, double k) {
	double sum = 0.0;
	for (long long i = 1; m * i - k < n; i++)
	{
		double d = m * i - k;
		sum = sum + 1.0 / (d * d);
	}
	return sum;
}
int main()
{
	auto start = omp_get_wtime(); 
	int N = 100000000;
	double sum = 0;
	const int num_threads = 10; // スレッドの数.
	vector<double> v(num_threads);
	omp_set_num_threads(num_threads); 
	#pragma omp parallel for 
	for (int i = 0; i < num_threads; i++)
	{
		v[i] = zeta2(N, num_threads, i);
	}
	sum = accumulate(v.begin(), v.end(), 0.0); // 和の計算.
	cout.precision(15);
	cout << sum << endl;
	auto end = omp_get_wtime();
	cout << "time:" << end - start << endl; 
}

実行結果.

1.6449340569441
time:0.18281970010139

すべてはmain関数から始まる(と思っていた話)

最近数学から離れていて、C++の話が中心になっています. 今回もC++です.
早速本題に入っていきます。

main関数は「はじめに実行される」と教わったような...

おそらく、プログラムを勉強するときに最初に作成するのが
Hello World
を表示するプログラムだと思います。c++だと次のような感じになります。

#include <iostream>
using namespace std;
int main()
{
    cout << "Hello World\n";
}

実行結果.

Hello World

このようなプログラムとほぼ同時に、
main関数というのは最初に実行されるもの
と教わることが多いのではないかと思います。
しかし、先日、mainの外部にクラスのインスタンスを作れば、コンストラクタがその前に
実行されるということを知りました。
つまり、mainが最初に実行される関数とは限らないということになります。
それどころか、コンストラクタでexitしてしまえば、mainに到達する前にプログラムを終了させることも可能です。

#include <iostream>
using namespace std;
class MAIN {
public:
    MAIN() {
        cout << "My MAIN Hello!" << endl;
        exit(0);
    }
};
MAIN m;  // コンストラクタがよばれる

int main()
{
    cout << "本当のメイン関数\n";
}

実行結果.

My MAIN Hello!

実行結果を見るとわかるのですが、mainのcoutが実行されていません。つまり、main関数実行前にプログラムは終了していることになります。
だから何だという話がありますが、
一つのメリットとしては、プロトタイプ宣言しなくても関数をMAINの後ろに置くことができるという点でしょうか。
例えば、「MAIN」関数の前にgcdを定義して次のように書いたとします。

#include <iostream>
using namespace std;
class MAIN {
public:
    int gcd(int a, int b) {
        if (b == 0) {
            return abs(a);
        }
        return gcd(b, a % b);
    }
    MAIN() {
        int a = 15;
        int b = 20;
        cout << "gcd(" <<a <<"," << b << ")=" <<  gcd(a,b) << endl;
        exit(0);
    }
};
MAIN m; // コンストラクタがよばれる

int main()
{
    cout << "本当のメイン関数\n";
}

実行結果.

gcd(15,20)=5

これは問題なく実行されるわけですが、gcdの位置をMAINの後ろに持ってくることもできます。

#include <iostream>
using namespace std;
class MAIN {
public:
    MAIN() {
        int a = 15;
        int b = 20;
        cout << "gcd(" <<a <<"," << b << ")=" <<  gcd(a,b) << endl;
        exit(0);
    }
    int gcd(int a, int b) {
        if (b == 0) {
            return abs(a);
        }
        return gcd(b, a % b);
    }
};
MAIN m;

int main()
{
    cout << "本当のメイン関数\n";
}

実行結果.

gcd(15,20)=5

もちろん、このような書き方を推奨したいわけではありません。一応メリットもないわけではないというお話です。
他にもメリットはありそうですが、それは一旦置いておいて、
今回の記事を要約すると「main関数の前に実行される関数を作ることができる」ということになります。

C++で関数に配列を渡す時の難点

配列利用時の不満点

C言語でも同じですが、C++でも配列を外部の関数に渡すと、外部の関数内で配列のサイズの取得ができなくなってしまいます。
そこで、配列を渡す時に、次のように一緒にサイズも渡さなければなりません。
渡された配列の要素に1を加えるだけの関数 addOne.

#include <iostream>
using namespace std;

void addOne(int* a, int size) {
	for (int i = 0; i < size; i++)
	{
		a[i]++;
	}
}
int main()
{
	int a[] = { 1,2,3 };
	int len = end(a) - begin(a);
	addOne(a, len);
	for (int i = 0; i < len; i++)
	{
		cout << a[i] << endl;
	}
}

実行結果.

2
3
4

さらにC++の場合、配列のサイズ取得は、できたとしても上のように少し面倒です。sizeofを利用する方法もありますが、やはりそれも面倒です。
以上、2点が配列に関する利用者の不満点なのではないかと思います。

代替手段 (vectorの利用)

上記の不満は、vector を使えば解決します。

#include <iostream>
#include <vector>
using namespace std;

void addOne(vector<int>& v) {
	for (int i = 0; i < v.size(); i++)
	{
		v[i]++;
	}
}
int main()
{
	vector<int> v{ 1,2,3 };
	addOne(v);
	for (int i = 0; i < v.size(); i++)
	{
		cout << v[i] << endl;
	}
}

実行結果.

2
3
4

vectorは要素数の取得も簡単で、しかも要素数変更までできるという優れものです。
これを利用しない手はないのですが、しかし、実行速度は状況によっては落ちてしまうかもしれません。
ということで、もう一つの代替手段を考えます。

代替手段 (array の利用)

array はおそらくvectorよりも高速に処理されるものと思われます。

#include <iostream>
#include <array>
using namespace std;

template <int N>
void addOne(array<int,N>& ar) {
	for (int i = 0; i < ar.size(); i++)
	{
		ar[i]++;
	}
}
int main()
{
	array<int,3> ar{ 1,2,3 };
	addOne(ar);
	for (int i = 0; i < ar.size(); i++)
	{
		cout << ar[i] << endl;
	}
}

実行結果.

2
3
4

素数の取得も簡単で、要素数を関数に渡す必要もありません。
ただし、非型テンプレートパラメータを使っています。

処理速度の比較

通常の配列、array, vectorで速度比較している方がすでにいらっしゃるようなので、
ここでは詳しく述べませんが、個人的に少し調べたところ、
debugモードで、
通常の配列 < array < vector
の順番で、結構明確な差が出ました。
一方releaseモードでは、
通常の配列とarray の速度差はわからないほどになりました。
実験に使ったコードでは、
0を初期値として持つ20万個の要素を定義して、関数内で+1するという
処理にかかる時間を計測しました。

いろいろな方法で和を考える(C++)

今回は、プログラミング言語C++で、いろいろな方法で配列の和について考えていきたいと思います。

一番基本的な方法

まず、for文を用いて配列の和を計算します。

#include <iostream>
using namespace std;
int main()
{
	int a[] = { 1,2,3 };
	int sum = 0;
	for (int i = 0; i < 3; i++)
	{
		sum += a[i];
	}
	cout << sum << endl;
	return 0;
}
実行結果. 6 

上の例は単純ではありますが、次のように

end(a)-begin(a)

を用いて、配列のサイズを取得するほうが保守性の面で優れています。

#include <iostream>
using namespace std;
int main()
{
	int a[] = { 1,2,3 };
	int sum = 0;
	for (int i = 0; i < end(a)-begin(a); i++)
	{
		sum += a[i];
	}
	cout << sum << endl;
	return 0;
}

範囲for文を用いる方法

範囲for文を用いると、要素数の取得をする必要もありません。

#include <iostream>
using namespace std;
int main()
{
	int a[] = { 1,2,3 };
	int sum = 0;
	for (auto& e : a)
	{
		sum += e;
	}
	cout << sum << endl;
	return 0;
}
実行結果. 6 

範囲for文のautoの後の&記号はなくても良いですが、参照機能を使ったほうが、コピーを作らないので効率が良いはずです。

for_eachを用いる方法

ここからラムダ式を使っていきますので、少し難度が上がります。

#include <iostream>
#include <algorithm>
using namespace std;

int main()
{
	int a[] = { 1,2,3 };
	int sum = 0;
	for_each(begin(a), end(a), [&sum](int x)->void {sum += x; });
	cout << sum << endl;
	return 0;
}
実行結果. 6 

for_eachの第3引数のラムダ式の引数は、配列の要素を動きます。

accumulateを用いる方法

accumulateはC#でいえばAggregateに対応する関数です。
C#のAggregateについては、
https://shabonlearning.com/cSharp/linq4.html
で述べています。

#include <iostream>
#include <numeric>
using namespace std;

int main()
{
	int a[] = { 1,2,3 };
	int sum = accumulate(begin(a), end(a), 0, [](int x, int y)-> int {return x + y; });
	cout << sum << endl;
	return 0;
}
実行結果. 6 

上の例では、一般的な書き方をしましたが、和の場合は第4引数のラムダ式を省略することもできます。

少し脱線して、連分数の計算

accumulateの第4引数のラムダ式に分数関数を適用すると連分数の計算もできます。

#include <iostream>
#include <numeric>
using namespace std;

int main()
{
	double a[] = { 1,1,1 };
	double result = accumulate(begin(a), end(a), 1.0, [](double x, double y)-> double {return y + 1 / x; });
	cout << result << endl;
	return 0;
}
実行結果. 1.66667

このプログラムでは
 \displaystyle 1+\cfrac{1}{1+\cfrac{1}{1+\cfrac{1}{1}}}=\frac{5}{3}
という連分数を計算することになります。

数式が上手く表示されない現象

最近気づいたのですが、

大部分の数式が上手く表示されなくなってしまいました。

tex:をつければ上手くいくのですが、

x^2=1

と表示したくて、

\[x^2=1\]

と書くと、2020/3/27 現在、上手く表示されません。

一方で、

begin{align} end{align}

を使って表現した数式は、上手く表示されているもようです。

修正状況

9割くらい修正しました。

 

C++のソート

C++のソートについてまとめてみました。

ソートと重複要素削除 (外部サイト)

ソートの難しいところ

1次元のソートは簡単ですが、難しいのは多次元のソートです。

例えば、

1 7
2 3
1 2
2 7
2 2
1 3

 というデータを月順にソートすると

1 7
1 2
1 3
2 3
2 7
2 2

というような結果が得られますが、これを日にち順にソートしなおすと

1 2
2 2
1 3
2 3
1 7
2 7

 というように、月の順序まで変わってしまう可能性があります。

日付順のソートをしたい場合は、このようなことが起きてしまうとまずい

わけですが、Excelであれば、ソートの優先度を「月、日」の順に設定することが

できて、上のようなことを回避することができます。

C++では、このような優先度別ソートは、sort関数の第3引数を調整すると可能です。

sortの第3引数はどのように順序をつけるかを定める関数を指定するところになるのですが、この関数の設定法はいくつか考えられます。

今回のケースであれば、重みを付けてソートするのがおすすめです。

重みを付けるとは

「日」は最大で31にしかならないことを考慮して、

M月D日であれば

M*31+D

という値を考えるということです。(Mの重みを31倍にした.)

こうして、M*31+Dを基準にしてソートしてみると、

自動的に月が日よりも優先されてソートされることになります。

実際のコード例

#include <iostream>
#include <vector>
#include <algorithm>
using namespace std;
class MyDate {
public:
	int Month;
	int Day;
	MyDate(int month, int day) {
		Month = month;
		Day = day;
	};
	int getValue() const {
		return Month * 31 + Day;
	}
};

ostream& operator <<(ostream& ost, const MyDate& p) {
	ost <<  p.Month << "月" << p.Day << "日";
	return ost;
}

int main() {
	vector<MyDate> dates = { MyDate(1,7),MyDate(2,3),MyDate(1,2),
		                     MyDate(2,7),MyDate(2,2),MyDate(1,3) };
	cout << "ソート前のデータ" << endl;
	for (auto d : dates) {
		cout << d << endl;
	}
	sort(dates.begin(), dates.end(), [](const MyDate& d1, const MyDate& d2)-> bool {return d1.getValue() < d2.getValue(); });
	cout << "ソート後のデータ" << endl;

	for (auto d : dates) {
		cout << d << endl;
	}

}

実行結果.

 ソート前のデータ
1月7日
2月3日
1月2日
2月7日
2月2日
1月3日
ソート後のデータ
1月2日
1月3日
1月7日
2月2日
2月3日
2月7日

プログラムのポイント

ostream& operator <<(ostream& ost, const MyDate& p) {
	ost <<  p.Month << "月" << p.Day << "日";
	return ost;
}

は自作クラスのMyDateの変数をcoutで表示できるようにするもので、ここはソートには直接関係しない部分です。

今重要なのは、MyDateクラスのgetValue関数で、ここが重みを付けたデータの和を考えるものとなっています。そしてこれをもとに

sort(dates.begin(), dates.end(), [](const MyDate& d1, const MyDate& d2)-> bool {return d1.getValue() < d2.getValue(); });

でソートを昇順に行っています。

上のケースは、M月D日というものでしたが、

Y年M月D日

といったケースにも応用できます。その場合はgetValue関数を

Y*12*31 + M*31+D

と設定してソートすればOKです。(12は月の最大値.)

ただし、値が大きくなってくると、オーバーフローに気をつける必要がありそうです。

C++で驚いた話

最近C++をさわりはじめたのですが、

すこし驚いたことがあったので、メモを残しておきたいと思います。

少し驚いたコード

vector<int> v = { 1,2,3 };
for (int i = 0; i < v.size() - 4; i++) {
 cout << v[i] << endl;
 if (i == 2) break;
}

実行結果.

1
2
3

何が意外だったのか

v.size()は3なので、v.size()-4 は -1 になるように見えます。

したがって、for文の中は実行されず、何も表示されないはずです。

しかし、実行結果を見ると、中身が実行されているので

何かおかしい

となったわけです。

理由は簡単で、

v.size() は size_t 型(unsigned long long ) になってしまい、

v.size()-4 もsize_t 型になって、 size_t 型の世界の -1 と解釈されたせいです。

size_t 型の世界で -1 は、負の数を扱うことができないので、

エラーになっても良さそうですが、そうはならず、

size_t 型が扱える一番大きな数として認識されてしまうようです。

(実験した環境では2^{64}-1になりました。)

つまり、結果的に上のコードは

vector<int> v = { 1,2,3 };
for (int i = 0; i < 18446744073709551615; i++) {
 cout << v[i] << endl;
 if (i == 2) break;
}

と解釈されて実行されたということになります。

偶然ですが、この話は、まさに10adicの世界で

\[-1 = \cdots 99999 \]

が成り立つというお話ですね。 

日常生活でp-adic的な現象に出会うことはあまりないかもしれませんが、

プログラムの世界だとこのようにp-adicの世界が見え隠れすることがあるようです。

追記

上記のようなことがあるので、size_t 型から引き算するときは充分注意しなければならないようです。

そして問題は「どのように回避すべきか」ということになりますが、次のように事前に int にキャストすれば良いようです。

vector v = { 1,2,3 };
for (int i = 0; i < static_cast<int>(v.size()) - 4; i++) {
	cout << v[i] << endl;
	if (i == 2) break;
}

実行結果. (何も表示されない.)