Open Journal Systems  3.3.0
Pool.php
1 <?php
2 namespace GuzzleHttp;
3 
8 
20 class Pool implements PromisorInterface
21 {
23  private $each;
24 
35  public function __construct(
36  ClientInterface $client,
37  $requests,
38  array $config = []
39  ) {
40  // Backwards compatibility.
41  if (isset($config['pool_size'])) {
42  $config['concurrency'] = $config['pool_size'];
43  } elseif (!isset($config['concurrency'])) {
44  $config['concurrency'] = 25;
45  }
46 
47  if (isset($config['options'])) {
48  $opts = $config['options'];
49  unset($config['options']);
50  } else {
51  $opts = [];
52  }
53 
54  $iterable = \GuzzleHttp\Promise\iter_for($requests);
55  $requests = function () use ($iterable, $client, $opts) {
56  foreach ($iterable as $key => $rfn) {
57  if ($rfn instanceof RequestInterface) {
58  yield $key => $client->sendAsync($rfn, $opts);
59  } elseif (is_callable($rfn)) {
60  yield $key => $rfn($opts);
61  } else {
62  throw new \InvalidArgumentException('Each value yielded by '
63  . 'the iterator must be a Psr7\Http\Message\RequestInterface '
64  . 'or a callable that returns a promise that fulfills '
65  . 'with a Psr7\Message\Http\ResponseInterface object.');
66  }
67  }
68  };
69 
70  $this->each = new EachPromise($requests(), $config);
71  }
72 
78  public function promise()
79  {
80  return $this->each->promise();
81  }
82 
100  public static function batch(
101  ClientInterface $client,
102  $requests,
103  array $options = []
104  ) {
105  $res = [];
106  self::cmpCallback($options, 'fulfilled', $res);
107  self::cmpCallback($options, 'rejected', $res);
108  $pool = new static($client, $requests, $options);
109  $pool->promise()->wait();
110  ksort($res);
111 
112  return $res;
113  }
114 
120  private static function cmpCallback(array &$options, $name, array &$results)
121  {
122  if (!isset($options[$name])) {
123  $options[$name] = function ($v, $k) use (&$results) {
124  $results[$k] = $v;
125  };
126  } else {
127  $currentFn = $options[$name];
128  $options[$name] = function ($v, $k) use (&$results, $currentFn) {
129  $currentFn($v, $k);
130  $results[$k] = $v;
131  };
132  }
133  }
134 }
GuzzleHttp\Pool\batch
static batch(ClientInterface $client, $requests, array $options=[])
Definition: Pool.php:103
GuzzleHttp
Definition: vendor/guzzlehttp/guzzle/src/Client.php:2
GuzzleHttp\Pool\promise
promise()
Definition: Pool.php:81
GuzzleHttp\Promise\PromiseInterface
Definition: PromiseInterface.php:13
Psr\Http\Message\RequestInterface
Definition: vendor/psr/http-message/src/RequestInterface.php:24
GuzzleHttp\ClientInterface
Definition: vendor/guzzlehttp/guzzle/src/ClientInterface.php:13
GuzzleHttp\Pool
Definition: Pool.php:20
GuzzleHttp\Promise\each
each( $iterable, callable $onFulfilled=null, callable $onRejected=null)
Definition: guzzlehttp/promises/src/functions.php:346
GuzzleHttp\Promise\PromisorInterface
Definition: PromisorInterface.php:7
GuzzleHttp\Promise\EachPromise
Definition: EachPromise.php:8
GuzzleHttp\Pool\__construct
__construct(ClientInterface $client, $requests, array $config=[])
Definition: Pool.php:38