Dynamic pool dispatch
池调度的实现
需要: 1. 知道总进程/线程数, 2. 增加任务的api 3. 队列
网上的实现c++ : https://zhuanlan.zhihu.com/p/95819747
队列的一种实现
OpenMP 动态线程池调度
不知道 #pragma omp parallel for num_threads(ndata) schedule(dynamic)行不行
这个动态调度,和openmp的线程池的概念,让我感觉应该是有线程动态调度池的概念的,因为只要有个for子句加任务的api。但是for指令在进行并行执行之前,就需要”静态“的知道任务该如何划分。
for和sections指令的”缺陷“:无法根据运行时的环境动态的进行任务划分,必须是预先能知道的任务划分的情况。
所以OpenMP3.0提供task指令,主要适用于不规则的循环迭代和递归的函数调用。OpenMP遇到了task之后,就会使用当前的线程或者延迟一会后使用其他的线程来执行task定义的任务。
#pragma omp parallel num_threads(2)
{
#pragma omp single
{
for(int i = 0;i < N; i=i+a[i])
{
#pragma omp task
task(a[i]);
}
}
}
#pragma omp single
{
i = 0;
while (i < p.n)
{
for (; i < p.n; ++i)
{
#pragma omp task
DoSomething(p, i);
}
#pragma omp taskwait
#pragma omp flush
}
}
int count(1);
#pragma omp parallel num_threads(64)
{
#pragma omp single
{
int c = 0;
while(c < count)
{
for( ; c < count; c++ )
{
#pragma omp task{
for( int n = 0; n < 4; n++ )
{
int x = xvec[c] + dx4[n];
int y = yvec[c] + dy4[n];
if( (x >= 0 && x < width) && (y >= 0 && y < height) )
{
int nindex = y*width + x;
if( 0 > nlabels[nindex] && labels[oindex] == labels[nindex] )
{
xvec[count] = x;
yvec[count] = y;
nlabels[nindex] = label;
count++;
}
}
}
}
}
#pragma omp taskwait
#pragma omp flush
}
}
}
MPI 动态进程池调度
python的多进程里有动态进程管理
池调度的存在意义
我感觉,意义在于对于完全不相关的,或者没有顺序关系的任务,可以用池调度来并行。
C++与OpenMP配合的for子句最简线程池
实现每个线程执行完全不同的任务
#include <iostream>
#include <functional>
#include <vector>
using namespace std;
void fun (int a, int b)
{
cout<< "fun exec :"<< a << '+' << b << '=' << a + b <<endl;
}
class C{
private:
float m_c = 2.0f;
public:
void mp( float d)
{
cout<<"c::mp exec :"<< m_c << 'x' << d << '=' << m_c * d <<endl;
}
};
int main(int argc, char * argv[])
{
const int task_groups = 5;
C c [task_groups];
vector<function<void (void) > > tasks;
for (int i=0;i<task_groups;++i)
{
tasks.push_back(bind( fun , 10, i * 10 ) );
tasks.push_back(bind( &C::mp , &c[i], i*2.0f ) );
tasks.push_back(bind(
[=] (void) {cout << "lambada :" <<i << endl; }
) );
}
size_t sz = tasks.size();
#pragma omp parallel for
for (size_t i=0;i<sz;++i)
{
tasks[i]();
}
return 0;
}
输出:
fun exec :10+0=10
c::mp exec :2x0=0
lambada :0
fun exec :10+10=20
c::mp exec :2x2=4
lambada :1
fun exec :10+20=30
c::mp exec :2x4=8
lambada :2
fun exec :10+30=40
c::mp exec :2x6=12
lambada :3
fun exec :10+40=50
c::mp exec :2x8=16
lambada :4
当然可以根据 num_threads 和 omp_get_thread_num()实现不同线程执行完全不同类型任务
#pragma omp parallel num_threads(2)
{
int i = omp_get_thread_num();
if (i == 0){
do_long(data1, sub_threads);
}
if (i == 1 || omp_get_num_threads() != 2){
do_long(data2, sub_threads);
}
}
void do_long(int threads) {
#pragma omp parallel for num_threads(threads)
for(...) {
// do proccessing
}
}
int main(){
omp_set_nested(1);
int threads = 8;
int sub_threads = (threads + 1) / 2;
#pragma omp parallel num_threads(2)
{
int i = omp_get_thread_num();
if (i == 0){
do_long(data1, sub_threads);
}
if (i == 1 || omp_get_num_threads() != 2){
do_long(data2, sub_threads);
}
}
return 0;
}
需要进一步的研究学习
openmp 对不同的子句的关系种类没弄清。
遇到的问题
暂无
开题缘由、总结、反思、吐槽~~
对于for循环次数增加的情况,这么处理呢。
OpenMP由于是fork/join结构,fork的线程数可以一开始设置,但是for循环任务总数是一开始固定的吗?还是可以中途增加,
参考文献
https://www.it1352.com/359097.html
https://blog.csdn.net/gengshenghong/article/details/7004594