Open Monograph Press  3.3.0
CurlMultiHandler.php
1 <?php
2 namespace GuzzleHttp\Handler;
3 
4 use GuzzleHttp\Promise as P;
8 
19 {
21  private $factory;
22  private $selectTimeout;
23  private $active;
24  private $handles = [];
25  private $delays = [];
26  private $options = [];
27 
39  public function __construct(array $options = [])
40  {
41  $this->factory = isset($options['handle_factory'])
42  ? $options['handle_factory'] : new CurlFactory(50);
43 
44  if (isset($options['select_timeout'])) {
45  $this->selectTimeout = $options['select_timeout'];
46  } elseif ($selectTimeout = getenv('GUZZLE_CURL_SELECT_TIMEOUT')) {
47  $this->selectTimeout = $selectTimeout;
48  } else {
49  $this->selectTimeout = 1;
50  }
51 
52  $this->options = isset($options['options']) ? $options['options'] : [];
53  }
54 
55  public function __get($name)
56  {
57  if ($name === '_mh') {
58  $this->_mh = curl_multi_init();
59 
60  foreach ($this->options as $option => $value) {
61  // A warning is raised in case of a wrong option.
62  curl_multi_setopt($this->_mh, $option, $value);
63  }
64 
65  // Further calls to _mh will return the value directly, without entering the
66  // __get() method at all.
67  return $this->_mh;
68  }
69 
70  throw new \BadMethodCallException();
71  }
72 
73  public function __destruct()
74  {
75  if (isset($this->_mh)) {
76  curl_multi_close($this->_mh);
77  unset($this->_mh);
78  }
79  }
80 
81  public function __invoke(RequestInterface $request, array $options)
82  {
83  $easy = $this->factory->create($request, $options);
84  $id = (int) $easy->handle;
85 
86  $promise = new Promise(
87  [$this, 'execute'],
88  function () use ($id) {
89  return $this->cancel($id);
90  }
91  );
92 
93  $this->addRequest(['easy' => $easy, 'deferred' => $promise]);
94 
95  return $promise;
96  }
97 
101  public function tick()
102  {
103  // Add any delayed handles if needed.
104  if ($this->delays) {
105  $currentTime = Utils::currentTime();
106  foreach ($this->delays as $id => $delay) {
107  if ($currentTime >= $delay) {
108  unset($this->delays[$id]);
109  curl_multi_add_handle(
110  $this->_mh,
111  $this->handles[$id]['easy']->handle
112  );
113  }
114  }
115  }
116 
117  // Step through the task queue which may add additional requests.
118  P\queue()->run();
119 
120  if ($this->active &&
121  curl_multi_select($this->_mh, $this->selectTimeout) === -1
122  ) {
123  // Perform a usleep if a select returns -1.
124  // See: https://bugs.php.net/bug.php?id=61141
125  usleep(250);
126  }
127 
128  while (curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM);
129 
130  $this->processMessages();
131  }
132 
136  public function execute()
137  {
138  $queue = P\queue();
139 
140  while ($this->handles || !$queue->isEmpty()) {
141  // If there are no transfers, then sleep for the next delay
142  if (!$this->active && $this->delays) {
143  usleep($this->timeToNext());
144  }
145  $this->tick();
146  }
147  }
148 
149  private function addRequest(array $entry)
150  {
151  $easy = $entry['easy'];
152  $id = (int) $easy->handle;
153  $this->handles[$id] = $entry;
154  if (empty($easy->options['delay'])) {
155  curl_multi_add_handle($this->_mh, $easy->handle);
156  } else {
157  $this->delays[$id] = Utils::currentTime() + ($easy->options['delay'] / 1000);
158  }
159  }
160 
168  private function cancel($id)
169  {
170  // Cannot cancel if it has been processed.
171  if (!isset($this->handles[$id])) {
172  return false;
173  }
174 
175  $handle = $this->handles[$id]['easy']->handle;
176  unset($this->delays[$id], $this->handles[$id]);
177  curl_multi_remove_handle($this->_mh, $handle);
178  curl_close($handle);
179 
180  return true;
181  }
182 
183  private function processMessages()
184  {
185  while ($done = curl_multi_info_read($this->_mh)) {
186  $id = (int) $done['handle'];
187  curl_multi_remove_handle($this->_mh, $done['handle']);
188 
189  if (!isset($this->handles[$id])) {
190  // Probably was cancelled.
191  continue;
192  }
193 
194  $entry = $this->handles[$id];
195  unset($this->handles[$id], $this->delays[$id]);
196  $entry['easy']->errno = $done['result'];
197  $entry['deferred']->resolve(
199  $this,
200  $entry['easy'],
201  $this->factory
202  )
203  );
204  }
205  }
206 
207  private function timeToNext()
208  {
209  $currentTime = Utils::currentTime();
210  $nextTime = PHP_INT_MAX;
211  foreach ($this->delays as $time) {
212  if ($time < $nextTime) {
213  $nextTime = $time;
214  }
215  }
216 
217  return max(0, $nextTime - $currentTime) * 1000000;
218  }
219 }
GuzzleHttp\Handler\CurlFactory\finish
static finish(callable $handler, EasyHandle $easy, CurlFactoryInterface $factory)
Definition: CurlFactory.php:101
GuzzleHttp\Handler\CurlMultiHandler
Definition: CurlMultiHandler.php:18
Psr\Http\Message\RequestInterface
Definition: vendor/psr/http-message/src/RequestInterface.php:24
GuzzleHttp\Handler\CurlMultiHandler\__invoke
__invoke(RequestInterface $request, array $options)
Definition: CurlMultiHandler.php:84
GuzzleHttp\Utils
Definition: Utils.php:8
GuzzleHttp\Handler
Definition: CurlFactory.php:2
GuzzleHttp\Handler\CurlFactory
Definition: CurlFactory.php:15
GuzzleHttp\Handler\CurlMultiHandler\__get
__get($name)
Definition: CurlMultiHandler.php:58
GuzzleHttp\Handler\CurlMultiHandler\execute
execute()
Definition: CurlMultiHandler.php:139
GuzzleHttp\Utils\currentTime
static currentTime()
Definition: Utils.php:18
GuzzleHttp\Promise\Promise
Definition: guzzlehttp/promises/src/Promise.php:9
GuzzleHttp\Promise
Definition: AggregateException.php:2
GuzzleHttp\Handler\CurlMultiHandler\__construct
__construct(array $options=[])
Definition: CurlMultiHandler.php:42
GuzzleHttp\Handler\CurlMultiHandler\tick
tick()
Definition: CurlMultiHandler.php:104
GuzzleHttp\Handler\CurlMultiHandler\__destruct
__destruct()
Definition: CurlMultiHandler.php:76