Open Journal Systems  3.3.0
EachPromise.php
1 <?php
2 namespace GuzzleHttp\Promise;
3 
8 class EachPromise implements PromisorInterface
9 {
10  private $pending = [];
11 
13  private $iterable;
14 
16  private $concurrency;
17 
19  private $onFulfilled;
20 
22  private $onRejected;
23 
25  private $aggregate;
26 
28  private $mutex;
29 
51  public function __construct($iterable, array $config = [])
52  {
53  $this->iterable = iter_for($iterable);
54 
55  if (isset($config['concurrency'])) {
56  $this->concurrency = $config['concurrency'];
57  }
58 
59  if (isset($config['fulfilled'])) {
60  $this->onFulfilled = $config['fulfilled'];
61  }
62 
63  if (isset($config['rejected'])) {
64  $this->onRejected = $config['rejected'];
65  }
66  }
67 
68  public function promise()
69  {
70  if ($this->aggregate) {
71  return $this->aggregate;
72  }
73 
74  try {
75  $this->createPromise();
76  $this->iterable->rewind();
77  $this->refillPending();
78  } catch (\Throwable $e) {
79  $this->aggregate->reject($e);
80  } catch (\Exception $e) {
81  $this->aggregate->reject($e);
82  }
83 
84  return $this->aggregate;
85  }
86 
87  private function createPromise()
88  {
89  $this->mutex = false;
90  $this->aggregate = new Promise(function () {
91  reset($this->pending);
92  if (empty($this->pending) && !$this->iterable->valid()) {
93  $this->aggregate->resolve(null);
94  return;
95  }
96 
97  // Consume a potentially fluctuating list of promises while
98  // ensuring that indexes are maintained (precluding array_shift).
99  while ($promise = current($this->pending)) {
100  next($this->pending);
101  $promise->wait();
102  if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
103  return;
104  }
105  }
106  });
107 
108  // Clear the references when the promise is resolved.
109  $clearFn = function () {
110  $this->iterable = $this->concurrency = $this->pending = null;
111  $this->onFulfilled = $this->onRejected = null;
112  };
113 
114  $this->aggregate->then($clearFn, $clearFn);
115  }
116 
117  private function refillPending()
118  {
119  if (!$this->concurrency) {
120  // Add all pending promises.
121  while ($this->addPending() && $this->advanceIterator());
122  return;
123  }
124 
125  // Add only up to N pending promises.
126  $concurrency = is_callable($this->concurrency)
127  ? call_user_func($this->concurrency, count($this->pending))
128  : $this->concurrency;
129  $concurrency = max($concurrency - count($this->pending), 0);
130  // Concurrency may be set to 0 to disallow new promises.
131  if (!$concurrency) {
132  return;
133  }
134  // Add the first pending promise.
135  $this->addPending();
136  // Note this is special handling for concurrency=1 so that we do
137  // not advance the iterator after adding the first promise. This
138  // helps work around issues with generators that might not have the
139  // next value to yield until promise callbacks are called.
140  while (--$concurrency
141  && $this->advanceIterator()
142  && $this->addPending());
143  }
144 
145  private function addPending()
146  {
147  if (!$this->iterable || !$this->iterable->valid()) {
148  return false;
149  }
150 
151  $promise = promise_for($this->iterable->current());
152  $idx = $this->iterable->key();
153 
154  $this->pending[$idx] = $promise->then(
155  function ($value) use ($idx) {
156  if ($this->onFulfilled) {
157  call_user_func(
158  $this->onFulfilled, $value, $idx, $this->aggregate
159  );
160  }
161  $this->step($idx);
162  },
163  function ($reason) use ($idx) {
164  if ($this->onRejected) {
165  call_user_func(
166  $this->onRejected, $reason, $idx, $this->aggregate
167  );
168  }
169  $this->step($idx);
170  }
171  );
172 
173  return true;
174  }
175 
176  private function advanceIterator()
177  {
178  // Place a lock on the iterator so that we ensure to not recurse,
179  // preventing fatal generator errors.
180  if ($this->mutex) {
181  return false;
182  }
183 
184  $this->mutex = true;
185 
186  try {
187  $this->iterable->next();
188  $this->mutex = false;
189  return true;
190  } catch (\Throwable $e) {
191  $this->aggregate->reject($e);
192  $this->mutex = false;
193  return false;
194  } catch (\Exception $e) {
195  $this->aggregate->reject($e);
196  $this->mutex = false;
197  return false;
198  }
199  }
200 
201  private function step($idx)
202  {
203  // If the promise was already resolved, then ignore this step.
204  if ($this->aggregate->getState() !== PromiseInterface::PENDING) {
205  return;
206  }
207 
208  unset($this->pending[$idx]);
209 
210  // Only refill pending promises if we are not locked, preventing the
211  // EachPromise to recursively invoke the provided iterator, which
212  // cause a fatal error: "Cannot resume an already running generator"
213  if ($this->advanceIterator() && !$this->checkIfFinished()) {
214  // Add more pending promises if possible.
215  $this->refillPending();
216  }
217  }
218 
219  private function checkIfFinished()
220  {
221  if (!$this->pending && !$this->iterable->valid()) {
222  // Resolve the promise if there's nothing left to do.
223  $this->aggregate->resolve(null);
224  return true;
225  }
226 
227  return false;
228  }
229 }
GuzzleHttp\Promise\promise_for
promise_for($value)
Definition: guzzlehttp/promises/src/functions.php:66
GuzzleHttp\Promise\EachPromise\promise
promise()
Definition: EachPromise.php:86
GuzzleHttp\Promise\EachPromise\__construct
__construct($iterable, array $config=[])
Definition: EachPromise.php:69
GuzzleHttp\Promise\Promise\then
then(callable $onFulfilled=null, callable $onRejected=null)
Definition: guzzlehttp/promises/src/Promise.php:30
GuzzleHttp\Promise\iter_for
iter_for($value)
Definition: guzzlehttp/promises/src/functions.php:122
Seboettg\Collection\count
count()
Definition: ArrayListTrait.php:253
GuzzleHttp\Promise\PromiseInterface\PENDING
const PENDING
Definition: PromiseInterface.php:15
GuzzleHttp\Promise\PromisorInterface
Definition: PromisorInterface.php:7
GuzzleHttp\Promise\EachPromise
Definition: EachPromise.php:8
GuzzleHttp\Promise\Promise
Definition: guzzlehttp/promises/src/Promise.php:9
GuzzleHttp\Promise
Definition: AggregateException.php:2