35 public function __construct(
41 if (isset($config[
'pool_size'])) {
42 $config[
'concurrency'] = $config[
'pool_size'];
43 } elseif (!isset($config[
'concurrency'])) {
44 $config[
'concurrency'] = 25;
47 if (isset($config[
'options'])) {
48 $opts = $config[
'options'];
49 unset($config[
'options']);
54 $iterable = \GuzzleHttp\Promise\iter_for($requests);
55 $requests =
function () use ($iterable, $client, $opts) {
56 foreach ($iterable as $key => $rfn) {
58 yield $key => $client->
sendAsync($rfn, $opts);
59 } elseif (is_callable($rfn)) {
60 yield $key => $rfn($opts);
62 throw new \InvalidArgumentException(
'Each value yielded by '
63 .
'the iterator must be a Psr7\Http\Message\RequestInterface '
64 .
'or a callable that returns a promise that fulfills '
65 .
'with a Psr7\Message\Http\ResponseInterface object.');
78 public function promise()
80 return $this->
each->promise();
100 public static function batch(
106 self::cmpCallback($options,
'fulfilled', $res);
107 self::cmpCallback($options,
'rejected', $res);
108 $pool =
new static($client, $requests, $options);
109 $pool->promise()->wait();
120 private static function cmpCallback(array &$options, $name, array &$results)
122 if (!isset($options[$name])) {
123 $options[$name] =
function ($v, $k) use (&$results) {
127 $currentFn = $options[$name];
128 $options[$name] =
function ($v, $k) use (&$results, $currentFn) {