Programming Language PHP

Namespace Oro\Component\MessageQueue\Consumption

Class ExtensionInterface

Total Examples 1

1 code examples of PHP Oro\Component\MessageQueue\Consumption\ExtensionInterface extracted from open source projects

Was this example useful?
0
                                                    /**
     * @throws ConsumptionInterruptedException
     */
    protected function doConsume(ExtensionInterface $extension, Context $context): void
    {
        $session = $context->getSession();
        $messageConsumer = $context->getMessageConsumer();
        $logger = $context->getLogger();

        $extension->onBeforeReceive($context);

        if ($context->isExecutionInterrupted()) {
            throw new ConsumptionInterruptedException($context->getInterruptedReason());
        }
        $logger->debug('Pre receive Message');
        $message = $messageConsumer->receive(1);
        if (null !== $message) {
            $context->setMessage($message);
            $extension->onPreReceived($context);

            $logger->info('Message received', [
                'headers'    => $message->getHeaders(),
                'properties' => $message->getProperties(),
            ]);

            $executionTime = 0;
            if (!$context->getStatus()) {
                $startTime = (int) (microtime(true) * 1000);

                $status = $this->messageProcessorRegistry
                    ->get($context->getMessageProcessorName())
                    ->process($message, $session);

                $executionTime = (int) (microtime(true) * 1000) - $startTime;
                $context->setStatus($status);
            }

            switch ($context->getStatus()) {
                case MessageProcessorInterface::ACK:
                    $messageConsumer->acknowledge($message);
                    $statusForLog = 'ACK';
                    break;
                case MessageProcessorInterface::REJECT:
                    $messageConsumer->reject($message, false);
                    $statusForLog = 'REJECT';
                    break;
                case MessageProcessorInterface::REQUEUE:
                    $messageConsumer->reject($message, true);
                    $statusForLog = 'REQUEUE';
                    break;
                default:
                    throw new \LogicException(sprintf('Status is not supported: %s', $context->getStatus()));
            }

            $loggerContext = [
                'status' => $statusForLog,
                'time_taken' => $executionTime,
            ];
            $this->addMemoryUsageInfo($loggerContext);
            $logger->notice('Message processed: {status}. Execution time: {time_taken} ms', $loggerContext);

            $extension->onPostReceived($context);
        } else {
            $logger->info('Idle');

            usleep($this->idleMicroseconds);
            $extension->onIdle($context);
        }

        if ($context->isExecutionInterrupted()) {
            throw new ConsumptionInterruptedException($context->getInterruptedReason());
        }
    }
                                            
ExtensionInterface's Other Methods
ExtensionInterface's Other Methods