1:   2:   3:   4:   5:   6:   7:   8:   9:  10:  11:  12:  13:  14:  15:  16:  17:  18:  19:  20:  21:  22:  23:  24:  25:  26:  27:  28:  29:  30:  31:  32:  33:  34:  35:  36:  37:  38:  39:  40:  41:  42:  43:  44:  45:  46:  47:  48:  49:  50:  51:  52:  53:  54:  55:  56:  57:  58:  59:  60:  61:  62:  63:  64:  65:  66:  67:  68:  69:  70:  71:  72:  73:  74:  75:  76:  77:  78:  79:  80:  81:  82:  83:  84:  85:  86:  87:  88:  89:  90:  91:  92:  93:  94:  95:  96:  97:  98:  99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 
<?php
namespace Omeka\Job;

use DateTime;
use Doctrine\ORM\EntityManager;
use Omeka\Job\DispatchStrategy\StrategyInterface;
use Omeka\Entity\Job;
use Omeka\Log\Writer\Job as JobWriter;
use Zend\Authentication\AuthenticationService;
use Zend\Log\Logger;

class Dispatcher
{
    /**
     * @var StrategyInterface
     */
    protected $dispatchStrategy;

    /**
     * @var EntityManager
     */
    protected $entityManager;

    /**
     * @var Logger
     */
    protected $logger;

    /**
     * @var AuthenticationService
     */
    protected $auth;

    /**
     * Set the dispatch strategy.
     *
     * @param StrategyInterface $dispatchStrategy
     * @param EntityManager $entityManager
     * @param Logger $logger
     * @param AuthenticationService $auth
     */
    public function __construct(StrategyInterface $dispatchStrategy, EntityManager $entityManager,
        Logger $logger, AuthenticationService $auth)
    {
        $this->dispatchStrategy = $dispatchStrategy;
        $this->entityManager = $entityManager;
        $this->logger = $logger;
        $this->auth = $auth;
    }

    /**
     * @return StrategyInterface
     */
    public function getDispatchStrategy()
    {
        return $this->dispatchStrategy;
    }

    /**
     * Dispatch a job.
     *
     * Composes a Job entity and uses the configured strategy if no strategy is
     * passed.
     *
     * @param string $class
     * @param mixed $args
     * @param StrategyInterface $strategy
     * @return null|Job $job
     */
    public function dispatch($class, $args = null, StrategyInterface $strategy = null)
    {
        if (!class_exists($class)) {
            throw new Exception\InvalidArgumentException(sprintf('The job class "%s" does not exist.', $class));
        }
        if (!is_subclass_of($class, 'Omeka\Job\JobInterface')) {
            throw new Exception\InvalidArgumentException(sprintf('The job class "%s" does not implement Omeka\Job\JobInterface.', $class));
        }

        $job = new Job;
        $job->setStatus(Job::STATUS_STARTING);
        $job->setClass($class);
        $job->setArgs($args);
        $job->setOwner($this->auth->getIdentity());
        $this->entityManager->persist($job);
        $this->entityManager->flush();

        if (!$strategy) {
            $strategy = $this->getDispatchStrategy();
        }

        $this->send($job, $strategy);
        return $job;
    }

    /**
     * Send a job via a strategy.
     *
     * @param Job $job
     * @param StrategyInterface $strategy
     */
    public function send(Job $job, StrategyInterface $strategy)
    {
        $this->logger->addWriter(new JobWriter($job));
        try {
            $strategy->send($job);
        } catch (\Exception $e) {
            $this->logger->err((string) $e);
            $job->setStatus(Job::STATUS_ERROR);
            $job->setEnded(new DateTime('now'));

            // Account for "inside Doctrine" errors that close the EM
            if ($this->entityManager->isOpen()) {
                $entityManager = $this->entityManager;
            } else {
                $entityManager = $this->getNewEntityManager($this->entityManager);
            }

            $entityManager->clear();
            $entityManager->merge($job);
            $entityManager->flush();
        }
    }

    /**
     * Set a job to be stopped.
     *
     * This does nothing but change the job status to STATUS_STOPPING. It's up
     * to individual job implementations to stop performing by listening to the
     * status change, usually from within an iteration.
     *
     * @param int $jobId
     */
    public function stop($jobId)
    {
        $job = $this->entityManager->find('Omeka\Entity\Job', $jobId);
        if (!$job) {
            throw new Exception\InvalidArgumentException(sprintf('The job ID "%s" is invalid.', $jobId));
        }
        $job->setStatus(Job::STATUS_STOPPING);
        $this->entityManager->flush();
    }

    /**
     * Get a new EntityManager sharing the settings of an old one.
     *
     * Internal Doctrine errors "close" the EntityManager and we can never use it again, so we need
     * to create a new one if we want to save anything after one of those kinds of errors.
     *
     * @param EntityManager $entityManager
     * @return EntityManager
     */
    private function getNewEntityManager(EntityManager $entityManager)
    {
        return EntityManager::create(
            $entityManager->getConnection(),
            $entityManager->getConfiguration(),
            $entityManager->getEventManager()
        );
    }
}