[源码]spicedb: 源码阅读之第一篇(热点缓存)

Hotspot Caching

由于 2/8 原则的存在,笼统来说,系统中百分之二十的数据占据了全部访问流量的百分之八十(其实还有更极端的 1/99 现象,百分之一的数据占据了流量的百分之九十九),因此热点数据的处理技术一直都是低时延和高可用系统的前沿研究对象,而 Hotspot caching(热点缓存)就是专门用于降低热点数据被高频访问时的延迟。

SpiceDB 处理热点数据的方法主要由两部分组成:

  • 在 node 上使用本地缓存将查询结果保存起来,但每个 node 都只缓存了在本 node 上计算的结果(也就是说不会对不同的 node 同步缓存数据)
  • 使用一致性哈希算法,请求到达时先查询本地缓存,查询失败后会根据请求体的参数计算哈希值,然后往对应的 node 发送请求,一致性哈希算法的原理参考Reference 里的[1][2][3]。此处简单描述一下原理:

在上图中,要缓存的值为白色节点,将这些要散列的值映射到一个圆环上(计算哈希值,然后将哈希值与圆环本身的角度做关联),然后加入三个服务器节点(A/B/C:同样计算三个服务器节点的哈希值并与圆环本身的角度做关联),此时定义一个规则将两者关联起来:每个 key 分配到逆时针方向(或顺时针)上离它最近的服务器,由此得到了下面的映射关系:

从编程的角度来看,要做的是保存一个服务器值的有序列表(可以是角度或数字列表),然后遍历此列表(或使用二分查找)以找到第一个值大于或等于检索的key的hash值的服务器,然后从该服务器取出key对应的value[1]为了确保 kv 能够均匀分配到服务器上,将每个真实的 server 虚拟出多个服务器节点出来,譬如 A 服务器性能更强,就虚拟出 10 个节点,B 服务器性能弱就虚拟 5 个节点,此时就能实现负载的调节,在 spiceDb 中,权重的参数名为 replicationFactor。

Cache Entry

在 SpiceDB 中,一条 cache entry 的格式大概是这样的:<object>#<relation>@<user>@<snapshot timestamp> → <result>,result 有两种表示:PERMISSIONSHIP_HAS_PERMISSION(拥有权限)/ PERMISSIONSHIP_NO_PERMISSION(没有权限)。举例两个cache entry:

  • document:doc1#reader@user:francesca@12345 → PERMISSIONSHIP_HAS_PERMISSION
  • document:doc1#owner@user:francesca@12345 → PERMISSIONSHIP_NO_PERMISSION

请求落到了 server 之后,如果没有命中 snapshot timestamp,则不能直接使用缓存里的数据,而 snapshot timestamp 的选择策略又与 consistency level 参数有关(参考这里)。

gRPC 负载均衡

gRPC 的负载均衡是基于每次调用,而不是基于连接的,因为 gRPC 客户端会与所有的 server 都预先建立好连接。并且 gRPC 采取的是客户端负载均衡,由客户端在每次发起请求时根据策略选择连接。原理如下

过程:

  1. gRPC 客户端启动的时候会先请求 name resolver 服务器,获取服务端名解析出来的全部 ip 列表,还有服务端的服务配置信息(主要用于指定服务端的负载均衡策略和其他属性信息)
  2. 客户端实例化负载均衡策略,并向其传递服务端的配置信息,还有 ip 地址列表和其他属性
  3. 客户端的负载均衡实例会与resolver 返回的全部 ip 都建立连接,同时会监视所有连接的连接状态,以便及时重连
  4. 负载均衡实例会在客户端发起 rpc 连接时根据策略选取对应的连接来发送请求

gRPC 内置了服务治理功能, 支持自定义 Resolver 来实现自定义的服务发现机制,自定义 Balancer 来实现自定义的负载均衡策略:

  • Resolver 是解析器,用于从注册中心实时获取当前服务端的 ip:port 列表,同步发送给 Balancer
  • Balancer 是平衡器,主要有两个作用
    • 接收 Resolver 发来的服务端 ip:port 列表,同时与所有服务端建立并维护长连接状态(使用长连接可以避免每次 rpc 调用时创建新连接的开销)
    • 当客户端发起 rpc 调用时,按照负载均衡算法从连接池中选择一个连接进行 rpc 调用

SpiceDB 的负载均衡

下图是 spiceDB 处理一个权限验证请求的过程:

自定义 Resolver

SpiceDB 使用自定义的名称解析器 kuberesolver 来自动发现上线的 node,如果是在 Kubernets 上运行 SpiceDB,kuberresolver 就会使用 kubernetes API 来发现和监控服务的 IP 地址(需要在 server 的启动参数里添加如下参数):

--dispatch-upstream-addr=kubernetes:///spicedb.default:50053

就可以在 service 为”spicedb”,namespace 为”default”下找到所有的 node 实例。当客户端通过 Dial 方法对指定服务进行拨号时,grpc resolver 查找注册的 Builder 实例调用其 Build() 方法构建自定义 kubeResolver。

代码执行流程如下:

func NewServeCommand(programName string, config *server.Config) *cobra.Command {
return &cobra.Command{
Use: "serve",
Short: "serve the permissions database",
Long: "A database that stores, computes, and validates application permissions",
PreRunE: server.DefaultPreRunE(programName),
// RunE 绑定的函数是真正要运行的函数
RunE: termination.PublishError(func(cmd *cobra.Command, args []string) error {
server, err := config.Complete(cmd.Context())
if err != nil {
return err
}
signalctx := SignalContextWithGracePeriod(
context.Background(),
config.ShutdownGracePeriod,
)
return server.Run(signalctx)
}),
Example: server.ServeExample(programName),
}
}

func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {
...
// 初始化调度器
dispatcher, err = combineddispatch.NewDispatcher(
combineddispatch.UpstreamAddr(c.DispatchUpstreamAddr),
combineddispatch.UpstreamCAPath(c.DispatchUpstreamCAPath),
combineddispatch.GrpcPresharedKey(dispatchPresharedKey),
// hashingConfigJSON 的默认值:{"loadBalancingConfig":[{"consistent-hashring":{"replicationFactor":100,"spread":1}}]}
combineddispatch.GrpcDialOpts(
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithDefaultServiceConfig(hashringConfigJSON),
),
combineddispatch.MetricsEnabled(c.DispatchClientMetricsEnabled),
combineddispatch.PrometheusSubsystem(c.DispatchClientMetricsPrefix),
combineddispatch.Cache(cc),
combineddispatch.ConcurrencyLimits(concurrencyLimits),
)
...
}

func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) {
...
// 在 server 的启动参数里指定了 --dispatch-upstream-addr 后,会在这里初始化resolver 实例。只有在该参数设置为:kubernetes:///spicedb.default:50053 时才会调用自定义的 kuberResolver,如果设置为ip:port(如:localhost:500053),就会使用默认的 resolver:passthroughResolver
if opts.upstreamAddr != "" {
if opts.upstreamCAPath != "" {
customCertOpt, err := grpcutil.WithCustomCerts(grpcutil.VerifyCA, opts.upstreamCAPath)
if err != nil {
return nil, err
}
opts.grpcDialOpts = append(opts.grpcDialOpts, customCertOpt)
opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithBearerToken(opts.grpcPresharedKey))
} else {
opts.grpcDialOpts = append(opts.grpcDialOpts, grpcutil.WithInsecureBearerToken(opts.grpcPresharedKey))
opts.grpcDialOpts = append(opts.grpcDialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

opts.grpcDialOpts = append(opts.grpcDialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor("s2")))

// gRPC 连接 upstreamAddr
conn, err := grpc.Dial(opts.upstreamAddr, opts.grpcDialOpts...)
if err != nil {
return nil, err
}
redispatch = remote.NewClusterDispatcher(v1.NewDispatchServiceClient(conn), conn, remote.ClusterDispatcherConfig{
KeyHandler: &keys.CanonicalKeyHandler{},
DispatchOverallTimeout: opts.remoteDispatchTimeout,
})
}
...
}

// gRPC 内部经过多个调用链:grpc.Dial -> grpc.DialContext -> grpc.newCCResolverWrapper ->
// 最终调用到了自定义的 resolver
func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if b.k8sClient == nil {
if cl, err := NewInClusterK8sClient(); err == nil {
b.k8sClient = cl
} else {
return nil, err
}
}
ti, err := parseResolverTarget(target)
if err != nil {
return nil, err
}
if ti.serviceNamespace == "" {
ti.serviceNamespace = getCurrentNamespaceOrDefault()
}
ctx, cancel := context.WithCancel(context.Background())
r := &kResolver{
target: ti,
ctx: ctx,
cancel: cancel,
cc: cc,
rn: make(chan struct{}, 1),
k8sClient: b.k8sClient,
t: time.NewTimer(defaultFreq),
freq: defaultFreq,

endpoints: endpointsForTarget.WithLabelValues(ti.String()),
addresses: addressesForTarget.WithLabelValues(ti.String()),
}
// 在一个 goroutinue 里持续监控所有注册到 k8s 的 endpoint
go until(func() {
r.wg.Add(1)
err := r.watch()
if err != nil && err != io.EOF {
grpclog.Errorf("kuberesolver: watching ended with error='%v', will reconnect again", err)
}
}, time.Second, ctx.Done())
return r, nil

}

func NewInClusterK8sClient() (K8sClient, error) {
// 在 pod 里获取环境变量,最终构造成 k8s 内部 api 的调用地址
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 || len(port) == 0 {
return nil, fmt.Errorf("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined")
}
token, err := ioutil.ReadFile(serviceAccountToken)
if err != nil {
return nil, err
}
ca, err := ioutil.ReadFile(serviceAccountCACert)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(ca)
transport := &http.Transport{TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS10,
RootCAs: certPool,
}}
httpClient := &http.Client{Transport: transport, Timeout: time.Nanosecond * 0}

client := &k8sClient{
// k8s 内部 api 的 url 地址
host: "https://" + net.JoinHostPort(host, port),
token: string(token),
httpClient: httpClient,
}
...
}

// 获取 SpiceDB 的可用 node,其实就是访问 k8s 提供的服务注册 api 实现的
func getEndpoints(client K8sClient, namespace, targetName string) (Endpoints, error) {
u, err := url.Parse(fmt.Sprintf("%s/api/v1/namespaces/%s/endpoints/%s",
client.Host(), namespace, targetName))
if err != nil {
return Endpoints{}, err
}
req, err := client.GetRequest(u.String())
if err != nil {
return Endpoints{}, err
}
resp, err := client.Do(req)
if err != nil {
return Endpoints{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return Endpoints{}, fmt.Errorf("invalid response code %d for service %s in namespace %s", resp.StatusCode, targetName, namespace)
}
result := Endpoints{}
err = json.NewDecoder(resp.Body).Decode(&result)
return result, err
}

---------------------------------------------------
// 监控 endpoint

func (k *kResolver) watch() error {
defer k.wg.Done()
// watch endpoints lists existing endpoints at start
sw, err := watchEndpoints(k.ctx, k.k8sClient, k.target.serviceNamespace, k.target.serviceName)
if err != nil {
return err
}
for {
select {
case <-k.ctx.Done():
return nil
// 兜底策略:定期每 30 分钟获取一次全部 endpoint 的实例信息,检查是否有新的 endpoint。resolve()函数内部也是调用了 handle()函数
case <-k.t.C:
k.resolve()
case <-k.rn:
k.resolve()
// 发现新的 endpoint
case up, hasMore := <-sw.ResultChan():
if hasMore {
k.handle(up.Object)
} else {
return nil
}
}
}
}

// 持续监控所有注册到 k8s 的 endpoint
func watchEndpoints(ctx context.Context, client K8sClient, namespace, targetName string) (watchInterface, error) {
u, err := url.Parse(fmt.Sprintf("%s/api/v1/watch/namespaces/%s/endpoints/%s",
client.Host(), namespace, targetName))
if err != nil {
return nil, err
}
req, err := client.GetRequest(u.String())
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, fmt.Errorf("invalid response code %d for service %s in namespace %s", resp.StatusCode, targetName, namespace)
}
return newStreamWatcher(resp.Body), nil
}

总结调用流程:

  • 客户端启动时,通过 kuberesolver.RegisterInCluster() 注册自定义的 kubeResolver
  • 在cobra 的 Command(serve)中通过 grpc.Dial(opts.upstreamAddr, opts.grpcDialOpts…) 来初始化自定义的kubeResolver
    • grpc.DialContext()方法内部解析 URI(kubernetes:///spicedb.default:50053),解析到协议类型为 kubernetes,因此匹配到了自定义的kubeResolver,调用kubeBuilder.Build()方法构建 kubeResolver,同时开启 goroutinue,通过此 resolver 更新被调用服务(spicedb.default)对应的实例列表(所有注册到该服务的server node)
  • grpc 底层 LB 库会对每个服务实例创建一个 subConnection,最终根据自定义的负载均衡策略,在每次发起 gRPC 调用时选择合适的 subConnection 处理请求

参考其他人做的流程图(把图中的nsResolver替换成kubeResolver即可):

自定义 Balancer

自定义的 resolver 解析到了所有 spiceDB node 的真实地址列表后,Balancer 负责控制客户端和这些服务端的地址之一建立连接(只会建立一个正常的连接)并使用该连接处理所有 rpc 请求。

通过 kubersolver,grpc-go 客户端就可以找到所有SpiceDB 的 node,SpiceDB 自定义了一个负载均衡器来支持一致性哈希算法,该算法会聚合request 里的不同参数(如下所示)再计算哈希值:

cr/<req.Metadata.AtRevision>@<req.ResourceRelation>@<req.ResourceIds>@<req.Subject>@<req.ResultsSetting>

计算 hash key 的算法

// checkRequestToKey converts a check request into a cache key based on the relation
func checkRequestToKey(req *v1.DispatchCheckRequest, option dispatchCacheKeyHashComputeOption) DispatchCacheKey {
return dispatchCacheKeyHash(checkViaRelationPrefix, req.Metadata.AtRevision, option,
hashableRelationReference{req.ResourceRelation},
hashableIds(req.ResourceIds),
hashableOnr{req.Subject},
hashableResultSetting(req.ResultsSetting),
)
}

回顾上面的图,dispatchServiceClient.DispatchCheck 函数通过 gRPC 调用其他 node 的时候,底层实际上调用了grpc.(*pickerWrapper).pick() 函数,spiceDB 自定义的 Balancer 实现接口:balancer.Picker:

//每次 rpc 调用时,返回 chosen 对应的连接
func (p *consistentHashringPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// 存储在 context 中的 key(CtxKey)用于计算哈希值,这个 key 就是上面所说的 request 参数被聚合后的结果
key := info.Ctx.Value(CtxKey).([]byte)
// FindN 函数用于从哈希环里选择节点
members, err := p.hashring.FindN(key, p.spread)
if err != nil {
return balancer.PickResult{}, err
}

index := 0
// 如果 spread 大于 1,会选择一个随机数
if p.spread > 1 {
// TODO: should look into other options for this to avoid locking; we mostly use spread 1 so it's not urgent
// rand is not safe for concurrent use
p.Lock()
index = p.rand.Intn(int(p.spread))
p.Unlock()
}

chosen := members[index].(subConnMember)

return balancer.PickResult{
SubConn: chosen.SubConn,
}, nil
}

// 从哈希环里选择 num 个虚拟节点返回
func (h *Hashring) FindN(key []byte, num uint8) ([]Member, error) {
h.RLock()
defer h.RUnlock()

if int(num) > len(h.nodes) {
return nil, ErrNotEnoughMembers
}

keyHash := h.hasher(key)
// 哈希环可以展开成一个首尾相连的数组,此处用二分法查找
vnodeIndex := sort.Search(len(h.virtualNodes), func(i int) bool {
return h.virtualNodes[i].hashvalue >= keyHash
})

alreadyFoundNodeKeys := map[string]struct{}{}
foundNodes := make([]Member, 0, num)
for i := 0; i < len(h.virtualNodes) && len(foundNodes) < int(num); i++ {
// 连续选择 num 个虚拟节点
boundedIndex := (i + vnodeIndex) % len(h.virtualNodes)
candidate := h.virtualNodes[boundedIndex]
if _, ok := alreadyFoundNodeKeys[candidate.members.nodeKey]; !ok {
foundNodes = append(foundNodes, candidate.members.member)
alreadyFoundNodeKeys[candidate.members.nodeKey] = struct{}{}
}
}

return foundNodes, nil
}

哈希环的处理


// 发现新的 node 时,往哈希环里添加缓存节点
func (h *Hashring) Add(member Member) error {
// nodeKeyString 的值:ip+port
nodeKeyString := member.Key()

h.Lock()
defer h.Unlock()

if _, ok := h.nodes[nodeKeyString]; ok {
// already have node, bail
return ErrMemberAlreadyExists
}

nodeHash := h.hasher([]byte(nodeKeyString))

newNodeRecord := nodeRecord{
nodeHash,
nodeKeyString,
member,
nil,
}

// virtualNodeBuffer是一个 10 bit 的数组,高 8 位存储了节点的哈希 key,最后 2 位存储了对应的虚拟节点索引值
virtualNodeBuffer := make([]byte, 10)
binary.LittleEndian.PutUint64(virtualNodeBuffer, nodeHash)

for i := uint16(0); i < h.replicationFactor; i++ {
binary.LittleEndian.PutUint16(virtualNodeBuffer[8:], i)
// 组装成 10 位的数组后再计算一次哈希值
virtualNodeHash := h.hasher(virtualNodeBuffer)

virtualNode := virtualNode{
virtualNodeHash,
newNodeRecord,
}

newNodeRecord.virtualNodes = append(newNodeRecord.virtualNodes, virtualNode)
h.virtualNodes = append(h.virtualNodes, virtualNode)
}
// 将所有的虚拟节点排序
sort.Sort(h.virtualNodes)

// Add the node to our map of nodes
h.nodes[nodeKeyString] = newNodeRecord

return nil
}

// 删除服务节点对应的所有虚拟节点
func (h *Hashring) Remove(member Member) error {
nodeKeyString := member.Key()

h.Lock()
defer h.Unlock()

foundNode, ok := h.nodes[nodeKeyString]
if !ok {
// don't have the node, bail
return ErrMemberNotFound
}

indexesToRemove := make([]int, 0, h.replicationFactor)
for _, vnode := range foundNode.virtualNodes {
vnode := vnode
vnodeIndex := sort.Search(len(h.virtualNodes), func(i int) bool {
return !h.virtualNodes[i].less(vnode)
})
if vnodeIndex >= len(h.virtualNodes) {
return spiceerrors.MustBugf("unable to find vnode to remove: %020d:%020d:%s", vnode.hashvalue, vnode.members.hashvalue, vnode.members.nodeKey)
}

indexesToRemove = append(indexesToRemove, vnodeIndex)
}

sort.Slice(indexesToRemove, func(i, j int) bool {
// NOTE: this is a reverse sort!
return indexesToRemove[j] < indexesToRemove[i]
})

if len(indexesToRemove) != int(h.replicationFactor) {
return spiceerrors.MustBugf("found wrong number of vnodes to remove: %d != %d", len(indexesToRemove), h.replicationFactor)
}

// 将要删除的元素都放到数组的末尾
for i, indexToRemove := range indexesToRemove {
// Swap this index for a later one
h.virtualNodes[indexToRemove] = h.virtualNodes[len(h.virtualNodes)-1-i]
}

// Truncate and sort the nodelist
h.virtualNodes = h.virtualNodes[:len(h.virtualNodes)-len(indexesToRemove)]
sort.Sort(h.virtualNodes)

// Remove the node from our map
delete(h.nodes, nodeKeyString)

return nil
}

总结

一个权限检查请求的查找过程如下:

  • 根据要查找的对象计算哈希值
  • 哈希值写入 gRPC 的 context 中发起 gRPC 调用
  • gRPC 根据该哈希值查找哈希环里 >= 该值的一个虚拟节点
  • 从虚拟节点里找到对应的真实server node,gRPC 使用该连接对 node 发起请求
  • spiceDB node 接受请求后同样使用哈希值查找本身的 cache
  • cache 没有命中的话就查找数据库,然后将结果 set 进 cache 中(ttl 为 20s),在 cache 里的 key 就是该对象的哈希值

注意:

  • 哈希环存储的是所有虚拟节点的哈希值,并不存储 cache 里的哈希值
  • 不管 spiceDB 的 node 是增加还是减少,其实都不会迁移 cache 里的值
  • 如果新增了一个 node,就会在哈希环里虚拟出多个虚拟节点。此时 node 的 cache 是空的,所有落到该 node 的请求都必须查询数据库后才能写入 cache,如果其他 node 已经缓存了要查询的结果也没用,只等等到 ttl 过期后淘汰。删除也同理

Others

在 Zanzibar 中,除了使用热点缓存技术之外,还使用了其他缓存技术,包括:relationship cache(将一个热点 object 相关的全部 relation tuple 提前加载到内存中);Leopard Indexing System(用于持续更新用户权限的非结构化数据,可以支持权限集合的快速计算)。SpiceDB 目前对这两种技术的实现还停留在 proposal 阶段。

Reference

[1] 一致性哈希(Consistent hashing)算法
[2] CS168: The Modern Algorithmic Toolbox Lecture #1: Introduction and Consistent Hashing
[3] Introducing Consistent Hashing
[4] grpc进阶篇之resolver
[5] gRPC Name Resolver 原理及实践

文章作者: kylinlin
文章链接: https://kylinlingh.github.io/2023/06/28/%E6%BA%90%E7%A0%81-spicedb-%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB%E4%B9%8B%E7%AC%AC%E4%B8%80%E7%AF%87-%E7%83%AD%E7%82%B9%E7%BC%93%E5%AD%98/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Water&Melon's Blog