Monday, March 25, 2019

Distributed ID generation

X. DB
https://tech.meituan.com/2017/04/21/mt-leaf.html
以MySQL举例,利用给字段设置auto_increment_increment和auto_increment_offset来保证ID自增,每次业务使用下列SQL读写MySQL得到ID号。

begin;
REPLACE INTO Tickets64 (stub) VALUES ('a');
SELECT LAST_INSERT_ID();
commit;
优点:
  • 非常简单,利用现有数据库系统的功能实现,成本小,有DBA专业维护。
  • ID号单调自增,可以实现一些对ID有特殊要求的业务。
缺点:
  • 强依赖DB,当DB异常时整个系统不可用,属于致命问题。配置主从复制可以尽可能的增加可用性,但是数据一致性在特殊情况下难以保证。主从切换时的不一致可能会导致重复发号。
  • ID发号性能瓶颈限制在单台MySQL的读写性能。
对于MySQL性能问题,可用如下方案解决:在分布式系统中我们可以多部署几台机器,每台机器设置不同的初始值,且步长和机器数相等。比如有两台机器。设置步长step为2,TicketServer1的初始值为1(1,3,5,7,9,11…)、TicketServer2的初始值为2(2,4,6,8,10…)。这是Flickr团队在2010年撰文介绍的一种主键生成策略(Ticket Servers: Distributed Unique Primary Keys on the Cheap )。如下所示,为了实现上述方案分别设置两台机器对应的参数,TicketServer1从1开始发号,TicketServer2从2开始发号,两台机器每次发号之后都递增2。
TicketServer1:
auto-increment-increment = 2
auto-increment-offset = 1

TicketServer2:
auto-increment-increment = 2
auto-increment-offset = 2
这种架构貌似能够满足性能的需求,但有以下几个缺点:
  • 系统水平扩展比较困难,比如定义好了步长和机器台数之后,如果要添加机器该怎么做?假设现在只有一台机器发号是1,2,3,4,5(步长是1),这个时候需要扩容机器一台。可以这样做:把第二台机器的初始值设置得比第一台超过很多,比如14(假设在扩容时间之内第一台不可能发到14),同时设置步长为2,那么这台机器下发的号码都是14以后的偶数。然后摘掉第一台,把ID值保留为奇数,比如7,然后修改第一台的步长为2。让它符合我们定义的号段标准,对于这个例子来说就是让第一台以后只能产生奇数。扩容方案看起来复杂吗?貌似还好,现在想象一下如果我们线上有100台机器,这个时候要扩容该怎么做?简直是噩梦。所以系统水平扩展方案复杂难以实现。
  • ID没有了单调递增的特性,只能趋势递增,这个缺点对于一般业务需求不是很重要,可以容忍。
  • 数据库压力还是很大,每次获取ID都得读写一次数据库,只能靠堆机器来提高性能。

Leaf-segment数据库方案

第一种Leaf-segment方案,在使用数据库的方案上,做了如下改变: - 原方案每次获取ID都得读写一次数据库,造成数据库压力大。改为利用proxy server批量获取,每次获取一个segment(step决定大小)号段的值。用完之后再去数据库获取新的号段,可以大大的减轻数据库的压力。 - 各个业务不同的发号需求用biz_tag字段来区分,每个biz-tag的ID获取相互隔离,互不影响。如果以后有性能需求需要对数据库扩容,不需要上述描述的复杂的扩容操作,只需要对biz_tag分库分表就行。
数据库表设计如下:
+-------------+--------------+------+-----+-------------------+-----------------------------+
| Field       | Type         | Null | Key | Default           | Extra                       |
+-------------+--------------+------+-----+-------------------+-----------------------------+
| biz_tag     | varchar(128) | NO   | PRI |                   |                             |
| max_id      | bigint(20)   | NO   |     | 1                 |                             |
| step        | int(11)      | NO   |     | NULL              |                             |
| desc        | varchar(256) | YES  |     | NULL              |                             |
| update_time | timestamp    | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-------------+--------------+------+-----+-------------------+-----------------------------+
重要字段说明:biz_tag用来区分业务,max_id表示该biz_tag目前所被分配的ID号段的最大值,step表示每次分配的号段长度。原来获取ID每次都需要写数据库,现在只需要把step设置得足够大,比如1000。那么只有当1000个号被消耗完了之后才会去重新读写一次数据库。读写数据库的频率从1减小到了1/step.

Leaf-segment数据库方案

第一种Leaf-segment方案,在使用数据库的方案上,做了如下改变: - 原方案每次获取ID都得读写一次数据库,造成数据库压力大。改为利用proxy server批量获取,每次获取一个segment(step决定大小)号段的值。用完之后再去数据库获取新的号段,可以大大的减轻数据库的压力。 - 各个业务不同的发号需求用biz_tag字段来区分,每个biz-tag的ID获取相互隔离,互不影响。如果以后有性能需求需要对数据库扩容,不需要上述描述的复杂的扩容操作,只需要对biz_tag分库分表就行。
数据库表设计如下:
+-------------+--------------+------+-----+-------------------+-----------------------------+
| Field       | Type         | Null | Key | Default           | Extra                       |
+-------------+--------------+------+-----+-------------------+-----------------------------+
| biz_tag     | varchar(128) | NO   | PRI |                   |                             |
| max_id      | bigint(20)   | NO   |     | 1                 |                             |
| step        | int(11)      | NO   |     | NULL              |                             |
| desc        | varchar(256) | YES  |     | NULL              |                             |
| update_time | timestamp    | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-------------+--------------+------+-----+-------------------+-----------------------------+
重要字段说明:biz_tag用来区分业务,max_id表示该biz_tag目前所被分配的ID号段的最大值,step表示每次分配的号段长度。原来获取ID每次都需要写数据库,现在只需要把step设置得足够大,比如1000。那么只有当1000个号被消耗完了之后才会去重新读写一次数据库。读写数据库的频率从1减小到了1/step,大致架构如下图所示:
image
image

test_tag在第一台Leaf机器上是1~1000的号段,当这个号段用完时,会去加载另一个长度为step=1000的号段,假设另外两台号段都没有更新,这个时候第一台机器新加载的号段就应该是3001~4000。同时数据库对应的biz_tag这条数据的max_id会从3000被更新成4000,更新号段的SQL语句如下:
Begin
UPDATE table SET max_id=max_id+step WHERE biz_tag=xxx
SELECT tag, max_id, step FROM table WHERE biz_tag=xxx
Commit
这种模式有以下优缺点:
优点:
  • Leaf服务可以很方便的线性扩展,性能完全能够支撑大多数业务场景。
  • ID号码是趋势递增的8byte的64位数字,满足上述数据库存储的主键要求。
  • 容灾性高:Leaf服务内部有号段缓存,即使DB宕机,短时间内Leaf仍能正常对外提供服务。
  • 可以自定义max_id的大小,非常方便业务从原有的ID方式上迁移过来。
缺点:
  • ID号码不够随机,能够泄露发号数量的信息,不太安全。
  • TP999数据波动大,当号段使用完之后还是会hang在更新数据库的I/O上,tg999数据会出现偶尔的尖刺。
  • DB宕机会造成整个系统不可用。

双buffer优化

对于第二个缺点,Leaf-segment做了一些优化,简单的说就是:
Leaf 取号段的时机是在号段消耗完的时候进行的,也就意味着号段临界点的ID下发时间取决于下一次从DB取回号段的时间,并且在这期间进来的请求也会因为DB号段没有取回来,导致线程阻塞。如果请求DB的网络和DB的性能稳定,这种情况对系统的影响是不大的,但是假如取DB的时候网络发生抖动,或者DB发生慢查询就会导致整个系统的响应时间变慢。
为此,我们希望DB取号段的过程能够做到无阻塞,不需要在DB取号段的时候阻塞请求线程,即当号段消费到某个点时就异步的把下一个号段加载到内存中。而不需要等到号段用尽的时候才去更新号段。这样做就可以很大程度上的降低系统的TP999指标。详细实现如下图所示:
image
image

采用双buffer的方式,Leaf服务内部有两个号段缓存区segment。当前号段已下发10%时,如果下一个号段未更新,则另启一个更新线程去更新下一个号段。当前号段全部下发完后,如果下个号段准备好了则切换到下个号段为当前segment接着下发,循环往复。
  • 每个biz-tag都有消费速度监控,通常推荐segment长度设置为服务高峰期发号QPS的600倍(10分钟),这样即使DB宕机,Leaf仍能持续发号10-20分钟不受影响。
  • 每次请求来临时都会判断下个号段的状态,从而更新此号段,所以偶尔的网络抖动不会影响下个号段的更新。

Leaf高可用容灾

对于第三点“DB可用性”问题,我们目前采用一主两从的方式,同时分机房部署,Master和Slave之间采用半同步方式[5]同步数据。同时使用公司Atlas数据库中间件(已开源,改名为DBProxy)做主从切换。当然这种方案在一些情况会退化成异步模式,甚至在非常极端情况下仍然会造成数据不一致的情况,但是出现的概率非常小。如果你的系统要保证100%的数据强一致,可以选择使用“类Paxos算法”实现的强一致MySQL方案,如MySQL 5.7前段时间刚刚GA的MySQL Group Replication。但是运维成本和精力都会相应的增加,根据实际情况选型即可。

同时Leaf服务分IDC部署,内部的服务化框架是“MTthrift RPC”。服务调用的时候,根据负载均衡算法会优先调用同机房的Leaf服务。在该IDC内Leaf服务不可用的时候才会选择其他机房的Leaf服务。同时服务治理平台OCTO还提供了针对服务的过载保护、一键截流、动态流量分配等对服务的保护措施。
Leaf-segment方案可以生成趋势递增的ID,同时ID号是可计算的,不适用于订单ID生成场景,比如竞对在两天中午12点分别下单,通过订单id号相减就能大致计算出公司一天的订单量,这个是不能忍受的。面对这一问题,我们提供了 Leaf-snowflake方案。
image
image

Leaf-snowflake方案完全沿用snowflake方案的bit位设计,即是“1+41+10+12”的方式组装ID号。对于workerID的分配,当服务集群数量较小的情况下,完全可以手动配置。Leaf服务规模较大,动手配置成本太高。所以使用Zookeeper持久顺序节点的特性自动对snowflake节点配置wokerID。Leaf-snowflake是按照下面几个步骤启动的:
  1. 启动Leaf-snowflake服务,连接Zookeeper,在leaf_forever父节点下检查自己是否已经注册过(是否有该顺序子节点)。
  2. 如果有注册过直接取回自己的workerID(zk顺序节点生成的int类型ID号),启动服务。
  3. 如果没有注册过,就在该父节点下面创建一个持久顺序节点,创建成功后取回顺序号当做自己的workerID号,启动服务。
image
image

弱依赖ZooKeeper

除了每次会去ZK拿数据以外,也会在本机文件系统上缓存一个workerID文件。当ZooKeeper出现问题,恰好机器出现问题需要重启时,能保证服务能够正常启动。这样做到了对三方组件的弱依赖。一定程度上提高了SLA
同时Leaf服务分IDC部署,内部的服务化框架是“MTthrift RPC”。服务调用的时候,根据负载均衡算法会优先调用同机房的Leaf服务。在该IDC内Leaf服务不可用的时候才会选择其他机房的Leaf服务。同时服务治理平台OCTO还提供了针对服务的过载保护、一键截流、动态流量分配等对服务的保护措施。
Leaf-segment方案可以生成趋势递增的ID,同时ID号是可计算的,不适用于订单ID生成场景,比如竞对在两天中午12点分别下单,通过订单id号相减就能大致计算出公司一天的订单量,这个是不能忍受的。面对这一问题,我们提供了 Leaf-snowflake方案。
image
image

Leaf-snowflake方案完全沿用snowflake方案的bit位设计,即是“1+41+10+12”的方式组装ID号。对于workerID的分配,当服务集群数量较小的情况下,完全可以手动配置。Leaf服务规模较大,动手配置成本太高。所以使用Zookeeper持久顺序节点的特性自动对snowflake节点配置wokerID。Leaf-snowflake是按照下面几个步骤启动的:
  1. 启动Leaf-snowflake服务,连接Zookeeper,在leaf_forever父节点下检查自己是否已经注册过(是否有该顺序子节点)。
  2. 如果有注册过直接取回自己的workerID(zk顺序节点生成的int类型ID号),启动服务。
  3. 如果没有注册过,就在该父节点下面创建一个持久顺序节点,创建成功后取回顺序号当做自己的workerID号,启动服务。
image
image

弱依赖ZooKeeper

除了每次会去ZK拿数据以外,也会在本机文件系统上缓存一个workerID文件。当ZooKeeper出现问题,恰好机器出现问题需要重启时,能保证服务能够正常启动。这样做到了对三方组件的弱依赖。一定程度上提高了SLA

解决时钟问题

因为这种方案依赖时间,如果机器的时钟发生了回拨,那么就会有可能生成重复的ID号,需要解决时钟回退的问题。
image
image

参见上图整个启动流程图,服务启动时首先检查自己是否写过ZooKeeper leaf_forever节点:
  1. 若写过,则用自身系统时间与leaf_forever/${self}节点记录时间做比较,若小于leaf_forever/${self}时间则认为机器时间发生了大步长回拨,服务启动失败并报警。
  2. 若未写过,证明是新服务节点,直接创建持久节点leaf_forever/${self}并写入自身系统时间,接下来综合对比其余Leaf节点的系统时间来判断自身系统时间是否准确,具体做法是取leaf_temporary下的所有临时节点(所有运行中的Leaf-snowflake节点)的服务IP:Port,然后通过RPC请求得到所有节点的系统时间,计算sum(time)/nodeSize。
  3. 若abs( 系统时间-sum(time)/nodeSize ) < 阈值,认为当前系统时间准确,正常启动服务,同时写临时节点leaf_temporary/${self} 维持租约。
  4. 否则认为本机系统时间发生大步长偏移,启动失败并报警。
  5. 每隔一段时间(3s)上报自身系统时间写入leaf_forever/${self}。
由于强依赖时钟,对时间的要求比较敏感,在机器工作时NTP同步也会造成秒级别的回退,建议可以直接关闭NTP同步。要么在时钟回拨的时候直接不提供服务直接返回ERROR_CODE,等时钟追上即可。或者做一层重试,然后上报报警系统,更或者是发现有时钟回拨之后自动摘除本身节点并报警,如下:
 //发生了回拨,此刻时间小于上次发号时间
 if (timestamp < lastTimestamp) {
       
            long offset = lastTimestamp - timestamp;
            if (offset <= 5) {
                try {
                 //时间偏差大小小于5ms,则等待两倍时间
                    wait(offset << 1);//wait
                    timestamp = timeGen();
                    if (timestamp < lastTimestamp) {
                       //还是小于,抛异常并上报
                        throwClockBackwardsEx(timestamp);
                      }    
                } catch (InterruptedException e) {  
                   throw  e;
                }
            } else {
                //throw
                throwClockBackwardsEx(timestamp);
            }
        }
 //分配ID       
        
从上线情况来看,在2017年闰秒出现那一次出现过部分机器回拨,由于Leaf-snowflake的策略保证,成功避免了对业务造成的影响。
https://www.cnblogs.com/haoxinyue/p/5208136.html
当使用数据库来生成ID性能不够要求的时候,我们可以尝试使用Redis来生成ID。这主要依赖于Redis是单线程的,所以也可以用生成全局唯一的ID。可以用Redis的原子操作 INCR和INCRBY来实现。
可以使用Redis集群来获取更高的吞吐量。假如一个集群中有5台Redis。可以初始化每台Redis的值分别是1,2,3,4,5,然后步长都是5。各个Redis生成的ID为:
A:1,6,11,16,21
B:2,7,12,17,22
C:3,8,13,18,23
D:4,9,14,19,24
E:5,10,15,20,25
这个,随便负载到哪个机确定好,未来很难做修改。但是3-5台服务器基本能够满足器上,都可以获得不同的ID。但是步长和初始值一定需要事先需要了。使用Redis集群也可以方式单点故障的问题。
另外,比较适合使用Redis来生成每天从0开始的流水号。比如订单号=日期+当日自增长号。可以每天在Redis中生成一个Key,使用INCR进行累加。

优点:
1)不依赖于数据库,灵活方便,且性能优于数据库。
2)数字ID天然排序,对分页或者需要排序的结果很有帮助。
缺点:
1)如果系统中没有Redis,还需要引入新的组件,增加系统复杂度。
2)需要编码和配置的工作量比较大。

SnowFlake
snowflake算法可以根据自身项目的需要进行一定的修改。比如估算未来的数据中心个数,每个数据中心的机器数以及统一毫秒可以能的并发数来调整在算法中所需要的bit数。
优点:
1)不依赖于数据库,灵活方便,且性能优于数据库。
2)ID按照时间在单机上是递增的。
缺点:
1)在单机上是递增的,但是由于涉及到分布式环境,每台机器上的时钟不可能完全同步,也许有时候也会出现不是全局递增的情况。

6. 利用zookeeper生成唯一ID

zookeeper主要通过其znode数据版本来生成序列号,可以生成32位和64位的数据版本号,客户端可以使用这个版本号来作为唯一的序列号。
很少会使用zookeeper来生成唯一ID。主要是由于需要依赖zookeeper,并且是多步调用API,如果在竞争较大的情况下,需要考虑使用分布式锁。因此,性能在高并发的分布式环境下,也不甚理想。
 
7. MongoDB的ObjectId
MongoDB的ObjectId和snowflake算法类似。它设计成轻量型的,不同的机器都能用全局唯一的同种方法方便地生成它。MongoDB 从一开始就设计用来作为分布式数据库,处理多个节点是一个核心要求。使其在分片环境中要容易生成得多。
其格式如下:

前4 个字节是从标准纪元开始的时间戳,单位为秒。时间戳,与随后的5 个字节组合起来,提供了秒级别的唯一性。由于时间戳在前,这意味着ObjectId 大致会按照插入的顺序排列。这对于某些方面很有用,如将其作为索引提高效率。这4 个字节也隐含了文档创建的时间。绝大多数客户端类库都会公开一个方法从ObjectId 获取这个信息。 
接下来的3 字节是所在主机的唯一标识符。通常是机器主机名的散列值。这样就可以确保不同主机生成不同的ObjectId,不产生冲突。 
为了确保在同一台机器上并发的多个进程产生的ObjectId 是唯一的,接下来的两字节来自产生ObjectId 的进程标识符(PID)。 
前9 字节保证了同一秒钟不同机器不同进程产生的ObjectId 是唯一的。后3 字节就是一个自动增加的计数器,确保相同进程同一秒产生的ObjectId 也是不一样的。同一秒钟最多允许每个进程拥有2563(16 777 216)个不同的ObjectId。


X. Zookeeper
https://blog.csdn.net/fanrenxiang/article/details/83015216
我们利用zookeeper的持久顺序节点特性,多个客户端同时创建同一节点,zk保证了能有序的创建,创建成功并返回的path类似于/root/generateid0000000001酱紫的,可以看到是顺序有规律的,能较好的解决这个问题,缺点是,会依赖于zk。

利用ZK的Znode数据版本如下面的代码,每次都不获取期望版本号也就是每次都会成功,那么每次都会返回最新的版本号:
Zookeeper这个方案用得较少,严重依赖Zookeeper集群,并且性能不是很高,所以不予推荐。



X. Redis

Assuming you're looking to generate sequential ids, you can use Redis and the INCR command without worrying about race conditions. Since Redis is (mostly) single threaded, you are assured that every request will get it's own unique id from it.
Furthermore, you don't need to check the id key's existence/initialize it because Redis will do that for you (i.e. if you INCR a non-existent key, it will be first created and set to 0 automatically).
https://redis.io/commands/INCRBY
INCRBY mykey 5
https://www.callicoder.com/distributed-unique-id-sequence-number-generator/

MongoDB’s ObjectId

MongoDB’s ObjectIDs are 12-byte (96-bit) hexadecimal numbers that are made up of -
  • a 4-byte epoch timestamp in seconds,
  • a 3-byte machine identifier,
  • a 2-byte process id, and
  • a 3-byte counter, starting with a random value.
This is smaller than the earlier 128-bit UUID. But again the size is relatively longer than what we normally have in a single MySQL auto-increment field (a 64-bit bigint value).

Database Ticket Servers

This approach uses a centralized database server to generate unique incrementing IDs. It’s like a centralized auto-increment. This approach is used by Flickr.
The problem with this approach is that the ticket server can become a write bottleneck. Moreover, you introduce one more component in your infrastructure that you need to manage and scale.

Twitter Snowflake

Twitter snowflake is a dedicated network service for generating 64-bit unique IDs at high scale. The IDs generated by this service are roughly time sortable.
The IDs are made up of the following components:
  • Epoch timestamp in millisecond precision - 41 bits (gives us 69 years with a custom epoch)
  • Configured machine id - 10 bits (gives us up to 1024 machines)
  • Sequence number - 12 bits (A local counter per machine that rolls over every 4096)
They have reserved 1-bit for future purpose. Since the IDs use timestamp as the first component, they are sortable.
The IDs generated by twitter snowflake fits in 64-bits and are time sortable, which is great. That’s what we want.

The above generator uses the system’s MAC address to create a unique identifier for the Node. You can also supply a NodeID to the sequence generator. That will guarantee uniqueness.


https://blog.intenthq.com/blog/distributed-id-generation-with-redis-and-lua
An ID can be many things, but surely its most important definition is that of “uniqueness”

  • Ordered: They may be ordered to allow comparison of items stamped with IDs.
  • Meaningful: They may embed certain information about the point at which the ID was generated, for example the time (however inexact).
  • Distributed: They may be generated by a machine without consulting any other machine (ie. they must be truly distributed).
  • Compact: They may be represented in a specific format for space or performance reasons. Normally, you’d want to represent this in as compact a format as possible but you often trade-off longevity to do this (UNIX timestamps are a great example of compact albeit finite time representation in action 1).


So called “time oracles” are dedicated servers normally equipped with an atomic clock or GPS device whose sole purpose is for keeping time in as precise a manner as possible. Time oracle servers are used frequently in environments where super accurate time is important (such as trading platforms and financial applications), but they are prohibitively expensive and not a viable solution for those working in an environment where deploying custom hardware is impossible (think cloud environments).
To combat this, software such as NTP is available, specifically written to ameliorate this problem for those who cannot access a dedicated time oracle. The idea is basically that large organisations donate access to their time oracle servers by adding them to a global pool of NTP servers which the masses can use to adjust their clock against. This is, unfortunately, not without its problems. Its an error prone process, as it involves network hops and jitter, and is thus susceptible to all kinds of inaccuracies that the underlying algorithms do their best to abate but cannot avoid. It’s also really important to note that NTP is, by default, allowed to move time backwards to compensate for clock drift, something you must disable if you want to ensure any ID generation based on time can never generate duplicates.


Redis fitted our needs well as it allowed us to run arbitrary scripts using Lua; however, you could use any service that allows this 7. The ability to run Lua scripts is important in the case of Redis, as it otherwise lacks the ability to conditionally run commands in a transaction.



Sunday, March 24, 2019

MySQL - System Design

https://www.quora.com/In-scaling-Memcache-at-Facebook-4-1-Regional-Invalidations-is-there-an-example-of-how-McSqueal-actually-derives-the-keys-to-be-deleted-based-on-an-SQL-statement
Facebook has a technology called wormhole that captures row changes from the MySQL binary logs and publishes them on a message bus.  These messages can be used for invalidation.
https://code.fb.com/core-data/wormhole-pub-sub-system-moving-data-through-space-and-time/
At a high level, Wormhole propagates changes issued in one system to all systems that need to reflect those changes – within and across data centers.
One application of Wormhole is to connect our user databases (UDBs) with other services so they can operate based on the most current data. Here are three examples of systems that receive updates via Wormhole:
  1. Caches – Refills and invalidation messages need to be sent to each cache so they stay in sync with their local database and consistent with each other.
  2. Services – Services such as Graph Search that build specialized social indexes need to remain current and receive updates to the underlying data.
  3. Data warehouse – The Data warehouses and bulk processing systems (Hadoop, Hbase, Hive, Oracle) receive streaming, real-time updates instead of relying on periodic database dumps.
The Wormhole system has three primary components:
  1. Producer – Services and web tier use the Wormhole producer library to embed messages that get written to the binary log of the UDBs.
  2. Publisher – The Wormhole publisher tails the binary log, extracts the embedded messages, and makes them available as real-time streams that can be subscribed to.
  3. Consumer – Each interested consumer subscribes to relevant updates.
In order to satisfy a wide variety of use-cases, Wormhole has the following properties:
  1. Data partitioning: On the databases, the user data is partitioned, or sharded, across a large number of machines. Updates are ordered within a shard but not necessarily across shards. This partitioning also isolates failures, allowing the rest of the system to keep working even when one or more storage machines have failed. Wormhole maintains a separate publish-subscribe stream per shard–think parallel wormholes in space.
  2. Rewind in time: To deal with different failures (network, software, and hardware), services need to be able to go back to an earlier data checkpoint and start applying updates from that point onward. Wormhole supports check-pointing, state management, and a rewind feature.
Thanks to all of these properties, we are able to run Wormhole at a huge scale. For example, on the UDB deployment alone, Wormhole processes over 1 trillion messages every day (significantly more than 10 million messages every second). Like any system at Facebook’s scale, Wormhole is engineered to deal with failure of individual components, integrate with monitoring systems, perform automatic remediation, enable capacity planning, automate provisioning and adapt to sudden changes in usage pattern


Memcached - Product Design

https://pdos.csail.mit.edu/6.824/notes/l-memcached.txt
how do FB apps use mc?
  read:
    v = get(k) (computes hash(k) to choose mc server)
    if v is nil {
      v = fetch from DB
      set(k, v)
    }
  write:
    v = new value
    send k,v to DB
    delete(k)
  application determines relationship of mc to DB
    mc doesn't know anything about DB
  FB uses mc as a "look-aside" cache
    real data is in the DB
    cached value (if any) should be same as DB

  look-aside is much trickier than it looks -- consistency
    paper is trying to integrate mutually-oblivious storage layers
  cache is critical:
    not really about reducing user-visible delay
    mostly about surviving huge load!
    cache misses and failures can create intolerable DB load
  they can tolerate modest staleness: no freshness guarantee
  stale data nevertheless a big headache
    want to avoid unbounded staleness (e.g. missing a delete() entirely)
    want read-your-own-writes
    each performance fix brings a new source of staleness
  huge "fan-out" => parallel fetch, in-cast congestion

will partition or replication yield most mc throughput?
  partition: server i, key k -> mc server hash(k)
  replicate: server i, key k -> mc server hash(i)
  partition is more memory efficient (one copy of each k/v)
  partition works well if no key is very popular
  partition forces each web server to talk to many mc servers (overhead)
  replication works better if a few keys are very popular

performance and regions (Section 5)

Q: what is the point of regions -- multiple complete replicas?
   lower RTT to users (east coast, west coast)
   parallel reads of popular data due to replication
   (note DB replicas help only read performance, no write performance)
   maybe hot replica for main site failure?

Q: why not partition users over regions?
   i.e. why not east-coast users' data in east-coast region, &c
   social net -> not much locality
   very different from e.g. e-mail


Q: why OK performance despite all writes forced to go to the master region?
   writes would need to be sent to all regions anyway -- replicas
   users probably wait for round-trip to update DB in master region
     only 100ms, not so bad
   users do not wait for all effects of writes to finish
     i.e. for all stale cached values to be deleted
   
performance within a region (Section 4)

multiple mc clusters *within* each region
  cluster == complete set of mc cache servers
    i.e. a replica, at least of cached data

why multiple clusters per region?
  why not add more and more mc servers to a single cluster?
  1. adding mc servers to cluster doesn't help single popular keys
     replicating (one copy per cluster) does help
  2. more mcs in cluster -> each client req talks to more servers
     and more in-cast congestion at requesting web servers
     client requests fetch 20 to 500 keys! over many mc servers
     MUST request them in parallel (otherwise total latency too large)
     so all replies come back at the same time
     network switches, NIC run out of buffers
  3. hard to build network for single big cluster
     uniform client/server access
     so cross-section b/w must be large -- expensive
     two clusters -> 1/2 the cross-section b/w

but -- replicating is a waste of RAM for less-popular items
  "regional pool" shared by all clusters
  unpopular objects (no need for many copies)
  decided by *type* of object
  frees RAM to replicate more popular objects

bringing up new mc cluster was a serious performance problem
  new cluster has 0% hit rate
  if clients use it, will generate big spike in DB load
    if ordinarily 1% miss rate, and (let's say) 2 clusters,
      adding "cold" third cluster will causes misses for 33% of ops.
    i.e. 30x spike in DB load!
  thus the clients of new cluster first get() from existing cluster (4.3)
    and set() into new cluster
    basically lazy copy of existing cluster to new cluster
  better 2x load on existing cluster than 30x load on DB

important practical networking problems:
  n^2 TCP connections is too much state
    thus UDP for client get()s
  UDP is not reliable or ordered
    thus TCP for client set()s
    and mcrouter to reduce n in n^2
  small request per packet is not efficient (for TCP or UDP)
    per-packet overhead (interrupt &c) is too high
    thus mcrouter batches many requests into each packet
    
mc server failure?
  can't have DB servers handle the misses -- too much load
  can't shift load to one other mc server -- too much
  can't re-partition all data -- time consuming
  Gutter -- pool of idle servers, clients only use after mc server fails

The Question:
  why don't clients send invalidates to Gutter servers?
  my guess: would double delete() traffic
    and send too many delete()s to small gutter pool
    since any key might be in the gutter pool

thundering herd
  one client updates DB and delete()s a key
  lots of clients get() but miss
    they all fetch from DB
    they all set()
  not good: needless DB load
  mc gives just the first missing client a "lease"
    lease = permission to refresh from DB
    mc tells others "try get() again in a few milliseconds"
  effect: only one client reads the DB and does set()
    others re-try get() later and hopefully hit

let's talk about consistency now

the big truth
  hard to get both consistency (== freshness) and performance
  performance for reads = many copies
  many copies = hard to keep them equal

what is their consistency goal?
  *not* read sees latest write
    since not guaranteed across clusters
  more like "not more than a few seconds stale"
    i.e. eventual
  *and* writers see their own writes
    read-your-own-writes is a big driving force

first, how are DB replicas kept consistent across regions?
  one region is master
  master DBs distribute log of updates to DBs in slave regions
  slave DBs apply
  slave DBs are complete replicas (not caches)
  DB replication delay can be considerable (many seconds)

how do we feel about the consistency of the DB replication scheme?
  good: eventual consistency, b/c single ordered write stream
  bad: longish replication delay -> stale reads

how do they keep mc content consistent w/ DB content?
  1. DBs send invalidates (delete()s) to all mc servers that might cache
  2. writing client also invalidates mc in local cluster
     for read-your-writes

why did they have consistency problems in mc?
  client code to copy DB to mc wasn't atomic:
    1. writes: DB update ... mc delete()
    2. read miss: DB read ... mc set()
  so *concurrent* clients had races

what were the races and fixes?

Race 1:
  k not in cache
  C1 get(k), misses
  C1 v = read k from DB
    C2 updates k in DB
    C2 and DB delete(k) -- does nothing
  C1 set(k, v)
  now mc has stale data, delete(k) has already happened
  will stay stale indefinitely, until key is next written
  solved with leases -- C1 gets a lease, but C2's delete() invalidates lease,
    so mc ignores C1's set
    key still missing, so next reader will refresh it from DB

Race 2:
  during cold cluster warm-up
  remember clients try get() in warm cluster, copy to cold cluster
  k starts with value v1
  C1 updates k to v2 in DB
  C1 delete(k) -- in cold cluster
  C2 get(k), miss -- in cold cluster
  C2 v1 = get(k) from warm cluster, hits
  C2 set(k, v1) into cold cluster
  now mc has stale v1, but delete() has already happened
    will stay stale indefinitely, until key is next written
  solved with two-second hold-off, just used on cold clusters
    after C1 delete(), cold ignores set()s for two seconds
    by then, delete() will propagate via DB to warm cluster

Race 3:
  k starts with value v1
  C1 is in a slave region
  C1 updates k=v2 in master DB
  C1 delete(k) -- local region
  C1 get(k), miss
  C1 read local DB  -- sees v1, not v2!
  later, v2 arrives from master DB
  solved by "remote mark"
    C1 delete() marks key "remote"
    get()/miss yields "remote"
      tells C1 to read from *master* region
    "remote" cleared when new data arrives from master region

Q: aren't all these problems caused by clients copying DB data to mc?
   why not instead have DB send new values to mc, so clients only read mc?
     then there would be no racing client updates &c, just ordered writes
A:
  1. DB doesn't generally know how to compute values for mc
     generally client app code computes them from DB results,
       i.e. mc content is often not simply a literal DB record
  2. would increase read-your-own writes delay
  3. DB doesn't know what's cached, would end up sending lots
     of values for keys that aren't cached

PNUTS does take this alternate approach of master-updates-all-copies

FB/mc lessons for storage system designers?
  cache is vital to throughput survival, not just a latency tweak
  need flexible tools for controlling partition vs replication
  need better ideas for integrating storage layers with consistency

https://www.quora.com/How-does-the-lease-token-solve-the-stale-sets-problem-in-Facebooks-memcached-servers


However, this will not happen because if the memcached server has recently given a lease token out, it will not give out another. In other words, B does not get a lease token in this scenario. Instead what B receives is a hot miss result which tells client B that another client (namely client A) is already headed to the DB for the value, and that client B will have to try again later.

So, leases work in practice because only one client is headed to the DB at any given time for a particular value in the cache.

Furthermore, when a new delete comes in, memcached will know that the next lease-set (the one from client A) is already stale, so it will accept the value (because it's newer) but it will mark it as stale and the next client to ask for that value will get a new lease token, and clients after that will once again get hot miss results.

Note also that hot miss results include the latest stale value, so a client can make the decision to go forth with slightly stale data, or try again (via exponential backoff) to get the most up-to-date data possible.
Scaling Memcache at Facebook
https://www.usenix.org/system/files/conference/nsdi13/nsdi13-final170_update.pdf
https://www.youtube.com/watch?v=6phA3IAcEJ8&t=67s
https://medium.com/@shagun/scaling-memcache-at-facebook-1ba77d71c082
https://zhuanlan.zhihu.com/p/20734038
https://zhuanlan.zhihu.com/p/20761071
https://zhuanlan.zhihu.com/p/20827183
FB的数据中心分布在不同地方,互相之间数据需要保持一致性,这部分就是讲在数据库和Memcache层面保证数据正确性的。
首先,底层数据存储用的是MySQL(跟开源版本不完全一样,有一些Facebook自己的强化),采用Master-Slave Replication(主从复制)架构,其中一个Regional作为Master Region,接受所有的Write,然后通过MySQL本身的Replication把数据更新广播到其他Slave Region。这是数据库层面,简单直接。
那么Memcache层面呢?如果是Master Region的Write,那好办,完全照搬之前的流程:
1. Web Server往数据库发更新,然后invalidate local(同一个Cluster的) cache
2. 同一个Cluster的Web Server遇上cache miss会从数据库拿新的数据然后往cache写
但是如果是非Master Region,就不能照搬了,因为更新是往Master Region发,而这个更新可能要很久才会被广播到当前的Region。一旦local cache被清掉,别人来读发现cache miss了,但是新的数据还没传到本Region的数据库,这时候别人就会读本地数据里那个实际上已经过期的值往cache里写了。这么一写,这个陈旧数据就可能不知道会存活到何年何月了。
要避免这种情况发生,就得保证本地cache被清掉了,但是本地数据库还没有最新数据的时候,要到Master Region去拿正确的数据。所以步骤如下。假如一个Web Server要更新一个key k:
1. 先在某个地方放一个标志Rk。这个标志的含义就是我更新数据了,但是这个更新数据还没到我这个Region的数据库。
2. 把k和Rk一起放在SQL里发给Master Region
3. 清掉本Cluster的k
4. Master Region收到更新后会把同样的更新广播到本地Region
5. 本地Region收到后负责把其他Cluster的k,和之前的标志Rk删掉
然后别人的读如果发生在3之后,5之前,它就会看到Rk存在,于是它就不会去读本地数据库去拿数据,而是直接访问Master Region的数据库,这样就能很大程度上保证读到的数据不是过期的。
这就是大概的思路。至于实现细节,Rk是放在Regional Pool中的,被整个Regional共享(读到这里真是感觉人生如此巧妙,需要某个东西的时候恰好可以拿现成的东西来用,这种感觉真是太爽了)。
论文里还提到一个历史细节:在需要扩张到多个Region之前,其实清空cache过期数据的工作其实完全是靠Web Server来干的。前面说过Web Server跨Cluster干这种事情已经很蛋疼了,还要跨Region就太不实际了(而且还容易发生Race condition),于是才把这部分逻辑放在后端。这个故事告诉我们系统是逐渐演变的,没有一步到位的完美系统。

D. Single Server Improvement

这部分讲的是如何提高单个服务器的性能。

D.1 基本优化

基本的优化有三个
1) 允许Hashtable自动扩张(否则冲突太多的时候,look-up time会趋向O(n))
2) 从单线程变成多线程,通过全局锁来保护数据结构
3) 每个线程使用单独的端口通信
然后在此基础上
1) 把全局锁改成粒度更小的锁,使得throughput提高
2) 使用UDP而不是TCP来通信,使得throughput大致提升了10%(论文做了两个实验,分别是8%和13%)
D.2 Slab Allocator
Slab Allocator是Memcached的内存管理模块。它的算法大致是:
1. 定义不同的内存块大小(slab class),从64 bytes等比数列一直到1M (1.07 factor)
2. 每个slab class会对应一堆预先分配的内存块
3. 每当需要放什么东西的时候,它会找对应大小的slab class,然后从这个class对应的内存块里找空闲的,然后把数据塞进去。找不到就清掉没人用的内存块,再把数据塞进去腾出来的空间。
论文提出的修改是:动态的重新分配给每个slab class的内存。比如原来是64bytes一直都有100块内存,1M一直都有10块内存,都是固定的。但是,随着workload的改变,一台Server可能早上要存的东西都是64 bytes以下的,晚上要存的东西都是0.99M的。理想状况是,早上我们把1M的那些内存块对应的内存分给64bytes,晚上把64bytes的内存块对应的内存分给1M。这就是大致的算法思路。
具体实现则是,结合之前的例子,我发现64 bytes这个class老是在腾空间给新数据,而且被清掉的数据块的历史,比其他slab class最没人用的数据块(least recently used)还要短20%,那么我就知道64 bytes这个class内存不够了,我就从别的slab class里找最最最least recently used的内存块,把它resize一下分给64 bytes这个急切需要用内存的class。
D.3 Transient Item Cache
Memcached在清cache的时候是很拖延症的——不到满了都不清。在大部分情况下,这是没问题的。但是,有一部分key,只在很短的一段时间之内有用,之后根本不会被用到,但它们还是会在cache呆着,占着内存(请想象一个人在厕所赖着不出来直到下一个人来的情况),直到它们到了eviction队列尾端。
其实问题不大,但就是看它们不爽,那么怎么清掉呢?方法是对于expiration time比较短的key,把他们放到另外一个数据结构了,然后过一段时间check一下,有过期的就主动把它删掉,即使内存还没满。
D.4 Software Upgrade
我们每更新一次软件都要重启Server,然后要等一段时间cache才能满上数据。解决办法是更新前把cached的数据放到某个地方(System V Shared Memory Region,我也不知道是啥。。。),然后这部分数据之后就可以接着用了。

https://medium.com/@shagun/scaling-memcache-at-facebook-1ba77d71c082
In Facebook’s context, users consume much more content than they create. So the workload is read intensive and caching help to reduce the workload.MORE ON IT. Facebook uses Memcache Clusters where each Memcache instance is a demand-filled, look-aside cache. This means if a client requests data from Memcache and data is not available, the client would fetch the data from the database and would populate the cache for further requests. 

In case of get requests, UDP performs better than TCP and geterrors are treated as cache miss though insertion is not performed. This design choice seems practical as only .25% of requests fail due to late/ dropped or out of order packets. Though the response size is very small, the variation is quite large with mean = 954 bytes and median = 135 bytes. Set and delete operations are still performed over TCP (for reliability) though the connections are coalesced to improve efficiency.

Within a cluster, data is distributed across hundreds of servers through consistent hashing. A very high request rate combined with large fanout leads to an all to all communication between Memcache servers and clients and even a single server can become a bottleneck for many requests. Clients construct a DAG representing the dependency between data so that more independent requests are fired concurrently. Facebook also provides a standalone proxy called mcrouter that acts as an interface to Memcache server interface and routes the requests/replies to/from other servers. Along with these, flow control mechanisms in the form of sliding window mechanism are provided to limit incast congestion.

Lease

Leases are used to address stale sets (when web server writes a stale value in the cache) and thundering herds (when a key undergoes heavy read and write operations). When a client experiences a cache-miss, Memcache gives it a lease (a 64-bit token bound to the requested key). This lease is verified by Memcache when client tries to set the value. If Memcache receives a delete request for the key, the lease is invalidated and the value can not be set. To mitigate thundering herds, Memcache returns a token only once every 10 seconds per key. If a read request comes within 10 seconds of a token issue, the client is notified to retry after a short time, by which the updated value is expected to be set in the cache. In situations where returning stale data is not much problem, the client does not have to wait and stale data (at most 10 second old) is returned

Memcache Pools

Since different workloads can have different access patterns and requirements, Memcache clusters are partitioned into pools. The default pool is called wildcard and then there are separate pools for keys that can not reside in the wildcard pool. A small pool can be provisioned for keys for which cache miss is not expensive. Data is replicated within a pool when request rate is quite high and data can easily fit into a few machines. In Facebook’s context, replication seems to work better than sharding though it has the additional overhead of maintaining consistency.
In case a few Memcache servers fail, a small set of machines, called gutters, take over. In case of more widespread failures, requests are diverted to alternate clusters.

Cold cluster Warmup

When a new cluster is brought online, it takes time to get populated and initially the cache hit rate is very low. So a technique called Cold Cluster Warmup is employed where a new cold cluster is populated with data from a warm cluster instead of the database cluster. This way the cold cluster comes to full capacity within few hours. But additional care is needed to account for race conditions. One example could be: a client in cold cluster makes an update and before this update reaches the warm cluster, another request for the same key is made by the cold cluster then the item in the cold cache would be indefinitely inconsistent. To avoid this, Memcache rejects add operations for 2 seconds (called holdoff time)once a delete operation is taken place. So if a value is updated in a cold cluster and a subsequent request is made within 2 seconds, the add operation would be rejected indicating that the data has been updated. 2 seconds is chosen as most updates seems to propagate within this time.

Across Region consistency

Clusters comprising a storage cluster and several front end clusters are deployed in different regions. Only one of the regions contains the master database and rest act as replicas. This adds the challenge of synchronisation.
Writes from a master region send invalidations only within the master region. Sending invalidations outside may lead to a race situation where deletes reach before data is replicated. Facebook uses mcsequal daemon helps to avoid that at the cost of serving stale data for some time.
Writes from a non-master region are handled differently. Suppose a user updates his data from a non-master region with a large replication lag. A cache refill from a replica database is allowed only after the replication stream has caught up. A remote marker mechanism is used to minimise the probability if reading stale data. The presence of a marker indicates that data in the replica is stale and the query is redirected to the master region. When a webserver updates a key k, it sets a remote marker rk in the region, performs the write to the master database having key k and deletes k in the local cluster. When it tries to read k next time, it will experience a cache miss, will check if rk exists and will redirect its query to the master cluster. Had rk not been set, the query would have gone to the local cluster itself. Here latency is introduced to make sure most updated data is read.

Single Server Improvements

Facebook introduced many improvements for Memcache servers running as single instances as well. This includes extending Memcache to support automatic expansion of the hash table without the look-up time drifting to O(n), making the server multi-threaded using a global lock and giving each thread its own UDP port to reduce contention.
Memcache uses an Adaptive Slab Allocator which organizes memory into slab classes — pre-allocated, uniformly sized chunks of memory. Items are stored in smallest possible slab class which can fit the data. Slab sizes start at 64 bytes and reach up to 1 Mb. Each slab class maintains a free-list of available chunks and requests more memory in 1MB in case the free-list is empty. If no more free memory can be allocated, eviction takes place in Least Recently Used (LR) fashion. The allocator periodically rebalances slab assignments to keep up with the workload. Memory is freed in less used slabs and given to more heavily used slabs.
Most of the items are evicted lazily from the memory though some are evicted as soon as they are expired. Short lived items are placed into a circular buffer of linked lists (indexed by seconds until expiration) — called Transient Item Cache — based on the expiration time of the item. Every second, all of the items in the bucket at the head of the buffer are evicted and the head advances by one. By adding a short expiration time to heavily used set of keys whose items with short useful lifespans, the proportion of Memcache pool used by this key family was reduced from 6% to 0.3% without affecting the hit rate.




Lessons Learnt

Memcache’s design decisions are driven by data and not just intuition. Facebooks seems to have experimented with a lot of configurations before arriving on decisions like using UDP for read operations and choosing the value for parameters like holdoff time. This is how it should be — data-driven decisions. In the entire process of developing Memcache, Facebook focused on optimizations which affect a good number of users and usecases instead of optimizing for each possbile use case.
Facebook separated caching layer from persistence layer which makes it easier to mould the 2 layers individually. By handling the cache insertion logic in the application itself, Facebook made it easier to plug Memcache with different data stores. By modeling the probability of reading the stale data as a parameter, they allowed the latency and hit rate on the persistence store to be configurable to some extent thus broadening the scope for Memcache. Facebook breaks down a single page load request into multiple requests, thereby allowing for different kind of stale data tolerance for each component. For example, the servers serving the profile picture can cache content longer than the servers serving messages. This helps to lower the response latency and reduce the load on the data layer.
Facebook’s Memcache is one of the many solutions aimed to solve a rather common problem — a scalable, distributed key-value store. Amazon’s Dynamosolves is well for a write-intensive workload for small data sizes and Facebook solves it for a read intensive workload. Other contenders in the list are Cassandra, Voldemort, Redis, Twitter’s Memcache and more. The sheer number of possible, well known solutions suggests that there are many dimensions to the problem, some of which are yet unknown and that we still do not have a single solution that can be used for all use cases.


Every time a new cache box is added to Memcache infrastructure, it has empty cache. This box is called 'cold'. Every request to this box will end up in 'miss', so clients will spend lots of resources to fill the cache. In the worst case this will affect performance of the entire site, because on every 'miss' on the cold box client would spend much more time to actually get the data.
Mcrouter offers a way to 'warm up' cold cache boxes without impact on performance. The idea is simple: when we have a cold box and receive a 'miss', try to get a value from 'warm 'box (one with filled cache) and set it back to cold box.
Here is the setup for this logic:
 {
   "pools": {
     "cold": { "servers": [ /* cold hosts */ ] },
     "warm": { "servers": [ /* warm hosts */ ] }
   },
   "route": {
     "type": "WarmUpRoute",
     "cold": "PoolRoute|cold",
     "warm": "PoolRoute|warm"
   }
 }
Explanation: All sets and deletes go to the "cold" route handle. Gets are attempted on the "cold" route handle and in case of a miss, data is fetched from the "warm" route handle (where the request is likely to result in a cache hit). If "warm" returns an hit, the response is then forwarded to the client and an asynchronous request updates the value in the "cold" route handle.
Since any client that wants to talk to memcached can already speak the standard ASCII memcached protocol, we use that as the common API and enter the picture silently. To a client, mcrouter looks like a memcached server. To a server, mcrouter looks like a normal memcached client. But mcrouter’s feature-rich configurability makes it more than a simple proxy.
Some features of mcrouter are listed below. In the following, a “destination” is a memcached host (or some other cache service that understands the memcached protocol) and “pool” is a set of destinations configured for some workload — e.g., a sharded pool with a specified hashing function, or a replicated pool with multiple copies of the data on separate hosts. Finally, pools can be organized into multiple clusters.
  • Connection pooling: Multiple clients can connect to a single mcrouter instance and share the outgoing connections, reducing the number of open connections to memcached instances.
  • Multiple hashing schemes: Mcrouter provides a proven consistent hashing algorithm (furc_hash) that allows distribution of keys across many memcached instances. Hostname hashing is useful for selecting a unique replica per client. There are a number of other hashes useful in specialized applications.
  • Prefix routing: Mcrouter can route keys according to common key prefixes. For example, you can send all keys starting with “foo” to one pool, “bar” prefix to another pool, and everything else to a “wildcard” pool. This is a simple way to separate different workloads.
  • Replicated pools: A replicated pool has the same data on multiple hosts. Writes are replicated to all hosts in the pool, while reads are routed to a single replica chosen separately for each client. This could be done either due to per-host packet limitations where a sharded pool would not be able to handle the read rate; or for increased availability of the data (one replica going down doesn’t affect availability due to automatic failover).


    • Cold cache warm up: Mcrouter can smooth the performance impact of starting a brand new empty cache host or set of hosts (as large as an entire cluster) by automatically refilling it from a designated “warm” cache.


      • Broadcast operations: By adding a special prefix to a key in a request, it’s easy to replicate the same request into multiple pools and/or clusters.
      • Reliable delete stream: In a demand-filled look-aside cache, it’s important to ensure all deletes are eventually delivered to guarantee consistency. Mcrouter supports logging delete commands to disk in cases when the destination is not accessible (due to a network outage or other failure). A separate process then replays those deletes asynchronously. This is done transparently to the client — the original delete command is always reported as successful.


        • Quality of service: Mcrouter allows throttling the rate of any type of request (e.g., get/set/delete) at any level (per-host, per-pool, per-cluster), rejecting requests over a specified limit. We also support rate limit requests to slow delivery.
        • Large values: Mcrouter can automatically split/re-stitch large values that would not normally fit in a memcached slab.


          • Multi-level caches: Mcrouter supports local/remote cache setup, where values would be looked up locally first and automatically set in a local cache from remote after fetching.

          • Production traffic shadowing: When testing new cache hardware, we found it extremely useful to be able to route a complete copy of production traffic from clients. Mcrouter supports flexible shadowing configuration. It’s possible to shadow test a different pool size (re-hashing the key space), shadow only a fraction of the key space, or vary shadowing settings dynamically at runtime.


            • Online reconfiguration: Mcrouter monitors its configuration files and automatically reloads them on any file change; this loading and parsing is done on a background thread and new requests are routed according to the new configuration as soon as it’s ready. There’s no extra latency from client’s point of view.


              • Flexible routing: Configuration is specified as a graph of small routing modules called “route handles,” which share a common interface (route a request and return a reply) and which can be composed freely. Route handles are easy to understand, create, and test individually, allowing for arbitrarily complex logic when used together. For example: An “all-sync” route handle will be set up with multiple child route handles (which themselves could be arbitrary route handles). It will pass a request to all of its children and wait for all of the replies to come back before returning one of these replies. Other examples include, among many others, “all-async” (send to all but don’t wait for replies), “all-majority” (for consensus polling), and “failover” (send to every child in order until an non-error reply is returned). Expanding a pool can be done quickly by using a “cold cache warmup” route handle on the pool (with the old set of servers as the warm pool). Moving this handle handle up the stack will allow for an entire cluster to be warmed up from a warm cluster.
              • Destination health monitoring and automatic failover: Mcrouter keeps track of the health status of each destination. If mcrouter marks a destination as unresponsive, it will fail over incoming requests to an alternate destination automatically (fast failover) without attempting to send them to the original destination. At the same time health check requests will be sent in the background, and as soon as a health check is successful, mcrouter will revert to using the original destination. We distinguish between “soft errors” (e.g., data timeouts) that are allowed to happen a few times in a row and “hard errors” (e.g., connection refused) that cause a host to be marked unresponsive immediately. Needless to say, all of this is completely transparent to the client.


                https://www.quora.com/In-scaling-Memcache-at-Facebook-4-1-Regional-Invalidations-is-there-an-example-of-how-McSqueal-actually-derives-the-keys-to-be-deleted-based-on-an-SQL-statement
                I think you are asking about how memcached gets invalidated when SQL changes rows in the database.  Facebook has a technology called wormhole that captures row changes from the MySQL binary logs and publishes them on a message bus.  These messages can be used for invalidation.

                https://zhuanlan.zhihu.com/p/20734038
                A.1.1 第一个优化是把数据请求一批一批,每一批并发来做。Facebook内部的异步编程使得Web Server能够建立起数据的依赖关系,得到一个DAG(有向无环图),然后没有依赖关系的数据就可以作为一批(batch)并发请求。每个batch的大小平均而言是24个。
                A.1.2 客户端-服务器端通信
                不同memcached服务器之间并不通信,一切工作都在客户端进行。客户端主要的工作是,给定一个key,它要知道应该到哪个memcached server去取数据。这个过程称为routing。客户端routing的逻辑,可以作为一个库直接嵌入到Web Server里面,也可以作为单独的程序运行。
                客户端和服务器端通信的时候,UDP和TCP都用。在get请求的时候,用的是UDP,这样就不用像TCP那样还要握手建立连接。如果客户端发现丢包了,直接作为错误报出,而并不会尝试找回丢失的包。这时Web Server会把这当作cache miss处理。使用UDP把latency的平均值降低了20%左右。另一方面为了可靠起见,update和delete都采用TCP。
                如果在网路中的包太多,就会发生Incast Congestion的问题(可以理解为,network有很多switch,router啥的,一旦一次性发一堆包,这些包同时到达switch,这些switch就会忙不过来)。应对这个问题就是不要让大量包在同一时间发送出去,在客户端限制每次发出去的包的数量(具体实现就是客户端弄个队列)。每次发送的包的数量称为“Window size”。这个值太小的话,发送太慢,自然延迟会变高;这个值太大,发送的包太多把network switch搞崩溃了,就可能发生比如丢包之类的情况,可能被当作cache miss,这样延迟也会变高。所以这个值需要调。
                A.2.1 租约(Lease)。租约解决的问题有两个:
                1. Stale set,就是一个client拿着过期的数据往memcache里塞
                2. Thundering Herd Problem.
                租约的实现细节如下:每次cache miss,memcache会给客户端返回一个token,告诉它我这没有数据,你去取数据,取完拿这个token来更新我这的数据。如果有多个客户端同时读,同时cache miss,那么它们就会同时收到各自的token。但是,在接受数据更新的时候,比如A,B先后拿到一个token,A拿Token更新完之后,B再拿Token过来更新就不认了。如果系统收到了一个delete请求,那么啥token都不管用了。
                再进一步,我们可以限制发token的频率,比如每10秒最多发一个。然后对于那些拿不到token的,既然有别人去拿数据去了,何必让它们再去拿?就让它们等待一小段时间再重试。大部分时候,更新后的数据在几毫秒后就来了,它们再重试就是cache hit了。
                这个设计还是很精妙的。
                A.2.2 过期数据(Stale values)
                如果应用程序层可以忍受稍微过期一点的数据,针对这点可以进一步降低系统负载。当一个key-value被删除的时候(delete请求或者cache爆棚清空间了),它被放倒一个临时的数据结构里,会再续上比较短的一段时间。当有请求进来的时候会返回这个数据并标记为“Stale”。对于Facebook大部分应用场景而言,Stale Value是可以忍受的。
                A.2.3 Memcache池(memcache pool)
                在不同应用场景之下,对memcache的读取规律(access pattern)是可能大为不同的。比如有些访问率高,有些访问率低;cache miss有时无所谓,有时候很昂贵(比如计算一次时间特别久);有时上一周用到的key跟这一周用到的key基本一样(low-churn),有时差别很大(high-churn)。
                拿最后一个例子来说(low-churn vs. high-churn),服务器A使用的key万年不变,就在那么一小堆里来来去去;服务器B使用的key基本什么都有可能。如果它们使用同一个Pool,那么服务器B因为经常换新key,就会把A的key给频繁踢掉,而理想情况下,A使用的pool应该基本上就不用更新key的。这样就会造成很大的性能浪费。
                所以,针对不同的需求,设立不同的pool是很有必要的。
                A.2.4 数据复制(Replication)
                最后一个降低负载的手段是在每个Pool内进行Replication。想象如下情景:我们有一台memcache机器,可以支持500K/s的请求率(request rate)。现在我们想要支持1M/s的请求,怎么办?加一台机器,然后把数据平分,两台机器一人一半?但是,客户端在请求的时候,一般是多个key作为一个batch发出来。假如原来是每个request要读100个key,那么现在就是分成两个request,每个request读50个key,然后同时扔给两个机器。由于每台机器接受请求的数量还是受到500K/s的限制,那么总的request rate还是500K/s。
                那如何破?就是加一台机器,然后把数据完整复制到那上面。现在客户端还是一次发一个有100个key的读,但是有50%发到老机器,50%发到新机器。这样就能支持500K/s * 2的请求率了。
                A.3 应对系统崩溃
                系统崩溃有大有小。对于大规模的服务器失联,一般就直接把对应的cluster直接取下来,把web server的请求重定向到别的cluster的memcache。这种情况不常见,应对方法也比较简单粗暴。
                对于小规模的outage,就需要注意。一方面小规模事故不足以让我们把整个cluster取缔掉,但是放任不管,也容易造成连锁反应:cache miss太多,导致Web Server全部直接访问后端服务,后端服务顶不住,挂了,或者后端数据库顶不住,也挂了,然后全站就挂了,句号。论文里提到的应对的办法就是设立一堆专门的应急memcache服务器(Gutter)。一旦一个get请求没有任何响应,客户端默认那个server挂了,转而请求Gutter。这个优化带来的影响是:
                In practice, this sys- tem reduces the rate of client-visible failures by 99% and converts 10%–25% of failures into hits each day
                https://zhuanlan.zhihu.com/p/20761071
                B. Region Replication
                (Region感觉不好翻译,类似于data center的概念吧)
                前面我们谈到把一堆Web Server和一堆Memcached作为一个Cluster。但是一个Cluster是不能无限Scale up的,因为每个Cluster里面都是多对多的Connection,也就是说是N^2的connection上限,这样N越大,就会:
                1. Hotkey 访问量越来越大,最终搞得有hotkey的那个server hold不住
                2. 网路堵塞各种丢包
                3. ...
                所以,论文提到建立多个Cluster (为了区分,后面把这种每个里面包含一定数量的Web Server和memcached的cluster称为Frontend Cluster),把它们和一个Storage Cluster组合起来,组成一个Region。所有Frontend Cluster共享同一个后端服务。
                这样相当于是用空间换时间:同一个key在每一个Frontend Cluster都可能有一个Copy,这样会带来consistency的问题(后面会讲怎么解决),但是这样能够降低latency和提高availability。
                B.1 Regional Invalidation
                这里要解决的问题是:Cluster A的某个Server修改了后端数据库里面的值,如何把这个值被修改了的消息传播到所有的Frontend Cluster,好让它们把本地memcached对应的旧的数据清掉?
                一种简单的方式是让那个做出修改的Web Server负责通知同一个Region里所有Cluster的所有memcached,但基于我们上面说到的种种理由,这样performance会很差,而且还容易由于routing的配置出错,所以这种操作只能现在在同一个Cluster里。
                那么对于其他Cluster,解决办法是让Storage Cluster来负责把消息广播出去。Storage layer采用的是Mysql,而Mysql对于数据更新是有日志的。第一步首先在日志内容里加上对应的memcached key,然后设立一个进程监听日志,发现有数据更新,就解析出对应的key,然后把消息广播给所有的memcached。
                这里有一点要注意的是我们不能让数据库直接跟memcached通信,原因包括但不限于:
                1. 这样通信连接太多
                2. 这样还得把routing逻辑放到后端逻辑里
                所以每个cluster里面有专门的server,运行mcrouter进程(还记得前面说过routing逻辑可以作为库函数嵌入,也可以作为单独的进程运行吧)。广播消息会发送给这些进程,再由它们负责传给本cluster的memcached。
                B.2 Regional Pool
                每个Frontend Cluster都有自己的memcached pools,我们姑且把它们称作cluster pool吧。与之相对的Regional Pool顾名思义就是被所有cluster共享的memcached pool。为什么要设立这样的pool呢?
                主要原因是,有一些数据,访问频率低,本身占内存还多,这样的数据放到每个cluster里复制一份,要占用很多额外的内存。所以把它们放到Regional Pool里面就可以减少内存占用。虽然Regional Pool的latency会相对更高(因为要穿越cluster的边界),但是由于它们访问频率不高,所以也就显得不那么有所谓了。
                论文提到目前是靠人的经验来觉得什么东西放Regional Pool的,不知道现在是不是做到自动化了。
                B.3 Cold Cluster Warmup
                当我们起一个新的cluster,或者把cluster拿去维护,等等等等之类的,这个cluster的cache基本是没东西的,所以基本很大概率是各种cache miss,然后要等很久才能填得比较满,而且这样也会给后端服务带来很大压力。
                解决办法嘛,很容易就能想到,允许这个cold cluster在cache miss的时候,把别的“warm cluster”(就是cache有比较多数据的cluster)当作storage cluster去那边读数据,这样原来需要几天时间才能完成的warm up在几个小时之内就能完成。
                但是这样又带来了新的问题。想象下面这个情景:Cluster A某个Server往Storage里更新了一份数据,后端在完成数据更新后会把数据过期的消息发送到其他的Cluster。同时Cluster A里某个Server读这份数据的时候发现cache miss,然后从Cluster B里读;如果恰好Cluster B此时还没有收到数据过期的消息(因为消息传递也是要时间的),这个读会成功返回,然后这个Server拿到事实上已经过期的数据后会往Cluster A的memcached里写。这样子就相当于Cluster A的memcached就存放了一份过期的数据,而且这份过期的数据可能被保留很长甚至无限长的时间。
                解决办法是:memcached支持一个功能,在对一个key进行delete操作之后锁住这个key一段时间不允许进行add操作。通过在cold cluster里设置这段hold-off时间为大于0的值(2秒),在上面的场景中,由于前面那个更新的Server是对本地memcached进行了delete操作的,第二个server拿着过期数据往里写的时候就会报错,然后它就知道这个数据在Storage里面有新值,就会去读最新的值再往里写。理论上过期数据还是可能出现的,但是可能性大大减低了。
                http://www.shuatiblog.com/blog/2015/01/24/distributed-caching-memcached/
                Memcached is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of database calls, API calls, or page rendering.
                Memcached servers are indeed independent of each other. Memcached server is just an efficient key-value store implemented as in-memory hash table.

                • It cannot cache large object
                What makes memcached distributed is the client, which in most implementations can connect to a pool of servers. Typical implementations use consistent hashing, which means that when you add or remove server to/from a pool of N servers, you only have to remap 1/N keys.
                Typically keys are not duplicated on various hosts, as memcached is not meant to be persistent store and gives no guarantees that your value will persist (for example when running out of assigned memory, memcached server drops least recently used (LRU) elements). Thus it’s assumed that your application should handle missing keys.
                https://miafish.wordpress.com/2015/03/06/memcached/
                Here is where consistent hashing is advantageous: Suppose server 2 will be removed from the memcache server pool. What would happen when we want to fetch key k1? Nothing strange would happen. We plot k1 still on the same position in the continuum, and the first server-dot it will hit is still s1.
                However, when fetching k3, which is stored on s2, it would miss the s2-dot (since it has been removed), and will be moved to server s3. Same goes for k2, which moves from s2 to s1.
                In fact, the more server-dots we place onto the continuum, the less key-misses we get in case a server gets removed (or added). A good number would be around 100 to 200 dots, since more dots would result in a slower lookup on the continuum (this has to be a very fast process of course). The more servers you add, the better the consistent hashing will perform.
                Instead of almost 100% of the key-movements you have when using a standard modulus algorithm, the consistent hashing algorithm would maybe invalidate 10-25% of your keys (these numbers drop down quickly the more servers you use) which means the pressure on your backend system (like the database) will be much less than it would when using modulus.
                Pros:
                reduces database load.
                perfect for websites with high database load
                significantly reduces the number of retrieval requests to database
                cuts down the I/O access
                Cons:
                not a persistent data store
                not a database
                not application specific
                can not cache large object(key size = 255 chars, max value = 1MB)
                https://www.adayinthelifeof.nl/2011/02/06/memcache-internals/
                Internally, all objects have a “counter”. This counter holds a timestamp. Every time a new object is created, that counter will be set to the current time. When an object gets FETCHED, it will reset that counter to the current time as well. As soon as memcache needs to “evict” an object to make room for newer objects, it will find the lowest counter. That is the object that isn’t fetched or is fetched the longest time ago (and probably isn’t needed that much, otherwise the counter would be closed to the current timestamp).
                In effect this creates a simple system that uses the cache very efficient. If it isn’t used, it’s kicked out of the system.

                But high performance systems like memcache can get into trouble working that way. The reason is that malloc() and free() functions are not really optimized for such kind of programs. Memory gets fragmented easily which means a lot of memory will get spilled

                in order to combat this “malloc()” problem, memcache does its own memory management by default. Memcache’s memory manager will allocate the maximum amount of memory from the operating system that you have set (for instance, 64Mb, but probably more) through one malloc() call. From that point on, it will use its own memory manager system called the slab allocator.

                Slab allocation
                When memcache starts, it partitions its allocated memory into smaller parts called pages. 
                Each page is 1Mb large (coincidentally, the maximum size that an object can have you can store in memcache). 
                Each of those pages can be assigned to a slab-class, or can be unassigned (being a free page).

                A slab-class decides how large the objects can be that are stored inside that particular page. Each page that is designated to a particular slab-class will be divided into smaller parts called chunks. 

                The chunks in each slab have the same size so there cannot be 2 different sized chunks inside the same page.

                For instance, there could be a page with 64byte chunks (slab class 1), a page with 128byte chunks (slab class 2) and so on, until we get the largest slab with only 1 chunk (the 1MB chunk).

                There can be multiple pages for each slab-class, but as soon as a page is assigned a slab-class (and thus, split up in chunks), it cannot be changed to another slab-class.

                The smallest chunk-size starts at 80 bytes and increases with a factor of 1.25 (rounded up until the next  power of 2). So the second smallest chunksize would be 100 etc. You can actually find it out by issuing the “-vv” flag when starting memcache. 

                You can also set the factor (-f) and the initial chunk-size (-s), but unless you really know what you are doing, don’t change the initial values.

                 Memcache will initially create 1 page per slab-class and the rest of the pages will be free (which even slab class needs a page, gets a page)

                Now that memcache has partitioned the memory, it can add data to the slabs. as soon as a complete page if full (all chunks in the page are filled) and we need to add another piece of data, it will fetch a new free page, assign it to the specified slab-class, partition it into chunks and gets the first available chunk to store the data. 

                But as soon as there are no more pages left that we can designate to our slab-class, it will use the LRU-algorithm to evict one of the existing chunks to make room. This means that when we need a 128byte chunk, it will evict a 128byte chunk, even though there might be a 256byte chunk that is even older. Each slab-class has its own LRU.

                 Consistent hashing
                $server_id = hashfunc($key) % $servercount;


                The trouble with this system: as soon as $servercount (the number of servers) change, almost 100% of all keys will change server as well.