Menu

未来模式(多线程模式)

未来模式

客户端提供一个「未来」或「异步」的模式。这允许批量处理请求(并行发送到群集),这会对性能和吞吐量产生巨大影响。

PHP 基本上是单线程的, 但是 libcurl 库提供了「多接口」的功能。 这允许像 PHP这样的语言通过提供一批要处理的请求来获得并发。 批处理由底层多线程 libcurl 库并发执行,然后将相应返回给 PHP。

在单线程环境中,执行 n 请求的时间,是这些执行 n 请求延迟的总和。 使用多接口,执行 n 请求的时间是最慢请求的延迟 (假设有足够的句柄可用于并行执行所有请求)。

此外,多接口允许同时向不同主机发出请求,这意味着 Elasticsearch-PHP 客户端可以更有效的利用整个集群。

使用 Future 模式

虽然使用这种模式相对简单,但它在代码中引入了跟多的职责。为了开启 future 模式,在客户端选项中添加 future 参数,并将值设置为 lazy

$client = ClientBuilder::create()->build();

$params = [
    'index' => 'test',
    'type' => 'test',
    'id' => 1,
    'client' => [
        'future' => 'lazy'
    ]
];

$future = $client->get($params);

这将会返回一个 future 对象,而不是一个真正的响应数据。future 对象是待处理对象,它看起来就像占位符。你可以把 future 对象当做普通对象在代码中使用。当你需要响应数据,你可以通过解析 future 对象获得。如果 future 对象已被解析(由于其它操作),可以立即使用响应数据。如果 future 对象没有被解析完成,那么解析动作将会产生阻塞,直到解析完成。

事实上,这意味着你可以通过设置 future: lazy 键值对来构造一个批量请求队列,而返回的 future 对象直到解析完成,程序才会继续执行。无论什么时候,所有的请求以并行的方式发送到集群,以异步的方式返回给 curl。

这听起来很复杂,但由于 RingPHP 的 FutureArray 接口,使其变得很简单,这使 future 对象看起来像一个简单的关联数组。例如:

$client = ClientBuilder::create()->build();

$params = [
    'index' => 'test',
    'type' => 'test',
    'id' => 1,
    'client' => [
        'future' => 'lazy'
    ]
];

$future = $client->get($params);

$doc = $future['_source'];    // 此调用将产生阻塞并迫使 future 被解析

像普通返回一样,以关联数组的形式与 Future 模式交互,会导致 Future 模式解析出特定值(进而解析出所有待处理的请求和值)。可用模式如下:

$client = ClientBuilder::create()->build();
$futures = [];

for ($i = 0; $i < 1000; $i++) {
    $params = [
        'index' => 'test',
        'type' => 'test',
        'id' => $i,
        'client' => [
            'future' => 'lazy'
        ]
    ];

    $futures[] = $client->get($params);     //请求入队列
}

foreach ($futures as $future) {
    // 访问Future模式的值,必要时会触发解析
    echo $future['_source'];
}

队列中的请求将会并行执行,并在执行之后给请求对应的 $future 变量赋值。默认每次批量执行100个请求。

如果你希望强制 Future 模式解析出返回值,但又不需要在当下使用,你可以调用 wait() 方法强制 Future 模式解析该请求:

$client = ClientBuilder::create()->build();
$futures = [];

for ($i = 0; $i < 1000; $i++) {
    $params = [
        'index' => 'test',
        'type' => 'test',
        'id' => $i,
        'client' => [
            'future' => 'lazy'
        ]
    ];

    $futures[] = $client->get($params);     //请求入队列
}

//wait() 方法强制 Future 模式执行底层请求
$futures[999]->wait();

修改批次大小

默认的批次大小是100,也就是说每当有100个请求在排队,客户端就会开始处理 Future 对象(比如初始化一个 curl_multi 调用)。 批次的大小可以按照你的喜好来修改。 通过设置 Handler 的 max_handles 来修改。

$handlerParams = [
    'max_handles' => 500
];

$defaultHandler = ClientBuilder::defaultHandler($handlerParams);

$client = ClientBuilder::create()
            ->setHandler($defaultHandler)
            ->build();

上面的设置会让客户端每500个请求进行一次批量处理。要注意的是,不论这个批次是否被填满,当开始处理一个 Future 对象时会导致整个批次被都处理。比如,只有499个请求在这个批次中...但是最后一个 Future 对象的直接处理(也就是获取结果)会导致整个批次都被直接处理(可能会等待比较久,也可以用这个特性来触发批处理):

$handlerParams = [
    'max_handles' => 500
];

$defaultHandler = ClientBuilder::defaultHandler($handlerParams);

$client = ClientBuilder::create()
            ->setHandler($defaultHandler)
            ->build();

$futures = [];

for ($i = 0; $i < 499; $i++) {
    $params = [
        'index' => 'test',
        'type' => 'test',
        'id' => $i,
        'client' => [
            'future' => 'lazy'
        ]
    ];

    $futures[] = $client->get($params);     // 构建一个请求队列
}

// 处理了最后一个 Future 对象,前面的也都会被处理
$body = $future[499]['body'];

混沌的分批

一个批处理中可以包含各种类型的请求。比如,可以在一个批处理中加入 Get 请求、Index 请求和 Search 请求,这依然能被正常处理。

$client = ClientBuilder::create()->build();
$futures = [];

$params = [
    'index' => 'test',
    'type' => 'test',
    'id' => 1,
    'client' => [
        'future' => 'lazy'
    ]
];

$futures['getRequest'] = $client->get($params);     // 第一个请求 Get

$params = [
    'index' => 'test',
    'type' => 'test',
    'id' => 2,
    'body' => [
        'field' => 'value'
    ],
    'client' => [
        'future' => 'lazy'
    ]
];

$futures['indexRequest'] = $client->index($params);       // 第二个请求 Index

$params = [
    'index' => 'test',
    'type' => 'test',
    'body' => [
        'query' => [
            'match' => [
                'field' => 'value'
            ]
        ]
    ],
    'client' => [
        'future' => 'lazy'
    ]
];

$futures['searchRequest'] = $client->search($params);      // 第三个请求 Search

// 处理 Future,也就是获取其中一个结果,整个批次都会被处理...这里会阻塞,直到批处理请求完成才会返回
$searchResults = $futures['searchRequest']['hits'];

// 这里会立即返回值,因为已经在前面批处理中完成了请求
$doc = $futures['getRequest']['_source'];

Future 模式注意事项

在使用 future 模式时,有几点需要注意。最重要也最明显的是:需要自己处理将要遇到的问题。这点通常比较细微,但有时会引入意想不到的并发症。

例如,如果你手动使用 wait(),你可能需要调用 wait() 多次,如果存在重试的话。这是因为每次重试都会引入另一层包裹的 futures,并且每个都需要被解决掉才能得到最终结果。

但是,如果通过 ArrayInterface 访问值,则不需要这样做 (例如: $response['hits']['hits']),因为 FutureArrayInterface 将自动地并且完全地解析 future 提供的值。

另外一个需要注意的是,某些 APIs 将失去 「助手」功能。 例如: "exists" APIs (如: $client→exists()$client→indices()→exists$client→indices→templateExists() 等) 通常在正常操作下返回 true 或 false。

在运行 future 模式时,future 的延展取决于你的应用,这就意味着客户端无法再检查响应并返回简单的 true / false。相反地,你将看到 Elasticsearch 原始返回并须要采取适当的措施。

这也适用于 ping()

本文章首发在 Laravel China 社区
上一篇 下一篇
讨论数量: 0
发起讨论


暂无话题~