文档控制
# 文档冲突
当使用index API更新文档,一次性读取原始文档,做我们的修改,然后重新索引整个文档。最近的索引请求将获胜:无论最后哪一个文档被索引,都将被唯一存储在 Elasticsearch 中。如果其他人同时更改这个文档,他们的更改将丢失
很多时候这是没有问题的。也许我们的主数据存储是一个关系型数据库,我们只是将数据复制到Elasticsearch中并使其可被搜索。也许两个人同时更改相同的文档的几率很小。或对于我们的业务来说偶尔丢失更改并不是很严重的问题
但有时丢失了一个变更就是非常严重的。如使用Elasticsearch 存储网上商城商品库存的数量,每次卖一个商品的时候,在 Elasticsearch 中将库存数量减少。有一天,管理层决定做一次促销。突然地,一秒要卖好几个商品。假设有两个web程序并行运行,每一个都同时处理所有商品的销售,如果有数据更新的丢失,则会导致数据的不一致性
web_1 对stock_count所做的更改已经丢失,因为 web_2不知道它的 stock_count的拷贝已经过期。结果会认为有超过商品的实际数量的库存,因为卖给顾客的库存商品并不存在,我们将让他们非常失望
变更越频繁,读数据和更新数据的间隙越长,也就越可能丢失变更。在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失
- 悲观并发控制
- 在关系型数据库中广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突
- 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改
- 乐观并发控制
- Elasticsearch中使用的这种方法
- 假定冲突是不可能发生的,并且不会阻塞正在尝试的操作
- 如果源数据在读写当中被修改,更新将会失败
- 应用程序接下来将决定该如何解决冲突
- 如可以重试更新、使用新的数据、或者将相关情况报告给用户
# 乐观锁并发控制
Elasticsearch 特点
- 分布式:当文档创建、更新或删除时,新版本的文档必须复制到集群中的其他节点
- 异步和并发:复制请求被并行发送,并且到达目的地时也许顺序是乱的
需要一种方法确保文档的旧版本不会覆盖新的版本
- 通过指定想要修改文档的version号来达到这个目的,乐观锁机制
- index , GET和DELETE请求时,指出每个文档都有一个_version(版本号),当文档被修改时版本号递增
- 使用version号来确保变更以正确顺序得到执行;如果旧版本的文档在新版本之后到达,它可以被简单的忽略
- 利用version号来确保应用中相互冲突的变更不会导致数据丢失;如果该版本不是当前版本号,请求将会失败
注意:老的版本es使用version,但是新版本不支持了,会报下面的错误,提示用if_seq _no和if _primary_term
# 示例
# 创建文档
PUT http://localhost:9200/shopping2/_create/1001
body
{}
response
{
"_index": "shopping2",
"_type": "_doc",
"_id": "1001",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 更新文档
POST http://localhost:9200/shopping2/_update/1001
body
{
"doc":{
"title":"华为手机"
}
}
2
3
4
5
response
{
"_index": "shopping2",
"_type": "_doc",
"_id": "1001",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 旧版本防止冲突更新方法
POST http://localhost:9200/shopping2/_update/1001?version=1
body
{
"doc":{
"title":"华为手机2"
}
}
2
3
4
5
response
{
"error": {
"root_cause": [
{
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
}
],
"type": "action_request_validation_exception",
"reason": "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
},
"status": 400
}
2
3
4
5
6
7
8
9
10
11
12
13
# 新版本防止冲突更新方法
- if_seq_no 是更新记录的 _seq_no 当前值
- if_primary_term 是更新记录的 _primary_term 当前值
POST http://localhost:9200/shopping2/_update/1001?if_seq_no=1&if_primary_term=1
body
{
"doc":{
"title":"华为手机3"
}
}
2
3
4
5
response
{
"_index": "shopping2",
"_type": "_doc",
"_id": "1001",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 1
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 外部系统版本控制
一个常见的设置是使用其它数据库作为主要的数据存储,使用Elasticsearch做数据检索,这意味着主数据库的所有更改发生时都需要被复制到Elasticsearch,如果多个进程负责这一数据同步,可能遇到类似于之前描述的并发问题
若主数据库已经有了版本号,或一个能作为版本号的字段值比如timestamp,可以在 Elasticsearch 中通过增加 version_type=extermal到查询字符串的方式重用这些相同的版本号
- 版本号必须是大于零的整数,且小于9.2E+18
- 一个Java中 long类型的正值
外部版本号的处理方式和内部版本号的处理方式有些不同,Elasticsearch不是检查当前_version
和请求中指定的版本号是否相同,而是检查当前_version
是否小于指定的版本号。如果请求成功,外部的版本号作为文档的新_version
进行存储
注意:外部版本号不仅在索引和删除请求是可以指定,而且在创建新文档时也可以指定
# 示例
使用外部版本号更新当前文档,当前文档version=1
POST http://localhost:9200/shopping2/_doc/1001?version=300&version_type=external
body
{
"title":"华为手机6"
}
2
3
response
_version
已经替换为外部版本号
{
"_index": "shopping2",
"_type": "_doc",
"_id": "1001",
"_version": 300,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 3,
"_primary_term": 1
}
2
3
4
5
6
7
8
9
10
11
12
13
14