Open Journal Systems  3.0.0
 All Classes Namespaces Functions Variables Groups Pages
ProcessDAO.inc.php
1 <?php
41 // Define the max number of seconds a process is allowed to run.
42 // We assume that no process should run longer than
43 // 15 minutes. So we clean all processes that have a time
44 // stamp of more than 15 minutes ago. Running processes should check
45 // regularly (about once per minute) whether "their" process entry
46 // is still there. If not they are required to halt immediately.
47 // NB: Don't set this timeout much shorter as this may
48 // potentially cause more parallel processes being spawned
49 // than allowed.
50 define('PROCESS_MAX_EXECUTION_TIME', 900);
51 
52 // Cap the max. number of parallel process to avoid server
53 // flooding in case of an error.
54 define('PROCESS_MAX_PARALLELISM', 20);
55 
56 // The max. number of seconds a one-time-key will be kept valid.
57 // This defines the potential window of attack if an attacker
58 // manages to guess a key. Defining this time too short can lead
59 // to problems when networks are slow.
60 define('PROCESS_MAX_KEY_VALID', 10);
61 
62 
63 import('lib.pkp.classes.process.Process');
64 
65 class ProcessDAO extends DAO {
69  function ProcessDAO() {
70  parent::DAO();
71  }
72 
83  function &insertObject($processType, $maxParallelism, $additionalData = null) {
84  // Free processing slots occupied by zombie processes.
85  $this->deleteZombies();
86 
87  // Cap the parallelism to the max. parallelism.
88  $maxParallelism = min($maxParallelism, PROCESS_MAX_PARALLELISM);
89 
90  // Check whether we're allowed to spawn another process.
91  $currentParallelism = $this->getNumberOfObjectsByProcessType($processType);
92  if ($currentParallelism >= $maxParallelism) {
93  $falseVar = false;
94  return $falseVar;
95  }
96 
97  // We create a process instance from the given data.
98  $process = $this->newDataObject();
99  $process->setProcessType($processType);
100 
101  // Generate a new process ID. See classdoc for process ID
102  // requirements.
103  $process->setId(uniqid('', true));
104 
105  // Generate the timestamp.
106  $process->setTimeStarted(time());
107 
108  // Persist the process.
109  $this->update(
110  sprintf('INSERT INTO processes
111  (process_id, process_type, time_started, obliterated, additional_data)
112  VALUES
113  (?, ?, ?, 0, ?)'),
114  array(
115  $process->getId(),
116  (int) $process->getProcessType(),
117  (int) $process->getTimeStarted(),
118  serialize($additionalData)
119  )
120  );
121  $process->setObliterated(false);
122  return $process;
123  }
124 
130  function getObjectById($processId) {
131  $result = $this->retrieve(
132  'SELECT process_id, process_type, time_started, obliterated, additional_data FROM processes WHERE process_id = ?',
133  $processId
134  );
135 
136  $process = null;
137  if ($result->RecordCount() != 0) {
138  $process = $this->_fromRow($result->GetRowAssoc(false));
139  }
140  $result->Close();
141 
142  return $process;
143  }
144 
151  function getNumberOfObjectsByProcessType($processType) {
152  // Find the number of processes for the
153  // given process type.
154  $result = $this->retrieve(
155  'SELECT COUNT(*) AS running_processes
156  FROM processes
157  WHERE process_type = ?',
158  (int) $processType
159  );
160 
161  $runningProcesses = 0;
162  if ($result->RecordCount() != 0) {
163  $row = $result->GetRowAssoc(false);
164  $runningProcesses = (int)$row['running_processes'];
165  }
166  return $runningProcesses;
167  }
168 
173  function deleteObject(&$process) {
174  return $this->deleteObjectById($process->getId());
175  }
176 
181  function deleteObjectById($processId) {
182  assert(!empty($processId));
183 
184  // Delete process
185  return $this->update('DELETE FROM processes WHERE process_id = ?', $processId);
186  }
187 
200  function deleteZombies($force = false) {
201  static $zombiesDeleted = false;
202 
203  // For performance reasons don't delete zombies
204  // more than once per request.
205  if ($zombiesDeleted && !$force) {
206  return;
207  } else {
208  $zombiesDeleted = true;
209  }
210 
211  // Calculate the max timestamp that is considered ok.
212  $maxTimestamp = time() - PROCESS_MAX_EXECUTION_TIME;
213 
214  // Delete all processes with a timestamp older than
215  // the max. timestamp.
216  return $this->update(
217  'DELETE FROM processes
218  WHERE time_started < ?',
219  (int) $maxTimestamp
220  );
221  }
222 
237  function spawnProcesses($request, $handler, $op, $processType, $noOfProcesses, $data = null) {
238  // Generate the web URL to be called.
239  $dispatcher = Application::getDispatcher();
240  $processUrl = $dispatcher->url($request, ROUTE_COMPONENT, null, $handler, $op);
241 
242  // Parse the URL into parts to construct the fsockopen call.
243  $urlParts = parse_url($processUrl);
244  assert(isset($urlParts['scheme']) && isset($urlParts['host']) && isset($urlParts['path']) && !isset($urlParts['fragment']));
245  if ($urlParts['scheme'] == 'https') {
246  $port = 443;
247  $transport = 'ssl://';
248  } else {
249  $port = 80;
250  $transport = '';
251  }
252 
253  // Delete process zombies for correct process slot calculation.
254  $this->deleteZombies();
255 
256  // Calculate the number of max process slots for the given process type.
257  $noOfProcesses = min($noOfProcesses, PROCESS_MAX_PARALLELISM);
258 
259  // Spawn new non-blocking (i.e. parallel) processes via
260  // web requests until all process slots have been filled.
261  $currentParallelism = $this->getNumberOfObjectsByProcessType($processType);
262  $spawnedProcesses = 0;
263  while ($currentParallelism < $noOfProcesses) {
264  // Block a process slot.
265  // NB: insertObject() re-checks the number of currently running
266  // processes on each iteration to make sure that we don't exceed
267  // the limit when there are concurrent requests.
268  $process =& $this->insertObject($processType, $noOfProcesses, $data);
269  if (!is_a($process, 'Process')) break;
270  $oneTimeKey = $process->getId();
271 
272  // Make the request including the generated one-time-key.
273  $stream = fsockopen($transport.$urlParts['host'], $port);
274  if (!$stream) break;
275  $processRequest =
276  'GET '.$urlParts['path'].'?authToken='.urlencode($oneTimeKey)." HTTP/1.1\r\n"
277  .'Host: '.$urlParts['host']."\r\n"
278  ."User-Agent: OJS\r\n"
279  ."Connection: Close\r\n\r\n";
280  stream_set_blocking($stream, 0);
281  fwrite($stream, $processRequest);
282  fclose($stream);
283  unset($stream);
284 
285  $currentParallelism++;
286  $spawnedProcesses++;
287  }
288 
289  return $spawnedProcesses;
290  }
291 
300  function authorizeProcess($processId) {
301  $process =& $this->getObjectById($processId);
302  if (is_a($process, 'Process') && $process->getObliterated() === false) {
303  // The one time key has not been used yet.
304  // Mark it as used.
305  $success = $this->update(
306  'UPDATE processes
307  SET obliterated = 1
308  WHERE process_id = ?',
309  $processId
310  );
311  if (!$success) return false;
312 
313  // Only authorize the process if its one-time-key
314  // has not expired yet.
315  $minTimestamp = time() - PROCESS_MAX_KEY_VALID;
316  $authorized = ($process->getTimeStarted() > $minTimestamp);
317 
318  // Delete the process entry if the process was
319  // not authorized due to an expired key.
320  if (!$authorized) $this->deleteObjectById($processId);
321 
322  return $authorized;
323  }
324 
325  // Deny access if the process entry doesn't exist or
326  // the one-time-key has already been marked used. But don't
327  // delete the process entry in this case to avoid that
328  // outsiders can stop processes if they guess a key.
329  return false;
330  }
331 
341  function canContinue($processId) {
342  // Calculate the max timestamp that is considered ok.
343  $minTimestamp = time() - PROCESS_MAX_EXECUTION_TIME;
344 
345  // Check whether the process is still allowed to run.
346  $process =& $this->getObjectById($processId);
347  $canContinue = (is_a($process, 'Process') && $process->getTimeStarted() > $minTimestamp);
348 
349  // Delete the process entry if the process is
350  // not allowed to continue.
351  if (!$canContinue) $this->deleteObjectById($processId);
352 
353  return $canContinue;
354  }
355 
360  function newDataObject() {
361  return new Process();
362  }
363 
364  //
365  // Private helper methods
366  //
373  function _fromRow($row) {
374  $process = $this->newDataObject();
375  $process->setId($row['process_id']);
376  $process->setProcessType((integer)$row['process_type']);
377  $process->setTimeStarted((integer)$row['time_started']);
378  $process->setObliterated((boolean)$row['obliterated']);
379  $process->setAdditionalData(unserialize($row['additional_data']));
380  return $process;
381  }
382 }
383 
384 ?>
spawnProcesses($request, $handler, $op, $processType, $noOfProcesses, $data=null)
deleteZombies($force=false)
Operations for retrieving and modifying objects from a database.
Definition: DAO.inc.php:30
& retrieve($sql, $params=false, $callHooks=true)
Definition: DAO.inc.php:84
canContinue($processId)
deleteObject(&$process)
getObjectById($processId)
deleteObjectById($processId)
getNumberOfObjectsByProcessType($processType)
Operations for retrieving and modifying process data.
authorizeProcess($processId)
A class representing a running process.
Definition: Process.inc.php:28
& insertObject($processType, $maxParallelism, $additionalData=null)
update($sql, $params=false, $callHooks=true, $dieOnError=true)
Definition: DAO.inc.php:208