10 private $pending = [];
51 public function __construct($iterable, array $config = [])
53 $this->iterable =
iter_for($iterable);
55 if (isset($config[
'concurrency'])) {
56 $this->concurrency = $config[
'concurrency'];
59 if (isset($config[
'fulfilled'])) {
60 $this->onFulfilled = $config[
'fulfilled'];
63 if (isset($config[
'rejected'])) {
64 $this->onRejected = $config[
'rejected'];
70 if ($this->aggregate) {
71 return $this->aggregate;
75 $this->createPromise();
76 $this->iterable->rewind();
77 $this->refillPending();
78 }
catch (\Throwable $e) {
79 $this->aggregate->reject($e);
80 }
catch (\Exception $e) {
81 $this->aggregate->reject($e);
84 return $this->aggregate;
87 private function createPromise()
90 $this->aggregate =
new Promise(
function () {
91 reset($this->pending);
92 if (empty($this->pending) && !$this->iterable->valid()) {
93 $this->aggregate->resolve(
null);
99 while ($promise = current($this->pending)) {
100 next($this->pending);
109 $clearFn =
function () {
110 $this->iterable = $this->concurrency = $this->pending =
null;
111 $this->onFulfilled = $this->onRejected =
null;
114 $this->aggregate->
then($clearFn, $clearFn);
117 private function refillPending()
119 if (!$this->concurrency) {
121 while ($this->addPending() && $this->advanceIterator());
126 $concurrency = is_callable($this->concurrency)
127 ? call_user_func($this->concurrency, count($this->pending))
128 : $this->concurrency;
129 $concurrency = max($concurrency - count($this->pending), 0);
140 while (--$concurrency
141 && $this->advanceIterator()
142 && $this->addPending());
145 private function addPending()
147 if (!$this->iterable || !$this->iterable->valid()) {
151 $promise =
promise_for($this->iterable->current());
152 $idx = $this->iterable->key();
154 $this->pending[$idx] = $promise->then(
155 function ($value) use ($idx) {
156 if ($this->onFulfilled) {
158 $this->onFulfilled, $value, $idx, $this->aggregate
163 function ($reason) use ($idx) {
164 if ($this->onRejected) {
166 $this->onRejected, $reason, $idx, $this->aggregate
176 private function advanceIterator()
187 $this->iterable->next();
188 $this->mutex =
false;
190 }
catch (\Throwable $e) {
191 $this->aggregate->reject($e);
192 $this->mutex =
false;
194 }
catch (\Exception $e) {
195 $this->aggregate->reject($e);
196 $this->mutex =
false;
201 private function step($idx)
208 unset($this->pending[$idx]);
213 if ($this->advanceIterator() && !$this->checkIfFinished()) {
215 $this->refillPending();
219 private function checkIfFinished()
221 if (!$this->pending && !$this->iterable->valid()) {
223 $this->aggregate->resolve(
null);