ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Rust常用并发示例代码

2022-09-04 16:34:43  阅读:247  来源: 互联网

标签:std thread 示例 并发 let ThreadId println 线程 Rust


记录几个常用的并发用法:


1、如何让线程只创建1次

先看一段熟悉的java代码:

void method1() {
    new Thread(() -> {
        while (true) {
            System.out.println(String.format("thread-id:%s,timestamp:%d",
                    Thread.currentThread().getId(), System.currentTimeMillis()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
    }).start();
}

如果method1()被多次调用,就会创建多个线程,如果希望不管调用多少次,只能有1个线程,在不使用线程池的前提下,有1个简单的办法:

AtomicBoolean flag = new AtomicBoolean(false);

void method1() {
    //AtomicBoolean保证线程安全,getAndSet是1个原子操作,method1只有第1次执行时,才能if判断才能通过
    if (!flag.getAndSet(true)) {
        new Thread(() -> {
            while (true) {
                System.out.println(String.format("thread-id:%s,timestamp:%d",
                        Thread.currentThread().getId(), System.currentTimeMillis()));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
}

在rust中也可以套用这个思路,完整代码如下:


use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

//声明1个全局静态变量(AtomicBool能保证线程安全)
static FLAG: AtomicBool = AtomicBool::new(false);

fn method1() {
    //fetch_update类似java中的AtomicBoolean.getAndSet
    if FLAG
        .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true))
        .unwrap()
    {
        std::thread::spawn(move || loop {
            println!(
                "thread-id:{:?},timestamp:{}",
                thread::current().id(),
                timestamp()
            );
            thread::sleep(Duration::from_millis(1000));
        });
    }
}

//辅助方法,获取系统时间戳(不用太关注这个方法)
fn timestamp() -> i64 {
    let start = SystemTime::now();
    let since_the_epoch = start
        .duration_since(UNIX_EPOCH)
        .expect("Time went backwards");
    let ms = since_the_epoch.as_secs() as i64 * 1000
        + (since_the_epoch.subsec_nanos() as f64 / 1_000_000.0) as i64;
    ms
}
fn main() {
    //调用2次
    method1();
    method1();

    //用1个死循环,防止main线束(仅演示用)
    loop {
        thread::sleep(Duration::from_millis(1000));
    }
}

输出:

thread-id:ThreadId(2),timestamp:1662265684621
thread-id:ThreadId(2),timestamp:1662265685623
thread-id:ThreadId(2),timestamp:1662265686627
thread-id:ThreadId(2),timestamp:1662265687628
thread-id:ThreadId(2),timestamp:1662265688630
...

从输出的线程id上看,2次method1()只创建了1个线程


2、如何让线程执行完再继续

fn main() {
    let mut thread_list = Vec::<thread::JoinHandle<()>>::new();
    for _i in 0..5 {
        let t = thread::spawn(|| {
            for n in 1..3 {
                println!("{:?}, n:{}", thread::current().id(), n);
                thread::sleep_ms(5);
            }
        });
        thread_list.push(t);
    }
    //运行后会发现,大概率只有下面这行会输出,因为main已经提前线束了,上面的线程没机会执行,就被顺带着被干掉了
    println!("main thread");
}

上面这段代码,如果希望在main主线程结束前,让所有创建出来的子线程执行完,可以使用join方法

fn main() {
    let mut thread_list = Vec::<thread::JoinHandle<()>>::new();
    for _i in 0..5 {
        let t = thread::spawn(|| {
            for n in 1..3 {
                println!("{:?}, n:{}", thread::current().id(), n);
                thread::sleep_ms(5);
            }
        });
        thread_list.push(t);
    }
 
    //将所有线程join,强制执行完后,才继续
    for t in thread_list {
        t.join().unwrap();
    }
 
    println!("main thread");
}

3、线程互斥锁

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    //声明1个互斥锁Mutex,注意在多线程中使用时,必须套一层Arc
    let flag = Arc::new(Mutex::new(false));
    let mut handlers = vec![];
    for _ in 0..10 {
        let flag = Arc::clone(&flag);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            //只有1个线程会lock成功
            let mut b = flag.lock().unwrap();
            if !*b {
                //抢到锁的,把标志位改成true,其它线程就没机会执行println
                *b = true;
                println!("sub\t=>\t{:?}", thread::current().id());
            }
        });
        handlers.push(handle);
    }
    for h in handlers {
        h.join().unwrap();
    }
    println!("main\t=>\t{:?}", thread::current().id());
}

上面的效果,9个子线程中,只会有1个抢到锁,并输出println,输出类似下面这样:

sub     =>      ThreadId(2)
main    =>      ThreadId(1)

4、线程之间发送数据

use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
 
fn main() {
    let (sender, receiver) = mpsc::channel();
    let t1 = send_something(sender);
    let t2 = receive_something(receiver);
 
    t1.join().unwrap();
    t2.join().unwrap();
}
 
/**
 * 线程发送消息测试
 */
fn send_something(tx: Sender<String>) -> JoinHandle<()> {
    thread::spawn(move || {
        //模拟先做其它业务处理
        thread::sleep(Duration::from_millis(100));
 
        let msg_list = vec![
            String::from("a"),
            String::from("b"),
            String::from("c"),
            //约定:\n是数据的结束符
            String::from("\n"),
        ];
 
        //发送一堆消息
        for msg in msg_list {
            tx.send(msg).unwrap();
        }
    })
}
 
/**
 * 线程收消息
 */
fn receive_something(rx: Receiver<String>) -> JoinHandle<()> {
    thread::spawn(move || loop {
        //try_recv 不会阻塞
        let s = rx.try_recv();
        if s.is_ok() {
            let msg = s.unwrap();
            if msg == "\n" {
                //约定:收到\n表示后面没数据了,可以退出
                println!("end!");
                break;
            } else {
                println!("got msg:{}", msg);
            }
        }
        //模拟没数据时干点其它事情
        println!("do another thing!");
        thread::sleep(Duration::from_millis(100));
    })
}

输出:

do another thing!
do another thing!
got msg:a
do another thing!
got msg:b
do another thing!
got msg:c
do another thing!
end!

5、线程池示例

先要引用threadpool的依赖

[dependencies]
threadpool="1.8.1"

然后就可以使用了:

use std::thread;
use std::time::Duration;
use threadpool::ThreadPool;

fn main() {
    let n_workers = 3;
    //创建1个名为test-pool的线程池
    let pool = ThreadPool::with_name(String::from("test-pool"), n_workers);

    for _ in 0..10 {
        pool.execute(|| {
            println!(
                "{:?},{:?}",
                thread::current().id(),
                thread::current().name()
            );
            thread::sleep(Duration::from_millis(100));
        });
    }

    //待线程池中的所有任务都执行完
    pool.join();
}

输出:

ThreadId(2),Some("test-pool")
ThreadId(3),Some("test-pool")
ThreadId(4),Some("test-pool")
ThreadId(2),Some("test-pool")
ThreadId(3),Some("test-pool")
ThreadId(4),Some("test-pool")
ThreadId(2),Some("test-pool")
ThreadId(3),Some("test-pool")
ThreadId(4),Some("test-pool")
ThreadId(2),Some("test-pool")

6、指定线程名称

use std::thread;

fn main() {
    let t1 = thread::Builder::new()
        //子线程命名
        .name(format!("my-thread"))
        .spawn(|| {
            //打印子线程的id和name
            println!(
                "{:?},{:?}",
                thread::current().id(),
                thread::current().name()
            );
        })
        .unwrap();
    t1.join().unwrap();

    //打印主线程的id和name
    println!(
        "{:?},{:?}",
        thread::current().id(),
        thread::current().name()
    );
}

输出:

ThreadId(2),Some("my-thread")
ThreadId(1),Some("main")

7、如何暂停/恢复线程运行

use std::time::Duration;

use std::thread;

fn main() {
    let (tx, rx) = std::sync::mpsc::channel();
    let t = thread::spawn(move || loop {
        //获取当前时间的秒数
        let seconds = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs();
        println!("{}", seconds);
        //每当秒数为5的倍数,就把自己暂停,同时对外发消息pause
        if seconds % 5 == 0 {
            tx.send("pause").unwrap();
            println!("\nwill be parked !!!");
            //将自己暂停
            thread::park();
        }
        thread::sleep(Duration::from_millis(1000));
    });

    //不断收消息,发现是pause后,过3秒将线程t解封
    loop {
        let flag = rx.recv();
        if flag.is_ok() && flag.unwrap() == "pause" {
            thread::sleep(Duration::from_millis(3000));
            //解封t
            t.thread().unpark();
            println!("unparked !!!\n");
        }
    }
}

这样就实现了一个简易版的ScheudleThread,可以周期性的运行,运行效果:

1662278909
1662278910

will be parked !!!
unparked !!!

1662278914
1662278915

will be parked !!!
unparked !!!

1662278919
1662278920
...

参考文章:

Rust语言圣经-多线程并发编程

标签:std,thread,示例,并发,let,ThreadId,println,线程,Rust
来源: https://www.cnblogs.com/yjmyzz/p/16654831.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有