Open Journal Systems  2.4.4
 All Classes Namespaces Functions Variables Groups Pages
ProcessDAO.inc.php
1 <?php
41 // Define the max number of seconds a process is allowed to run.
42 // We assume that no process should run longer than
43 // 15 minutes. So we clean all processes that have a time
44 // stamp of more than 15 minutes ago. Running processes should check
45 // regularly (about once per minute) whether "their" process entry
46 // is still their. If not they are required to halt immediately.
47 // NB: Don't set this timeout much shorter as this may
48 // potentially cause more parallel processes being spawned
49 // than allowed.
50 define('PROCESS_MAX_EXECUTION_TIME', 900);
51 
52 // Cap the max. number of parallel process to avoid server
53 // flooding in case of an error.
54 define('PROCESS_MAX_PARALLELISM', 20);
55 
56 // The max. number of seconds a one-time-key will be kept valid.
57 // This defines the potential window of attack if an attacker
58 // manages to guess a key. Defining this time too short can lead
59 // to problems when networks are slow.
60 define('PROCESS_MAX_KEY_VALID', 10);
61 
62 
63 import('lib.pkp.classes.process.Process');
64 
65 class ProcessDAO extends DAO {
69  function ProcessDAO() {
70  parent::DAO();
71  }
72 
82  function &insertObject($processType, $maxParallelism) {
83  // Free processing slots occupied by zombie processes.
84  $this->deleteZombies();
85 
86  // Cap the parallelism to the max. parallelism.
87  $maxParallelism = min($maxParallelism, PROCESS_MAX_PARALLELISM);
88 
89  // Check whether we're allowed to spawn another process.
90  $currentParallelism = $this->getNumberOfObjectsByProcessType($processType);
91  if ($currentParallelism >= $maxParallelism) {
92  $falseVar = false;
93  return $falseVar;
94  }
95 
96  // We create a process instance from the given data.
97  $process = $this->newDataObject();
98  $process->setProcessType($processType);
99 
100  // Generate a new process ID. See classdoc for process ID
101  // requirements.
102  $process->setId(uniqid('', true));
103 
104  // Generate the timestamp.
105  $process->setTimeStarted(time());
106 
107  // Persist the process.
108  $this->update(
109  sprintf('INSERT INTO processes
110  (process_id, process_type, time_started, obliterated)
111  VALUES
112  (?, ?, ?, 0)'),
113  array(
114  $process->getId(),
115  (int) $process->getProcessType(),
116  (int) $process->getTimeStarted(),
117  )
118  );
119  $process->setObliterated(false);
120  return $process;
121  }
122 
128  function getObjectById($processId) {
129  $result =& $this->retrieve(
130  'SELECT process_id, process_type, time_started, obliterated FROM processes WHERE process_id = ?',
131  $processId
132  );
133 
134  $process = null;
135  if ($result->RecordCount() != 0) {
136  $process =& $this->_fromRow($result->GetRowAssoc(false));
137  }
138  $result->Close();
139 
140  return $process;
141  }
142 
149  function getNumberOfObjectsByProcessType($processType) {
150  // Find the number of processes for the
151  // given process type.
152  $result =& $this->retrieve(
153  'SELECT COUNT(*) AS running_processes
154  FROM processes
155  WHERE process_type = ?',
156  (int) $processType
157  );
158 
159  $runningProcesses = 0;
160  if ($result->RecordCount() != 0) {
161  $row =& $result->GetRowAssoc(false);
162  $runningProcesses = (int)$row['running_processes'];
163  }
164  return $runningProcesses;
165  }
166 
171  function deleteObject(&$process) {
172  return $this->deleteObjectById($process->getId());
173  }
174 
179  function deleteObjectById($processId) {
180  assert(!empty($processId));
181 
182  // Delete process
183  return $this->update('DELETE FROM processes WHERE process_id = ?', $processId);
184  }
185 
198  function deleteZombies($force = false) {
199  static $zombiesDeleted = false;
200 
201  // For performance reasons don't delete zombies
202  // more than once per request.
203  if ($zombiesDeleted && !$force) {
204  return;
205  } else {
206  $zombiesDeleted = true;
207  }
208 
209  // Calculate the max timestamp that is considered ok.
210  $maxTimestamp = time() - PROCESS_MAX_EXECUTION_TIME;
211 
212  // Delete all processes with a timestamp older than
213  // the max. timestamp.
214  return $this->update(
215  'DELETE FROM processes
216  WHERE time_started < ?',
217  (int) $maxTimestamp
218  );
219  }
220 
236  function spawnProcesses(&$request, $handler, $op, $processType, $noOfProcesses) {
237  // Generate the web URL to be called.
238  $router =& $request->getRouter();
239  $dispatcher =& $router->getDispatcher();
240  $processUrl = $dispatcher->url($request, ROUTE_COMPONENT, null, $handler, $op);
241 
242  // Parse the URL into parts to construct the fsockopen call.
243  $urlParts = parse_url($processUrl);
244  assert(isset($urlParts['scheme']) && isset($urlParts['host']) && isset($urlParts['path']) && !isset($urlParts['fragment']));
245  if ($urlParts['scheme'] == 'https') {
246  $port = 443;
247  $transport = 'ssl://';
248  } else {
249  $port = 80;
250  $transport = '';
251  }
252 
253  // Delete process zombies for correct process slot calculation.
254  $this->deleteZombies();
255 
256  // Calculate the number of max process slots for the given process type.
257  $noOfProcesses = min($noOfProcesses, PROCESS_MAX_PARALLELISM);
258 
259  // Spawn new non-blocking (i.e. parallel) processes via
260  // web requests until all process slots have been filled.
261  $currentParallelism = $this->getNumberOfObjectsByProcessType($processType);
262  $spawnedProcesses = 0;
263  while ($currentParallelism < $noOfProcesses) {
264  // Block a process slot.
265  // NB: insertObject() re-checks the number of currently running
266  // processes on each iteration to make sure that we don't exceed
267  // the limit when there are concurrent requests.
268  $process =& $this->insertObject($processType, $noOfProcesses);
269  if (!is_a($process, 'Process')) break;
270  $oneTimeKey = $process->getId();
271 
272  // Make the request including the generated one-time-key.
273  $stream = fsockopen($transport.$urlParts['host'], $port);
274  if (!$stream) break;
275  $processRequest =
276  'GET '.$urlParts['path'].'?authToken='.urlencode($oneTimeKey)." HTTP/1.1\r\n"
277  .'Host: '.$urlParts['host']."\r\n"
278  ."User-Agent: OJS\r\n"
279  ."Connection: Close\r\n\r\n";
280  stream_set_blocking($stream, 0);
281  fwrite($stream, $processRequest);
282  fclose($stream);
283  unset($stream);
284 
285  $currentParallelism++;
286  $spawnedProcesses++;
287  }
288 
289  return $spawnedProcesses;
290  }
291 
300  function authorizeProcess($processId) {
301  $process =& $this->getObjectById($processId);
302  if (is_a($process, 'Process') && $process->getObliterated() === false) {
303  // The one time key has not been used yet.
304  // Mark it as used.
305  $success = $this->update(
306  'UPDATE processes
307  SET obliterated = 1
308  WHERE process_id = ?',
309  $processId
310  );
311  if (!$success) return false;
312 
313  // Only authorize the process if its one-time-key
314  // has not expired yet.
315  $minTimestamp = time() - PROCESS_MAX_KEY_VALID;
316  $authorized = ($process->getTimeStarted() > $minTimestamp);
317 
318  // Delete the process entry if the process was
319  // not authorized due to an expired key.
320  if (!$authorized) $this->deleteObjectById($processId);
321 
322  return $authorized;
323  }
324 
325  // Deny access if the process entry doesn't exist or
326  // the one-time-key has already been marked used. But don't
327  // delete the process entry in this case to avoid that
328  // outsiders can stop processes if they guess a key.
329  return false;
330  }
331 
341  function canContinue($processId) {
342  // Calculate the max timestamp that is considered ok.
343  $minTimestamp = time() - PROCESS_MAX_EXECUTION_TIME;
344 
345  // Check whether the process is still allowed to run.
346  $process =& $this->getObjectById($processId);
347  $canContinue = (is_a($process, 'Process') && $process->getTimeStarted() > $minTimestamp);
348 
349  // Delete the process entry if the process is
350  // not allowed to continue.
351  if (!$canContinue) $this->deleteObjectById($processId);
352 
353  return $canContinue;
354  }
355 
360  function newDataObject() {
361  return new Process();
362  }
363 
364  //
365  // Private helper methods
366  //
373  function &_fromRow(&$row) {
374  $process = $this->newDataObject();
375  $process->setId($row['process_id']);
376  $process->setProcessType((integer)$row['process_type']);
377  $process->setTimeStarted((integer)$row['time_started']);
378  $process->setObliterated((boolean)$row['obliterated']);
379  return $process;
380  }
381 }
382 
383 ?>
deleteZombies($force=false)
Operations for retrieving and modifying objects from a database.
Definition: DAO.inc.php:29
& _fromRow(&$row)
& retrieve($sql, $params=false, $callHooks=true)
Definition: DAO.inc.php:83
canContinue($processId)
spawnProcesses(&$request, $handler, $op, $processType, $noOfProcesses)
deleteObject(&$process)
getObjectById($processId)
deleteObjectById($processId)
& insertObject($processType, $maxParallelism)
getNumberOfObjectsByProcessType($processType)
Operations for retrieving and modifying process data.
authorizeProcess($processId)
A class representing a running process.
Definition: Process.inc.php:25
update($sql, $params=false, $callHooks=true, $dieOnError=true)
Definition: DAO.inc.php:211