CPUのSIMDを使うクラスIntrinsicsとNumerics、どちらが速いのかを、要素数1千万のbyte型配列から最小値と最大値を求める時間で比較してみた
今回の方法では結果は差が出なかった。シングルスレッド、マルチスレッドともに同じ速さ。Intrinsicsはポインタを使ったりするから速いかなと期待していたので、ちょっと残念
大小判定をSIMDを使わないで普通にifと不等号や、MathクラスのMinMaxで行ったときと比較
SIMD使うとシングルスレッドでも20倍速く処理できる。Parallel.ForEachでマルチスレッド化すると26倍速。使用CPUは4コア8スレッドだからマルチスレッド化で8倍の160倍速、とは行かないまでも物理コア数の4倍で80倍速ともいかず26倍速止まりなのは、マルチスレッド化する処理と、それぞれのスレッドから返ってきた結果を集計する処理が、SIMDを使った大小判定処理に比べて重いのかもねえ、逆に言うとそれだけSIMDでの処理が速い
計測用アプリ
ダウンロード先はギットハブ
ファイル名:20200225_Intrinsics_MinMax.zip
github.com
製作と計測環境
計測1回目
2回め
3回目
この3回計測の平均で比較した
分散を見ると誤差は少なそう?MultiLoopの2つ目は誤差が大きいけど、これは処理があっという間に終わるからこうなのかも
テストに使ったbyte型配列
MyArrayって名前でフィールドに配置しておいて、アプリ起動時に値を入れる
配列作成して値を入れる
入れる値は250と100に2種類
298~299行目、配列からSpan<T>を作って、Fillメソッドを使うとすべての要素が指定した値になる、Spanってイマイチ使いどころがわかんないけどこれは便利ねえ
302行目、配列の最後の要素だけ100にする、これで最小値は100、最大値は250って答えが返ってくれば、正しい処理がされているってことになる
要素数10,000,001っていう半端
SIMDでの処理は決まった個数づつの処理で、Vector256byte型だと32個づつ処理になる。もし要素数が1千万だと10000000/32=312500で割り切れてしまうけど、毎回32で割り切れるような要素数とは限らないのを考えて1個足した。要素数が32で割り切れなかった場合は、余りになった要素をVectorじゃなくて普通に処理することになる、これを確かめるために最後の値だけ100
追加using
Intrinsics.X86.Avx2クラスのMinとMaxメソッド
これで大小判定
シングルスレッドで
65行目、lastIndexは10000001-(10000001%32)=10000000、この要素数までSIMDで処理は74行目まで
75~78行目はVector256から配列のポインタに戻す処理?あんまりわかってない
81行目のForで結果の配列ポインタからさらに大小判定
86行目のForは割り切れなかった余りの要素、10000001番めの大小判定処理
↑のマルチスレッド化
Parallel.ForEachでマルチスレッド化
99行目、配列の区分けはPartitionerクラスを使ってスレッド数はCPUのスレッド数に設定
97行目と131,132行目、それぞれのスレッドから返ってきた結果を集計する処理は、排他処理する必要があるのでSystem.Collections.Concurrent.ConcurrentBagクラスを使用
Partitionerで配列の区分け
CPUスレッド数と同じ8分割にしようと思って、1区画のサイズ指定を
配列の長さ/CPUスレッド数
にしていたんだけど、これだと割り切れなかったときは+1の9区画になるのね、当然だけど中を覗いてみるまで気づかなかったwでも最後の区画の要素数は1個だからいいか…
Numericsのシングルスレッド
これは以前のと全く同じはず、Intrinsicsと違ってポインタを使わないからラク
↑のマルチスレッド化
Intrinsicsのマルチスレッド化と全く同じ方法
普通のシングルスレッド
ifと不等号で判定している。MathクラスのMinやMaxでも全く同じ速度だったのは以前の記事で判明、書くのは楽だけど遅い
↑のマルチスレッド化
マルチスレッド化の方法はIntrinsicsやNumericsのときと同じで、Parallel.ForでPartitionerとConcurrentBagを使っている。これで4倍速はCPUの物理コア数分なので、16コアCPUなら16倍速になってSIMDとの差も縮まったりするのかしら
マルチスレッド化する場所というか処理の内容で効率が変わる
1000回ループで計測していたけど、この1000回ループをマルチスレッドにして、シングルスレッドのメソッドを実行すると効率が良くなる
Intrinsicsでは20だったのが85倍、Numericsでも26だったのが70倍と、コア数分速くなった
時間計測
上側は1000回ループの中でメソッドを回している
下側は80倍速が出るほうで、1000回ループ自体をマルチスレッドにしている。処理する配列がいっぱいある場合はこれが使えるけど、そういう状況はあんまり思いつかない
ループ回数を10万回にしてCPU使用率を見てみる
配列をマルチスレッド処理した場合のCPU使用率は60%
配列の外のループ回数自体をマルチスレッド処理した場合は100%使い切っていた、やっぱりこっちのほうが効率がいいねえ。気分も良くなる
Forループの中の簡単な処理は4つにすると効率がいい?
これはシングルスレッドのTest5を少し変えたもので、240行目のForの中の処理を2つから4つに変更しただけ。これだけで少し速くなって
10.6秒だったのが6.754秒になった
じゃあ4つから8個にしたらもっと速くなる?と思って試したら逆に遅くなったので、これはループの回数が減ったから速くなったんじゃなくて、別の理由があるってことになる。どこかでForループの中の単純な処理は4個にすると、効率が良くなるとか見たことがあるからそれなのかも、なんかコンパイラの最適化がどうとかいうのだったかな。
同じようにマルチスレッドのTest6も4つ処理にしてみたんだけど、これは2.772秒が2.688秒と変化がなかった、でも速くする方法ってのは色々あるんだねえ
<Window xClass="_20200225_Intrinsics_MinMax.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlnsx="http://schemas.microsoft.com/winfx/2006/xaml"
xmlnsd="http://schemas.microsoft.com/expression/blend/2008"
xmlnsmc="http://schemas.openxmlformats.org/markup-compatibility/2006"
xmlnslocal="clr-namespace:_20200225_Intrinsics_MinMax"
mcIgnorable="d"
Title="MainWindow" Height="400" Width="614">
<Grid>
<StackPanel>
<StackPanelResources>
<Style TargetType="StackPanel">
<Setter Property="Margin" Value="2"/>
</Style>
<Style TargetType="Button">
<Setter Property="Width" Value="60"/>
</Style>
<Style TargetType="TextBlock">
<Setter Property="Margin" Value="2,0"/>
</Style>
</StackPanelResources>
<TextBlock xName="MyTextBlock" Text="text" HorizontalAlignment="Center" FontSize="20"/>
<TextBlock xName="MyTextBlockVectorCount" Text="vectorCount" HorizontalAlignment="Center"/>
<TextBlock xName="MyTextBlockCpuThreadCount" Text="threadCount" HorizontalAlignment="Center"/>
<StackPanel Orientation="Horizontal" HorizontalAlignment="Center">
<Button xName="ButtonAll" Content="一斉テスト" Margin="20,0" Width="120"/>
<TextBlock xName="TbAll" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button1" Content="test1"/>
<TextBlock xName="Tb1" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button2" Content="test2"/>
<TextBlock xName="Tb2" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button3" Content="test3"/>
<TextBlock xName="Tb3" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button4" Content="test4"/>
<TextBlock xName="Tb4" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button5" Content="test5"/>
<TextBlock xName="Tb5" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button6" Content="test6"/>
<TextBlock xName="Tb6" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button7" Content="test7"/>
<TextBlock xName="Tb7" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button8" Content="test8"/>
<TextBlock xName="Tb8" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button9" Content="test9"/>
<TextBlock xName="Tb9" Text="time"/>
</StackPanel>
<StackPanel Orientation="Horizontal">
<Button xName="Button10" Content="test10"/>
<TextBlock xName="Tb10" Text="time"/>
</StackPanel>
</StackPanel>
</Grid>
</Window>
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Runtime.Intrinsics;
using System.Runtime.Intrinsics.X86;
using System.Numerics;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace _20200225_Intrinsics_MinMax
{
<summary>
</summary>
public partial class MainWindow : Window
{
private byte[] MyArray;
private const int LOOP_COUNT = 1000;
private const int ELEMENT_COUNT = 10_000_001;
public MainWindow()
{
InitializeComponent();
MyInitialize();
MyTextBlock.Text = $"byte型配列要素数{ELEMENT_COUNT.ToString("N0")} から最小値と最大値を {LOOP_COUNT}回求める";
MyTextBlockVectorCount.Text = $"Vector256<byte>.Count={Vector256<byte>.Count} Vector<byte>.Count={Vector<byte>.Count}";
MyTextBlockCpuThreadCount.Text = $"CPUスレッド数:{Environment.ProcessorCount}";
ButtonAll.Click += (s, e) => MyExeAll();
Button1.Click += (s, e) => MyExe(Test1_MinMax_IntrinsicsVector, Tb1, MyArray);
Button2.Click += (s, e) => MyExe(Test2_MinMax_IntrinsicsVector_Multi, Tb2, MyArray);
Button3.Click += (s, e) => MyExe(Test3_MinMax_NumericsVector, Tb3, MyArray);
Button4.Click += (s, e) => MyExe(Test4_MinMax_NumericsVector_Multi, Tb4, MyArray);
Button5.Click += (s, e) => MyExeMultiLoop(Test1_MinMax_IntrinsicsVector, Tb5, MyArray);
Button6.Click += (s, e) => MyExeMultiLoop(Test3_MinMax_NumericsVector, Tb6, MyArray);
Button7.Click += (s, e) => MyExe(Test5_MinMax, Tb7, MyArray);
Button8.Click += (s, e) => MyExe(Test6_MinMax_Multi, Tb8, MyArray);
Button9.Click += (s, e) => MyExe(Test7_MinMaxKai, Tb9, MyArray);
Button10.Click += (s, e) => MyExe(Test8_MinMax_Multi_Kai, Tb10, MyArray);
}
private unsafe (byte min, byte max) Test1_MinMax_IntrinsicsVector(byte[] vs)
{
var vMin = Vector256.Create(byte.MaxValue);
var vMax = Vector256<byte>.Zero;
int simdLength = Vector256<byte>.Count;
int lastIndex = vs.Length - (vs.Length % simdLength);
fixed (byte* p = vs)
{
for (int i = 0; i < lastIndex; i += simdLength)
{
var vv = Avx.LoadVector256(p + i);
vMin = Avx2.Min(vMin, vv);
vMax = Avx2.Max(vMax, vv);
}
}
byte* tMin = stackalloc byte[simdLength];
byte* tMax = stackalloc byte[simdLength];
Avx.Store(tMin, vMin);
Avx.Store(tMax, vMax);
byte min = byte.MaxValue;
byte max = byte.MinValue;
for (int k = 0; k < simdLength; k++)
{
if (min > tMin[k]) min = tMin[k];
if (max < tMax[k]) max = tMax[k];
}
for (int i = lastIndex; i < vs.Length; i++)
{
if (min > vs[i]) min = vs[i];
if (max < vs[i]) max = vs[i];
}
return (min, max);
}
private unsafe (byte min, byte max) Test2_MinMax_IntrinsicsVector_Multi(byte[] vs)
{
var bag = new ConcurrentBag<byte>();
int simdLength = Vector256<byte>.Count;
var partition = Partitioner.Create(0, vs.Length, vs.Length / Environment.ProcessorCount);
Parallel.ForEach(partition,
(range) =>
{
var vMin = Vector256.Create(byte.MaxValue);
var vMax = Vector256<byte>.Zero;
int lastIndex = range.Item2 - (range.Item2 - range.Item1) % simdLength;
fixed (byte* p = vs)
{
for (int i = range.Item1; i < lastIndex; i += simdLength)
{
var vv = Avx.LoadVector256(p + i);
vMin = Avx2.Min(vMin, vv);
vMax = Avx2.Max(vMax, vv);
}
}
byte* tMin = stackalloc byte[simdLength];
byte* tMax = stackalloc byte[simdLength];
Avx.Store(tMin, vMin);
Avx.Store(tMax, vMax);
byte min = byte.MaxValue;
byte max = byte.MinValue;
for (int k = 0; k < simdLength; k++)
{
if (min > tMin[k]) min = tMin[k];
if (max < tMax[k]) max = tMax[k];
}
for (int i = lastIndex; i < range.Item2; i++)
{
if (min > vs[i]) min = vs[i];
if (max < vs[i]) max = vs[i];
}
bag.Add(min);
bag.Add(max);
});
return (bag.Min(), bag.Max());
}
private (byte min, byte max) Test3_MinMax_NumericsVector(byte[] vs)
{
int simdLength = Vector<byte>.Count;
int lastIndex = vs.Length - simdLength;
var vMin = new Vector<byte>(byte.MaxValue);
var vMax = new Vector<byte>(byte.MinValue);
for (int i = 0; i < lastIndex; i += simdLength)
{
var v = new Vector<byte>(vs, i);
vMin = System.Numerics.Vector.Min(v, vMin);
vMax = System.Numerics.Vector.Max(v, vMax);
}
byte min = byte.MaxValue;
byte max = byte.MinValue;
for (int i = 0; i < simdLength; i++)
{
if (min > vMin[i]) min = vMin[i];
if (max < vMax[i]) max = vMax[i];
}
for (int i = lastIndex; i < vs.Length; i++)
{
if (min > vs[i]) min = vs[i];
if (max < vs[i]) max = vs[i];
}
return (min, max);
}
private (byte min, byte max) Test4_MinMax_NumericsVector_Multi(byte[] vs)
{
var bag = new ConcurrentBag<byte>();
var rangeSize = Partitioner.Create(0, vs.Length, vs.Length / Environment.ProcessorCount);
Parallel.ForEach(rangeSize,
(range) =>
{
int simdLength = Vector<byte>.Count;
int lastIndex = range.Item2 - (range.Item2 - range.Item1) % simdLength;
var vMin = new Vector<byte>(byte.MaxValue);
var vMax = new Vector<byte>(byte.MinValue);
for (int i = range.Item1; i < lastIndex; i += simdLength)
{
var v = new Vector<byte>(vs, i);
vMin = System.Numerics.Vector.Min(v, vMin);
vMax = System.Numerics.Vector.Max(v, vMax);
}
byte min = byte.MaxValue;
byte max = byte.MinValue;
for (int i = 0; i < simdLength; i++)
{
if (min > vMin[i]) min = vMin[i];
if (max < vMax[i]) max = vMax[i];
}
for (int i = lastIndex; i < range.Item2; i++)
{
if (min > vs[i]) min = vs[i];
if (max < vs[i]) max = vs[i];
}
bag.Add(min);
bag.Add(max);
});
return (bag.Min(), bag.Max());
}
private (byte min, byte max) Test5_MinMax(byte[] vs)
{
var min = byte.MaxValue;
var max = byte.MinValue;
for (int i = 0; i < vs.Length; i++)
{
if (min > vs[i]) min = vs[i];
if (max < vs[i]) max = vs[i];
}
return (min, max);
}
private (byte min, byte max) Test6_MinMax_Multi(byte[] vs)
{
var bag = new ConcurrentBag<byte>();
Parallel.ForEach(
Partitioner.Create(0, vs.Length, vs.Length / Environment.ProcessorCount),
(range) =>
{
var min = byte.MaxValue;
var max = byte.MinValue;
for (int i = range.Item1; i < range.Item2; i++)
{
if (min > vs[i]) min = vs[i];
if (max < vs[i]) max = vs[i];
}
bag.Add(min);
bag.Add(max);
});
return (bag.Min(), bag.Max());
}
private (byte min, byte max) Test7_MinMaxKai(byte[] vs)
{
var min = byte.MaxValue;
var max = byte.MinValue;
int lastIndex = vs.Length - vs.Length % 2;
for (int i = 0; i < lastIndex; i += 2)
{
if (min > vs[i]) min = vs[i];
if (min > vs[i + 1]) min = vs[i + 1];
if (max < vs[i]) max = vs[i];
if (max < vs[i + 1]) max = vs[i + 1];
}
for (int i = lastIndex; i < vs.Length; i++)
{
if (min > vs[i]) min = vs[i];
if (max < vs[i]) max = vs[i];
}
return (min, max);
}
private (byte min, byte max) Test8_MinMax_Multi_Kai(byte[] vs)
{
var bag = new ConcurrentBag<byte>();
Parallel.ForEach(
Partitioner.Create(0, vs.Length, vs.Length / Environment.ProcessorCount),
(range) =>
{
var min = byte.MaxValue;
var max = byte.MinValue;
int lastIndex = range.Item2 - (range.Item2 - range.Item1) % 2;
for (int i = range.Item1; i < lastIndex; i += 2)
{
if (min > vs[i]) min = vs[i];
if (min > vs[i + 1]) min = vs[i + 1];
if (max < vs[i]) max = vs[i];
if (max < vs[i + 1]) max = vs[i + 1];
}
for (int i = lastIndex; i < range.Item2; i++)
{
if (min > vs[i]) min = vs[i];
if (max < vs[i]) max = vs[i];
}
bag.Add(min);
bag.Add(max);
});
return (bag.Min(), bag.Max());
}
private void MyInitialize()
{
MyArray = new byte[ELEMENT_COUNT];
var span = new Span<byte>(MyArray);
span.Fill(250);
MyArray[ELEMENT_COUNT - 1] = 100;
}
#region 時間計測
private void MyExe(Func<byte[], (byte, byte)> func, TextBlock tb, byte[] vs)
{
(byte, byte) minmax = (0, 0);
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i < LOOP_COUNT; i++)
{
minmax = func(vs);
}
sw.Stop();
this.Dispatcher.Invoke(() => tb.Text = $"処理時間:{sw.Elapsed.TotalSeconds.ToString("000.000")}秒 {minmax} | {func.Method.Name}");
}
private void MyExeMultiLoop(Func<byte[], (byte, byte)> func, TextBlock tb, byte[] vs)
{
(byte, byte) minmax = (0, 0);
var sw = new Stopwatch();
sw.Start();
Parallel.For(0, LOOP_COUNT,
new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
x => minmax = func(vs));
sw.Stop();
this.Dispatcher.Invoke(() => tb.Text = $"処理時間:{sw.Elapsed.TotalSeconds.ToString("000.000")}秒 {minmax} | {func.Method.Name}(LoopMulti)");
}
private async void MyExeAll()
{
var sw = new Stopwatch();
sw.Start();
this.IsEnabled = false;
await Task.Run(() => MyExe(Test1_MinMax_IntrinsicsVector, Tb1, MyArray));
await Task.Run(() => MyExe(Test2_MinMax_IntrinsicsVector_Multi, Tb2, MyArray));
await Task.Run(() => MyExe(Test3_MinMax_NumericsVector, Tb3, MyArray));
await Task.Run(() => MyExe(Test4_MinMax_NumericsVector_Multi, Tb4, MyArray));
await Task.Run(() => MyExeMultiLoop(Test1_MinMax_IntrinsicsVector, Tb5, MyArray));
await Task.Run(() => MyExeMultiLoop(Test3_MinMax_NumericsVector, Tb6, MyArray));
await Task.Run(() => MyExe(Test5_MinMax, Tb7, MyArray));
await Task.Run(() => MyExe(Test6_MinMax_Multi, Tb8, MyArray));
await Task.Run(() => MyExe(Test7_MinMaxKai, Tb9, MyArray));
await Task.Run(() => MyExe(Test8_MinMax_Multi_Kai, Tb10, MyArray));
this.IsEnabled = true;
sw.Stop();
TbAll.Text = $"処理時間:{sw.Elapsed.TotalSeconds.ToString("000.000")}秒";
}
#endregion 時間計測
}
}
関連記事
次回は2日後
前回は2日前
1ヶ月前
このときはint型配列だったせいか5~10倍速だった、int型は同時に8個処理、byte型は32個