ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

线程池实现文件复制操作

2021-10-12 19:31:26  阅读:117  来源: 互联网

标签:文件 task thread void 复制 线程 file pool


项目:使用线程池实现大目录拷贝

       创建一个线程的综合资源,让需要执行的任务挂载在线程池中。如果线程池有空闲的线程,就可以安排线程去执行任务,如果线程池没有空闲的线程,就安排任务等待,直到有线程空闲出来。

 

线程池步骤:

  1. 初始化线程池资源
  2. 向线程池中加入线程
  3. 向线程池中添加任务               ---如果有空闲的线程,空闲线程可以自动获取任务并运行
  4. 销毁线程池。

源码分享:thread_pool.c

/*
    设计思路:
        1、预处理板块:构建任务结点结构体、线程池结构体,文件路径结构体
        2、void handler(void *arg)	//解除死锁,防止死锁
        3、void *routine(void *arg)//提取调用任务链表的任务结点
        4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
        5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
        6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)  //添加任务
        7、void *copyfile(void * arg)  //复制文件函数
        8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
        9、bool destroy_pool(thread_pool *pool)	 //摧毁线程池

        main()
        {
            if-判断输入命令是否规范规范
            初始化线程池
            创建对象文件,存储源文件和复制文件路径
            判断文件属性,执行拷贝操作    
            销毁线程池

        }




*/




#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <dirent.h>
#include <stdbool.h>
#include <string.h>
#include <errno.h>

//********************预处理板块**************************
#define MAX_WAITING_TASKS	1000  //等待任务最大数目
#define MAX_ACTIVE_THREADS	200	  //最大线程数

//任务结构体
typedef struct task
{
    void *(*task)(void *arg);//定义任务函数指针以及参数
	void *arg;			 	 //函数参数(指针传参)
	struct task *next;       //结点内当然有指向下一结点的指针
    /*
         任务链表的结点内包含三个元素:
            1、函数指针,指向任务函数(属于data)
            2、函数所对应的参数,这里用指针传参
            3、结点内指向下一结点的指针 next
     */
}task;

typedef struct thread_pool
{
    /* data */
    pthread_mutex_t lock;		//互斥锁
	pthread_cond_t  cond;		//条件变量  跟互斥锁是搭配使用
	
    task *task_list;		    //一个任务节点
	pthread_t *tids;			//线程号指针变量
 
	unsigned waiting_tasks;		//等待任务
	unsigned active_threads;	//执行线程
	
	bool shutdown;				//一个线程池销毁开关
}thread_pool;

typedef struct file_path
{
    /* 用于存储文件路径的结构体 */
    char source_file[4096];  //字符数组,用于存储源文件路径
    char copy_file[4096];    //字符数组,用于存放复制后的文件路径
}file_path;


//********************预处理板块**************************


//解除死锁,防止死锁     handler函数会被routine调用
void handler(void *arg)	
{
    pthread_mutex_unlock((pthread_mutex_t *)arg);
}

//提取调用任务链表的任务结点
void *routine(void *arg)    //这里的arg是task结构体的一项参数
{
    thread_pool *pool = (thread_pool *)arg;    //
    task *p;      //定义一个任务结点

    while (1)
    {
        /* code */
        pthread_cleanup_push(handler, (void *)&pool->lock);
        //调用hand函数,防止死锁

        //做一个判断,当没有任务,并且pool没有销毁时,执行休眠操作
        if (pool->waiting_tasks == 0 && !pool->shutdown)
        {
            pthread_cond_wait(&pool->cond, &pool->lock); //休眠
        }

        //判断是否需要要关闭线程
        if(pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);	//先执行解锁操作
			pthread_exit(NULL);	//退出线程
		}
         
		p = pool->task_list->next;			//从线程池中任务链表取出(复制)一个任务结点给P
		pool->task_list->next = p->next;	//然后被取出的任务被其他任务覆盖
		pool->waiting_tasks--;				//线程池中等待的任务减少一项

        pthread_mutex_unlock(&pool->lock);	//一次任务取出成功,解除互斥锁资源,方便下一次调用任务结点使用
		pthread_cleanup_pop(0);             //
 
//注意!

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //强制性的阻塞 任何进程都不能取消,我想老老实实把这个线程完成,其他的都先排队
		(p->task)(p->arg);		
					
									//他运行的是add_task( pool, copfile, tmpfile);	
									//传过来的参数 
									//p->task 等于copyfile
									//p->arg 等于  tmpfile
		
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);//关闭强制阻塞
		
		free(p->arg);							//释放在检索目录时 在内存开辟的空间
		free(p);								//释放掉完成任务的节点
	}
 
	pthread_exit(NULL);

 //注意!没有这一段,copyfile无法执行


}

//初始化线程池
bool pool_init(thread_pool *pool, unsigned int threads_number)
{
/*  线程池结构体元素
    pthread_mutex_t lock;		//互斥锁
	pthread_cond_t  cond;		//条件变量  跟互斥锁是搭配使用
	
    task *task_list;		    //一个任务节点
	pthread_t *tids;			//线程号指针变量
 
	unsigned waiting_tasks;		//等待任务   任务数量
	unsigned active_threads;	//执行线程   线程数量
	
	bool shutdown;				//一个线程池销毁开关
*/

    pthread_mutex_init(&pool->lock,NULL);   //线程池互斥锁初始化,这是个初始化函数
    pthread_cond_init(&pool->cond,NULL);   //线程池条件变量初始化,这是个初始化函数


//先分配内存
    pool->shutdown = false;  //线程池销毁为假,即不销毁(刚申请一个线程池,初始化肯定不能销毁呀!)
    pool->task_list = malloc(sizeof(task));  //线程池中的任务链表初始化分配内存空间
    pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//给进程分配空间

    if (pool->task_list == NULL || pool->tids == NULL)
    {
        perror("任务链表 或 线程分配 内存失败!");
        return false;
    }

//在把数量清零
    pool->task_list->next = NULL; //只有一个节点,还未连城任务链表
    pool->waiting_tasks = 0;   //暂时没有等待中的任务,要等后面添加任务结点
    pool->active_threads = threads_number;   //thread_number是参数,是传入的线程数量,赋值过来

    int i;
    for (i = 0; i < pool->active_threads; i++)
    {
        if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0)  
		//返回值为0表示创建成功,返回不为0表示创建失败
		{
			perror("线程创建失败");//根据传入的线程数,逐一创建线程,存入tid[]
			return false;
		}
    }
    return true;
}




//添加线程
int add_thread(thread_pool *pool, unsigned additional_threads)		
{
    if(additional_threads == 0)     //传入添加的线程数
    {
	    return 0;         //没有添加线程,啥都干不了,只能退出
    }
	unsigned total_threads = pool->active_threads + additional_threads;
    //计算先从总数 = 线程池中的线程 + 新添加的线程

	int i, actual_increment = 0;
	for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++)
	{
		if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0)
		{
			perror("添加线程失败!");
			if(actual_increment == 0)
            {
				return -1;
            }
			break;  //创建失败的话,直接break,不再执行后面的++
		}//创建失败

		actual_increment++; //实际上添加成功的线程数
	}
 
	pool->active_threads += actual_increment;
	return actual_increment;

}


//添加任务
bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)
{
	
    // pool->task_list = malloc(sizeof(task));  //之前初始化的是线程池中的链表结点内存
	struct task *new_task = malloc(sizeof(task));  // 任务结点new_task分配内存

	if(new_task == NULL)
	{
		perror("任务结点分配内存失败!");
		return false;
	}

    //初始化结点参数
	new_task->task = task;   
	new_task->arg = arg;
	new_task->next = NULL;
	
	pthread_mutex_lock(&pool->lock);   //上锁
	
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)  //判断任务是否超出上限,必要的操作
	{
		pthread_mutex_unlock(&pool->lock);
 
		fprintf(stderr, "任务过多!\n");
		free(new_task);  //释放任务空间
		return false;
	}
	
	struct task *tmp = pool->task_list;
	while(tmp->next != NULL)  //从任务结点取到不为空,就是取到任务
    {
        tmp = tmp->next;  //把这个任务取出来,拿到tmp上
    }

	tmp->next = new_task;
	pool->waiting_tasks++;
 
 
	pthread_mutex_unlock(&pool->lock);   //释放锁资源
	pthread_cond_signal(&pool->cond);	//唤醒一个休眠的线程

	return true;
}


//复制文件函数
void *copyfile(void * arg)
{

/*
    把要打开的文件路径放入source_file变量下(路径)
    打开源文件--------可读   判断打开是否成功
    创建目标文件,即复制文件    判断打开是否结束
    利用read函数和write函数,实现文件读写
    关闭打开的两个文件
*/

	file_path *dofile = (file_path *)arg; //arg是文件参数
	struct stat file_stat; //这个结构体来自#include <sys/stat.h>	

	stat(dofile->source_file, &file_stat);
    //通过文件名 获取文件的路径把他存放到结构体的sourc_file里面
	int source_fd,copy_fd;					
	source_fd = open(dofile->source_file,O_RDONLY);//打开源路径,只读模式

	if(source_fd == -1 )
	{
		printf("打开文件 %s\n 失败!\n",dofile->source_file);
        //打开文件失败
		return NULL;
	}
	
	copy_fd = open(dofile->copy_file,O_CREAT | O_TRUNC | O_RDWR,file_stat.st_mode);
	
	if( copy_fd == -1)
	{
		printf("打开文件 %s 失败!\n",dofile->copy_file);
		return NULL;
	}
 
	int nread;
	char buf[100];
	while((nread = read(source_fd,buf,100)) > 0)  //读取源文件的内容
	{
		if( write(copy_fd,buf,nread) == -1)	  //把读到的全部写进目标文件
		{
			break;
		}
	}
	printf("读取文件: %s\n",dofile->source_file);
	printf("写入文件: %s\n",dofile->copy_file);

	close(source_fd);  //关闭源文件
	close(copy_fd);    //关闭复制文件
	return NULL;

}

//复制目录函数
int copydir( file_path *dofile,thread_pool *pool)
{
	struct stat file_stat;				
	stat(dofile->source_file,&file_stat); //获取文件的属性
	mkdir(dofile->copy_file,file_stat.st_mode); //以源目录的类型和目录来创建一个目录
 
	DIR *sourcedir = opendir(dofile->source_file); //打开源目录
	struct dirent *dp;
	
	
	
	while( (dp = readdir(sourcedir))!=NULL )    //获取文件夹内文件的信息
	{
		
	
		if(dp->d_name[0] == '.') //如果文件为. 或者 .. 则跳过
		{						 
			continue;
		}
		//对本目录下的所有文件进行拷贝操作
		file_path *tmpfile = malloc(sizeof(file_path)); //为文件结构体开辟内存空间
		
		memset(tmpfile,0,sizeof(file_path));				//对内存清零 
		
		sprintf(tmpfile->source_file,"%s/%s",dofile->source_file,dp->d_name);//拼凑源文件路径
		sprintf(tmpfile->copy_file,"%s/%s",dofile->copy_file,dp->d_name);//拼凑目标文件路径
 
		struct stat tmpstat;
		stat(tmpfile->source_file,&tmpstat);	     			
 
		if(S_ISREG(tmpstat.st_mode))						//如果为普通文件,则拷贝
		{
			printf("tmpfile->source_file = %s\n",tmpfile->source_file);
			printf("tmpfile->copy_file = %s\n", tmpfile->copy_file);
			printf("\n");
			add_task( pool, copyfile, tmpfile);			//把复制的任务丢到任务链表
		}
		else if(S_ISDIR(tmpstat.st_mode))//如果为目录,则递归
		{
			copydir(tmpfile,pool);
		}
 
	}
	return 0;
}


//摧毁线程池
bool destroy_pool(thread_pool *pool)	
{
	pool->shutdown = true;
	pthread_cond_broadcast(&pool->cond);
	
	sleep(1);//等待空闲线程响应完毕

	int i;
	for(i=0; i<pool->active_threads; i++)
	{
		errno = pthread_join(pool->tids[i], NULL);
		if(errno != 0)
		{
			printf("copy tids[%d] error: %s\n",i, strerror(errno));
		}
		else
			printf("[%u] is copyed\n", (unsigned)pool->tids[i]);
		
	}
 
	free(pool->task_list);
	free(pool->tids);
	free(pool);
 
	return true;
}











//函数1:初始化线程池   pool_init(pool,100);


int main(int argc, char const *argv[])
{

    //第一步:   if-判断输入命令是否规范规范
    if(argc != 3)
    {
        printf("你的命令格式错误!\n");
        printf("请按右侧格式run: ./%s 源文件路径  生成文件路径 \n",argv[0]);
        return -1; //命令异常退出
    }


    //第二步:   初始化线程池
    thread_pool *pool = malloc(sizeof(thread_pool));//定义线程池的poor指针,初始化内存空间
    /*调用线程初始化函数------pool_init()*/
    pool_init(pool,100);          //调用函数初始化线程,分配内存(初始化线程函数会调用routine)


    //第三步:   创建对象文件,存储源文件和复制文件路径
    file_path dofile;     //创建一个路径实例dofile,存储源文件和复制文件的路径
    strcpy(dofile.source_file,argv[1]);   //strcpy是函数   #include <string.h>
    strcpy(dofile.copy_file,argv[2]);    // char *strcpy(char *dest, const char *src);

    struct stat copestat;//这个结构体来自#include <sys/stat.h>
    stat(dofile.source_file,&copestat);  //stat是函数,将dofile.source_file的信息,获取到&copestat
    /*
        #include<sys/stat.h>
        int stat(const char *restrict pathname,struct stat *restrict buf);
        int fstat(int fields,struct stat *buf);
        int lstat(const char *restrict pathname,struct stat *restrict buf);
    */


    //第四步:   判断文件属性,执行拷贝操作    
    if(S_ISREG(copestat.st_mode))//如果为普通文件,则拷贝
	{
		copyfile(&dofile);  //直接拷贝,不用添加到任务链表
		
	}
	else if(S_ISDIR(copestat.st_mode))//如果为目录,则递归
	{
		copydir(&dofile,pool);
	}


    //第五步:   销毁线程池
    destroy_pool(pool);    //销毁pool   destroy破坏
    return 0;
}


/*


    详细设计步骤:
        1、预处理板块:构建
				任务结点结构体、
				线程池结构体,
				文件路径结构体
        
        2、void handler(void *arg)	//解除死锁,防止死锁
				pthread_mutex_unlock((pthread_mutex_t *)arg);
				
        3、void *routine(void *arg)//提取调用任务链表的任务结点  被pool_init和add_thread调用过
				传入pool  定义一个任务结点p(task结构体的实例)
				while循环
				{
					首先调用线程清理函数,清理掉残留的线程,解放锁资源
					当没有任务情况下,线程执行休眠操作还是直接销毁(由shutdown决定)
					当有任务情况下,提取任务结点(拿出来给p,任务结点数量减少),任务取出后,解放对应锁资源
					
					
					
				}
				
        
        4、bool pool_init(thread_pool *pool, unsigned int threads_number)//初始化线程池
				(传入pool)
				加锁,条件变量
				给pool的各成员变量初始化分配内存
						判断是否成功
				pool各成员变量初始化赋值为空
				for循环逐一创建线程:(返回为0表示创建成功,不为0表示创建失败)
		(注意:thread:线程标识符     attr:线程属性设置      start_routine:线程函数起始地址   arg:传递参数)
				
        5、int add_thread(thread_pool *pool, unsigned additional_threads)//添加线程
				创建线程,增加线程数,计算可用线程
        
        6、bool add_task(thread_pool *pool, void *(*task)(void *arg) , void *arg)  		//添加任务
				先定于一个new_task指针,申请内存,初始化结构体成员变量
							内存申请成功?
				给互斥锁资源
				判断等待任务结点是否超过上限?
				定义一个tmp结点指针指向任务链表		
				把任务结点拿出来,组织起来,形成等待任务链表
				任务链表串完了,解除互斥锁哦并且唤醒一个进程
				
        7、void *copyfile(void * arg)  //复制文件函数
				把要打开的文件路径放入source_file变量下(路径)
				打开源文件--------可读   判断打开是否成功
				创建目标文件,即复制文件    判断打开是否结束
				利用read函数和write函数,实现文件读写
				关闭打开的两个文件
        
        8、int copydir( file_path *dofile,thread_pool *pool) //复制目录函数
				普通文件	调用add_task函数复制
				目录文件	mkdir创建目录(tmpfile内的文件夹命名)  如果是目录不停递归
				
        10、bool destroy_pool(thread_pool *pool)	 //摧毁线程池

        main()
        {
            第一步:if-判断输入命令是否规范规范(三个字符串命令:./%s 源文件路径  生成文件路径)
            第二步:初始化线程池
					  thread_pool *pool = malloc(sizeof(thread_pool));
					  //定义线程池的poor指针,初始化内存空间
					  
					  //调用线程初始化函数------pool_init()
					  pool_init(pool,100);          
					  //调用函数初始化线程,分配内存(初始化线程函数会调用routine)
							

            第三步:创建对象文件,存储源文件和复制文件路径
						创建一个dofile,存储源文件和复制文件路径
								(dofile是file_path结构体的一个实例)dofile.source 源文件路径
						(copeatat:提取文件队列,作为判断文件是普通文件还是目录文件的参数)
						
            第四步:判断文件属性,执行拷贝操作
						使用st_mode判断文件类型
								执行copyfile和copydir操作(复制文件和目录)
            第五步:销毁线程池
						直接调用destroy_pool函数销毁线程池
        }

*/

标签:文件,task,thread,void,复制,线程,file,pool
来源: https://www.cnblogs.com/zlxxc/p/15399253.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有