EventListProcessor.php 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. <?php
  2. /**
  3. * Refer to LICENSE.txt distributed with the Temando Shipping module for notice of license
  4. */
  5. namespace Temando\Shipping\Sync;
  6. use Psr\Log\LoggerInterface;
  7. use Temando\Shipping\Model\StreamEventInterface;
  8. use Temando\Shipping\Sync\Exception\EventException;
  9. use Temando\Shipping\Sync\Exception\EventProcessorException;
  10. /**
  11. * Temando Event List Processor
  12. *
  13. * Process given stream events, usually a subset of events available at the API.
  14. *
  15. * @package Temando\Shipping\Sync
  16. * @author Benjamin Heuer <benjamin.heuer@netresearch.de>
  17. * @license http://opensource.org/licenses/osl-3.0.php Open Software License (OSL 3.0)
  18. * @link http://www.temando.com/
  19. */
  20. class EventListProcessor
  21. {
  22. /**
  23. * @var EntityProcessorFactory
  24. */
  25. private $entityProcessorFactory;
  26. /**
  27. * @var LoggerInterface
  28. */
  29. private $logger;
  30. /**
  31. * @param EntityProcessorFactory $entityProcessorFactory
  32. * @param LoggerInterface $logger
  33. */
  34. public function __construct(
  35. EntityProcessorFactory $entityProcessorFactory,
  36. LoggerInterface $logger
  37. ) {
  38. $this->entityProcessorFactory = $entityProcessorFactory;
  39. $this->logger = $logger;
  40. }
  41. /**
  42. * @param StreamEventInterface[] $streamEvents
  43. * @param EventList $eventList
  44. * @return void
  45. */
  46. public function processEventList(array $streamEvents, EventList $eventList)
  47. {
  48. foreach ($streamEvents as $streamEvent) {
  49. if ($eventList->hasEvent($streamEvent->getEventId())) {
  50. // event was already processed with failure. do not try again.
  51. continue;
  52. }
  53. // mark event as processing
  54. $eventList->addEvent($streamEvent->getEventId());
  55. $operation = $streamEvent->getEventType();
  56. $entityType = $streamEvent->getEntityType();
  57. $entityId = $streamEvent->getEntityId();
  58. try {
  59. $entityProcessor = $this->entityProcessorFactory->get($entityType);
  60. $processedId = $entityProcessor->execute($operation, $entityId);
  61. // event was successfully processed
  62. $eventList->removeEvent($streamEvent->getEventId());
  63. $message = "Operation {$operation} was executed successfully for {$entityType} {$processedId}.";
  64. $this->logger->info($message);
  65. } catch (EventProcessorException $e) {
  66. // processing failed, try again later
  67. $this->logger->error($e->getMessage(), ['exception' => $e]);
  68. continue;
  69. } catch (EventException $e) {
  70. // processing not supported or desired
  71. $eventList->removeEvent($streamEvent->getEventId());
  72. $this->logger->notice($e->getMessage(), ['exception' => $e]);
  73. }
  74. }
  75. }
  76. }