3.数据容器

未匹配的标注

原文链接-map
原文链接-切片数组

3.1 数组与切片

因为切片(slice)比数组更好用,也更安全, Go 推荐使用 slice 而不是数组。下面比较了 slice 和数组的区别,也研究了 slice 的一些特有性质。

3.1.1 数组和切片有何异同

Go 语言中的切片(slice)结构的本质是对数组的封装,它描述一个数组的片段。无论是数组还是切片,都可以通过下标来访问单个元素。

数组是定长的,长度定义好之后,不能再更改。在 Go 语言中,数组是不常见的,因为其长度是类型的一部分,限制了它的表达能力,比如 [3]int[4]int 就是不同的类型。而切片则非常灵活,它可以动态地扩容,且切片的类型和长度无关。例如:

func main() {
    arr1 := [1]int{1}
    arr2 := [2]int{1, 2}
    if arr1 == arr2 {
        fmt.Println("equal type")
    }
}

尝试运行,报编译错误:

./test.go:16:10: invalid operation: arr1 == arr2 (mismatched types [1]int and [2]int)

因为两个数组的长度不同,根本就不是同一类型,因此不能进行比较。 数组是一片连续的内存,切片实际上是一个结构体,包含三个字段: 长度、容量、底层数组。

// src/runtime/slice.go
type slice struct {
    array unsafe.Pointer  // 元素指针
    len int  // 长度
    cap int  // 容量
}

切片的数据结果如图所示:

切片数据结构

注意,底层数组可以被多个切片同时指向,因此对一个切片的元素进行操作有可能影响到其他切片。

3.1.2 切片如何被截取

截取也是一种比较常见的创建 slice 的方法,可以从数组或者 slice 直接截取,需要指定起、止索引位置。

基于已有 slice 创建新 slice 对象,被称为 reslice。新 slice 和老 slice 共用底层数组,新老 slice 对底层数组的更改都会影响到彼此。基于数组创建的新 slice 也是同样的效果: 对数组或 slice 元素做的更改都会影响到彼此。

值得注意的是,新老 slice 或者新 slice 老数组互相影响的前提是两者共用底层数组,如果因为执行 append 操作使得新 slice 或老 slice 底层数组扩容,移动到了新的位置,两者就不会相互影响了。所以,问题的关键在于两者是否会共用底层数组。

截取操作采用如下方式:

data := [...]int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
slice := data[2:4:6] // data[low, high, max]

data 使用 3 个索引值,截取出新的 slice。这里 data 可以是数组或者 slicelow 是最低索引值,这里是闭区间,也就是说第一个元素是 data 位于 low 索引处的元素; 而 highmax 则是开区间,表示最后一个元素只能是索引 high-1 处的元素,而最大容量则只能是索引 max-1 处的元素。

要求: max >= high >= low

high == low 时,新 slice 为空。

还有一点,highmax 必须在老数组或者老 slice 的容量(cap)范围内。

来看一个例子: 运行下面的代码,输出是什么?

package  main

import  "fmt"

func main() {
    slice := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
    s1 := [2:5]
    s2 := s1[2:6:7]
    s2 =  append(s2, 100)
    s2 =  append(s2, 200)
    s1[2] =20
    fmt.Println(s1)
    fmt.Println(s2)
    fmt.Println(slice)
}

运行此程序,得到如下输出:

[2 3 20]
[4 5 6 7 100 200]
[0 1 2 3 20 5 6 7 100 9]

得到这样结果的原因是:

s1slice 索引 2(闭区间)到索引 5(开区间,元素真正取到索引 4),长度为 3,容量默认到数组结尾,为 8。

s2s1 的索引 2(闭区间)到索引 6(开区间,元素真正取到索引 5),容量到索引 7(开区间,真正到索引 6),为 5。slices1s2 的关系如图所示。

slice初始时的状态

注意, slices1s2 三者的元素指向同一个底层数组。接着,向 s2 尾部追加一个元素 100:

s2 =  append(s2, 100)

此时, s2 容量刚好够,直接追加。不过,这会修改原始数组对应位置的元素。这一改动,数组和 s1 都可以看得到,如下图所示:

append 100

再次向 s2 追加元素 200:

s2 =  append(s2, 200)

此时,s2 的容量不够用,需要进行扩容。于是,s2 “另起炉灶”,将原来的元素复制到新的位置,扩大自己的容量。并且为了应对未来可能的 append 带来的再一次扩容,s2 会在此次扩容的时候多留一些 buffer,将新的容量将扩大为原始容量的 2 倍,也就是 10。slices1s2 的关系如图所示:

append 200

注意,s2 此时的底层数组元素和 slices1 已经没有关系了。最后,修改 s1 索引为 2 位置的元素:

s1[2] =  20

这次操作只会影响原始数组相应位置的元素,影响不到 s2 了,它已经“远走高飞”了,如图所示:

`s1[2]=20`

最后执行打印操作,打印 s1 的时候,只会打印出 s1 长度以内的元素。所以,只会打印出 3 个元素,虽然它的底层数组不止 3 个元素。

3.1.3 切片的容量是怎样增长的

一般都是在向切片追加了元素之后,由于容量不足,才会引起扩容。向切片追加元素调用的是 append 函数。append 函数的原型如下:

// src/builtin/builtin.go
func append(slice []Type, elems  ...Type) []Type

Append 函数的参数长度可变,因此可以追加多个值到 slice 中,还可以在切片后面追加“…”符号直接传入 slice,即追加切片里所有的元素。

slice = append(slice, elem1, elem2)
slice = append(slice, anotherSlice...)

Append 函数的返回值是一个新的切片,Go 语言的编译器不允许调用了 append 函数后不使用返回值。所以下面的用法是错的,不能通过编译:

append(slice, elem1, elem2)
append(slice, anotherSlice...)

使用 append 函数可以向 slice 追加元素,实际上是往底层数组相应的位置放置要追加的元素。 但是底层数组的长度是固定的,如果索引 len-1 所指向的元素已经是底层数组的最后一个元素,那就不能再继续放置新的元素了。

这时,slice 会整体迁移到新的位置,并且新底层数组的长度也会增加,使得可以继续放置新增的元素。同时,为了应对未来可能再次发生的 append 操作,新的底层数组的长度,也就是新 slice 的容量需要预留一定的 buffer。否则,每次添加元素的时候,都会发生迁移,成本太高。

slice 预留的 buffer 大小是有一定规律的。注意,下面这些说法是不准确的:

说法1:当原 slice 容量小于 1024 的时候,新 slice 容量变成原来的 2 倍;

说法2:当原 slcie 容量超过 1024,新 slice 容量变成原来的 1.25 倍。

在1.18版本更新后,slice 的扩容策略变为了:当原slice容量(oldcap)小于256的时候,新slice(newcap)容量为原来的2倍;原slice容量超过256,新slice容量newcap = oldcap+(oldcap+3*256)/4

为了说明切片的孔融规律,首先通过下面的程序来验证一下扩容的行为:


package  main
import  "fmt"
func main() {
    s := make([]int, 0)
    oldCap :=  cap(s)
    for i :=  0; i <  2048; i++ {
        s =  append(s, i)
        newCap :=  cap(s)
        if newCap != oldCap {
            fmt.Printf("[%d -> %4d] cap = %-4d | after append %-4d cap = %-4d\n", 0, i-1, oldCap, i, newCap)
            oldCap = newCap
        }    
    }
}

创建了一个空的 slice,然后,在一个循环里不断往里面 append 新的元素。然后记录容量的变化,并且每当容量发生变化的时候,记录下老的容量,以及添加完元素之后的容量,同时记下此时 slice 里的元素。这样,就可以观察,新老 slice 的容量变化情况,从而找出规律。

运行结果(1.18版本之前):

[0 ->  -1] cap = 0 |  after  append  0  cap  =  1
[0 ->  0] cap = 1 |  after  append  1  cap  =  2
[0 ->  1] cap = 2 |  after  append  2  cap  =  4
[0 ->  3] cap = 4 |  after  append  4  cap  =  8
[0 ->  7] cap = 8 |  after  append  8  cap  =  16
[0 ->  15] cap = 16 |  after  append  16  cap  =  32
[0 ->  31] cap = 32 |  after  append  32  cap  =  64
[0 ->  63] cap = 64 |  after  append  64  cap  =  128
[0 ->  127] cap = 128 |  after  append  128  cap  =  256
[0 ->  255] cap = 256 |  after  append  256  cap  =  512
[0 ->  511] cap = 512 |  after  append  512  cap  =  1024
[0 ->  1023] cap = 1024 |  after  append  1024  cap  =  1280
[0 ->  1279] cap = 1280 |  after  append  1280  cap  =  1696
[0 ->  1695] cap = 1696 |  after  append  1696  cap  =  2304

运行结果(1.18版本):

[0 ->  -1] cap = 0 |  after  append  0  cap  =  1
[0 ->  0] cap = 1 |  after  append  1  cap  =  2
[0 ->  1] cap = 2 |  after  append  2  cap  =  4
[0 ->  3] cap = 4 |  after  append  4  cap  =  8
[0 ->  7] cap = 8 |  after  append  8  cap  =  16
[0 ->  15] cap = 16 |  after  append  16  cap  =  32
[0 ->  31] cap = 32 |  after  append  32  cap  =  64
[0 ->  63] cap = 64 |  after  append  64  cap  =  128
[0 ->  127] cap = 128 |  after  append  128  cap  =  256
[0 ->  255] cap = 256 |  after  append  256  cap  =  512
[0 ->  511] cap = 512 |  after  append  512  cap  =  848
[0 ->  847] cap = 848 |  after  append  848  cap  =  1280
[0 ->  1279] cap = 1280 |  after  append  1280  cap  =  1792
[0 ->  1791] cap = 1792 |  after  append  1792  cap  =  2560

根据上面的结果我们可以看出在1.18版本之前:

在原来的slice容量oldcap小于1024的时候,新 slice 的容量newcap的确是oldcap的2倍。

但是,当oldcap大于等于 1024 的时候,情况就有变化了。当向 slice 中添加元素 1280 的时候,原来的slice 的容量为 1280,之后newcap变成了 1696,两者并不是 1.25 倍的关系(1696/1280=1.325)。添加完 1696 后,新的容量 2304 当然也不是 16961.25 倍。

1.18版本之后:

在原来的slice 容量oldcap小于256的时候,新 slice 的容量newcap的确是oldcap 的2倍。

但是,当oldcap容量大于等于 256 的时候,情况就有变化了。当向 slice 中添加元素 512 的时候,老 slice 的容量为 512,之后变成了 848,两者并没有符合newcap = oldcap+(oldcap+3*256)/4 的策略(512+(512+3*256)/4)=832。添加完 848 后,新的容量 1280 当然也不是按照之前策略所计算出的的1252

难道现在网上各种文章中的扩容策略并不是正确的吗。我们直接搬出源码:源码面前,了无秘密。

从前面汇编代码我们也看到了,向 slice 追加元素的时候,若容量不够,会调用 growslice 函数,所以我们直接看它的代码。

golang版本1.9.5

// go 1.9.5 src/runtime/slice.go:82
func growslice(et  *_type, old  slice, cap  int) slice {
    // ……
    newcap := old.cap
    doublecap := newcap + newcap
    if cap > doublecap {
        newcap = cap
    } else {
        if old.len <  1024 {
            newcap = doublecap
        } else {
            for newcap < cap {
                newcap += newcap /  4
            }
        }
    }
    // ……
    capmem =  roundupsize(uintptr(newcap) * ptrSize)
    newcap =  int(capmem / ptrSize)
}

golang版本1.18

// go 1.18 src/runtime/slice.go:178
func growslice(et  *_type, old  slice, cap  int) slice {
    // ……
    newcap := old.cap
    doublecap := newcap + newcap
    if cap > doublecap {
        newcap = cap
    } else {
        const threshold =  256
        if old.cap < threshold {
            newcap = doublecap
        } else {
        for  0  < newcap && newcap < cap {
            // Transition from growing 2x for small slices
            // to growing 1.25x for large slices. This formula
            // gives a smooth-ish transition between the two.
            newcap += (newcap +  3*threshold) /  4
        }
            if newcap <=  0 {
                newcap = cap
            }
        }
    }
    // ……
    capmem =  roundupsize(uintptr(newcap) * ptrSize)
    newcap =  int(capmem / ptrSize)
}

看到了吗?如果只看前半部分,现在网上各种文章里说的 newcap 的规律是对的。现实是,后半部分还对 newcap 作了一个内存对齐,这个和内存分配策略相关。进行内存对齐之后,新 slice 的容量是要 大于等于 按照前半部分生成的newcap

之后,向 Go 内存管理器申请内存,将老 slice 中的数据复制过去,并且将 append 的元素添加到新的底层数组中。

最后,向 growslice 函数调用者返回一个新的 slice,这个 slice 的长度并没有变化,而容量却增大了。

【引申1】

来看一个例子,来源于这里

package  main

import  "fmt"

func  main() {
    s := []int{5}
    s =  append(s, 7)
    s =  append(s, 9)
    x :=  append(s, 11)
    y :=  append(s, 12)
    fmt.Println(s, x, y)
}
代码 切片对应状态
s := []int{5} s 只有一个元素,[5]
s = append(s, 7) s 扩容,容量变为2,[5, 7]
s = append(s, 9) s 扩容,容量变为4,[5, 7, 9]。注意,这时 s 长度是3,只有3个元素
x := append(s, 11) 由于 s 的底层数组仍然有空间,因此并不会扩容。这样,底层数组就变成了 [5, 7, 9, 11]。注意,此时 s = [5, 7, 9],容量为4;x = [5, 7, 9, 11],容量为4。这里 s 不变
y := append(s, 12) 这里还是在 s 元素的尾部追加元素,由于 s 的长度为3,容量为4,所以直接在底层数组索引为3的地方填上12。结果:s = [5, 7, 9],y = [5, 7, 9, 12],x = [5, 7, 9, 12]xy 的长度均为4,容量也均为4

所以最后程序的执行结果是:

[5  7  9] [5  7  9  12] [5  7  9  12]

这里要注意的是,append函数执行完后,返回的是一个全新的 slice,并且对传入的 slice 并不影响。

【引申2】

关于 append,我们最后来看一个例子,来源于 Golang Slice的扩容规则

package  main

import  "fmt"

func  main() {
    s := []int{1,2}
    s =  append(s,4,5,6)
    fmt.Printf("len=%d, cap=%d",len(s),cap(s))
}

运行结果是:

len=5, cap=6

如果按网上各种文章中总结的那样:小于原 slice 长度小于 1024 的时候,容量每次增加 1 倍。添加元素 4 的时候,容量变为4;添加元素 5 的时候不变;添加元素 6 的时候容量增加 1 倍,变成 8。

那上面代码的运行结果就是:

len=5, cap=8

这是错误的!我们来仔细看看,为什么会这样,再次搬出代码:

// go 1.9.5 src/runtime/slice.go:82
func  growslice(et  *_type, old  slice, cap  int) slice {
    // ……
    newcap := old.cap
    doublecap := newcap + newcap
    if cap > doublecap {
        newcap = cap
    } else {
        // ……
    }
    // ……
    capmem =  roundupsize(uintptr(newcap) * ptrSize)
    newcap =  int(capmem / ptrSize)
}

这个函数的参数依次是 元素的类型,老的 slice,新 slice 最小求的容量

例子中 s 原来只有 2 个元素,lencap 都为 2,append 了三个元素后,长度变为 5,容量最小要变成 5,即调用 growslice 函数时,传入的第三个参数应该为 5。即 cap=5。而一方面,doublecap 是原 slice容量的 2 倍,等于 4。满足第一个 if 条件,所以 newcap 变成了 5。

接着调用了 roundupsize 函数,传入 40。(代码中ptrSize是指一个指针的大小,在64位机上是8)

我们再看内存对齐,搬出 roundupsize 函数的代码:


// src/runtime/msize.go:13
func  roundupsize(size  uintptr) uintptr {
    if size < _MaxSmallSize {
        if size <= smallSizeMax-8 {
            return  uintptr(class_to_size[size_to_class8[(size+smallSizeDiv-1)/smallSizeDiv]])
        } else {
            //……
        }
    }
    //……
}
const _MaxSmallSize =  32768
const smallSizeMax =  1024
const smallSizeDiv =  8

很明显,我们最终将返回这个式子的结果:

class_to_size[size_to_class8[(size+smallSizeDiv-1)/smallSizeDiv]]

这是 Go 源码中有关内存分配的两个 sliceclass_to_size通过 spanClass获取 span划分的 object大小。而 size_to_class8 表示通过 size 获取它的 spanClass

var size_to_class8 = [smallSizeMax/smallSizeDiv +  1]uint8{0, 1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 15, 15, 16, 16, 17, 17, 18, 18, 19, 19, 19, 19, 20, 20, 20, 20, 21, 21, 21, 21, 22, 22, 22, 22, 23, 23, 23, 23, 24, 24, 24, 24, 25, 25, 25, 25, 26, 26, 26, 26, 27, 27, 27, 27, 27, 27, 27, 27, 28, 28, 28, 28, 28, 28, 28, 28, 29, 29, 29, 29, 29, 29, 29, 29, 30, 30, 30, 30, 30, 30, 30, 30, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 31, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32}

var class_to_size = [_NumSizeClasses]uint16{0, 8, 16, 24, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 288, 320, 352, 384, 416, 448, 480, 512, 576, 640, 704, 768, 896, 1024, 1152, 1280, 1408, 1536, 1792, 2048, 2304, 2688, 3072, 3200, 3456, 4096, 4864, 5376, 6144, 6528, 6784, 6912, 8192, 9472, 9728, 10240, 10880, 12288, 13568, 14336, 16384, 18432, 19072, 20480, 21760, 24576, 27264, 28672, 32768}

我们传进去的 size 等于 40。所以 (size+smallSizeDiv-1)/smallSizeDiv = 5;获取 size_to_class8 数组中索引为 5 的元素为 5;获取 class_to_size 中索引为 5 的元素为 48

最终,新的 slice 的容量为 6

newcap =  int(capmem / ptrSize) // 6

至于,上面的两个魔法数组的由来,就不展开了。

【引申2】

向一个nil的slice添加元素会发生什么?为什么?

其实 nil slice 或者 empty slice 都是可以通过调用 append 函数来获得底层数组的扩容。最终都是调用 mallocgc 来向 Go 的内存管理器申请到一块内存,然后再赋给原来的nil sliceempty slice,然后摇身一变,成为“真正”的 slice 了。

3.1.4 切片作为函数参数

前面我们说到,slice 其实是一个结构体,包含了三个成员:len, cap, array。分别表示切片长度,容量,底层数据的地址。

slice 作为函数参数时,就是一个普通的结构体。其实很好理解:若直接传 slice,在调用者看来,实参 slice 并不会被函数中的操作改变;若传的是 slice 的指针,在调用者看来,是会被改变原 slice 的。

值得注意的是,不管传的是 slice 还是 slice 指针,如果改变了 slice 底层数组的数据,会反应到实参 slice 的底层数据。为什么能改变底层数组的数据?很好理解:底层数据在 slice 结构体里是一个指针,尽管 slice 结构体自身不会被改变,也就是说底层数据地址不会被改变。 但是通过指向底层数据的指针,可以改变切片的底层数据,没有问题。

通过 slicearray 字段就可以拿到数组的地址。在代码里,是直接通过类似 s[i]=10 这种操作改变 slice 底层数组元素值。

另外,值得注意的是,Go 语言的函数参数传递,只有值传递,没有引用传递。

来看一个代码片段:


package  main

func  main() {
    s := []int{1, 1, 1}
    f(s)
    fmt.Println(s)
}

func  f(s []int) {
    // i只是一个副本,不能改变s中元素的值
    /*for _, i := range s {
        i++
    }
    */
    for i :=  range s {
        s[i] +=  1
    }
}

运行一下,程序输出:

[2  2 2]

果真改变了原始 slice 的底层数据。这里传递的是一个 slice 的副本,在 f 函数中,s 只是 main 函数中 s 的一个拷贝。在f 函数内部,对 s 的作用并不会改变外层 main 函数的 s

要想真的改变外层 slice,只有将返回的新的 slice 赋值到原始 slice,或者向函数传递一个指向 slice 的指针。我们再来看一个例子:

package  main

import  "fmt"

func  myAppend(s []int) []int {
    // 这里 s 虽然改变了,但并不会影响外层函数的 s
    s =  append(s, 100)
    return s
}

func  myAppendPtr(s  *[]int) {
    // 会改变外层 s 本身
    *s =  append(*s, 100)
    return
}

func  main() {
    s := []int{1, 1, 1}
    newS :=  myAppend(s)
    fmt.Println(s)
    fmt.Println(newS)
    s = newS
    myAppendPtr(&s)
    fmt.Println(s)
}

运行结果:

[1  1 1]
[1  1  1 100]
[1  1  1  100 100]

myAppend 函数里,虽然改变了 s,但它只是一个值传递,并不会影响外层的 s,因此第一行打印出来的结果仍然是 [1 1 1]

newS 是一个新的 slice,它是基于 s 得到的。因此它打印的是追加了一个 100 之后的结果: [1 1 1 100]

最后,将 newS 赋值给了 ss 这时才真正变成了一个新的slice。之后,再给 myAppendPtr 函数传入一个 s 指针,这回它真的被改变了:[1 1 1 100 100]

3.1.5 内建函数 makenew 的区别是什么

首先 makenew 均是 Go 语言的内置的用来分配内存的函数。但是使用的类型不同:前者使用于 slicemapchannel等引用类型;后者适用于 int 型、数组、结构体等值类型。

其次,两者的函数形式及调用形式不同,函数形式如下:

func  make(t  Type, size  ...IntegerType) Type
func  new(Type) *Type

前者返回一个值,后者返回一个指针。

使用上, make 返回初始化之后的类型的引用, new 会为类型的新值分配已置零的内存空间,并返回指针。例如:

s :=  make([]int, 0, 10) // 使用 make 创建一个长度为0,容量为10的切片
a =  new(int) // 使用 new 分配一个零值的 int 型
*a =  5

3.2 散列表 map

3.2.1 map 是什么

维基百科里这样定义 map

In computer science, an associative array, map, symbol table, or dictionary is an abstract data type composed of a collection of (key, value) pairs, such that each possible key appears at most once in the collection.

简单说明一下:在计算机科学里,被称为相关数组、map、符号表或者字典,是由一组 <key, value> 对组成的抽象数据结构,,并且同一个 key 只会出现一次。

有两个关键点:map 是由 key-value 对组成的;key 只会出现一次。

map 相关的操作主要是:

  1. 增加一个 k-v 对 —— Add or insert
  2. 删除一个 k-v 对 —— Remove or delete
  3. 修改某个 k 对应的 v —— Reassign
  4. 查询某个 k 对应的 v —— Lookup
    简单说就是最基本的 增删查改

map 的设计也被称为 “The dictionary problem”,它的任务是设计一种数据结构用来维护一个集合的数据,并且可以同时对集合进行增删查改的操作。最主要的数据结构有两种:哈希查找表(Hash table)搜索树(Search tree)

哈希查找表用一个哈希函数将 key 分配到不同的桶(bucket,也就是数组的不同 index)。这样,开销主要在哈希函数的计算以及数组的常数访问时间。在很多场景下,哈希查找表的性能很高。

哈希查找表一般会存在“碰撞”的问题,就是说不同的 key 被哈希到了同一个 bucket。一般有两种应对方法:链表法开放地址法链表法将一个 bucket 实现成一个链表,落在同一个 bucket 中的 key 都会插入这个链表。开放地址法则是碰撞发生后,通过一定的规律,在数组的后面挑选“空位”,用来放置新的 key

搜索树法一般采用自平衡搜索树,包括:AVL 树,红黑树。面试时经常会被问到,甚至被要求手写红黑树代码,很多时候,面试官自己都写不上来,非常过分。

自平衡搜索树法的最差搜索效率是 O(logN),而哈希查找表最差是 O(N)。当然,哈希查找表的平均查找效率是 O(1),如果哈希函数设计的很好,最坏的情况基本不会出现。还有一点,遍历自平衡搜索树,返回的 key 序列,一般会按照从小到大的顺序;而哈希查找表则是乱序的。

前面说了 map 实现的几种方案,Go 语言采用的是哈希查找表,并且使用链表解决哈希冲突。

接下来我们要探索 map 的核心原理,一窥它的内部结构。

map 内存模型

在源码中,表示 map 的结构体是hmap,它是 hashmap 的“缩写”:


// A header for a Go map.
type  hmap  struct {
    // 元素个数,调用 len(map) 时,直接返回此值
    count int
    // 状态标识符,比如正在被写,buckets和oldbuckets正在被遍历或扩容
    flags uint8
    // buckets 的对数 log_2
    B uint8
    // overflow 的 bucket 近似数
    noverflow uint16
    // 计算 key 的哈希的时候会传入哈希函数
    hash0 uint32
    // 指向 buckets 数组,大小为 2^B
    // 如果元素个数为0,就为 nil
    buckets unsafe.Pointer
    // 等量扩容的时候,buckets 长度和 oldbuckets 相等
    // 双倍扩容的时候,buckets 长度会是 oldbuckets 的两倍
    oldbuckets unsafe.Pointer
    // 指示扩容进度,小于此地址的 buckets 迁移完成
    nevacuate uintptr
    // 用于gc,指向所有的溢出桶,正常桶里面某个bmap存满了,会使用这里面的内存空间存放键值对
    extra *mapextra  // optional fields
}

说明一下,Bbuckets 数组的长度的对数,也就是说 buckets 数组的长度就是 2^Bbucket 里面存储了 keyvalue,后面会再讲。

buckets 是一个指针,最终它指向的是一个结构体:

type  bmap  struct {
    tophash [bucketCnt]uint8
}

但这只是表面(src/runtime/hashmap.go)的结构,编译期间会给它加料,动态地创建一个新的结构:


type  bmap  struct {
    // 存放key哈希值的高8位,用于决定kv键值对放在桶内的哪个位置
    topbits [8]uint8
    // 以下属性,编译时动态生成,在源码中不存在
    keys [8]keytype  // 存放key的数组
    values [8]valuetype  // 存放value的数组
    pad uintptr  // 用于对齐内存
    overflow uintptr  // 指向下一个桶,即溢出桶,拉链法
}

bmap 就是我们常说的“桶”,桶里面会最多装 8 个 key,这些 key 之所以会落入同一个桶,是因为它们经过哈希计算后,哈希结果是“一类”的。在桶内,又会根据 key 计算出来的 hash 值的高 8 位来决定 key 到底落入桶内的哪个位置(一个桶内最多有8个位置)。

来一个整体的图:

map 结构图

  • map的内存模型中,其实总共就三种结构,hmap,bmap,mapextra

  • hmap表示整个map,bmap表示hmap中的一个桶,map底层其实是由很多个桶组成的

  • 当一个桶存满之后,指向的下一个桶,就叫做溢出桶,溢出桶就是拉链法的具体表现

  • mapextra表示所有的溢出桶,之所以还要重新的指向,目的是为了用于gc,避免gc时扫描整个map,仅扫描所有溢出桶就足够了

  • 桶结构的很多字段得在编译时才会动态生成,比如key和values等

  • 桶结构中,之所以所有的key放一起,所有的value放一起,而不是key/value一对对的一起存放,目的便是在某些情况下可以省去pad字段,节省内存空间

  • golang中的map使用的内存是不会收缩的,只会越用越多。

mapkeyvalue 都不是指针,并且 size 都小于 128 字节的情况下,会把 bmap 标记为不含指针,这样可以避免 gc 时扫描整个 hmap。但是,我们看 bmap 其实有一个 overflow 的字段,是指针类型的,破坏了 bmap 不含指针的设想,这时会把 overflow 移动到 extra 字段来。

type  mapextra  struct {
    // overflow[0] contains overflow buckets for hmap.buckets.
    // overflow[1] contains overflow buckets for hmap.oldbuckets.
    overflow [2]*[]*bmap
    // nextOverflow 包含空闲的 overflow bucket,这是预分配的 bucket
    nextOverflow *bmap
}

bmap 是存放 k-v 的地方,我们把视角拉近,仔细看 bmap 的内部组成。

bmap 结构

上图就是 bucket 的内存模型,HOB Hash 指的就是 top hash。 注意到 keyvalue 是各自放在一起的,并不是 key/value/key/value/... 这样的形式。源码里说明这样的好处是在某些情况下可以省略掉 padding 字段,节省内存空间。

例如,有这样一个类型的 map

map[int64]int8

如果按照 key/value/key/value/... 这样的模式存储,那在每一个 key/value 对之后都要额外 padding 7 个字节;而将所有的 keyvalue 分别绑定到一起,这种形式 key/key/.../value/value/...,则只需要在最后添加 padding

每个 bucket 设计成最多只能放 8key-value 对,如果有第 9 个 key-value 落入当前的 bucket,那就需要再构建一个 bucket ,通过 overflow 指针连接起来。

创建 map

从语法层面上来说,创建 map 很简单:

ageMp :=  make(map[string]int)
// 指定 map 长度
ageMp :=  make(map[string]int, 8)
// ageMp 为 nil,不能向其添加元素,会直接panic
var ageMp map[string]int

通过汇编语言可以看到,实际上底层调用的是 makemap 函数,主要做的工作就是初始化 hmap 结构体的各种字段,例如计算 B 的大小,设置哈希种子 hash0 等等。


func  makemap(t  *maptype, hint  int64, h  *hmap, bucket  unsafe.Pointer) *hmap {

    // 省略各种条件检查...
    // 找到一个 B,使得 map 的装载因子在正常范围内
    B :=  uint8(0)
    for ; overLoadFactor(hint, B); B++ {
    }
    // 初始化 hash table
    // 如果 B 等于 0,那么 buckets 就会在赋值的时候再分配
    // 如果长度比较大,分配内存会花费长一点
    buckets := bucket
    var extra *mapextra
    if B !=  0 {
        var nextOverflow *bmap
        buckets, nextOverflow =  makeBucketArray(t, B)
        if nextOverflow !=  nil {
            extra =  new(mapextra)
            extra.nextOverflow = nextOverflow
        }
    }
    // 初始化 hamp
    if h ==  nil {
        h = (*hmap)(newobject(t.hmap))
    }
    h.count =  0
    h.B = B
    h.extra = extra
    h.flags =  0
    h.hash0 =  fastrand()
    h.buckets = buckets
    h.oldbuckets =  nil
    h.nevacuate =  0
    h.noverflow =  0
    return h
}

【引申一】slice 和 map 分别作为参数时有什么区别?

注意,这个函数返回的结果:*hmap,它是一个指针,而我们之前讲过的 makeslice 函数返回的是 Slice 结构体:

func  makeslice(et  *_type, len, cap  int) slice

回顾一下 slice 的结构体定义:


// runtime/slice.go
type  slice  struct {
    array unsafe.Pointer  // 元素指针
    len int  // 长度
    cap int  // 容量
}

结构体内部包含底层的数据指针。

makemapmakeslice 的区别,带来一个不同点:当 mapslice 作为函数参数时,在函数参数内部对 map 的操作会影响 map 自身;而对 slice 却不会(之前讲 slice 的文章里有讲过)。

主要原因:一个是指针(*hmap),一个是结构体(slice)。Go 语言中的函数传参都是值传递,在函数内部,参数会被 copy 到本地。*hmap指针 copy 完之后,仍然指向同一个 map,因此函数内部对 map 的操作会影响实参。而 slicecopy 后,会成为一个新的 slice,对它进行的操作不会影响到实参。

哈希函数

map 的一个关键点在于,哈希函数的选择。在程序启动时,会检测 cpu 是否支持 aes,如果支持,则使用 aes hash,否则使用 memhash。这是在函数 alginit() 中完成,位于路径:src/runtime/alg.go 下。

hash 函数,有加密型和非加密型。

加密型的一般用于加密数据、数字摘要等,典型代表就是 md5sha1sha256aes256 这种;

非加密型的一般就是查找。在 map 的应用场景中,用的是查找。

选择 hash 函数主要考察的是两点:性能、碰撞概率。

Key 定位过程

key 经过哈希计算后得到哈希值,共 64 个 bit 位(64位机,32位机就不讨论了,现在主流都是64位机),计算它到底要落在哪个桶时,只会用到最后 Bbit 位。还记得前面提到过的 B 吗?如果 B = 5,那么桶的数量,也就是 buckets 数组的长度是 2^5 = 32

例如,现在有一个 key 经过哈希函数计算后,得到的哈希结果是:

10010111 | 00001111011011001000111100101010001001011001010101001010

用最后的 5bit 位,也就是 01010,值为 10,也就是 10 号桶。这个操作实际上就是取余操作,但是取余开销太大,所以代码实现上用的位操作代替。

再用哈希值的高 8 位,找到此 keybucket 中的位置,这是在寻找已有的 key。最开始桶内还没有 key,新加入的 key 会找到第一个空位,放入。

buckets 编号就是桶编号,当两个不同的 key 落在同一个桶中,也就是发生了哈希冲突。冲突的解决手段是用链表法:在 bucket 中,从前往后找到第一个空位。这样,在查找某个 key 时,先找到对应的桶,再去遍历 bucket 中的 key

如下图:

map key 查找过程

上图中,假定 B = 5,所以 bucket 总数就是 2^5 = 32。首先计算出待查找 key 的哈希,使用低 500110,找到对应的 6bucket,使用高 810010111,对应十进制 151,在 6bucket 中寻找 tophash 值(HOB hash)为 151key,找到了 2 号槽位,这样整个查找过程就结束了。

如果在 bucket 中没找到,并且 overflow 不为空,还要继续去 overflow bucket 中寻找,直到找到或是所有的 key 槽位都找遍了,包括所有的 overflow bucket

我们来看下源码吧,哈哈!通过汇编语言可以看到,查找某个 key 的底层函数是 mapaccess 系列函数,函数的作用类似,区别在下一节会讲到。这里我们直接看 mapaccess1 函数:


func  mapaccess1(t  *maptype, h  *hmap, key  unsafe.Pointer) unsafe.Pointer {
    // ……
    // 如果 h 什么都没有,返回零值
    if h ==  nil  || h.count ==  0 {
        return unsafe.Pointer(&zeroVal[0])
    }

    // 写和读冲突
    if h.flags&hashWriting !=  0 {
        throw("concurrent map read and map write")
    }
    // 不同类型 key 使用的 hash 算法在编译期确定
    alg := t.key.alg
    // 计算哈希值,并且加入 hash0 引入随机性
    hash := alg.hash(key, uintptr(h.hash0))

    // 比如 B=5,那 m 就是31,二进制是全 1
    // 求 bucket num 时,将 hash 与 m 相与,
    // 达到 bucket num 由 hash 的低 8 位决定的效果
    m :=  uintptr(1)<<h.B -  1

    // b 就是 bucket 的地址
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))

    // oldbuckets 不为 nil,说明发生了扩容
    if c := h.oldbuckets; c !=  nil {
        // 如果不是同 size 扩容(看后面扩容的内容)
        // 对应条件 1 的解决方案
        if  !h.sameSizeGrow() {
        // 新 bucket 数量是老的 2 倍
            m >>=  1
        }

        // 求出 key 在老的 map 中的 bucket 位置
        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        // 如果 oldb 没有搬迁到新的 bucket
        // 那就在老的 bucket 中寻找
        if  !evacuated(oldb) {
            b = oldb
        }
    }

    // 计算出高 8 位的 hash
    // 相当于右移 56 位,只取高8位
    top :=  uint8(hash >> (sys.PtrSize*8  -  8))
    // 增加一个 minTopHash
    if top < minTopHash {
        top += minTopHash
    }
    for {
        // 遍历 bucket 的 8 个位置
        for i :=  uintptr(0); i < bucketCnt; i++ {
            // tophash 不匹配,继续
            if b.tophash[i] != top {
                continue
            }
            // tophash 匹配,定位到 key 的位置
            k :=  add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // key 是指针
            if t.indirectkey {
                // 解引用
                k =  *((*unsafe.Pointer)(k))
            }
            // 如果 key 相等
            if alg.equal(key, k) {
            // 定位到 value 的位置
                v :=  add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
            // value 解引用
            if t.indirectvalue {
                v =  *((*unsafe.Pointer)(v))
            }
            return v
            }
        }
        // bucket 找完(还没找到),继续到 overflow bucket 里找
        b = b.overflow(t)
        // overflow bucket 也找完了,说明没有目标 key
        // 返回零值
        if b ==  nil {
            return unsafe.Pointer(&zeroVal[0])
        }
    }
}

函数返回 h[key] 的指针,如果 h 中没有此 key,那就会返回一个 key 相应类型的零值,不会返回 nil

代码整体比较直接,没什么难懂的地方。跟着上面的注释一步步理解就好了。

这里,说一下定位 keyvalue 的方法以及整个循环的写法。

// key 定位公式
k :=  add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// value 定位公式
v :=  add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))

bbmap 的地址,这里 bmap 还是源码里定义的结构体,只包含一个 tophash 数组,经编译器扩充之后的结构体才包含 keyvalueoverflow 这些字段。dataOffsetkey 相对于 bmap 起始地址的偏移:

dataOffset = unsafe.Offsetof(struct {
            b bmap
            v int64
        }{}.v)

因此 bucketkey 的起始地址就是 unsafe.Pointer(b)+dataOffset。第 ikey 的地址就要在此基础上跨过 ikey 的大小;而我们又知道,value 的地址是在所有 key 之后,因此第 ivalue 的地址还需要加上所有 key 的偏移。理解了这些,上面 keyvalue 的定位公式就很好理解了。

再说整个大循环的写法,最外层是一个无限循环,通过

b = b.overflow(t)

遍历所有的 bucket,这相当于是一个 bucket 链表。

当定位到一个具体的 bucket 时,里层循环就是遍历这个 bucket 里所有的 cell,或者说所有的槽位,也就是 bucketCnt=8 个槽位。整个循环过程:

循环定位key

再说一下 minTopHash,当一个 celltophash 值小于 minTopHash 时,标志这个 cell 的迁移状态。因为这个状态值是放在 tophash 数组里,为了和正常的哈希值区分开,会给 key 计算出来的哈希值一个增量:minTopHash。这样就能区分正常的 top hash 值和表示状态的哈希值。

下面的这几种状态就表征了 bucket 的情况:

// 空的 cell,也是初始时 bucket 的状态
empty =  0
// 空的 cell,表示 cell 已经被迁移到新的 bucket
evacuatedEmpty =  1
// key,value 已经搬迁完毕,但是 key 都在新 bucket 前半部分,
// 后面扩容部分会再讲到。
evacuatedX =  2
// 同上,key 在后半部分
evacuatedY =  3
// tophash 的最小正常值
minTopHash =  4

源码里判断这个 bucket 是否已经搬迁完毕,用到的函数:

func  evacuated(b  *bmap) bool {
    h := b.tophash[0]
    return h > empty && h < minTopHash
}

只取了 tophash 数组的第一个值,判断它是否在 0-4 之间。对比上面的常量,当 top hashevacuatedEmptyevacuatedXevacuatedY 这三个值之一,说明此 bucket 中的 key 全部被搬迁到了新 bucket

3.2.2 如何实现两种 get 操作

Go 语言中读取 map 有两种语法:带 comma 和 不带 comma。当要查询的 key 不在 map 里,带 comma 的用法会返回一个 bool 型变量提示 key 是否在 map 中;而不带 comma 的语句则会返回一个 key 对应 value 类型的零值。如果 valueint 型就会返回 0,如果 valuestring 类型,就会返回空字符串。


package  main

import  "fmt"

func  main() {

ageMap :=  make(map[string]int)

ageMap["qcrao"] =  18

// 不带 comma 用法

age1 := ageMap["stefno"]

fmt.Println(age1)

// 带 comma 用法

age2, ok := ageMap["stefno"]

fmt.Println(age2, ok)

}

运行结果:


0

0  false

以前一直觉得好神奇,怎么实现的?这其实是编译器在背后做的工作:分析代码后,将两种语法对应到底层两个不同的函数。


// src/runtime/hashmap.go

func  mapaccess1(t  *maptype, h  *hmap, key  unsafe.Pointer) unsafe.Pointer

func  mapaccess2(t  *maptype, h  *hmap, key  unsafe.Pointer) (unsafe.Pointer, bool)

源码里,函数命名不拘小节,直接带上后缀 1,2,完全不理会《代码大全》里的那一套命名的做法。从上面两个函数的声明也可以看出差别了,mapaccess2 函数返回值多了一个 bool 型变量,两者的代码也是完全一样的,只是在返回值后面多加了一个 false 或者 true

另外,根据 key 的不同类型,编译器还会将查找、插入、删除的函数用更具体的函数替换,以优化效率:

key 类型 查找
uint32 mapaccess1_fast32(t *maptype, h *hmap, key uint32) unsafe.Pointer
uint32 mapaccess2_fast32(t *maptype, h *hmap, key uint32) (unsafe.Pointer, bool)
uint64 mapaccess1_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer
uint64 mapaccess2_fast64(t *maptype, h *hmap, key uint64) (unsafe.Pointer, bool)
string mapaccess1_faststr(t *maptype, h *hmap, ky string) unsafe.Pointer
string mapaccess2_faststr(t *maptype, h *hmap, ky string) (unsafe.Pointer, bool)

这些函数的参数类型直接是具体的 uint32unt64string,在函数内部由于提前知晓了 key 的类型,所以内存布局是很清楚的,因此能节省很多操作,提高效率。

上面这些函数都是在文件 src/runtime/hashmap_fast.go 里。

3.2.3 遍历过程

本来 map 的遍历过程比较简单:遍历所有的 bucket 以及它后面挂的 overflow bucket,然后挨个遍历 bucket 中的所有 cell。每个 bucket 中包含 8 个 cell,从有 keycell 中取出 keyvalue,这个过程就完成了。

但是,现实并没有这么简单。还记得前面讲过的扩容过程吗?扩容过程不是一个原子的操作,它每次最多只搬运 2 个 bucket,所以如果触发了扩容操作,那么在很长时间里,map 的状态都是处于一个中间态:有些 bucket 已经搬迁到新家,而有些 bucket 还待在老地方。

因此,遍历如果发生在扩容的过程中,就会涉及到遍历新老 bucket 的过程,这是难点所在。

我先写一个简单的代码样例,假装不知道遍历过程具体调用的是什么函数:

package  main

import  "fmt"

func main() {
    ageMp :=  make(map[string]int)
    ageMp["qcrao"] =  18
    for name, age :=  range ageMp {
        fmt.Println(name, age)
    }
}

执行命令:

go  tool  compile  -S  main.go

得到汇编命令。这里就不逐行讲解了,可以去看之前的几篇文章,说得很详细。

关键的几行汇编代码如下:

//  ......
0x0124  00292 (test16.go:9) CALL runtime.mapiterinit(SB)

//  ......
0x01fb  00507 (test16.go:9) CALL runtime.mapiternext(SB)
0x0200  00512 (test16.go:9) MOVQ ""..autotmp_4+160(SP), AX
0x0208  00520 (test16.go:9) TESTQ AX, AX
0x020b  00523 (test16.go:9) JNE 302
//  ......

这样,关于 map 迭代,底层的函数调用关系一目了然。先是调用 mapiterinit 函数初始化迭代器,然后循环调用 mapiternext 函数进行 map 迭代。

map 遍历

迭代器的结构体定义:

type hiter  struct {
    // key 指针
    key unsafe.Pointer
    // value 指针
    value unsafe.Pointer
    // map 类型,包含如 key size 大小等
    t *maptype
    // map header
    h *hmap
    // 初始化时指向的 bucket
    buckets unsafe.Pointer
    // 当前遍历到的 bmap
    bptr *bmap
    overflow [2]*[]*bmap
    // 起始遍历的 bucket 编号
    startBucket uintptr
    // 遍历开始时 cell 的编号(每个 bucket 中有 8 个 cell)
    offset uint8
    // 是否从头遍历了
    wrapped bool
    // B 的大小
    B uint8
    // 指示当前 cell 序号
    i uint8
    // 指向当前的 bucket
    bucket uintptr
    // 因为扩容,需要检查的 bucket
    checkBucket uintptr
}

mapiterinit 就是对 hiter 结构体里的字段进行初始化赋值操作。

前面已经提到过,即使是对一个写死的 map 进行遍历,每次出来的结果也是无序的。下面我们就可以近距离地观察他们的实现了。

// 生成随机数 r
r :=  uintptr(fastrand())
if h.B >  31-bucketCntBits {
    r +=  uintptr(fastrand()) <<  31
}
// 从哪个 bucket 开始遍历
it.startBucket = r & (uintptr(1)<<h.B -  1)
// 从 bucket 的哪个 cell 开始遍历
it.offset =  uint8(r >> h.B & (bucketCnt -  1))

例如,B = 2,那 uintptr(1)<<h.B - 1 结果就是 3,低 8 位为 0000 0011,将 r 与之相与,就可以得到一个 03 的 bucket 序号;bucketCnt - 1 等于 7,低 8 位为 0000 0111,将 r 右移 2 位后,与 7 相与,就可以得到一个 `07号的cell`。

于是,在 mapiternext 函数中就会从 it.startBucketit.offset 号的 cell 开始遍历,取出其中的 keyvalue,直到又回到起点 bucket,完成遍历过程。

源码部分比较好看懂,尤其是理解了前面注释的几段代码后,再看这部分代码就没什么压力了。所以,接下来,我将通过图形化的方式讲解整个遍历过程,希望能够清晰易懂。

假设我们有下图所示的一个 map,起始时 B = 1,有两个 bucket,后来触发了扩容(这里不要深究扩容条件,只是一个设定),B 变成 2。并且, 1 号 bucket 中的内容搬迁到了新的 bucket,1 号裂变成 1 号和 3 号;0 号 bucket 暂未搬迁。老的 bucket 挂在在 *oldbuckets 指针上面,新的 bucket 则挂在 *buckets 指针上面。

map 扩容

这时,我们对此 map 进行遍历。假设经过初始化后,startBucket = 3offset = 2。于是,遍历的起点将是 3 号 bucket 的 2 号 cell,下面这张图就是开始遍历时的状态:

map 遍历过程

标红的表示起始位置,bucket 遍历顺序为:3 -> 0 -> 1 -> 2

因为 3 号 bucket 对应老的 1 号 bucket,因此先检查老 1 号 bucket 是否已经被搬迁过。判断方法就是:

func evacuated(b  *bmap) bool {
    h := b.tophash[0]
    return h > empty && h < minTopHash
}

如果 b.tophash[0] 的值在标志值范围内,即在 (0,4) 区间里,说明已经被搬迁过了。

empty =  0
evacuatedEmpty =  1
evacuatedX =  2
evacuatedY =  3
minTopHash =  4

在本例中,老 1 号 bucket 已经被搬迁过了。所以它的 tophash[0]值在 (0,4) 范围内,因此只用遍历新的 3 号 bucket

依次遍历 3 号 bucketcell,这时候会找到第一个非空的 key:元素 e。到这里,mapiternext 函数返回,这时我们的遍历结果仅有一个元素:

元素e

由于返回的 key 不为空,所以会继续调用 mapiternext 函数。

继续从上次遍历到的地方往后遍历,从新 3 号 overflow bucket 中找到了元素 f 和 元素 g

遍历结果集也因此壮大:

map遍历

新 3 号 bucket 遍历完之后,回到了新 0 号 bucket。0 号 bucket 对应老的 0 号 bucket,经检查,老 0 号 bucket 并未搬迁,因此对新 0 号 bucket 的遍历就改为遍历老 0 号 bucket。那是不是把老 0 号 bucket 中的所有 key 都取出来呢?

并没有这么简单,回忆一下,老 0 号 bucket 在搬迁后将裂变成 2 个 bucket:新 0 号、新 2 号。而我们此时正在遍历的只是新 0 号 bucket(注意,遍历都是遍历的 *bucket 指针,也就是所谓的新 buckets)。所以,我们只会取出老 0 号 bucket 中那些在裂变之后,分配到新 0 号 bucket 中的那些 key

因此,lowbits == 00 的将进入遍历结果集:

map 遍历

和之前的流程一样,继续遍历新 1 号 bucket,发现老 1 号 bucket 已经搬迁,只用遍历新 1 号 bucket 中现有的元素就可以了。结果集变成:

map遍历

继续遍历新 2 号 bucket,它来自老 0 号 bucket,因此需要在老 0 号 bucket 中那些会裂变到新 2 号 bucket 中的 key,也就是 lowbit == 10 的那些 key

这样,遍历结果集变成:

map遍历

最后,继续遍历到新 3 号 bucket 时,发现所有的 bucket 都已经遍历完毕,整个迭代过程执行完毕。

顺便说一下,如果碰到 keymath.NaN() 这种的,处理方式类似。核心还是要看它被分裂后具体落入哪个 bucket。只不过只用看它 top hash 的最低位。如果 top hash 的最低位是 0 ,分配到 X part;如果是 1 ,则分配到 Y part。据此决定是否取出 key,放到遍历结果集里。

map 遍历的核心在于理解 2 倍扩容时,老 bucket 会分裂到 2 个新 bucket 中去。而遍历操作,会按照新 bucket 的序号顺序进行,碰到老 bucket 未搬迁的情况时,要在老 bucket中找到将来要搬迁到新 bucket 来的 key

3.2.4 赋值过程

通过汇编语言可以看到,向 map 中插入或者修改 key,最终调用的是 mapassign 函数。

实际上插入或修改 key 的语法是一样的,只不过前者操作的 keymap 中不存在,而后者操作的 key 存在 map 中。

mapassign 有一个系列的函数,根据 key 类型的不同,编译器会将其优化为相应的“快速函数”。

key 类型 插入
uint32 mapassign_fast32(t *maptype, h *hmap, key uint32) unsafe.Pointer
uint64 mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer
string mapassign_faststr(t *maptype, h *hmap, ky string) unsafe.Pointer

我们只用研究最一般的赋值函数 mapassign

整体来看,流程非常得简单:对 key 计算 hash 值,根据 hash 值按照之前的流程,找到要赋值的位置(可能是插入新 key,也可能是更新老 key),对相应位置进行赋值。

源码大体和之前讲的类似,核心还是一个双层循环,外层遍历 bucket 和它的 overflow bucket,内层遍历整个 bucket 的各个 cell。限于篇幅,这部分代码的注释我也不展示了,有兴趣的可以去看,保证理解了这篇文章内容后,能够看懂。

我这里会针对这个过程提几点重要的。

函数首先会检查 map 的标志位 flags。如果 flags 的写标志位此时被置 1 了,说明有其他协程在执行“写”操作,进而导致程序 panic。这也说明了 map 对协程是不安全的。

通过前文我们知道扩容是渐进式的,如果 map 处在扩容的过程中,那么当 key 定位到了某个 bucket 后,需要确保这个 bucket 对应的老 bucket 完成了迁移过程。即老 bucket 里的 key 都要迁移到新的 bucket 中来(分裂到 2 个新 bucket),才能在新的 bucket 中进行插入或者更新的操作。

上面说的操作是在函数靠前的位置进行的,只有进行完了这个搬迁操作后,我们才能放心地在新 bucket 里定位 key 要安置的地址,再进行之后的操作。

现在到了定位 key 应该放置的位置了,所谓找准自己的位置很重要。准备两个指针,一个(inserti)指向 keyhash 值在 tophash 数组所处的位置,另一个(insertk)指向 cell 的位置(也就是 key 最终放置的地址),当然,对应 value 的位置就很容易定位出来了。这三者实际上都是关联的,在 tophash 数组中的索引位置决定了 key 在整个 bucket 中的位置(共 8 个 key),而 value 的位置需要“跨过” 8 个 key 的长度。

在循环的过程中,insertiinsertk 分别指向第一个找到的空闲的 cell。如果之后在 map 没有找到 key 的存在,也就是说原来 map 中没有此 key,这意味着插入新 key。那最终 key 的安置地址就是第一次发现的“空位”(tophashempty)。

如果这个 bucket 的 8 个 key 都已经放置满了,那在跳出循环后,发现 insertiinsertk 都是空,这时候需要在 bucket 后面挂上 overflow bucket。当然,也有可能是在 overflow bucket 后面再挂上一个 overflow bucket。这就说明,太多 key hash 到了此 bucket

在正式安置 key 之前,还要检查 map 的状态,看它是否需要进行扩容。如果满足扩容的条件,就主动触发一次扩容操作。

这之后,整个之前的查找定位 key 的过程,还得再重新走一次。因为扩容之后,key 的分布都发生了变化。

最后,会更新 map 相关的值,如果是插入新 keymap 的元素数量字段 count 值会加 1;在函数之初设置的 hashWriting 写标志出会清零。

另外,有一个重要的点要说一下。前面说的找到 key 的位置,进行赋值操作,实际上并不准确。我们看 mapassign 函数的原型就知道,函数并没有传入 value 值,所以赋值操作是什么时候执行的呢?

func  mapassign(t  *maptype, h  *hmap, key  unsafe.Pointer) unsafe.Pointer

答案还得从汇编语言中寻找。我直接揭晓答案,有兴趣可以私下去研究一下。mapassign 函数返回的指针就是指向的 key 所对应的 value 值位置,有了地址,就很好操作赋值了。

3.2.5 删除过程

写操作底层的执行函数是 mapdelete

func  mapdelete(t  *maptype, h  *hmap, key  unsafe.Pointer)

根据 key 类型的不同,删除操作会被优化成更具体的函数:

key 类型 删除
uint32 mapdelete_fast32(t *maptype, h *hmap, key uint32)
uint64 mapdelete_fast64(t *maptype, h *hmap, key uint64)
string mapdelete_faststr(t *maptype, h *hmap, ky string)

当然,我们只关心 mapdelete 函数。它首先会检查 h.flags 标志,如果发现写标位是 1,直接 panic,因为这表明有其他协程同时在进行写操作。

计算 key 的哈希,找到落入的 bucket。检查此 map 如果正在扩容的过程中,直接触发一次搬迁操作。

删除操作同样是两层循环,核心还是找到 key 的具体位置。寻找过程都是类似的,在 bucket 中挨个 cell 寻找。

找到对应位置后,对 key 或者 value 进行“清零”操作:

// 对 key 清零
if t.indirectkey {
    *(*unsafe.Pointer)(k) =  nil
} else {
    typedmemclr(t.key, k)
}

// 对 value 清零
if t.indirectvalue {
    *(*unsafe.Pointer)(v) =  nil
} else {
    typedmemclr(t.elem, v)
}

最后,将 count 值减 1,将对应位置的 tophash 值置成 Empty

这块源码同样比较简单,感兴起直接去看代码。

3.2.6 扩容过程

使用哈希表的目的就是要快速查找到目标 key,然而,随着向 map 中添加的 key 越来越多,key 发生碰撞的概率也越来越大。bucket 中的 8 个 cell 会被逐渐塞满,查找、插入、删除 key 的效率也会越来越低。最理想的情况是一个 bucket 只装一个 key,这样,就能达到 O(1) 的效率,但这样空间消耗太大,用空间换时间的代价太高。

Go 语言采用一个 bucket 里装载 8 个 key,定位到某个 bucket 后,还需要再定位到具体的 key,这实际上又用了时间换空间。

当然,这样做,要有一个度,不然所有的 key 都落在了同一个 bucket 里,直接退化成了链表,各种操作的效率直接降为 O(n),是不行的。

因此,需要有一个指标来衡量前面描述的情况,这就是装载因子Go 源码里这样定义 装载因子

loadFactor := count / (2^B) // 元素个数与桶数的比值

count 就是 map 的元素个数,2^B 表示 bucket 数量。

再来说触发 map 扩容的时机:在向 map 插入新 key 的时候,会进行条件检测,符合下面这 2 个条件,就会触发扩容:

  1. 装载因子超过阈值,源码里定义的阈值是 6.5。

  2. overflowbucket 数量过多:当 B 小于 15,也就是 bucket 总数 2^B 小于 2^15 时,如果 overflowbucket 数量超过 2^B;当 B >= 15,也就是 bucket 总数 2^B 大于等于 2^15,如果 overflowbucket 数量超过 2^15

通过汇编语言可以找到赋值操作对应源码中的函数是 mapassign,对应扩容条件的源码如下:

// src/runtime/hashmap.go/mapassign
// 触发扩容时机
if  !h.growing() && (overLoadFactor(int64(h.count), h.B) ||  tooManyOverflowBuckets(h.noverflow, h.B)) {
    hashGrow(t, h)
}

// 装载因子超过 6.5
func  overLoadFactor(count  int64, B  uint8) bool {
    return count >= bucketCnt &&  float32(count) >= loadFactor*float32((uint64(1)<<B))
}
// overflow buckets 太多
func  tooManyOverflowBuckets(noverflow  uint16, B  uint8) bool {
    if B <  16 {
        return noverflow >=  uint16(1)<<B
    }
    return noverflow >=  1<<15
}

解释一下:

第 1 点:我们知道,每个 bucket 有 8 个空位,在没有溢出,且所有的桶都装满了的情况下,装载因子算出来的结果是 8。因此当装载因子超过 6.5 时,表明很多 bucket 都快要装满了,查找效率和插入效率都变低了。在这个时候进行扩容是有必要的。

第 2 点:是对第 1 点的补充。就是说在装载因子比较小的情况下,这时候 map 的查找和插入效率也很低,而第 1 点识别不出来这种情况。表面现象就是计算装载因子的分子比较小,即 map 里元素总数少,但是 bucket 数量多(真实分配的 bucket 数量多,包括大量的 overflow bucket)。

不难想像造成这种情况的原因:不停地插入、删除元素。先插入很多元素,导致创建了很多 bucket,但是装载因子达不到第 1 点的临界值,未触发扩容来缓解这种情况。之后,删除元素降低元素总数量,再插入很多元素,导致创建很多的 overflow bucket,但就是不会触犯第 1 点的规定,你能拿我怎么办?overflow bucket 数量太多,导致 key 会很分散,查找插入效率低得吓人,因此出台第 2 点规定。这就像是一座空城,房子很多,但是住户很少,都分散了,找起人来很困难。

对于命中条件 1,2 的限制,都会发生扩容。但是扩容的策略并不相同,毕竟两种条件应对的场景不同。

对于条件 1,元素太多,而 bucket 数量太少,很简单:将 B 加 1,bucket 最大数量(2^B)直接变成原来 bucket 数量的 2 倍。于是,就有新老 bucket 了。注意,这时候元素都在老 bucket 里,还没迁移到新的 bucket 来。而且,新 bucket 只是最大数量变为原来最大数量(2^B)的 2 倍(2^B * 2)。

对于条件 2,其实元素没那么多,但是 overflow bucket 数特别多,说明很多 bucket 都没装满。解决办法就是开辟一个新 bucket 空间,将老 bucket 中的元素移动到新 bucket,使得同一个 bucket 中的 key 排列地更紧密。这样,原来,在 overflow bucket 中的 key 可以移动到 bucket 中来。结果是节省空间,提高 bucket 利用率,map 的查找和插入效率自然就会提升。

对于条件 2 的解决方案,曹大的博客里还提出了一个极端的情况:如果插入 mapkey 哈希都一样,就会落到同一个 bucket 里,超过 8 个就会产生 overflow bucket,结果也会造成 overflow bucket 数过多。移动元素其实解决不了问题,因为这时整个哈希表已经退化成了一个链表,操作效率变成了 O(n)

再来看一下扩容具体是怎么做的。由于 map 扩容需要将原有的 key/value 重新搬迁到新的内存地址,如果有大量的 key/value 需要搬迁,会非常影响性能。因此 Go map 的扩容采取了一种称为“渐进式”地方式,原有的 key 并不会一次性搬迁完毕,每次最多只会搬迁 2 个 bucket

上面说的 hashGrow() 函数实际上并没有真正地“搬迁”,它只是分配好了新的 buckets,并将老的 buckets 挂到了 oldbuckets 字段上。真正搬迁 buckets 的动作在 growWork() 函数中,而调用 growWork() 函数的动作是在 mapassignmapdelete 函数中。也就是插入或修改、删除 key 的时候,都会尝试进行搬迁 buckets 的工作。先检查 oldbuckets 是否搬迁完毕,具体来说就是检查 oldbuckets 是否为 nil

我们先看 hashGrow() 函数所做的工作,再来看具体的搬迁 buckets 是如何进行的。

func  hashGrow(t  *maptype, h  *hmap) {
    // B+1 相当于是原来 2 倍的空间
    bigger :=  uint8(1)
    // 对应条件 2
    if  !overLoadFactor(int64(h.count), h.B) {
        // 进行等量的内存扩容,所以 B 不变
        bigger =  0
        h.flags |= sameSizeGrow
    }
    // 将老 buckets 挂到 buckets 上
    oldbuckets := h.buckets
    // 申请新的 buckets 空间
    newbuckets, nextOverflow :=  makeBucketArray(t, h.B+bigger)
    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator !=  0 {
        flags |= oldIterator
    }
    // 提交 grow 的动作
    h.B += bigger
    h.flags = flags
    h.oldbuckets = oldbuckets
    h.buckets = newbuckets
    // 搬迁进度为 0
    h.nevacuate =  0
    // overflow buckets 数为 0
    h.noverflow =  0
    // ……
}

主要是申请到了新的 buckets 空间,把相关的标志位都进行了处理:例如标志 nevacuate 被置为 0, 表示当前搬迁进度为 0。

值得一说的是对 h.flags 的处理:

flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator !=  0 {
    flags |= oldIterator
}

这里得先说下运算符:&^。这叫按位置 0运算符。例如:

x =  01010011
y =  01010100
z = x &^ y =  00000011

如果 y bit 位为 1,那么结果 z 对应 bit 位就为 0,否则 z 对应 bit 位就和 x 对应 bit 位的值相同。

所以上面那段对 flags 一顿操作的代码的意思是:先把 h.flagsiteratoroldIterator 对应位清 0,然后如果发现 iterator 位为 1,那就把它转接到 oldIterator 位,使得 oldIterator 标志位变成 1。潜台词就是:buckets 现在挂到了 oldBuckets 名下了,对应的标志位也转接过去吧。

几个标志位如下:

// 可能有迭代器使用 buckets
iterator =  1
// 可能有迭代器使用 oldbuckets
oldIterator =  2
// 有协程正在向 map 中写入 key
hashWriting =  4
// 等量扩容(对应条件 2)
sameSizeGrow =  8

再来看看真正执行搬迁工作的 growWork() 函数。

func  growWork(t  *maptype, h  *hmap, bucket  uintptr) {
    // 确认搬迁老的 bucket 对应正在使用的 bucket
    evacuate(t, h, bucket&h.oldbucketmask())
    // 再搬迁一个 bucket,以加快搬迁进程
    if h.growing() {
        evacuate(t, h, h.nevacuate)
    }
}

h.growing() 函数非常简单:

func (h *hmap) growing() bool {
    return h.oldbuckets !=  nil
}

如果 oldbuckets 不为空,说明还没有搬迁完毕,还得继续搬。

bucket&h.oldbucketmask() 这行代码,如源码注释里说的,是为了确认搬迁的 bucket 是我们正在使用的 bucketoldbucketmask() 函数返回扩容前的 mapbucketmask

所谓的 bucketmask,作用就是将 key 计算出来的哈希值与 bucketmask 相与,得到的结果就是 key 应该落入的桶。比如 B = 5,那么 bucketmask 的低 5 位是 11111,其余位是 0hash 值与其相与的意思是,只有 hash 值的低 5 位决策 key 到底落入哪个 bucket

接下来,我们集中所有的精力在搬迁的关键函数 evacuate。源码贴在下面,不要紧张,我会加上大面积的注释,通过注释绝对是能看懂的。之后,我会再对搬迁过程作详细说明。

源码如下:

func evacuate(t  *maptype, h  *hmap, oldbucket  uintptr) {
    // 定位老的 bucket 地址
    b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
    // 结果是 2^B,如 B = 5,结果为32
    newbit := h.noldbuckets()
    // key 的哈希函数
    alg := t.key.alg
    // 如果 b 没有被搬迁过
    if  !evacuated(b) {
        var (
            // 表示bucket 移动的目标地址
            x, y *bmap
            // 指向 x,y 中的 key/val
            xi, yi int
            // 指向 x,y 中的 key
            xk, yk unsafe.Pointer
            // 指向 x,y 中的 value
            xv, yv unsafe.Pointer
        )
    // 默认是等 size 扩容,前后 bucket 序号不变
    // 使用 x 来进行搬迁
        x = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
        xi =  0
        xk =  add(unsafe.Pointer(x), dataOffset)
        xv =  add(xk, bucketCnt*uintptr(t.keysize))
        // 如果不是等 size 扩容,前后 bucket 序号有变
    // 使用 y 来进行搬迁
        if  !h.sameSizeGrow() {
        // y 代表的 bucket 序号增加了 2^B
            y = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
            yi =  0
            yk =  add(unsafe.Pointer(y), dataOffset)
            yv =  add(yk, bucketCnt*uintptr(t.keysize))
        }
    // 遍历所有的 bucket,包括 overflow buckets
    // b 是老的 bucket 地址
        for ; b !=  nil; b = b.overflow(t) {
            k :=  add(unsafe.Pointer(b), dataOffset)
            v :=  add(k, bucketCnt*uintptr(t.keysize))
            // 遍历 bucket 中的所有 cell
            for i :=  0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
            // 当前 cell 的 top hash 值
            top := b.tophash[i]
    // 如果 cell 为空,即没有 key
            if top == empty {
                // 那就标志它被"搬迁"过
                b.tophash[i] = evacuatedEmpty
                // 继续下个 cell
                continue
            }
            // 正常不会出现这种情况
            // 未被搬迁的 cell 只可能是 empty 或是
            // 正常的 top hash(大于 minTopHash)
            if top < minTopHash {
                throw("bad map state")
            }
            k2 := k
            // 如果 key 是指针,则解引用
            if t.indirectkey {
                k2 =  *((*unsafe.Pointer)(k2))
            }
            // 默认使用 X,等量扩容
            useX :=  true
            // 如果不是等量扩容
            if  !h.sameSizeGrow() {
                // 计算 hash 值,和 key 第一次写入时一样
                hash := alg.hash(k2, uintptr(h.hash0))
                // 如果有协程正在遍历 map
                if h.flags&iterator !=  0 {
                    // 如果出现 相同的 key 值,算出来的 hash 值不同
                    if  !t.reflexivekey &&  !alg.equal(k2, k2) {
                    // 只有在 float 变量的 NaN() 情况下会出现
                        if top&1  !=  0 {
                            // 第 B 位置 1
                            hash |= newbit
                        } else {
                            // 第 B 位置 0
                            hash &^= newbit
                        }
                        // 取高 8 位作为 top hash 值
                        top =  uint8(hash >> (sys.PtrSize*8  -  8))
                        if top < minTopHash {
                            top += minTopHash
                        }
                    }
                }
                // 取决于新哈希值的 oldB+1 位是 0 还是 1
                // 详细看后面的文章
                useX = hash&newbit ==  0
            }
            // 如果 key 搬到 X 部分
            if useX {
                // 标志老的 cell 的 top hash 值,表示搬移到 X 部分
                b.tophash[i] = evacuatedX
                // 如果 xi 等于 8,说明要溢出了
                if xi == bucketCnt {
                    // 新建一个 bucket
                    newx := h.newoverflow(t, x)
                    x = newx
                    // xi 从 0 开始计数
                    xi =  0
                    // xk 表示 key 要移动到的位置
                    xk =  add(unsafe.Pointer(x), dataOffset)
                    // xv 表示 value 要移动到的位置
                    xv =  add(xk, bucketCnt*uintptr(t.keysize))
                }
                // 设置 top hash 值
                x.tophash[xi] = top
                // key 是指针
                if t.indirectkey {
                    // 将原 key(是指针)复制到新位置
                    *(*unsafe.Pointer)(xk) = k2 // copy pointer
                } else {
                    // 将原 key(是值)复制到新位置
                    typedmemmove(t.key, xk, k) // copy value
                }
                // value 是指针,操作同 key
                if t.indirectvalue {
                    *(*unsafe.Pointer)(xv) =  *(*unsafe.Pointer)(v)
                } else {
                    typedmemmove(t.elem, xv, v)
                }
                // 定位到下一个 cell
                xi++
                xk =  add(xk, uintptr(t.keysize))
                xv =  add(xv, uintptr(t.valuesize))
            } else { // key 搬到 Y 部分,操作同 X 部分
                // ……
                // 省略了这部分,操作和 X 部分相同
            }
        }
    }
    // 如果没有协程在使用老的 buckets,就把老 buckets 清除掉,帮助gc
    if h.flags&oldIterator ==  0 {
        b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
        // 只清除bucket 的 key,value 部分,保留 top hash 部分,指示搬迁状态
        if t.bucket.kind&kindNoPointers ==  0 {
            memclrHasPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
        } else {
            memclrNoHeapPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
        }
    }
    }
    // 更新搬迁进度
    // 如果此次搬迁的 bucket 等于当前进度
    if oldbucket == h.nevacuate {
    // 进度加 1
    h.nevacuate = oldbucket +  1
    // Experiments suggest that 1024 is overkill by at least an order of magnitude.
    // Put it in there as a safeguard anyway, to ensure O(1) behavior.
    // 尝试往后看 1024 个 bucket
    stop := h.nevacuate +  1024
    if stop > newbit {
    stop = newbit
    }
    // 寻找没有搬迁的 bucket
    for h.nevacuate != stop &&  bucketEvacuated(t, h, h.nevacuate) {
    h.nevacuate++
    }
    // 现在 h.nevacuate 之前的 bucket 都被搬迁完毕
    // 所有的 buckets 搬迁完毕
    if h.nevacuate == newbit {
        // 清除老的 buckets
        h.oldbuckets =  nil
        // 清除老的 overflow bucket
        // 回忆一下:[0] 表示当前 overflow bucket
        // [1] 表示 old overflow bucket
        if h.extra !=  nil {
            h.extra.overflow[1] =  nil
        }
        // 清除正在扩容的标志位
        h.flags &^= sameSizeGrow
        }
    }
}

evacuate 函数的代码注释非常清晰,对着代码和注释是很容易看懂整个的搬迁过程的,耐心点。

搬迁的目的就是将老的 buckets 搬迁到新的 buckets。而通过前面的说明我们知道,应对条件 1,新的 buckets 数量是之前的一倍,应对条件 2,新的 buckets 数量和之前相等。

对于条件 2,从老的 buckets 搬迁到新的 buckets,由于 bucktes 数量不变,因此可以按序号来搬,比如原来在 0 号 bucktes,到新的地方后,仍然放在 0 号 buckets

对于条件 1,就没这么简单了。要重新计算 key 的哈希,才能决定它到底落在哪个 bucket。例如,原来 B = 5,计算出 key 的哈希后,只用看它的低 5 位,就能决定它落在哪个 bucket。扩容后,B 变成了 6,因此需要多看一位,它的低 6 位决定 key 落在哪个 bucket。这称为 rehash

map rehash

因此,某个 key 在搬迁前后 bucket 序号可能和原来相等,也可能是相比原来加上 2^B(原来的 B 值),取决于 hash 值 第 6 bit 位是 0 还是 1。

再明确一个问题:如果扩容后,B 增加了 1,意味着 buckets 总数是原来的 2 倍,原来 1 号的桶“裂变”到两个桶。

例如,原始 B = 2,1号 bucket 中有 2 个 key 的哈希值低 3 位分别为:010,110。由于原来 B = 2,所以低 2 位 10 决定它们落在 2 号桶,现在 B 变成 3,所以 010110 分别落入 2、6 号桶。

bucket split

理解了这个,后面讲 map 迭代的时候会用到。

再来讲搬迁函数中的几个关键点:

evacuate 函数每次只完成一个 bucket 的搬迁工作,因此要遍历完此 bucket 的所有的 cell,将有值的 cell copy 到新的地方。bucket 还会链接 overflow bucket,它们同样需要搬迁。因此会有 2 层循环,外层遍历 bucketoverflow bucket,内层遍历 bucket 的所有 cell。这样的循环在 map 的源码里到处都是,要理解透了。

源码里提到 X, Y part,其实就是我们说的如果是扩容到原来的 2 倍,桶的数量是原来的 2 倍,前一半桶被称为 X part,后一半桶被称为 Y part。一个 bucket 中的 key 可能会分裂落到 2 个桶,一个位于 X part,一个位于 Y part。所以在搬迁一个 cell 之前,需要知道这个 cell 中的 key 是落到哪个 Part。很简单,重新计算 cellkeyhash,并向前“多看”一位,决定落入哪个 Part,这个前面也说得很详细了。

有一个特殊情况是:有一种 key,每次对它计算 hash,得到的结果都不一样。这个 key 就是 math.NaN() 的结果,它的含义是 not a number,类型是 float64。当它作为 mapkey,在搬迁的时候,会遇到一个问题:再次计算它的哈希值和它当初插入 map 时的计算出来的哈希值不一样!

你可能想到了,这样带来的一个后果是,这个 key 是永远不会被 Get 操作获取的!当我使用 m[math.NaN()] 语句的时候,是查不出来结果的。这个 key 只有在遍历整个 map 的时候,才有机会现身。所以,可以向一个 map 插入任意数量的 math.NaN() 作为 key。

当搬迁碰到 math.NaN()key 时,只通过 tophash 的最低位决定分配到 X part 还是 Y part(如果扩容后是原来 buckets 数量的 2 倍)。如果 tophash 的最低位是 0 ,分配到 X part;如果是 1 ,则分配到 Y part

这是通过 tophash 值与新算出来的哈希值进行运算得到的:

if top&1  !=  0 {
    // top hash 最低位为 1
    // 新算出来的 hash 值的 B 位置 1
    hash |= newbit
} else {
    // 新算出来的 hash 值的 B 位置 0
    hash &^= newbit
}
// hash 值的 B 位为 0,则搬迁到 x part
// 当 B = 5时,newbit = 32,二进制低 6 位为 10 0000
useX = hash&newbit ==  0

其实这样的 key 我随便搬迁到哪个 bucket 都行,当然,还是要搬迁到上面裂变那张图中的两个 bucket 中去。但这样做是有好处的,在后面讲 map 迭代的时候会再详细解释,暂时知道是这样分配的就行。

确定了要搬迁到的目标 bucket 后,搬迁操作就比较好进行了。将源 key/valuecopy 到目的地相应的位置。

设置 key 在原始 bucketstophashevacuatedX 或是 evacuatedY,表示已经搬迁到了新 mapx part 或是 y part。新 maptophash 则正常取 key 哈希值的高 8 位。

下面通过图来宏观地看一下扩容前后的变化。

扩容前,B = 2,共有 4 个 bucketslowbits 表示 hash 值的低位。假设我们不关注其他 buckets 情况,专注在 2 号 bucket。并且假设 overflow 太多,触发了等量扩容(对应于前面的条件 2)。

扩容前

扩容完成后,overflow bucket 消失了,key 都集中到了一个 bucket,更为紧凑了,提高了查找的效率。

same size 扩容

假设触发了 2 倍的扩容,那么扩容完成后,老 buckets 中的 key 分裂到了 2 个 新的 bucket。一个在 x part,一个在 ypart。依据是 hashlowbits。新 map0-3 称为 x part4-7 称为 y part

2倍扩容

注意,上面的两张图忽略了其他 buckets 的搬迁情况,表示所有的 bucket 都搬迁完毕后的情形。实际上,我们知道,搬迁是一个“渐进”的过程,并不会一下子就全部搬迁完毕。所以在搬迁过程中,oldbuckets 指针还会指向原来老的 []bmap,并且已经搬迁完毕的 keytophash 值会是一个状态值,表示 key 的搬迁去向。

3.2.7 key 为什么是无序的

map 在扩容后,会发生 key 的搬迁,原来落在同一个 bucket 中的 key,搬迁后,有些 key 就要远走高飞了(bucket 序号加上了 2^B)。而遍历的过程,就是按顺序遍历 bucket,同时按顺序遍历 bucket 中的 key。搬迁后,key 的位置发生了重大的变化,有些 key 飞上高枝,有些 key 则原地不动。这样,遍历 map 的结果就不可能按原来的顺序了。

当然,如果我就一个 hard codemap,我也不会向 map 进行插入删除的操作,按理说每次遍历这样的 map 都会返回一个固定顺序的 key/value 序列吧。的确是这样,但是 Go 杜绝了这种做法,因为这样会给新手程序员带来误解,以为这是一定会发生的事情,在某些情况下,可能会酿成大错。

当然,Go 做得更绝,当我们在遍历 map 时,并不是固定地从 0 号 bucket 开始遍历,每次都是从一个随机值序号的 bucket 开始遍历,并且是从这个 bucket 的一个随机序号的 cell 开始遍历。这样,即使你是一个写死的 map,仅仅只是遍历它,也不太可能会返回一个固定序列的 key/value 对了。

多说一句,“迭代 map 的结果是无序的”这个特性是从 go 1.0 开始加入的。

3.2.8 float 类型可以作为 map 的 key 吗

从语法上看,是可以的。Go 语言中只要是可比较的类型都可以作为 key。除开 slicemapfunctions 这几种类型,其他类型都是 OK 的。具体包括:布尔值、数字、字符串、指针、通道、接口类型、结构体、只包含上述类型的数组。这些类型的共同特征是支持 ==!= 操作符,k1 == k2 时,可认为 k1k2 是同一个 key。如果是结构体,只有 hash 后的值相等以及字面值相等,才被认为是相同的 key。很多字面值相等的,hash出来的值不一定相等,比如引用。

顺便说一句,任何类型都可以作为 value,包括 map 类型。

来看个例子:

func main() {
    m :=  make(map[float64]int)
    m[1.4] =  1
    m[2.4] =  2
    m[math.NaN()] =  3
    m[math.NaN()] =  3
    for k, v :=  range m {
        fmt.Printf("[%v, %d] ", k, v)
    }
    fmt.Printf("\nk: %v, v: %d\n", math.NaN(), m[math.NaN()])
    fmt.Printf("k: %v, v: %d\n", 2.400000000001, m[2.400000000001])
    fmt.Printf("k: %v, v: %d\n", 2.4000000000000000000000001, m[2.4000000000000000000000001])
    fmt.Println(math.NaN() == math.NaN())
}

程序的输出:

[2.4, 2] [NaN, 3] [NaN, 3] [1.4, 1]
k:  NaN,  v:  0
k:  2.400000000001,  v:  0
k:  2.4,  v:  2
false

例子中定义了一个 key 类型是 float 型的 map,并向其中插入了 4 个 key:1.4, 2.4, NANNAN

打印的时候也打印出了 4 个 key,如果你知道 NAN != NAN,也就不奇怪了。因为他们比较的结果不相等,自然,在 map 看来就是两个不同的 key 了。

接着,我们查询了几个 key,发现 NAN 不存在,2.400000000001 也不存在,而 2.4000000000000000000000001 却存在。

有点诡异,不是吗?

接着,我通过汇编发现了如下的事实:

当用 float64 作为 key 的时候,先要将其转成 uint64 类型,再插入 key 中。

具体是通过 Float64frombits 函数完成:

// Float64frombits returns the floating point number corresponding
// the IEEE 754 binary representation b.
func  Float64frombits(b  uint64) float64 { return  *(*float64)(unsafe.Pointer(&b)) }

也就是将浮点数表示成 IEEE 754 规定的格式。如赋值语句:

0x00bd 00189 (test18.go:9) LEAQ "".statictmp_0(SB), DX
0x00c4 00196 (test18.go:9) MOVQ DX, 16(SP)
0x00c9 00201 (test18.go:9) PCDATA $0, $2
0x00c9 00201 (test18.go:9) CALL runtime.mapassign(SB)

"".statictmp_0(SB) 变量是这样的:

"".statictmp_0 SRODATA size=8
0x0000 33 33 33 33 33 33 03 40
"".statictmp_1 SRODATA size=8
0x0000 ff 3b 33 33 33 33 03 40
"".statictmp_2 SRODATA size=8
0x0000 33 33 33 33 33 33 03 40

我们再来输出点东西:


package main
import (
    "fmt"
    "math"
)

func main() {
    m :=  make(map[float64]int)
    m[2.4] =  2
    fmt.Println(math.Float64bits(2.4))
    fmt.Println(math.Float64bits(2.400000000001))
    fmt.Println(math.Float64bits(2.4000000000000000000000001))
}
4612586738352862003
4612586738352864255
4612586738352862003

转成十六进制为:

0x4003333333333333
0x4003333333333BFF
0x4003333333333333

和前面的 "".statictmp_0 比较一下,很清晰了吧。2.42.4000000000000000000000001 经过 math.Float64bits() 函数转换后的结果是一样的。自然,二者在 map 看来,就是同一个 key 了。

再来看一下 NAN(not a number)

// NaN returns an IEEE 754 ``not-a-number'' value.
func  NaN() float64 { return  Float64frombits(uvnan) }

uvnan 的定义为:

uvnan =  0x7FF8000000000001

NAN() 直接调用 Float64frombits,传入写死的 const 型变量 0x7FF8000000000001,得到 NAN 型值。既然,NAN 是从一个常量解析得来的,为什么插入 map 时,会被认为是不同的 key

这是由类型的哈希函数决定的,例如,对于 64 位的浮点数,它的哈希函数如下:

func  f64hash(p  unsafe.Pointer, h  uintptr) uintptr {
    f :=  *(*float64)(p)
    switch {
    case f ==  0:
        return c1 * (c0 ^ h) // +0, -0
    case f != f:
        return c1 * (c0 ^ h ^  uintptr(fastrand())) // any kind of NaN
    default:
        return  memhash(p, h, 8)
    }
}

第二个 casef != f 就是针对 NAN,这里会再加一个随机数。

这样,所有的谜题都解开了。

由于 NAN 的特性:

NAN  !=  NAN
hash(NAN) != hash(NAN)

因此向 map 中查找的 keyNAN 时,什么也查不到;如果向其中增加了 4 次 NAN,遍历会得到 4 个 NAN

最后说结论:float 型可以作为 key,但是由于精度的问题,会导致一些诡异的问题,慎用之。


关于当 key 是引用类型时,判断两个 key 是否相等,需要 hash 后的值相等并且 key 的字面量相等。由 @WuMingyu 补充的例子:

func TestT(t  *testing.T) {
    type S struct {
        ID int
    }
    s1 :=  S{ID: 1}
    s2 :=  S{ID: 1}
    var h =  map[*S]int{}
    h[&s1] =  1
    t.Log(h[&s1])
    t.Log(h[&s2])
    t.Log(s1 == s2)
}

test output:

=== RUN  TestT
---  PASS:  TestT (0.00s)
    endpoint_test.go:74:  1
    endpoint_test.go:75:  0
    endpoint_test.go:76:  true
PASS
Process  finished  with  exit  code  0

3.2.9 可以边遍历边删除吗

map 并不是一个线程安全的数据结构。同时读写一个 map 是未定义的行为,如果被检测到,会直接 panic

上面说的是发生在多个协程同时读写同一个 map 的情况下。 如果在同一个协程内边遍历边删除,并不会检测到同时读写,理论上是可以这样做的。但是,遍历的结果就可能不会是相同的了,有可能结果遍历结果集中包含了删除的 key,也有可能不包含,这取决于删除 key 的时间:是在遍历到 key 所在的 bucket 时刻前或者后。

一般而言,这可以通过读写锁来解决:sync.RWMutex

读之前调用 RLock() 函数,读完之后调用 RUnlock() 函数解锁;写之前调用 Lock() 函数,写完之后,调用 Unlock() 解锁。

另外,sync.Map 是线程安全的 map,也可以使用。

3.2.10 可以对 map 的元素取地址吗

无法对 mapkeyvalue 进行取址。以下代码不能通过编译:

package main

import "fmt"

func  main() {
    m :=  make(map[string]int)
    fmt.Println(&m["qcrao"])
}

编译报错:

./main.go:8:14:  cannot  take  the  address  of  m["qcrao"]

如果通过其他 hack 的方式,例如 unsafe.Pointer 等获取到了 keyvalue 的地址,也不能长期持有,因为一旦发生扩容,keyvalue 的位置就会改变,之前保存的地址也就失效了。

3.2.11 如何比较两个 map 相等

map 深度相等的条件:

  1. 都为 nil
  2. 非空、长度相等,指向同一个 map 实体对象
  3. 相应的 key 指向的 value “深度”相等

直接将使用 map1 == map2是错误的。这种写法只能比较 map 是否为 nil

package main

import "fmt"

func main() {
    var m map[string]int
    var n map[string]int
    fmt.Println(m ==  nil)
    fmt.Println(n ==  nil)
    // 不能通过编译
    //fmt.Println(m == n)
}

输出结果:

true
true

因此只能是遍历 map 的每个元素,比较元素是否都是深度相等。

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~