一.channel的几种关闭方式?
这篇文章完美的回答了这个问题 https://www.jianshu.com/p/d24dfbb33781 (膜拜大佬了 ~)
简单总结一下(一定要遵循的原则:1.不要在关闭已经关闭的通道2.不要给已经关闭的通道继续发送)
1.最简单的一种方式实现一个函数用来检查通道关闭了没有
func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() {
c := make(chan T)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}
这种函数靠select,default实现。我们可以将通道传入函数通过case来判断是否是接收通道是就说明通道已经关闭返回true如果不是就返回false说明通道没有关闭。但是这种方法只能检查一时的状态如果你的代码中调用了类似功能的方法修改了通道的状态,那么你相信这次结果必然会产生,关闭已经关闭的通道,给已经关闭的通道发送值的错误。
2.使用defer和recover来使你的数据安全的发送到通道
func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
// the return result can be altered
// in a defer function call
closed = true
}
}()
ch <- value // panic if ch is closed
return false // <=> closed = false; return
}
首先这个方式符合不在接受端关闭的原则。所以我们实现的是一个安全的发送端。如果通道没有关闭数据会安全的发送进去,并返回false。如果通道关闭那么引发一次panic,recover恢复panic并且返回true告诉你通道关闭了。
3.安全的关闭通道
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
justClosed = false
}
}()
// assume ch != nil here.
close(ch) // panic if ch is closed
return true
}
可以使用这个方法关闭通道,如果通道没有关闭就关闭通道并且返回true,如果通道关闭了那么就会引发panic,触发recover返回false。
4.使用sync.Once来关闭channel
type MyChannel struct {
C chan T
once sync.Once
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.once.Do(func(){
close(mc.C)
})
}
因为Once的关系,通道只能被关闭一次,这样就避免了多次关闭的问题。
5.使用Mutex避免多次关闭channel
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.mutex.Lock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
mc.mutex.Unlock()
}
func (mc *MyChannel) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
通关变量closed来控制锁的获取权限。
6.在一个发送端多个接收端的情况下,先用Waitgroup的Add数来控制通道的大小,然后一个发送端开始发送数据,并且设置好停止发送的条件,并在不想发送的时候停止关闭通道。多个接收端开始接收,并且每一次接收完毕就waitgroup.Done。这样做就可以确保全部接受,并且符合不在接收端关闭通道的原则。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
// the sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
// the only sender can close the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// receive values until dataCh is closed and
// the value buffer queue of dataCh is empty.
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
7.这种是多个发送端一个接收端的情况,用一个信号通道来控制停止发送请求,由于我们不能让接收者来关闭通道,所以我们可以关闭一个额外的通道来达到停止发送的目的。由于发送者有多个所以不应该在发送端直接关闭data通道,而是在接收端接收完毕通过信号通道通知发送端停止发送。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}()
}
// the receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == MaxRandomNumber-1 {
// the receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
8.多个发送者多个接收者的情况
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// the channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.
var stoppedBy string
// moderator
go func() {
stoppedBy = <- toStop // part of the trick used to notify the moderator
// to close the additional signal channel.
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// same as senders, the first select here is to
// try to exit the goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// the same trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
请注意channel toStop的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。 (因为这个问题我当时确实不知道怎么回答所以写的详细点,当时我的想法就是关闭通道不就是close()还能有其他的?/(ㄒoㄒ)/)
二.go垃圾回收
三.go内存管理 内存分配的大致策略 申请一块较大的地址空间(虚拟内存),用于内存分配及管理(golang:spans+bitmap+arena->512M+16G+512G) 当空间不足时,向系统申请一块较大的内存,如100KB或者1MB 申请到的内存块按特定的size,被分割成多种小块内存(golang:_NumSizeClasses = 67),并用链表管理起来 创建对象时,按照对象大小,从空闲链表中查找到最适合的内存块 销毁对象时,将对应的内存块返还空闲链表中以复用 空闲内存达到阈值时,返还操作系统 Go内存管理基于tcmalloc,使用连续虚拟地址,以页(8k)为单位、多级缓存进行管理; 在分配内存时,需要对size进行对齐处理,根据best-fit找到合适的mspan,对未用完的内存还会拆分成其他大小的mspan继续使用 在new一个object时(忽略逃逸分析),根据object的size做不同的分配策略: 极小对象(size<16byte)直接在当前P的mcache上的tiny缓存上分配; 小对象(16byte <= size <= 32k)在当前P的mcache上对应slot的空闲列表中分配,无空闲列表则会继续向mcentral申请(还是没有则向mheap申请); 大对象(size>32k)直接通过mheap申请,如果mheap也没有了就去操作系统申请。
span是内存管理的基本单位由多个页组成,它使得内存分配更加细致。
type mspan struct {
next *mspan // next span in list, or nil if none
prev *mspan // previous span in list, or nil if none
startAddr uintptr // address of first byte of span aka s.base()
npages uintptr // number of pages in span
nelems uintptr // number of object in the span.
allocBits *gcBits
gcmarkBits *gcBits
allocCount uint16 // number of allocated objects
spanclass spanClass // size class and noscan (uint8)
elemsize uintptr // computed from sizeclass or from npages
}
现在我们了解到go用span来分配内存,那么在哪里用span?我们都知道每个p都有mcache,通过mcache管理每个g需要的内存。
type mcache struct {
tiny uintptr
tinyoffset uintptr
alloc [numSpanClasses]*mspan // spans to allocate from, indexed by spanClass
}
numSpanClasses = _NumSizeClasses << 1
_NumSizeClasses = 67
从结构体来看,前两个字段用于极小对象的分配。alloc是一个mspan数组,长度是1>>67,说明每种size class有2组元素。第一组span对象中包含了指针,叫做scan,表示需要gc scan;第二组没有指针,叫做noscan。提高gc scan性能。mcache初始化没有span,g先从central动态申请,并缓存在cache。
central
type mcentral struct {
lock mutex
spanclass spanClass
nonempty mSpanList // list of spans with a free object, ie a nonempty free list
empty mSpanList // list of spans with no free objects (or cached in an mcache)
// nmalloc is the cumulative count of objects allocated from
// this mcentral, assuming all spans in mcaches are
// fully-allocated. Written atomically, read under STW.
nmalloc uint64
}
lock:多个g并发从central申请span,所以需要lock,保证一致性。
spanclass:每个mcentral管理着一组有相同size class的span列表。
nonompty:还有内存可用的span列表。
empty:没有内存可用的span列表。
nmalloc:累计分配的对象个数。
线程从central获取span的步骤
1.加锁。
2.从nonemptylie列表获取一个可用的span,并将其从链表中删除。
3.将取出的sapn放入empty链表。
4.将span返回给线程。
5.解锁。
6.线程将该span缓存进cache。
线程将span归还步骤
1.加锁
2.将span从empty列表中删除
3.将span加入nonempty列表
4.解锁
heap
central只管理特定的size class span,所以必然有一个人更上层的数据结构,管理所有的sizeclass central,这就是heap。
type mheap struct {
lock mutex
spans []*mspan
// Malloc stats.
largealloc uint64 // bytes allocated for large objects
nlargealloc uint64 // number of large object allocations
largefree uint64 // bytes freed for large objects (>maxsmallsize)
nlargefree uint64 // number of frees for large objects (>maxsmallsize)
// range of addresses we might see in the heap
bitmap uintptr // Points to one byte past the end of the bitmap
bitmap_mapped uintptr
arena_start uintptr
arena_used uintptr // Set with setArenaUsed.
arena_alloc uintptr
arena_end uintptr
arena_reserved bool
central [numSpanClasses]struct {
mcentral mcentral
pad [sys.CacheLineSize - unsafe.Sizeof(mcentral{})%sys.CacheLineSize]byte
}
}
spans:映射span->page
large:大对象>32k
bitmap:gc
arena:arena区相关信息,pages,堆区
central:通过size class管理span,每种size class对应两个centarl.
四.go调度器
pmg(也就是系统线程到goroutine的映射关系,当时一慌居然忘了可恶)
五.闭包