19.9 使用代理缓存

URLStore 已经成为了有效的 RPC 服务,现在可以创建另一种代表 RPC 客户端的类型,它会转发请求到 RPC 服务器,我们称它为 ProxyStore

  1. type ProxyStore struct {
  2. client *rpc.Client
  3. }

一个 RPC 客户端必须使用 DialHTTP() 方法连接到服务器,所以我们把这句加入 NewProxyStore() 函数,它用于创建 ProxyStore 对象。

  1. func NewProxyStore(addr string) *ProxyStore {
  2. client, err := rpc.DialHTTP("tcp", addr)
  3. if err != nil {
  4. log.Println("Error constructing ProxyStore:", err)
  5. }
  6. return &ProxyStore{client: client}
  7. }

ProxyStoreGet()Put() 方法,它们利用 RPC 客户端的 Call() 方法,将请求直接传递给服务器:

  1. func (s *ProxyStore) Get(key, url *string) error {
  2. return s.client.Call("Store.Get", key, url)
  3. }
  4. func (s *ProxyStore) Put(url, key *string) error {
  5. return s.client.Call("Store.Put", url, key)
  6. }

带缓存的 ProxyStore

可是,如果 slave 进程只是简单地代理所有的工作到 master 节点,不会得到任何增益!我们打算用 slave 节点来应对 Get() 请求。要做到这点,它们必须有 URLStoremap 的一份副本(缓存)。因此我们对 ProxyStore 的定义进行扩展,将 URLStore 包含在其中:

  1. type ProxyStore struct {
  2. urls *URLStore
  3. client *rpc.Client
  4. }

NewProxyStore() 也必须做修改:

  1. func NewProxyStore(addr string) *ProxyStore {
  2. client, err := rpc.DialHTTP("tcp", addr)
  3. if err != nil {
  4. log.Println("ProxyStore:", err)
  5. }
  6. return &ProxyStore{urls: NewURLStore(""), client: client}
  7. }

还必须修改 NewURLStore() 以便给出空文件名时,不会尝试从磁盘写入或读取文件:

  1. func NewURLStore(filename string) *URLStore {
  2. s := &URLStore{urls: make(map[string]string)}
  3. if filename != "" {
  4. s.save = make(chan record, saveQueueLength)
  5. if err := s.load(filename); err != nil {
  6. log.Println("Error loading URLStore: ", err)
  7. }
  8. go s.saveLoop(filename)
  9. }
  10. return s
  11. }

ProxyStoreGet() 方法需要扩展:它应该首先检查缓存中是否有对应的键。如果有,Get() 返回已缓存的结果。否则,应该发起 RPC 调用,然后用返回结果更新其本地缓存:

  1. func (s *ProxyStore) Get(key, url *string) error {
  2. if err := s.urls.Get(key, url); err == nil { // url found in local map
  3. return nil
  4. }
  5. // url not found in local map, make rpc-call:
  6. if err := s.client.Call("Store.Get", key, url); err != nil {
  7. return err
  8. }
  9. s.urls.Set(key, url)
  10. return nil
  11. }

同样地,Put() 方法仅当成功完成了远程 RPC Put() 调用,才更新本地缓存:

  1. func (s *ProxyStore) Put(url, key *string) error {
  2. if err := s.client.Call("Store.Put", url, key); err != nil {
  3. return err
  4. }
  5. s.urls.Set(key, url)
  6. return nil
  7. }

汇总

slave 节点使用 ProxyStore,只有 master 使用 URLStore。有鉴于创造它们的方式,它们看上去十分一致:两者都实现了相同签名的 Get()Put() 方法,因此我们可以指定一个 Store 接口来概括它们的行为:

  1. type Store interface {
  2. Put(url, key *string) error
  3. Get(key, url *string) error
  4. }

现在全局变量 store 可以成为 Store 类型:

  1. var store Store

最后,我们改写 main() 函数以便程序只作为 master 或 slave 启动(我们只能这么做,因为现在 store 是 Store 接口类型!)。

为此我们添加一个没有默认值的新命令行标志 masterAddr

  1. var masterAddr = flag.String("master", "", "RPC master address")

如果给出 master 地址,就启动一个 slave 进程并创建新的 ProxyStore;否则启动 master 进程并创建新的 URLStore

  1. func main() {
  2. flag.Parse()
  3. if *masterAddr != "" { // we are a slave
  4. store = NewProxyStore(*masterAddr)
  5. } else { // we are the master
  6. store = NewURLStore(*dataFile)
  7. }
  8. ...
  9. }

这样,我们已启用了 ProxyStore 作为 web 前端,以代替 URLStore

其余的前端代码继续和之前一样地工作,它们不必在意 Store 接口。只有 master 进程会写数据文件。

现在可以加载一个 master 节点和数个 slave 节点,对 slave 进行压力测试。

编译这个版本 4 或直接使用现有的可执行程序。

要进行测试,首先在命令行用以下命令启动 master 节点:

  1. ./goto -http=:8081 -rpc=true # (Windows 平台用 goto 代替 ./goto)

这里提供了 2 个标志:master 监听 8081 端口,已启用 RPC。

slave 节点用以下命令启动:

  1. ./goto -master=127.0.0.1:8081

它获取到 master 的地址,并在 8080 端口接受客户端请求。

在源码目录下已包含了以下 shell 脚本 demo.sh,用来在类 Unix 系统下自动启动程序:

  1. #!/bin/sh
  2. gomake
  3. ./goto -http=:8081 -rpc=true &
  4. master_pid=$!
  5. sleep 1
  6. ./goto -master=127.0.0.1:8081 &
  7. slave_pid=$!
  8. echo "Running master on :8081, slave on :8080."
  9. echo "Visit: http://localhost:8080/add"
  10. echo "Press enter to shut down"
  11. read
  12. kill $master_pid
  13. kill $slave_pid

要在 Windows 下测试,启动 MINGW shell 并启动 master,然后每个 slave 都要单独启动新的 MINGW shell 并启动 slave 进程。

链接