34 $groups = new \SplObjectStorage();
35 foreach ($queue as $item) {
39 $client = $item->getClient();
40 if (!$groups->contains($client)) {
41 $groups->attach($client,
new \ArrayObject(array($item)));
43 $groups[$client]->append($item);
48 foreach ($groups as $batch) {
49 $batches = array_merge($batches, array_chunk($groups[$batch]->getArrayCopy(), $this->batchSize));
55 public function transfer(array $batch)
62 $client = reset($batch)->getClient();
65 $invalid = array_filter($batch,
function ($command) use ($client) {
66 return $command->getClient() !== $client;
69 if (!empty($invalid)) {
70 throw new InconsistentClientTransferException($invalid);
73 $client->execute($batch);