0%

不做任何过滤的搜索

一旦将一些数据摄取到 Elasticsearch 索引之后,你就可以通过 /{index}/_search 来发起搜索请求。要访问全套搜索功能,请使用 Elasticsearch Query DSL 在请求正文中指定搜索条件。您可以在请求 URI 中指定搜索的索引的名称。

例如,以下请求检索 bank 按账号排序的索引中的所有文档:

1
2
3
4
5
6
7
GET /bank/_search
{
"query": { "match_all": {} },
"sort": {
{ "account_number": "asc" }
}
}

PHP 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$params = [
'index' => 'bank',
'body' => [
'query' => [
'match_all' => (object)[
],
],
'sort' => [
[
'account_number' => 'asc',
],
],
],
];
$response = $client->search($params);

'match_all' 后面如果是空数组,需要加上 (object) 转换成空对象。

默认情况下,返回值的 hits 字段包含了符合搜索条件的前十条文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
"took" : 63,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value": 1000,
"relation": "eq"
},
"max_score" : null,
"hits" : [ {
"_index" : "bank",
"_id" : "0",
"sort": [0],
"_score" : null,
"_source" : {"account_number":0,"balance":16623,"firstname":"Bradshaw","lastname":"Mckenzie","age":29,"gender":"F","address":"244 Columbus Place","employer":"Euron","email":"bradshawmckenzie@euron.com","city":"Hobucken","state":"CO"}
}, {
"_index" : "bank",
"_id" : "1",
"sort": [1],
"_score" : null,
"_source" : {"account_number":1,"balance":39225,"firstname":"Amber","lastname":"Duke","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"amberduke@pyrami.com","city":"Brogan","state":"IL"}
}, ...
]
}
}

响应结果也提供了关于搜索请求的一些其他信息:

  • took: 单位为 ms,Elasticsearch 处理这个请求用了多久
  • timed_out: 搜索请求是否超时
  • _shards: 搜索了多少个分片,以及成功、失败或跳过了多少个分片
  • max_score: 找到的最相关文件的分数
  • hits.total.value: 找到了多少个匹配的文档
  • hits.sort: 文档的排序(不按相关性得分排序时)
  • hits._score: 文档的相关性得分(使用时不适用 match_all

每个请求是独立的:Elasticsearch 在请求中不维护任何状态信息。要翻阅搜索结果,请在您的请求中指定 fromsize 参数。

例如,以下请求获取了第 10 到 19 条文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$params = [
'index' => 'bank',
'body' => [
'query' => [
'match_all' => (object)[
],
],
'sort' => [
[
'account_number' => 'asc',
],
],
'from' => 10,
'size' => 10,
],
];

$response = $client->search($params);

根据字段匹配搜索

不精确匹配

既然已经了解了如何提交基本的搜索请求,则可以开始构建更有趣的 match_all 查询。

要在字段中搜索特定术语,可以使用 match 查询。例如,以下请求搜索该 address 字段以查找地址包含 milllane 的客户:

1
2
3
4
5
6
7
8
9
10
11
12
$params = [
'index' => 'bank',
'body' => [
'query' => [
'match' => [
'address' => 'mill lane',
],
],
],
];

$response = $client->search($params);

精确匹配

如果要执行词组搜索而不是匹配单个词,请使用 match_phrase 代替 match。例如,以下请求仅匹配包含短语 mill lane 的地址:

1
2
3
4
5
6
7
8
9
10
11
12
$params = [
'index' => 'bank',
'body' => [
'query' => [
'match_phrase' => [
'address' => 'mill lane',
],
],
],
];

$response = $client->search($params);

要想构造更复杂的查询,可以使用 bool 查询来组合多个查询条件。你可以根据需要(必须匹配),期望(应该匹配)或不期望(必须不匹配)指定条件。

例如,以下请求在 bank 索引中搜索属于 40 岁客户的账户,但不包括住在爱达荷州(ID)的任何人:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$params = [
'index' => 'bank',
'body' => [
'query' => [
'bool' => [
'must' => [
'match' => [
'age' => '40',
],
],
'must_not' => [
[
'match' => [
'state' => 'ID',
],
],
],
],
],
],
];
$response = $client->search($params);

bool 查询中的每个 mustshouldmust_not 元素成为查询子句。文档满足每个 mustshould 的标准的程度有助于文档的相关性得分。分数越高,文档就越符合你的搜索条件。默认情况下,Elasticsearch 返回按这些相关性分数排名的文档。

must_not 子句中的条件被视为过滤器。它影响文件是否包含在结果中,但不会影响文件的评分方式。您还可以显式指定任意过滤器,以基于结构化数据包括或排除文档。

例如,以下请求使用范围过滤器将结果限制为余额在 2000 美元到 30000 美元(含)之间的账户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$params = [
'index' => 'bank',
'body' => [
'query' => [
'bool' => [
'must' => [
'match_all' => [
],
],
'filter' => [
'range' => [
'balance' => [
'gte' => 20000,
'lte' => 30000,
],
],
],
],
],
],
];

$response = $client->search($params);

索引单个文档

我们可以使用一个 PUT 请求来索引单个文档:

1
2
3
4
PUT /customer/_doc/1
{
"name": "John Doe"
}
  • 使用 PUT 方法
  • customer: 索引名称
  • 1: 唯一的文档 ID
  • body: 文档内容,可以包含一个或多个键值对

如果索引不存在,则 Elasticsearch 会新建一个,然后存储该文档。

因为这是一个新的文档,所以响应结果显示文档的版本为 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"_index" : "customer",
"_id" : "1",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 2,
"failed" : 0
},
"_seq_no" : 26,
"_primary_term" : 4
}

获取单个文档

我们可以通过一个 GET 请求来获取刚刚保存进索引的文档:

1
GET /customer/_doc/1

这里的 1 是文档唯一 ID。

响应结果如下,_source 是原始文档内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"_index": "customer",
"_type": "_doc",
"_id": "1",
"_version": 2,
"_seq_no": 1,
"_primary_term": 1,
"found": true,
"_source": {
"h": [
"health",
"index",
"docs.count"
]
}
}

使用 bulk API 批量索引文档

如果你有很多要索引的文档,则可以使用 bulk API 批量提交。使用批量处理批处理文档操作比单独提交请求要快得多,因为它可以最大程度地减少网络往返次数。

导入测试数据

https://www.elastic.co/guide/en/elasticsearch/reference/master/getting-started-index.html#getting-started-index

1、获取下载链接

命令行直接下载会非常慢,这个只有依靠代理去解决了,我们可以从 vagrant box add 的命令输出中拿到 box 的下载链接, 然后去浏览器通过代理来下载。

2、下载之后,新建一个 metadata.json 文件,内容如下:

1
2
3
4
5
6
7
8
9
10
{
"name": "laravel/homestead",
"versions": [{
"version": "9.5.0",
"providers": [{
"name": "virtualbox",
"url": "file://C:/Users/ruby/Downloads/homestead.box"
}]
}]
}

这里的 C:/Users/ruby/Downloads/homestead.box 是下载的 box 保存的路径。

这里的版本号是我们 homestead up 的时候获取到的版本。

3、最后一步

1
homestead up

环境:RabbitMQ + Lumen 5.5,消费者处理逻辑是在处理完消息之后 ack。

Worker 源码路径 。

今天突然发现一个 qa 环境的一个队列消息累积了几百万,最终发现是因为消费消息的 Worker 进程没有处理完消息就退出了处理,而且没有任何的记录。看源码发现 worker 里面有个 kill 函数,里面执行了 exit() 函数,我们都知道,这个函数是退出进程的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Kill the process.
*
* @param int $status
* @return void
*/
public function kill($status = 0)
{
$this->events->dispatch(new Events\WorkerStopping);

if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}

exit($status);
}

退出其实问题不大,但是这个退出的逻辑并没有标记这个 job 失败,这就导致下一次 Worker 进程启动的时候,继续拿到这个消息处理,然后处理到一定时候,又退出,如此无限循环。

Worker exit 的原因

我们可以看看 Worker 里面所有 exit 的调用,其实有两个地方:

  1. stopIfNecessary,kill 和 stop 函数最终都是 exit。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Stop the process if necessary.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
*/
protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
if ($this->shouldQuit) {
$this->kill();
}

if ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
}
}

shouldQuit 是在一些连接断开的时候被设置为 true,又或者用户发送了 SIGTERM 信号给 Worker。

  1. registerTimeoutHandler,超时处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Register the worker timeout handler (PHP 7.1+).
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if ($this->supportsAsyncSignals()) {
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});

pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}
}

所以总结一下有以下几个原因:

  • 数据库连接或其他连接断开
  • Worker 接收到 SIGTERM 信号
  • Worker 处理完一个 Job 之后发现 Worker 占用的内存超出了指定的内存
  • 用户执行了队列重启命令
  • Worker 执行时间超出了 timeout

exit 的影响

1、如果使用的是 redis

如果这个消息需要消费者处理的时间大于指定的 timeout,会导致消息没处理完就丢失。

2、如果使用的是 RabbitMQ,并且关闭了 AutoAck

Worker 进程没处理完就退出,然后消息还在队列中,下次启动 Worker 的时候继续消费这个消息,导致无限的重复消费。

解决方案

  1. 评估一下 Job 的最大运行时间,设置一个合适的 timeout,这个是必须的。
  2. 可以监听 WorkerStopping 事件,记录 Worker 异常退出的日志,但是需要注意的是,正常退出也会 fire 这个事件。所以有可能没有办法根据 log 来判断是否是异常退出(超时)。
  3. 这是 5.8 以下版本的 bug,我们可以升级到 5.8 以上的版本,在新版本中超时也会记录为失败,而不是单纯地退出。

这是个 bug

我们通过上面的代码也可以发现,其实 Worker 的超时回调其实并没有多少实际的处理,dispatch 一个 WorkerStopping 事件然后就 exit 了。但是我们有可能并没有监听这个事件,这就导致了 Worker 存在 timeout 过短的问题难以被及时发现。

其实这个 5.8 版本以下的 bug,在 5.8 以上的版本中这个已经修复了,超时的时候,Job 会被标记为超时,超过重试次数就被记录为失败的 job。

源码可在 https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L137 查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Register the worker timeout handler.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () use ($job, $options) {
if ($job) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job)
);
}

$this->kill(1);
});

pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}

其他问题

  1. Job 里面写死了 timeout 属性,会以这个 timeout 为准。

在调试过程中,发现明明 php artisan queue:work --timeout= 这里设置的 timeout 足够大了,但是 Worker 还是和原来一样退出了。在 RabbitMQ 的控制台发现消息里面有记录 Job 的 timeout 属性,然后 Worker 里面在判断到如果 job 里面有 timeout 属性的时候,就不会再使用命令行传递的 timeout。

1
2
3
4
5
6
7
8
9
10
11
/**
* Get the appropriate timeout for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
}

2020-05-15 更新:Laravel 5.5 以及以上版本本身已经有一个方法实现了下面要实现的功能,有一个方法是 chunkById(),这个就是记录上次的最大 id,然后下次查询的时候从这个 id 开始查询。

有时候我们可能需要查询某一个表的全部数据做一些处理,这个时候有一个可行的方法就是直接调用模型的 get() 方法,又或者调用 chunk() 方法。

但是这两种方案在处理大表的时候都不好,首先是 get(),会导致 PHP 占用内存过大,而 chunk() 方法实际上就是一个分页的封装,最终的查询语句是 LIMIT offset, count;

chunk() 也是个人之前一直使用的方法,但是在表越来越大之后,发现有比较严重的性能问题,越到后面的页查询就越慢。

为什么 MySQL 分页慢?

我们可以 explain 一下分页 sql,我们会发现扫描行数等于 limit offset, count 里面的 offset,这和 MySQL 的分页机制有关:

MySQL 在执行 limit offset, count 语句的时候,需要把第一条数据到 offset 的那一条数据扫描一遍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql> EXPLAIN SELECT * FROM users ORDER BY id DESC LIMIT 10000, 20\G
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: users
partitions: NULL
type: index
possible_keys: NULL
key: PRIMARY
key_len: 4
ref: NULL
rows: 10020 # 扫描了 10020 行
filtered: 100.00
Extra: NULL
1 row in set, 1 warning (0.03 sec)

LIMIT 10000, 20 意味着 MySQL 已经读取了 10020 条数据,并且丢弃了前 10000 行,然后返回接下来的 20 行。

参考链接: Efficient Pagination Using MySQL

使用 chunk 有什么问题?

现在我们知道了,MySQL 在分页的时候是从第一条数到 offset 的,也就是说,我们的 offset 越往后,需要扫描的行就越多。

在我们需要遍历全表数据的这种场景下,MySQL 就需要不断地扫描之前扫描过的数据,这样会导致重复扫描非常多。

我们都知道扫描一遍只需要 O(n) 的时间,但是由于 MySQL 的这种机制加上 chunk,会直接导致时间复杂度增加为 O(n²) ,在我们数据量越多的时候,速度下降得就越快。

解决方法

1. 记录上一次的最大 id(推荐使用)

在 MySQL 中的 InnoDB 引擎,主键索引字段是一个聚簇索引,存在 B+ 树的叶子节点层,是有序的。

我们可以利用这个特点,将上一次的最大 id (主键)记录下来,假设是 lastId,然后下一次查询的时候,加上 where id > lastId,这个时候我们的 limit 语句也要改一下,改成 limit count,就可以了,因为我们告诉了 MySQL offset 是什么。这样 MySQL 就不用做一些重复的扫描操作了。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 我们需要按 id 顺序去遍历
$builder = app(\Modules\Product\Models\Sku::class)->orderBy('id');

$lastId = 0;
while (true) {
/** @var \Illuminate\Database\Eloquent\Collection $skus */
$skus = app(\Modules\Product\Models\Sku::class)
->where('id', '>', $lastId)
->orderBy('id')
->limit(100)
->get();

if ($skus->count() > 0) {
$lastId = $skus->max('id');
}

// 最后一页了
if ($skus->count() < 100) {
break;
}
}

测试结果:在 33w 表的情况下,chunk() 需要 390s,而按上述方法只需要 22s。

2. 利用 MYSQL_ATTR_USE_BUFFERED_QUERY

在 PDO 里面有一个常量 MYSQL_ATTR_USE_BUFFERED_QUERY,是用来告诉 MySQL 是否使用查询缓存的。

Laravel 里面提供了一个 cursor() 方法,但是实际查询的时候是先获取所有结果再往下处理的,并不是预期那样获取一条之后返回。可参考 Using Cursor on large number of results causing memory issues。这个方法想要做的事情的确和我们的想法契合,但是由于 PDO 的 MYSQL_ATTR_USE_BUFFERED_QUERY 默认值为 true。所以导致实际表现并不是我们想要的。

但是 Laravel 也提供了方法让我们去手动设置这个属性:

1
2
3
4
5
6
7
8
$builder = app(\Modules\Product\Models\Sku::class);
// 获取底层 pdo 对象,然后设置 \PDO::MYSQL_ATTR_USE_BUFFERED_QUERY 为 false
$builder->getConnection()
->getPdo()
->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false);

foreach ($builder->cursor() as $item) {
}

这种解决方法有一定的问题,可参考上面提到的那个 laravel 的 issue。

总结

Laravel 扫描全表的时候可以记录上次 get() 的最大 id,下次从这个 id 起扫描,又或者利用 pdo 的 MYSQL_ATTR_USE_BUFFERED_QUERY 属性来单条获取。