https://leetcode.com/problems/two-sum/description/
func twoSum(nums []int, target int) []int {
dic := make(map[int]int, len(nums))
for i, v := range nums {
counterpart := target - v
if exist, ok := dic[counterpart]; exist >= 0 && ok {
return []int{i, exist}
} else {
dic[v] = i
}
}
return []int{}
}
点评:
kafka的核心诉求是解决海量高吞吐的log日志消息存储、读写、推送/订阅、处理问题。
在kafka出现以前,传统的企业级消息中间件存在一些问题,并不能很好地解决这个问题。比如:
- 用tcp的消息确认机制来保障消息送达。
- 消息的原子写入
- 消息的消费确认
这些功能固然好,然而对于log类的消息数据来说却不是必选项。和交易这样的场景不同,在很多时候,丢一两条log有时候并不是什么大不了的事情。提供这些强大保障功能的同时也意味着吞吐、性能的牺牲,比如确认机制要求一个tcp roundtrip。另外,这些系统在水平扩展上做的并不是很好。
kafka由producer、consumer、broker三部分组成。kafka的存储是以segment文件为单位的,一个segment文件存满了就创建一个新的。每次producer产生了一条消息,只需要往最后一个segment文件的末尾append一条记录即可,然后记录对应的offset。这样做提升了写消息的性能,比起mysql的写来说效率提升了2倍以上(mysql至少要写一次记录、写一次索引)。为了进一步提升写入性能,kafka只有当消息达到一定数量的时候才会刷盘。为了避免未落盘的消息被消费后,消息落盘失败而丢失的情况,kafka只有落盘的消息才可见可被读取。
为了缩减消息的长度,kafka消息没有所谓的message-id,只维护了offset。
消费者接收消息必须是顺序的,也就是说,当读到offset x的时候,x之前的所有消息都被接收到了。
kafka还利用文件系统底层提供的page cache来避免缓存消息而造成的性能开销,这样GC不会有太大的负担。
此外,kafka优化了消费者从网络获取数据的开销。一般来说,从本地文件往远程socket发送文件包括以下几个步骤:
- 从文件读取到系统page cache
- 从page cache拷贝到应用层
- 从应用层拷贝到内核层缓存
- 把数据从内核层缓存发送到socket
这个过程包括4次拷贝、2次系统调用。而利用sendfile API可以直接把数据从文件通道发送到socket通道,这样可以减少2次拷贝和一次系统调用(2、3)。
kafka提高并发和吞吐,方便水平扩展的基础是partition。
一个partition对应一个consumer group,一个consumer group由至少一个消费者组成。
一个partition里面的一条消息最多只能被一个consumer group里的一个消费者消费。如果我们允许多个消费者同时消费一个partition的消息,那么我们就必须维护状态来追踪哪个消费者消费了哪个消息。而我们只需要让partition的数量比消费者多即可避免这个开销。
为了避免broker单点故障,kafka引入了zookeeper来维护broker集群信息。
- 当broker启动,会把broker的信息注册到zookeeper:地址端口、topics、broker上存放的partitions。
- 当消费者启动,会把消费者的信息注册到zookeeper:所属的consumer group、订阅的topics。
- consumer group在zookeeper注册如下信息:owner、offset。
- owner:在consumer所订阅的partition下面增加consumer-id,表示partition属于该consumer。
- offset:每个partition下面会保存当前partition被消费的offset值。
kafka只保证至少一次触达。
利用神器alfred的workflow,可以帮助提升效率。比如:
- 全局唤起alfred之后输入#+任何命令行指令即可以唤起Terminal并执行;
- 利用workflow自定义
md
命令,使得md xxx
就可以搜索名字为xxx的md文件并用Typora App打开等等。
当数据库大到一定程度的时候,查询会变慢。作为一个传统数据库比如MySQL,这个时候就需要分表、水平扩展。并且没有办法进行跨节点的join或者分布式事务。
而另外一些NoSQL,比如HBase, MongoDB等,虽然能很好地水平扩展,不用分库分表了,却又不支持人见人爱的SQL以及一致性事务。而以 Google Spanner (https://thenewstack.io/google-cloud-spanner-view-field/ ) 和F1为代表的NewSQL,不仅能够很好地水平扩展,还能支持ACID。那么,怎么才能做到呢?
整个TiDB包括几个核心部分:MySQL协议、分布式事务、Raft、本地存储。
TiDB有一个协议层,兼容了MySQL协议。MySQL协议下面是TiKV。那么MySQL和TiKV是如何关联起来的呢?每一个TiDB实例对应多个TiDB Server,每个TiDB Server有一个关系型数据库schema,描述表、列、索引。TiDB把表结构和KV存储做了一个映射。
举个例子,比如:
INSERT INTO t(id, name, address) values (1, "Jack", "Sunnyvale")
与Mysql的schema及表结构不同,TiDB会把这条记录转变成KV的形式,key=t_r1,value={jack,Sunnyvale}。
如果除了主键之外还有其他列做索引,那么除了row-id到row-content的映射之外,还有一个index-id到row-id的映射。
为了提升数据的安全性、可用性以及性能,存储被分为若干个region,每一个region都包含一个区段的key值,比如[A-H), [H-N), [O-T), [U-Z)。这样整体的写吞吐就能得到提升。而每一个region对应一个Raft组,也就意味着每一个region都有几个副本。这样每个region的数据都能保持强一致性。
另外,TiDB servers没有数据分片、也无状态。任何一个数据中心的任何一台TiDB Server都可以访问到所有数据。
TiDB分布式事务的实现是受Percolator的启发。我们看看Google Percolator基于MVCC(Multi-Version Concurrency Control 多版本并发控制)的事务是怎么做到分布式事务ACID的。
由于Percolator是基于Bigtable的,所以数据结构直接使用了Bigtable的Tablet。
每个Tablet可以想象成大的key-value map,按照key排序。每一个row相当于一个key-value对,每个row有个key,value则对应几个关键列,timestamp,data,lock,write,对应的是该列的生成时间,数据,是否上锁,写入的数据版本(时间戳)。
如:
{
"Will" : {
"balance:data" : {
1517543541533: "$10"
},
"balance:lock" : {
1517543541533: "primary-lock"
}
"balance:write": {
1517543541533: "data@timestamp=1517543541533"
}
},
"Jean" : {
"balance:data" : {
1517543541534: "$12"
},
"balance:lock" : {
1517543541534: "[email protected]"
}
"balance:write": {
1517543541534: "data@timestamp=1517543541534"
}
}
}
接下来我们来通过例子说明原理。
比如will有10元,jean有2元。will向jean转账7元。
0 初始状态。
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Will | 1 | $10 | ||
Will | 2 | data@time=1 |
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Jean | 1 | $2 | ||
Jean | 2 | data@time=1 |
1 Will上主行锁,并且更新数据。
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Will | 1 | $10 | ||
Will | 2 | data@time=1 | ||
Will | 3 | $3 | lock |
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Jean | 1 | $2 | ||
Jean | 2 | data@time=1 |
2 Jean上次行锁,并且更新数据。
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Will | 1 | $10 | ||
Will | 2 | data@time=1 | ||
Will | 3 | $3 | lock |
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Jean | 1 | $2 | ||
Jean | 2 | data@time=1 | ||
Jean | 3 | $9 | [email protected] |
3 Will检查是否有主行锁,如果没锁则失败;有锁则写入新值,并清理之前的主行锁。从这一刻开始,后续的读Will行操作都将看到新的值$3。
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Will | 1 | $10 | ||
Will | 2 | data@time=1 | ||
Will | 3 | $3 | ||
Will | 4 | data@time=3 |
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Jean | 1 | $2 | ||
Jean | 2 | data@time=1 | ||
Jean | 3 | $9 | [email protected] |
4 Jean行写入新值,并清理次行锁
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Will | 1 | $10 | ||
Will | 2 | data@time=1 | ||
Will | 3 | $3 | ||
Will | 4 | data@time=3 |
row-key | timestamp | balance:data | balance:lock | balance:write |
---|---|---|---|---|
Jean | 1 | $2 | ||
Jean | 2 | data@time=1 | ||
Jean | 3 | $9 | ||
Jean | 4 | data@time=3 |
整个事务分成2阶段。
第一阶段,上锁,记录新数据。如果事务看到有任何其他write列记录的时间戳大于它的时间戳,该事务就取消。如果事务看到有锁存在,不管什么时间戳,也会中止。这里有可能出现一种情况:lock存在,但是时间戳小于当前事务时间戳(之前事务还没提交完),但是因为可能性不大,所以不另外考虑。
第二阶段,去掉锁的同时写新数据。
我们来看看异常情况:
如果客户端在提交的时候挂了,那么锁就有可能一直留在那,如何清理和处理读写呢?
这个清理的动作交给后续的读写操作来处理。如果后来的事务看到锁,它怎么判断这个锁要不要被清理呢?这很难。假设事务A先发生,事务B后发生。那么事务B看到A在某列上的锁时,非常难判断它是挂掉了导致lock没有清理遗留在那里还是事务A准备提交并清除lock。
因此,我们能做的就是避免A和B对锁的竞争。即:
- 要么A检测到锁,commit并释放锁
- 要么B的读操作检测到锁,清理锁。如果是B的写操作,直接返回失败。此时A如果提交就会因为拿不到锁而中止,提交失败,重试即可。
如果在第二阶段primary提交清除了lock,但是次行锁没清除,那么后续的事务在读操作会试图roll forward,把没完成的部分完成,即write并清除lock。
Raft和RocksDB的部分读者有兴趣自行了解。