Open Journal Systems  3.3.0
CurlMulti.php
1 <?php
2 
3 namespace Guzzle\Http\Curl;
4 
12 
17 {
19  protected $multiHandle;
20 
22  protected $requests;
23 
25  protected $handles;
26 
28  protected $resourceHash;
29 
31  protected $exceptions = array();
32 
34  protected $successful = array();
35 
37  protected $multiErrors = array(
38  CURLM_BAD_HANDLE => array('CURLM_BAD_HANDLE', 'The passed-in handle is not a valid CURLM handle.'),
39  CURLM_BAD_EASY_HANDLE => array('CURLM_BAD_EASY_HANDLE', "An easy handle was not good/valid. It could mean that it isn't an easy handle at all, or possibly that the handle already is in used by this or another multi handle."),
40  CURLM_OUT_OF_MEMORY => array('CURLM_OUT_OF_MEMORY', 'You are doomed.'),
41  CURLM_INTERNAL_ERROR => array('CURLM_INTERNAL_ERROR', 'This can only be returned if libcurl bugs. Please report it to us!')
42  );
43 
45  protected $selectTimeout;
46 
47  public function __construct($selectTimeout = 1.0)
48  {
49  $this->selectTimeout = $selectTimeout;
50  $this->multiHandle = curl_multi_init();
51  // @codeCoverageIgnoreStart
52  if ($this->multiHandle === false) {
53  throw new CurlException('Unable to create multi handle');
54  }
55  // @codeCoverageIgnoreEnd
56  $this->reset();
57  }
58 
59  public function __destruct()
60  {
61  if (is_resource($this->multiHandle)) {
62  curl_multi_close($this->multiHandle);
63  }
64  }
65 
66  public function add(RequestInterface $request)
67  {
68  $this->requests[] = $request;
69  // If requests are currently transferring and this is async, then the
70  // request must be prepared now as the send() method is not called.
71  $this->beforeSend($request);
72  $this->dispatch(self::ADD_REQUEST, array('request' => $request));
73 
74  return $this;
75  }
76 
77  public function all()
78  {
79  return $this->requests;
80  }
81 
82  public function remove(RequestInterface $request)
83  {
84  $this->removeHandle($request);
85  if (($index = array_search($request, $this->requests, true)) !== false) {
86  $request = $this->requests[$index];
87  unset($this->requests[$index]);
88  $this->requests = array_values($this->requests);
89  $this->dispatch(self::REMOVE_REQUEST, array('request' => $request));
90  return true;
91  }
92 
93  return false;
94  }
95 
96  public function reset($hard = false)
97  {
98  // Remove each request
99  if ($this->requests) {
100  foreach ($this->requests as $request) {
101  $this->remove($request);
102  }
103  }
104 
105  $this->handles = new \SplObjectStorage();
106  $this->requests = $this->resourceHash = $this->exceptions = $this->successful = array();
107  }
108 
109  public function send()
110  {
111  $this->perform();
114  $this->reset();
115 
116  if ($exceptions) {
118  }
119  }
120 
121  public function count()
122  {
123  return count($this->requests);
124  }
125 
133  protected function throwMultiException(array $exceptions, array $successful)
134  {
135  $multiException = new MultiTransferException('Errors during multi transfer');
136 
137  while ($e = array_shift($exceptions)) {
138  $multiException->addFailedRequestWithException($e['request'], $e['exception']);
139  }
140 
141  // Add successful requests
142  foreach ($successful as $request) {
143  if (!$multiException->containsRequest($request)) {
144  $multiException->addSuccessfulRequest($request);
145  }
146  }
147 
148  throw $multiException;
149  }
150 
157  protected function beforeSend(RequestInterface $request)
158  {
159  try {
160  $state = $request->setState(RequestInterface::STATE_TRANSFER);
161  if ($state == RequestInterface::STATE_TRANSFER) {
162  $this->addHandle($request);
163  } else {
164  // Requests might decide they don't need to be sent just before
165  // transfer (e.g. CachePlugin)
166  $this->remove($request);
167  if ($state == RequestInterface::STATE_COMPLETE) {
168  $this->successful[] = $request;
169  }
170  }
171  } catch (\Exception $e) {
172  // Queue the exception to be thrown when sent
173  $this->removeErroredRequest($request, $e);
174  }
175  }
176 
177  private function addHandle(RequestInterface $request)
178  {
179  $handle = $this->createCurlHandle($request)->getHandle();
180  $this->checkCurlResult(
181  curl_multi_add_handle($this->multiHandle, $handle)
182  );
183  }
184 
192  protected function createCurlHandle(RequestInterface $request)
193  {
194  $wrapper = CurlHandle::factory($request);
195  $this->handles[$request] = $wrapper;
196  $this->resourceHash[(int) $wrapper->getHandle()] = $request;
197 
198  return $wrapper;
199  }
200 
204  protected function perform()
205  {
206  $event = new Event(array('curl_multi' => $this));
207 
208  while ($this->requests) {
209  // Notify each request as polling
210  $blocking = $total = 0;
211  foreach ($this->requests as $request) {
212  ++$total;
213  $event['request'] = $request;
214  $request->getEventDispatcher()->dispatch(self::POLLING_REQUEST, $event);
215  // The blocking variable just has to be non-falsey to block the loop
216  if ($request->getParams()->hasKey(self::BLOCKING)) {
217  ++$blocking;
218  }
219  }
220  if ($blocking == $total) {
221  // Sleep to prevent eating CPU because no requests are actually pending a select call
222  usleep(500);
223  } else {
224  $this->executeHandles();
225  }
226  }
227  }
228 
232  private function executeHandles()
233  {
234  // The first curl_multi_select often times out no matter what, but is usually required for fast transfers
235  $selectTimeout = 0.001;
236  $active = false;
237  do {
238  while (($mrc = curl_multi_exec($this->multiHandle, $active)) == CURLM_CALL_MULTI_PERFORM);
239  $this->checkCurlResult($mrc);
240  $this->processMessages();
241  if ($active && curl_multi_select($this->multiHandle, $selectTimeout) === -1) {
242  // Perform a usleep if a select returns -1: https://bugs.php.net/bug.php?id=61141
243  usleep(150);
244  }
246  } while ($active);
247  }
248 
252  private function processMessages()
253  {
254  while ($done = curl_multi_info_read($this->multiHandle)) {
255  $request = $this->resourceHash[(int) $done['handle']];
256  try {
257  $this->processResponse($request, $this->handles[$request], $done);
258  $this->successful[] = $request;
259  } catch (\Exception $e) {
260  $this->removeErroredRequest($request, $e);
261  }
262  }
263  }
264 
271  protected function removeErroredRequest(RequestInterface $request, \Exception $e = null)
272  {
273  $this->exceptions[] = array('request' => $request, 'exception' => $e);
274  $this->remove($request);
275  $this->dispatch(self::MULTI_EXCEPTION, array('exception' => $e, 'all_exceptions' => $this->exceptions));
276  }
277 
287  protected function processResponse(RequestInterface $request, CurlHandle $handle, array $curl)
288  {
289  // Set the transfer stats on the response
290  $handle->updateRequestFromTransfer($request);
291  // Check if a cURL exception occurred, and if so, notify things
292  $curlException = $this->isCurlException($request, $handle, $curl);
293 
294  // Always remove completed curl handles. They can be added back again
295  // via events if needed (e.g. ExponentialBackoffPlugin)
296  $this->removeHandle($request);
297 
298  if (!$curlException) {
299  if ($this->validateResponseWasSet($request)) {
300  $state = $request->setState(
302  array('handle' => $handle)
303  );
304  // Only remove the request if it wasn't resent as a result of
305  // the state change
306  if ($state != RequestInterface::STATE_TRANSFER) {
307  $this->remove($request);
308  }
309  }
310  return;
311  }
312 
313  // Set the state of the request to an error
314  $state = $request->setState(RequestInterface::STATE_ERROR, array('exception' => $curlException));
315  // Allow things to ignore the error if possible
316  if ($state != RequestInterface::STATE_TRANSFER) {
317  $this->remove($request);
318  }
319 
320  // The error was not handled, so fail
321  if ($state == RequestInterface::STATE_ERROR) {
323  throw $curlException;
324  }
325  }
326 
332  protected function removeHandle(RequestInterface $request)
333  {
334  if (isset($this->handles[$request])) {
335  $handle = $this->handles[$request];
336  curl_multi_remove_handle($this->multiHandle, $handle->getHandle());
337  unset($this->handles[$request]);
338  unset($this->resourceHash[(int) $handle->getHandle()]);
339  $handle->close();
340  }
341  }
342 
352  private function isCurlException(RequestInterface $request, CurlHandle $handle, array $curl)
353  {
354  if (CURLM_OK == $curl['result'] || CURLM_CALL_MULTI_PERFORM == $curl['result']) {
355  return false;
356  }
357 
358  $handle->setErrorNo($curl['result']);
359  $e = new CurlException(sprintf('[curl] %s: %s [url] %s',
360  $handle->getErrorNo(), $handle->getError(), $handle->getUrl()));
361  $e->setCurlHandle($handle)
362  ->setRequest($request)
363  ->setCurlInfo($handle->getInfo())
364  ->setError($handle->getError(), $handle->getErrorNo());
365 
366  return $e;
367  }
368 
375  private function checkCurlResult($code)
376  {
377  if ($code != CURLM_OK && $code != CURLM_CALL_MULTI_PERFORM) {
378  throw new CurlException(isset($this->multiErrors[$code])
379  ? "cURL error: {$code} ({$this->multiErrors[$code][0]}): cURL message: {$this->multiErrors[$code][1]}"
380  : 'Unexpected cURL error: ' . $code
381  );
382  }
383  }
384 
388  private function validateResponseWasSet(RequestInterface $request)
389  {
390  if ($request->getResponse()) {
391  return true;
392  }
393 
394  $body = $request instanceof EntityEnclosingRequestInterface
395  ? $request->getBody()
396  : null;
397 
398  if (!$body) {
399  $rex = new RequestException(
400  'No response was received for a request with no body. This'
401  . ' could mean that you are saturating your network.'
402  );
403  $rex->setRequest($request);
404  $this->removeErroredRequest($request, $rex);
405  } elseif (!$body->isSeekable() || !$body->seek(0)) {
406  // Nothing we can do with this. Sorry!
407  $rex = new RequestException(
408  'The connection was unexpectedly closed. The request would'
409  . ' have been retried, but attempting to rewind the'
410  . ' request body failed.'
411  );
412  $rex->setRequest($request);
413  $this->removeErroredRequest($request, $rex);
414  } else {
415  $this->remove($request);
416  // Add the request back to the batch to retry automatically.
417  $this->requests[] = $request;
418  $this->addHandle($request);
419  }
420 
421  return false;
422  }
423 }
Guzzle\Http\Curl\CurlMulti\count
count()
Definition: CurlMulti.php:145
Guzzle\Http\Curl\CurlMulti\$successful
$successful
Definition: CurlMulti.php:52
Guzzle\Http\Curl\CurlMulti\$multiErrors
$multiErrors
Definition: CurlMulti.php:58
Guzzle\Http\Exception\CurlException
Definition: CurlException.php:10
Guzzle\Http\Message\RequestInterface
Definition: lib/vendor/guzzle/guzzle/src/Guzzle/Http/Message/RequestInterface.php:16
Guzzle\Http\Exception\RequestException
Definition: lib/vendor/guzzle/guzzle/src/Guzzle/Http/Exception/RequestException.php:11
Guzzle\Http\Curl\CurlMulti\throwMultiException
throwMultiException(array $exceptions, array $successful)
Definition: CurlMulti.php:157
Guzzle\Http\Curl
Definition: CurlHandle.php:3
Guzzle\Http\Message\RequestInterface\STATE_COMPLETE
const STATE_COMPLETE
Definition: lib/vendor/guzzle/guzzle/src/Guzzle/Http/Message/RequestInterface.php:19
Guzzle\Http\Curl\CurlMulti\$handles
$handles
Definition: CurlMulti.php:34
Guzzle\Http\Message\EntityEnclosingRequestInterface
Definition: EntityEnclosingRequestInterface.php:12
Guzzle\Http\Curl\CurlMulti\all
all()
Definition: CurlMulti.php:101
Guzzle\Http\Message\RequestInterface\STATE_TRANSFER
const STATE_TRANSFER
Definition: lib/vendor/guzzle/guzzle/src/Guzzle/Http/Message/RequestInterface.php:20
Guzzle\Http\Curl\CurlMulti\add
add(RequestInterface $request)
Definition: CurlMulti.php:90
Guzzle\Http\Exception\MultiTransferException
Definition: MultiTransferException.php:11
Guzzle\Http\Curl\CurlMulti\removeHandle
removeHandle(RequestInterface $request)
Definition: CurlMulti.php:356
Guzzle\Http\Curl\CurlMulti\processResponse
processResponse(RequestInterface $request, CurlHandle $handle, array $curl)
Definition: CurlMulti.php:311
Guzzle\Http\Curl\CurlMulti\send
send()
Definition: CurlMulti.php:133
Guzzle\Common\Event
Definition: lib/vendor/guzzle/guzzle/src/Guzzle/Common/Event.php:10
Guzzle\Http\Curl\CurlMulti\$selectTimeout
$selectTimeout
Definition: CurlMulti.php:69
Guzzle\Http\Curl\CurlMulti\removeErroredRequest
removeErroredRequest(RequestInterface $request, \Exception $e=null)
Definition: CurlMulti.php:295
Guzzle\Http\Curl\CurlMulti\__construct
__construct($selectTimeout=1.0)
Definition: CurlMulti.php:71
Guzzle\Http\Message\RequestInterface\setState
setState($state, array $context=array())
Guzzle\Http\Curl\CurlMultiInterface
Definition: CurlMultiInterface.php:12
Guzzle\Http\Curl\CurlMulti
Definition: CurlMulti.php:16
Guzzle\Common\AbstractHasDispatcher\dispatch
dispatch($eventName, array $context=array())
Definition: AbstractHasDispatcher.php:41
Guzzle\Http\Curl\CurlHandle
Definition: CurlHandle.php:16
Guzzle\Http\Curl\CurlMulti\$requests
$requests
Definition: CurlMulti.php:28
Guzzle\Http\Message\RequestInterface\STATE_ERROR
const STATE_ERROR
Definition: lib/vendor/guzzle/guzzle/src/Guzzle/Http/Message/RequestInterface.php:21
Guzzle\Common\AbstractHasDispatcher
Definition: AbstractHasDispatcher.php:12
Guzzle\Http\Curl\CurlMulti\$resourceHash
$resourceHash
Definition: CurlMulti.php:40
Guzzle\Http\Curl\CurlMulti\reset
reset($hard=false)
Definition: CurlMulti.php:120
Guzzle\Http\Curl\CurlMulti\perform
perform()
Definition: CurlMulti.php:228
Guzzle\Http\Curl\CurlMulti\createCurlHandle
createCurlHandle(RequestInterface $request)
Definition: CurlMulti.php:216
Guzzle\Http\Curl\CurlMulti\$exceptions
$exceptions
Definition: CurlMulti.php:46
Guzzle\Http\Curl\CurlMulti\__destruct
__destruct()
Definition: CurlMulti.php:83
Guzzle\Http\Curl\CurlMulti\beforeSend
beforeSend(RequestInterface $request)
Definition: CurlMulti.php:181
Guzzle\Http\Curl\CurlMulti\$multiHandle
$multiHandle
Definition: CurlMulti.php:22
Guzzle\Http\Curl\CurlHandle\factory
static factory(RequestInterface $request)
Definition: CurlHandle.php:52