当前位置:网站首页>[go] common concurrency model [generic version]
[go] common concurrency model [generic version]
2022-04-23 08:08:00 【CRAJA】
Conclusion in 《go The way of language concurrency 》
package common
import "sync"
// Bridge By accepting transmission chan Of chan, Pass the value back to ( This is to read one in order channel Will choose the next channel)
func Bridge(done <-chan interface{
}, chanStream <-chan <-chan any) <-chan any {
valStream := make(chan any)
go func() {
defer close(valStream)
for {
var stream <-chan any
select {
case mybeStream, ok := <-chanStream: // Read chanStream Medium channel
if !ok {
return
}
stream = mybeStream
case <-done:
return
}
for val := range OrDone(done, stream) {
// Read channel Send the content back
select {
case <-done:
return
case valStream <- val:
}
}
}
}()
return valStream
}
// Tee Read in Data and send it to two accepted at the same time channel
func Tee(done <-chan interface{
}, in <-chan any) (_, _ <-chan any) {
out1 := make(chan any)
out2 := make(chan any)
go func() {
defer close(out1)
defer close(out2)
for v := range OrDone(done, in) {
var out1, out2 = out1, out2 // Local version , Hide external variables
for i := 0; i < 2; i++ {
// To ensure two channel Can be written. We use two writes
select {
case <-done:
return
case out1 <- v:
out1 = nil // Close copy after simultaneous write channel To block and prevent secondary writes
case out2 <- v:
out2 = nil
}
}
}
}()
return out1, out2
}
// OrDone adopt done To control reading chan
func OrDone(done <-chan interface{
}, c <-chan any) <-chan any {
valStream := make(chan any)
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
// It can be optimized
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
// MyInteger Integer types , For random number type conversion
type MyInteger interface {
~int | ~int32 | ~int64
}
// MyFloat Floating point numbers , Can be used for addition, subtraction, multiplication and division
type MyFloat interface {
~float64 | ~float32
}
// Number Can be used for addition, subtraction, multiplication and division
type Number interface {
MyFloat | MyInteger
}
// FanIn From many channels Merge data into one channel
func FanIn(done <-chan interface{
}, channels []<-chan any) <-chan any {
var wg sync.WaitGroup
multiplexedStream := make(chan any)
multiplex := func(c <-chan any) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
// Multiply Multiplication
func Multiply[V Number](done <-chan interface{
}, valueStream <-chan V, multiplier V) <-chan V {
results := make(chan V)
go func() {
defer close(results)
for v := range valueStream {
select {
case <-done:
return
case results <- v * multiplier:
}
}
}()
return results
}
// Add Add
func Add[V Number](done <-chan interface{
}, valueStream <-chan V, additive V) <-chan V {
results := make(chan V)
go func() {
defer close(results)
for v := range valueStream {
select {
case <-done:
return
case results <- v + additive:
}
}
}()
return results
}
// ToType Explicitly convert to the corresponding type
func ToType[T any](done <-chan interface{
}, valueStream <-chan interface{
}) <-chan T {
stringStream := make(chan T)
go func() {
defer close(stringStream)
for v := range valueStream {
select {
case <-done:
return
case stringStream <- v.(T):
}
}
}()
return stringStream
}
// PrimeFinder Get and judge prime numbers
func PrimeFinder[T MyInteger](done <-chan interface{
}, intStream <-chan T) <-chan interface{
} {
results := make(chan interface{
})
go func() {
defer close(results)
for v := range intStream {
select {
case <-done:
return
default:
}
for i := T(2); i*i < v; i++ {
if v%i == 0 {
goto next
}
}
results <- v
next:
}
}()
return results
}
// Take Take out num End after number
func Take(done <-chan interface{
}, valueStream <-chan any, num int) <-chan any {
results := make(chan any)
go func() {
defer close(results)
for i := 0; i < num; i++ {
select {
case <-done:
return
case results <- <-valueStream:
}
}
}()
return results
}
// RepeatFn Call the function repeatedly
func RepeatFn(done <-chan interface{
}, fn func() interface{
}) <-chan interface{
} {
results := make(chan interface{
})
go func() {
defer close(results)
for {
select {
case <-done:
return
case results <- fn():
}
}
}()
return results
}
// Repeat Duplicate generated value
func Repeat(done <-chan interface{
}, values ...any) <-chan any {
valueStream := make(chan any)
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
The details are in Warehouse
There is no need to use generics in many places , Unless basically written as like as two peas of wheels, it is recommended to use for many types. , Advice to see go Official instruction course , I think this generic is OK , Not ugly
版权声明
本文为[CRAJA]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230645168316.html
边栏推荐
猜你喜欢

DVWA靶场练习

分布式服务治理Nacos

The displayed amount of ABAP ALV is inconsistent with the exported amount

几种智能机器人室内定位方法对比

Internal network security attack and defense: a practical guide to penetration testing (8): Authority maintenance analysis and defense

Upload labs range practice

How to import Excel data in SQL server, 2019 Edition

Principle of sentinel integrating Nacos to update data dynamically

三星,再次“西征”

Ctf-misc summary
随机推荐
雲計算技能大賽 -- openstack私有雲環境 第一部分
3C装配中的机械臂运动规划
SAP self created table log function is enabled
惨了,搞坏了领导的机密文件,吐血分享备份文件的代码技巧
C 输出一种二维数组,特点如下。
Buctf MISC brossage
Feign源码分析
Intranet penetration series: pingtunnel of Intranet tunnel
Ribbon启动流程
一篇文章看懂变量提升(hoisting)
[go]常见的并发模型[泛型版]
在MATLAB中快速画圆(给出圆心坐标和半径就能直接画的那种)
Alibaba sentinel learning QA
3C裝配中的機械臂運動規劃
Chapter VII asset impairment
简述CPU
yum源仓库本地搭建的两种方法
智能名片小程序名片详情页功能实现关键代码
RAID0和RAID5的创建和模拟RAID5工作原理
Research on software security based on NLP (I)