diff --git a/Command/AnonConsumerCommand.php b/Command/AnonConsumerCommand.php index 571d8ddd..6056d70d 100644 --- a/Command/AnonConsumerCommand.php +++ b/Command/AnonConsumerCommand.php @@ -4,7 +4,7 @@ class AnonConsumerCommand extends BaseConsumerCommand { - protected function configure() + protected function configure(): void { parent::configure(); @@ -15,7 +15,7 @@ protected function configure() } - protected function getConsumerService() + protected function getConsumerService(): string { return 'old_sound_rabbit_mq.%s_anon'; } diff --git a/Command/BaseConsumerCommand.php b/Command/BaseConsumerCommand.php index e17b0c92..aef8d038 100644 --- a/Command/BaseConsumerCommand.php +++ b/Command/BaseConsumerCommand.php @@ -2,7 +2,11 @@ namespace OldSound\RabbitMqBundle\Command; -use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer as Consumer; +use OldSound\RabbitMqBundle\RabbitMq\AnonConsumer; +use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer; +use OldSound\RabbitMqBundle\RabbitMq\Consumer; +use OldSound\RabbitMqBundle\RabbitMq\DynamicConsumer; +use OldSound\RabbitMqBundle\RabbitMq\MultipleConsumer; use PhpAmqpLib\Exception\AMQPTimeoutException; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; @@ -11,15 +15,20 @@ abstract class BaseConsumerCommand extends BaseRabbitMqCommand { + /** @var DynamicConsumer|MultipleConsumer|AnonConsumer */ protected $consumer; + /** @var string */ protected $amount; + /** + * @return mixed + */ abstract protected function getConsumerService(); - public function stopConsumer() + public function stopConsumer(): void { - if ($this->consumer instanceof Consumer) { + if ($this->consumer instanceof BaseConsumer) { // Process current message, then halt consumer $this->consumer->forceStopConsumer(); @@ -30,12 +39,21 @@ public function stopConsumer() } } - public function restartConsumer() + public function restartConsumer():void { - // TODO: Implement restarting of consumer + if ($this->consumer instanceof BaseConsumer) { + // Process current message, then halt consumer + $this->consumer->forceStopConsumer(); + + // Halt consumer if waiting for a new message from the queue + try { + $this->consumer->start(); + } catch (\ErrorException $e) { + } + } } - protected function configure() + protected function configure(): void { parent::configure(); @@ -43,7 +61,7 @@ protected function configure() ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0) ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '') - ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)', null) + ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)') ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals') ; @@ -52,15 +70,10 @@ protected function configure() /** * Executes the current command. * - * @param InputInterface $input An InputInterface instance - * @param OutputInterface $output An OutputInterface instance - * - * @return integer 0 if everything went fine, or an error code - * * @throws \InvalidArgumentException When the number of messages to consume is less than 0 * @throws \BadFunctionCallException When the pcntl is not installed and option -s is true */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { if (defined('AMQP_WITHOUT_SIGNALS') === false) { define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals')); @@ -87,17 +100,19 @@ protected function execute(InputInterface $input, OutputInterface $output) } $this->initConsumer($input); - return $this->consumer->consume($this->amount); + return $this->consumer->consume((int)$this->amount); } - protected function initConsumer($input) + protected function initConsumer(InputInterface $input): void { $this->consumer = $this->getContainer() ->get(sprintf($this->getConsumerService(), $input->getArgument('name'))); - if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) { + if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && (int)$input->getOption('memory-limit') > 0) { $this->consumer->setMemoryLimit($input->getOption('memory-limit')); } $this->consumer->setRoutingKey($input->getOption('route')); + + $this->consumer->setContext($input->getArgument('context')); } } diff --git a/Command/BaseRabbitMqCommand.php b/Command/BaseRabbitMqCommand.php index fe4bea5e..26ef98ab 100644 --- a/Command/BaseRabbitMqCommand.php +++ b/Command/BaseRabbitMqCommand.php @@ -16,15 +16,12 @@ abstract class BaseRabbitMqCommand extends Command implements ContainerAwareInte /** * {@inheritDoc} */ - public function setContainer(ContainerInterface $container = null) + public function setContainer(ContainerInterface $container = null): void { $this->container = $container; } - /** - * @return ContainerInterface - */ - public function getContainer() + public function getContainer(): ContainerInterface { return $this->container; } diff --git a/Command/BatchConsumerCommand.php b/Command/BatchConsumerCommand.php index 59e83919..52c98d84 100644 --- a/Command/BatchConsumerCommand.php +++ b/Command/BatchConsumerCommand.php @@ -16,7 +16,7 @@ final class BatchConsumerCommand extends BaseRabbitMqCommand */ protected $consumer; - public function stopConsumer() + public function stopConsumer(): void { if ($this->consumer instanceof BatchConsumer) { // Process current message, then halt consumer @@ -29,7 +29,7 @@ public function stopConsumer() } } - protected function configure() + protected function configure(): void { parent::configure(); @@ -38,25 +38,25 @@ protected function configure() ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') ->addOption('batches', 'b', InputOption::VALUE_OPTIONAL, 'Number of batches to consume', 0) ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '') - ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null) + ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process') ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals') ->setDescription('Executes a Batch Consumer'); - ; } /** * Executes the current command. * + * @param InputInterface $input An InputInterface instance * @param OutputInterface $output An OutputInterface instance * * @return integer 0 if everything went fine, or an error code * - * @throws \InvalidArgumentException When the number of batches to consume is less than 0 - * @throws \BadFunctionCallException When the pcntl is not installed and option -s is true + * @throws \InvalidArgumentException When the number of messages to consume is less than 0 + * @throws \BadFunctionCallException|\ErrorException When the pcntl is not installed and option -s is true */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { if (defined('AMQP_WITHOUT_SIGNALS') === false) { define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals')); @@ -86,10 +86,7 @@ protected function execute(InputInterface $input, OutputInterface $output) return $this->consumer->consume($batchAmountTarget); } - /** - * @param InputInterface $input - */ - protected function initConsumer(InputInterface $input) + protected function initConsumer(InputInterface $input): void { $this->consumer = $this->getContainer() ->get(sprintf($this->getConsumerService(), $input->getArgument('name'))); @@ -103,10 +100,7 @@ protected function initConsumer(InputInterface $input) $this->consumer->setRoutingKey($input->getOption('route')); } - /** - * @return string - */ - protected function getConsumerService() + protected function getConsumerService(): string { return 'old_sound_rabbit_mq.%s_batch'; } diff --git a/Command/ConsumerCommand.php b/Command/ConsumerCommand.php index 35a0e985..c03f5c35 100644 --- a/Command/ConsumerCommand.php +++ b/Command/ConsumerCommand.php @@ -4,14 +4,14 @@ class ConsumerCommand extends BaseConsumerCommand { - protected function configure() + protected function configure(): void { parent::configure(); $this->setDescription('Executes a consumer'); $this->setName('rabbitmq:consumer'); } - protected function getConsumerService() + protected function getConsumerService(): string { return 'old_sound_rabbit_mq.%s_consumer'; } diff --git a/Command/DeleteCommand.php b/Command/DeleteCommand.php index d67c2a39..980a4c4f 100644 --- a/Command/DeleteCommand.php +++ b/Command/DeleteCommand.php @@ -13,7 +13,7 @@ */ class DeleteCommand extends ConsumerCommand { - protected function configure() + protected function configure(): void { $this->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') ->setDescription('Delete a consumer\'s queue') @@ -22,13 +22,7 @@ protected function configure() $this->setName('rabbitmq:delete'); } - /** - * @param InputInterface $input - * @param OutputInterface $output - * - * @return int - */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { $noConfirmation = (bool) $input->getOption('no-confirmation'); diff --git a/Command/DynamicConsumerCommand.php b/Command/DynamicConsumerCommand.php index 2f010a0b..1aac9912 100644 --- a/Command/DynamicConsumerCommand.php +++ b/Command/DynamicConsumerCommand.php @@ -10,11 +10,13 @@ */ namespace OldSound\RabbitMqBundle\Command; +use OldSound\RabbitMqBundle\RabbitMq\DynamicConsumer; use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; class DynamicConsumerCommand extends BaseConsumerCommand { - protected function configure() + protected function configure(): void { parent::configure(); @@ -25,12 +27,12 @@ protected function configure() ; } - protected function getConsumerService() + protected function getConsumerService(): string { return 'old_sound_rabbit_mq.%s_dynamic'; } - protected function initConsumer($input) + protected function initConsumer(InputInterface $input): void { parent::initConsumer($input); $this->consumer->setContext($input->getArgument('context')); diff --git a/Command/MultipleConsumerCommand.php b/Command/MultipleConsumerCommand.php index e757d889..ca7ec2ec 100644 --- a/Command/MultipleConsumerCommand.php +++ b/Command/MultipleConsumerCommand.php @@ -3,10 +3,11 @@ namespace OldSound\RabbitMqBundle\Command; use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; class MultipleConsumerCommand extends BaseConsumerCommand { - protected function configure() + protected function configure(): void { parent::configure(); @@ -16,12 +17,12 @@ protected function configure() ; } - protected function getConsumerService() + protected function getConsumerService(): string { return 'old_sound_rabbit_mq.%s_multiple'; } - protected function initConsumer($input) + protected function initConsumer(InputInterface $input): void { parent::initConsumer($input); $this->consumer->setContext($input->getArgument('context')); diff --git a/Command/PurgeConsumerCommand.php b/Command/PurgeConsumerCommand.php index 9bf85da8..6899910d 100644 --- a/Command/PurgeConsumerCommand.php +++ b/Command/PurgeConsumerCommand.php @@ -13,7 +13,7 @@ */ class PurgeConsumerCommand extends ConsumerCommand { - protected function configure() + protected function configure(): void { $this->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') ->setDescription('Purge a consumer\'s queue') @@ -22,13 +22,7 @@ protected function configure() $this->setName('rabbitmq:purge'); } - /** - * @param InputInterface $input - * @param OutputInterface $output - * - * @return int - */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { $noConfirmation = (bool) $input->getOption('no-confirmation'); @@ -50,7 +44,7 @@ protected function execute(InputInterface $input, OutputInterface $output) $this->consumer = $this->getContainer() ->get(sprintf($this->getConsumerService(), $input->getArgument('name'))); - $this->consumer->purge($input->getArgument('name')); + $this->consumer->purge(); return 0; } diff --git a/Command/RpcServerCommand.php b/Command/RpcServerCommand.php index 59091453..d24b43c9 100644 --- a/Command/RpcServerCommand.php +++ b/Command/RpcServerCommand.php @@ -9,7 +9,7 @@ class RpcServerCommand extends BaseRabbitMqCommand { - protected function configure() + protected function configure(): void { parent::configure(); @@ -32,7 +32,7 @@ protected function configure() * * @throws \InvalidArgumentException When the number of messages to consume is less than 0 */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { define('AMQP_DEBUG', (bool) $input->getOption('debug')); $amount = $input->getOption('messages'); diff --git a/Command/SetupFabricCommand.php b/Command/SetupFabricCommand.php index d6bbbe83..df7458cd 100644 --- a/Command/SetupFabricCommand.php +++ b/Command/SetupFabricCommand.php @@ -9,7 +9,7 @@ class SetupFabricCommand extends BaseRabbitMqCommand { - protected function configure() + protected function configure(): void { $this ->setName('rabbitmq:setup-fabric') @@ -18,7 +18,7 @@ protected function configure() ; } - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { if (defined('AMQP_DEBUG') === false) { define('AMQP_DEBUG', (bool) $input->getOption('debug')); diff --git a/Command/StdInProducerCommand.php b/Command/StdInProducerCommand.php index 1cea339f..25485d30 100644 --- a/Command/StdInProducerCommand.php +++ b/Command/StdInProducerCommand.php @@ -12,7 +12,7 @@ class StdInProducerCommand extends BaseRabbitMqCommand const FORMAT_PHP = 'php'; const FORMAT_RAW = 'raw'; - protected function configure() + protected function configure(): void { parent::configure(); @@ -34,7 +34,7 @@ protected function configure() * * @return integer 0 if everything went fine, or an error code */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { define('AMQP_DEBUG', (bool) $input->getOption('debug')); diff --git a/DataCollector/MessageDataCollector.php b/DataCollector/MessageDataCollector.php index 5444509d..416e6503 100644 --- a/DataCollector/MessageDataCollector.php +++ b/DataCollector/MessageDataCollector.php @@ -2,6 +2,7 @@ namespace OldSound\RabbitMqBundle\DataCollector; +use OldSound\RabbitMqBundle\RabbitMq\AMQPLoggedChannel; use Symfony\Component\HttpKernel\DataCollector\DataCollector; use Symfony\Component\HttpFoundation\Request; use Symfony\Component\HttpFoundation\Response; @@ -13,16 +14,18 @@ */ class MessageDataCollector extends DataCollector { + /** @var array */ private $channels; - public function __construct($channels) + public function __construct(array $channels) { $this->channels = $channels; $this->data = array(); } - public function collect(Request $request, Response $response, \Throwable $exception = null) + public function collect(Request $request, Response $response, \Throwable $exception = null): void { + /** @var AMQPLoggedChannel $channel */ foreach ($this->channels as $channel) { foreach ($channel->getBasicPublishLog() as $log) { $this->data[] = $log; @@ -30,22 +33,22 @@ public function collect(Request $request, Response $response, \Throwable $except } } - public function getName() + public function getName(): string { return 'rabbit_mq'; } - public function getPublishedMessagesCount() + public function getPublishedMessagesCount(): int { return count($this->data); } - public function getPublishedMessagesLog() + public function getPublishedMessagesLog(): array { return $this->data; } - public function reset() + public function reset(): void { $this->data = []; } diff --git a/DependencyInjection/Compiler/InjectEventDispatcherPass.php b/DependencyInjection/Compiler/InjectEventDispatcherPass.php index 2f2a26af..ce18071f 100644 --- a/DependencyInjection/Compiler/InjectEventDispatcherPass.php +++ b/DependencyInjection/Compiler/InjectEventDispatcherPass.php @@ -19,7 +19,7 @@ class InjectEventDispatcherPass implements CompilerPassInterface /** * @inheritDoc */ - public function process(ContainerBuilder $container) + public function process(ContainerBuilder $container): void { if (!$container->has(self::EVENT_DISPATCHER_SERVICE_ID)) { return; diff --git a/DependencyInjection/Compiler/RegisterPartsPass.php b/DependencyInjection/Compiler/RegisterPartsPass.php index c6d7a06d..327b9aad 100644 --- a/DependencyInjection/Compiler/RegisterPartsPass.php +++ b/DependencyInjection/Compiler/RegisterPartsPass.php @@ -8,7 +8,7 @@ class RegisterPartsPass implements CompilerPassInterface { - public function process(ContainerBuilder $container) + public function process(ContainerBuilder $container): void { $services = $container->findTaggedServiceIds('old_sound_rabbit_mq.base_amqp'); $container->setParameter('old_sound_rabbit_mq.base_amqp', array_keys($services)); diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index f42a9aff..555b7f43 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -21,14 +21,13 @@ class Configuration implements ConfigurationInterface /** * Configuration constructor. * - * @param string $name */ - public function __construct($name) + public function __construct(string $name) { $this->name = $name; } - public function getConfigTreeBuilder() + public function getConfigTreeBuilder(): TreeBuilder { $tree = new TreeBuilder($this->name); /** @var ArrayNodeDefinition $rootNode */ @@ -56,7 +55,7 @@ public function getConfigTreeBuilder() return $tree; } - protected function addConnections(ArrayNodeDefinition $node) + protected function addConnections(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('connection') @@ -91,7 +90,7 @@ protected function addConnections(ArrayNodeDefinition $node) ; } - protected function addProducers(ArrayNodeDefinition $node) + protected function addProducers(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('producer') @@ -115,7 +114,7 @@ protected function addProducers(ArrayNodeDefinition $node) ; } - protected function addBindings(ArrayNodeDefinition $node) + protected function addBindings(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('binding') @@ -139,7 +138,7 @@ protected function addBindings(ArrayNodeDefinition $node) ; } - protected function addConsumers(ArrayNodeDefinition $node) + protected function addConsumers(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('consumer') @@ -180,7 +179,7 @@ protected function addConsumers(ArrayNodeDefinition $node) ; } - protected function addMultipleConsumers(ArrayNodeDefinition $node) + protected function addMultipleConsumers(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('multiple_consumer') @@ -219,8 +218,8 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node) ->end() ; } - - protected function addDynamicConsumers(ArrayNodeDefinition $node) + + protected function addDynamicConsumers(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('dynamic_consumer') @@ -261,12 +260,7 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node) ; } - /** - * @param ArrayNodeDefinition $node - * - * @return void - */ - protected function addBatchConsumers(ArrayNodeDefinition $node) + protected function addBatchConsumers(ArrayNodeDefinition $node): void { $node ->children() @@ -305,7 +299,7 @@ protected function addBatchConsumers(ArrayNodeDefinition $node) ; } - protected function addAnonConsumers(ArrayNodeDefinition $node) + protected function addAnonConsumers(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('anon_consumer') @@ -325,7 +319,7 @@ protected function addAnonConsumers(ArrayNodeDefinition $node) ; } - protected function addRpcClients(ArrayNodeDefinition $node) + protected function addRpcClients(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('rpc_client') @@ -347,7 +341,7 @@ protected function addRpcClients(ArrayNodeDefinition $node) ; } - protected function addRpcServers(ArrayNodeDefinition $node) + protected function addRpcServers(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('rpc_server') @@ -378,7 +372,7 @@ protected function addRpcServers(ArrayNodeDefinition $node) ; } - protected function getExchangeConfiguration() + protected function getExchangeConfiguration(): ArrayNodeDefinition { $node = new ArrayNodeDefinition('exchange_options'); @@ -398,7 +392,7 @@ protected function getExchangeConfiguration() ; } - protected function getQueueConfiguration() + protected function getQueueConfiguration(): ArrayNodeDefinition { $node = new ArrayNodeDefinition('queue_options'); @@ -407,12 +401,12 @@ protected function getQueueConfiguration() return $node; } - protected function getMultipleQueuesConfiguration() + protected function getMultipleQueuesConfiguration(): ArrayNodeDefinition { $node = new ArrayNodeDefinition('queues'); $prototypeNode = $node->prototype('array'); - $this->addQueueNodeConfiguration($prototypeNode); + $this->addQueueNodeConfiguration($node); $prototypeNode->children() ->scalarNode('callback')->isRequired()->end() @@ -423,7 +417,7 @@ protected function getMultipleQueuesConfiguration() return $node; } - protected function addQueueNodeConfiguration(ArrayNodeDefinition $node) + protected function addQueueNodeConfiguration(ArrayNodeDefinition $node): void { $node ->fixXmlConfig('routing_key') diff --git a/DependencyInjection/OldSoundRabbitMqExtension.php b/DependencyInjection/OldSoundRabbitMqExtension.php index b6e2e0f5..0231047e 100644 --- a/DependencyInjection/OldSoundRabbitMqExtension.php +++ b/DependencyInjection/OldSoundRabbitMqExtension.php @@ -29,11 +29,13 @@ class OldSoundRabbitMqExtension extends Extension */ private $collectorEnabled; + /** @var array */ private $channelIds = array(); + /** @var array */ private $config = array(); - public function load(array $configs, ContainerBuilder $container) + public function load(array $configs, ContainerBuilder $container): void { $this->container = $container; @@ -69,12 +71,15 @@ public function load(array $configs, ContainerBuilder $container) } } + /** + * @return object|Configuration|\Symfony\Component\Config\Definition\ConfigurationInterface|null + */ public function getConfiguration(array $config, ContainerBuilder $container) { return new Configuration($this->getAlias()); } - protected function loadConnections() + protected function loadConnections(): void { foreach ($this->config['connections'] as $key => $connection) { $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; @@ -99,9 +104,7 @@ protected function loadConnections() // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6 $definition->setFactory(array(new Reference($factoryName), 'createConnection')); } else { - // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6 - $definition->setFactoryService($factoryName); - $definition->setFactoryMethod('createConnection'); + $definition->setFactory(array(new Reference($factoryName), 'createConnection')); } $definition->addTag('old_sound_rabbit_mq.connection'); $definition->setPublic(true); @@ -110,7 +113,7 @@ protected function loadConnections() } } - protected function loadBindings() + protected function loadBindings(): void { if ($this->config['sandbox']) { return; @@ -135,7 +138,7 @@ protected function loadBindings() } } - protected function loadProducers() + protected function loadProducers(): void { if ($this->config['sandbox'] == false) { foreach ($this->config['producers'] as $key => $producer) { @@ -180,7 +183,7 @@ protected function loadProducers() } } - protected function loadConsumers() + protected function loadConsumers(): void { foreach ($this->config['consumers'] as $key => $consumer) { $definition = new Definition('%old_sound_rabbit_mq.consumer.class%'); @@ -245,7 +248,7 @@ protected function loadConsumers() } } - protected function loadMultipleConsumers() + protected function loadMultipleConsumers(): void { foreach ($this->config['multiple_consumers'] as $key => $consumer) { $queues = array(); @@ -330,7 +333,7 @@ protected function loadMultipleConsumers() } } - protected function loadDynamicConsumers() + protected function loadDynamicConsumers(): void { foreach ($this->config['dynamic_consumers'] as $key => $consumer) { @@ -402,7 +405,7 @@ protected function loadDynamicConsumers() } } - protected function loadBatchConsumers() + protected function loadBatchConsumers(): void { foreach ($this->config['batch_consumers'] as $key => $consumer) { $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); @@ -463,7 +466,7 @@ protected function loadBatchConsumers() } } - protected function loadAnonConsumers() + protected function loadAnonConsumers(): void { foreach ($this->config['anon_consumers'] as $key => $anon) { $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); @@ -487,12 +490,8 @@ protected function loadAnonConsumers() /** * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy * parameter. So we revert the change for right configurations. - * - * @param array $config - * - * @return array */ - private function normalizeArgumentKeys(array $config) + private function normalizeArgumentKeys(array $config): array { if (isset($config['arguments'])) { $arguments = $config['arguments']; @@ -517,10 +516,8 @@ private function normalizeArgumentKeys(array $config) * Support for arguments provided as string. Support for old configuration files. * * @deprecated - * @param string $arguments - * @return array */ - private function argumentsStringAsArray($arguments) + private function argumentsStringAsArray(string $arguments): array { $argumentsArray = array(); @@ -537,7 +534,7 @@ private function argumentsStringAsArray($arguments) return $argumentsArray; } - protected function loadRpcClients() + protected function loadRpcClients(): void { foreach ($this->config['rpc_clients'] as $key => $client) { $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%'); @@ -561,7 +558,7 @@ protected function loadRpcClients() } } - protected function loadRpcServers() + protected function loadRpcServers(): void { foreach ($this->config['rpc_servers'] as $key => $server) { $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%'); @@ -595,7 +592,7 @@ protected function loadRpcServers() } } - protected function injectLoggedChannel(Definition $definition, $name, $connectionName) + protected function injectLoggedChannel(Definition $definition, string $name, string $connectionName): void { $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%'); @@ -610,23 +607,20 @@ protected function injectLoggedChannel(Definition $definition, $name, $connectio $definition->addArgument(new Reference($id)); } - protected function injectConnection(Definition $definition, $connectionName) + protected function injectConnection(Definition $definition, string $connectionName): void { $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); } - public function getAlias() + public function getAlias(): string { return 'old_sound_rabbit_mq'; } /** * Add proper dequeuer aware call - * - * @param string $callback - * @param string $name */ - protected function addDequeuerAwareCall($callback, $name) + protected function addDequeuerAwareCall(string $callback, string $name): void { if (!$this->container->has($callback)) { return; @@ -639,7 +633,7 @@ protected function addDequeuerAwareCall($callback, $name) } } - private function injectLogger(Definition $definition) + private function injectLogger(Definition $definition): void { $definition->addTag('monolog.logger', array( 'channel' => 'phpamqplib' @@ -649,10 +643,8 @@ private function injectLogger(Definition $definition) /** * Get default AMQP exchange options - * - * @return array */ - protected function getDefaultExchangeOptions() + protected function getDefaultExchangeOptions(): array { return array( 'name' => '', @@ -664,10 +656,8 @@ protected function getDefaultExchangeOptions() /** * Get default AMQP queue options - * - * @return array */ - protected function getDefaultQueueOptions() + protected function getDefaultQueueOptions(): array { return array( 'name' => '', diff --git a/Event/AMQPEvent.php b/Event/AMQPEvent.php index 89ce9c66..c21716b0 100644 --- a/Event/AMQPEvent.php +++ b/Event/AMQPEvent.php @@ -4,7 +4,6 @@ use OldSound\RabbitMqBundle\RabbitMq\Consumer; use PhpAmqpLib\Message\AMQPMessage; -use Symfony\Component\EventDispatcher\Event; /** * Class AMQPEvent @@ -29,40 +28,24 @@ class AMQPEvent extends AbstractAMQPEvent */ protected $consumer; - /** - * @return AMQPMessage - */ - public function getAMQPMessage() + public function getAMQPMessage(): AMQPMessage { return $this->AMQPMessage; } - /** - * @param AMQPMessage $AMQPMessage - * - * @return AMQPEvent - */ - public function setAMQPMessage(AMQPMessage $AMQPMessage) + public function setAMQPMessage(AMQPMessage $AMQPMessage): AMQPEvent { $this->AMQPMessage = $AMQPMessage; return $this; } - /** - * @return Consumer - */ - public function getConsumer() + public function getConsumer(): Consumer { return $this->consumer; } - /** - * @param Consumer $consumer - * - * @return AMQPEvent - */ - public function setConsumer(Consumer $consumer) + public function setConsumer(Consumer $consumer): AMQPEvent { $this->consumer = $consumer; diff --git a/Event/OnConsumeEvent.php b/Event/OnConsumeEvent.php index 65721c90..728498a0 100644 --- a/Event/OnConsumeEvent.php +++ b/Event/OnConsumeEvent.php @@ -15,8 +15,6 @@ class OnConsumeEvent extends AMQPEvent /** * OnConsumeEvent constructor. - * - * @param Consumer $consumer */ public function __construct(Consumer $consumer) { diff --git a/Event/OnIdleEvent.php b/Event/OnIdleEvent.php index d3106409..4fb4dd1e 100644 --- a/Event/OnIdleEvent.php +++ b/Event/OnIdleEvent.php @@ -30,18 +30,12 @@ public function __construct(Consumer $consumer) $this->forceStop = true; } - /** - * @return boolean - */ - public function isForceStop() + public function isForceStop(): bool { return $this->forceStop; } - /** - * @param boolean $forceStop - */ - public function setForceStop($forceStop) + public function setForceStop(bool $forceStop): void { $this->forceStop = $forceStop; } diff --git a/MemoryChecker/MemoryConsumptionChecker.php b/MemoryChecker/MemoryConsumptionChecker.php index 14d592b8..a3be9bd5 100644 --- a/MemoryChecker/MemoryConsumptionChecker.php +++ b/MemoryChecker/MemoryConsumptionChecker.php @@ -14,8 +14,6 @@ class MemoryConsumptionChecker /** * MemoryManager constructor. - * - * @param NativeMemoryUsageProvider $memoryUsageProvider */ public function __construct(NativeMemoryUsageProvider $memoryUsageProvider) { $this->memoryUsageProvider = $memoryUsageProvider; @@ -24,10 +22,8 @@ public function __construct(NativeMemoryUsageProvider $memoryUsageProvider) { /** * @param int|string $allowedConsumptionUntil * @param int|string $maxConsumptionAllowed - * - * @return bool */ - public function isRamAlmostOverloaded($maxConsumptionAllowed, $allowedConsumptionUntil = 0) + public function isRamAlmostOverloaded($maxConsumptionAllowed, $allowedConsumptionUntil = 0): bool { $allowedConsumptionUntil = $this->convertHumanUnitToNumerical($allowedConsumptionUntil); $maxConsumptionAllowed = $this->convertHumanUnitToNumerical($maxConsumptionAllowed); @@ -38,10 +34,8 @@ public function isRamAlmostOverloaded($maxConsumptionAllowed, $allowedConsumptio /** * @param int|string $humanUnit - * - * @return int */ - private function convertHumanUnitToNumerical($humanUnit) + private function convertHumanUnitToNumerical($humanUnit): int { $numerical = $humanUnit; if (!is_numeric($humanUnit)) { diff --git a/OldSoundRabbitMqBundle.php b/OldSoundRabbitMqBundle.php index d9725163..8ed4223f 100644 --- a/OldSoundRabbitMqBundle.php +++ b/OldSoundRabbitMqBundle.php @@ -9,7 +9,7 @@ class OldSoundRabbitMqBundle extends Bundle { - public function build(ContainerBuilder $container) + public function build(ContainerBuilder $container): void { parent::build($container); @@ -20,7 +20,7 @@ public function build(ContainerBuilder $container) /** * {@inheritDoc} */ - public function shutdown() + public function shutdown(): void { parent::shutdown(); if (!$this->container->hasParameter('old_sound_rabbit_mq.base_amqp')) { diff --git a/Provider/ConnectionParametersProviderInterface.php b/Provider/ConnectionParametersProviderInterface.php index 6dcaf219..4f3f7aec 100644 --- a/Provider/ConnectionParametersProviderInterface.php +++ b/Provider/ConnectionParametersProviderInterface.php @@ -30,8 +30,6 @@ interface ConnectionParametersProviderInterface * * If constructor_args is present, all the other parameters are ignored; constructor_args are passes as constructor * arguments. - * - * @return array */ - public function getConnectionParameters(); + public function getConnectionParameters(): array; } diff --git a/Provider/QueueOptionsProviderInterface.php b/Provider/QueueOptionsProviderInterface.php index e988aadb..4893085b 100644 --- a/Provider/QueueOptionsProviderInterface.php +++ b/Provider/QueueOptionsProviderInterface.php @@ -18,9 +18,6 @@ interface QueueOptionsProviderInterface * 'durable' => true, * 'routing_keys' => array('key.*') * ) - * - * @return array - * */ - public function getQueueOptions($context = null); + public function getQueueOptions(?string $context = null): array; } diff --git a/Provider/QueuesProviderInterface.php b/Provider/QueuesProviderInterface.php index d56504be..32972e2d 100644 --- a/Provider/QueuesProviderInterface.php +++ b/Provider/QueuesProviderInterface.php @@ -26,8 +26,6 @@ interface QueuesProviderInterface * 'callback' => array($callback, 'execute') * ) * ); - * @return array - * */ - public function getQueues(); + public function getQueues(): array; } diff --git a/README.md b/README.md index bd0dbe7c..e4167342 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ The RabbitMqBundle incorporates messaging in your application via [RabbitMQ](http://www.rabbitmq.com/) using the [php-amqplib](http://github.com/php-amqplib/php-amqplib) library. -The bundle implements several messaging patterns as seen on the [Thumper](https://github.com/php-amqplib/Thumper) library. Therefore publishing messages to RabbitMQ from a Symfony controller is as easy as: +The bundle implements several messaging patterns as seen on the [Thumper](https://github.com/php-amqplib/Thumper) library. Therefore, publishing messages to RabbitMQ from a Symfony controller is as easy as: ```php $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png'); @@ -29,7 +29,7 @@ This bundle was presented at [Symfony Live Paris 2011](http://www.symfony-live.c ## Version 2 ## Due to the breaking changes happened caused by Symfony >=4.4, a new tag was released, making the bundle compatible with Symfony >=4.4. -Also it eliminates a lot notices caused by symfony event dispatcher in Symfony 4.3. +Also, it eliminates a lot of notices caused by symfony event dispatcher in Symfony 4.3. ## Installation ## @@ -71,7 +71,7 @@ Require the bundle in your composer.json file: } ``` -Register the extension and the compiler pass: +Register the extension, and the compiler pass: ```php use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension; @@ -140,7 +140,7 @@ old_sound_rabbit_mq: callback: upload_picture_service ``` -Here we configure the connection service and the message endpoints that our application will have. In this example your service container will contain the service `old_sound_rabbit_mq.upload_picture_producer` and `old_sound_rabbit_mq.upload_picture_consumer`. The later expects that there's a service called `upload_picture_service`. +Here we configure the connection service, and the message endpoints that our application will have. In this example your service container will contain the service `old_sound_rabbit_mq.upload_picture_producer` and `old_sound_rabbit_mq.upload_picture_consumer`. The later expects that there's a service called `upload_picture_service`. If you don't specify a connection for the client, the client will look for a connection with the same alias. So for our `upload_picture` the service container will look for an `upload_picture` connection. @@ -181,14 +181,14 @@ queue_options: In a Symfony environment all services are fully bootstrapped for each request, from version >= 4.3 you can declare a service as lazy ([Lazy Services](http://symfony.com/doc/master/components/dependency_injection/lazy_services.html)). -This bundle still doesn't support new Lazy Services feature but you can set `lazy: true` in your connection +This bundle still doesn't support new Lazy Services feature, but you can set `lazy: true` in your connection configuration to avoid unnecessary connections to your message broker in every request. It's extremely recommended to use lazy connections because performance reasons, nevertheless lazy option is disabled by default to avoid possible breaks in applications already using this bundle. ### Import notice - Heartbeats ### -It's a good idea to set the ```read_write_timeout``` to 2x the heartbeat so your socket will be open. If you don't do this, or use a different multiplier, there's a risk the __consumer__ socket will timeout. +It's a good idea to set the ```read_write_timeout``` to 2x the heartbeat, so your socket will be open. If you don't do this, or use a different multiplier, there's a risk the __consumer__ socket will timeout. ### Dynamic Connection Parameters ### @@ -196,7 +196,7 @@ Sometimes your connection information may need to be dynamic. Dynamic connection override parameters programmatically through a service. e.g. In a scenario when the `vhost` parameter of the connection depends on the current tenant of your white-labeled -application and you do not want (or can't) change it's configuration every time. +application, and you do not want (or can't) change its configuration every time. Define a service under `connection_parameters_provider` that implements the `ConnectionParametersProviderInterface`, and add it to the appropriate `connections` configuration. @@ -283,7 +283,7 @@ consumers: callback: upload_picture_service ``` -As we see there, the __callback__ option has a reference to an __upload\_picture\_service__. When the consumer gets a message from the server it will execute such callback. If for testing or debugging purposes you need to specify a different callback, then you can change it there. +As we see there, the __callback__ option has a reference to an __upload\_picture\_service__. When the consumer gets a message from the server, it will execute such a callback. If for testing or debugging purposes you need to specify a different callback, then you can change it there. Apart from the callback we also specify the connection to use, the same way as we do with a __producer__. The remaining options are the __exchange\_options__ and the __queue\_options__. The __exchange\_options__ should be the same ones as those used for the __producer__. In the __queue\_options__ we will provide a __queue name__. Why? @@ -295,7 +295,7 @@ Now, how to run a consumer? There's a command for it that can be executed like t $ ./app/console rabbitmq:consumer -m 50 upload_picture ``` -What does this mean? We are executing the __upload\_picture__ consumer telling it to consume only 50 messages. Every time the consumer receives a message from the server, it will execute the configured callback passing the AMQP message as an instance of the `PhpAmqpLib\Message\AMQPMessage` class. The message body can be obtained by calling `$msg->body`. By default the consumer will process messages in an __endless loop__ for some definition of _endless_. +What does this mean? We are executing the __upload\_picture__ consumer telling it to consume only 50 messages. Every time the consumer receives a message from the server, it will execute the configured callback passing the AMQP message as an instance of the `PhpAmqpLib\Message\AMQPMessage` class. The message body can be obtained by calling `$msg->body`. By default, the consumer will process messages in an __endless loop__ for some definition of _endless_. If you want to be sure that consumer will finish executing instantly on Unix signal, you can run command with flag `-w`. @@ -415,9 +415,9 @@ class OnIdleEvent extends AMQPEvent } ``` -Event raised when `wait` method exit by timeout without receiving a message. +Event raised when `wait` method exit by the timeout without receiving a message. In order to make use of this event a consumer `idle_timeout` has to be [configured](#idle-timeout). -By default process exit on idle timeout, you can prevent it by setting `$event->setForceStop(false)` in a listener. +By default, process exit on an idle timeout, you can prevent it by setting `$event->setForceStop(false)` in a listener. #### Idle timeout #### @@ -455,10 +455,10 @@ consumers: #### Graceful max execution timeout #### If you'd like your consumer to be running up to certain time and then gracefully exit, then set the `graceful_max_execution.timeout` in seconds. -"Gracefully exit" means, that the consumer will exit either after the currently running task or immediatelly, when waiting for new tasks. +"Gracefully exit" means, that the consumer will exit either after the currently running task or immediately, when waiting for new tasks. The `graceful_max_execution.exit_code` specifies what exit code should be returned by the consumer when the graceful max execution timeout occurs. Without specifying it, the consumer will exit with status `0`. -This feature is great in conjuction with supervisord, which together can allow for periodical memory leaks cleanup, connection with database/rabbitmq renewal and more. +This feature is great in conjunction with supervisord, which together can allow for periodical memory leaks cleanup, connection with a database/rabbitmq renewal and more. ```yaml consumers: @@ -483,7 +483,7 @@ consumers: From: http://www.rabbitmq.com/tutorials/tutorial-two-python.html -Be careful as implementing the fair dispatching introduce a latency that will hurt performance (see [this blogpost](http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/)). But implemeting it allow you to scale horizontally dynamically as the queue is increasing. +Be careful as implementing the fair dispatching introduce a latency that will hurt performance (see [this blogpost](http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/)). But, implementing it allow you to scale horizontally dynamically as the queue is increasing. You should evaluate, as the blogpost recommends, the right value of prefetch_size accordingly with the time taken to process each message and your network performance. With RabbitMqBundle, you can configure that qos_options per consumer like that: @@ -523,7 +523,7 @@ class UploadPictureConsumer implements ConsumerInterface if (!$isUploadSuccess) { // If your image upload failed due to a temporary error you can return false // from your callback so the message will be rejected by the consumer and - // requeued by RabbitMQ. + // requeue by RabbitMQ. // Any other value not equal to false will acknowledge the message and remove it // from the queue return false; @@ -547,7 +547,7 @@ This seems to be quite a lot of work for just sending messages, let's recap to h - Start the consumer from the CLI. - Add the code to publish messages inside the controller. -And that's it! +And, that's it! ### Audit / Logging ### @@ -599,7 +599,7 @@ First we have to start the server from the command line: $ ./app/console_dev rabbitmq:rpc-server random_int ``` -And then add the following code to our controller: +And then, add the following code to our controller: ```php public function indexAction($name) @@ -620,7 +620,7 @@ The arguments we are sending are the __min__ and __max__ values for the `rand()` The final piece is to get the reply. Our PHP script will block till the server returns a value. The __$replies__ variable will be an associative array where each reply from the server will contained in the respective __request\_id__ key. -By default the RPC Client expects the response to be serialized. If the server you are working with returns a non-serialized result then set the RPC client expect_serialized_response option to false. For example, if the integer_store server didn't serialize the result the client would be set as below: +By default, the RPC Client expects the response to be serialized. If the server you are working with returns a non-serialized result then set the RPC client expect_serialized_response option to false. For example, if the integer_store server didn't serialize the result the client would be set as below: ```yaml rpc_clients: @@ -629,7 +629,7 @@ rpc_clients: expect_serialized_response: false ``` -You can also set a expiration for request in seconds, after which message will no longer be handled by server and client request will simply time out. Setting expiration for messages works only for RabbitMQ 3.x and above. Visit http://www.rabbitmq.com/ttl.html#per-message-ttl for more information. +You can also set an expiration for request in seconds, after which message will no longer be handled by a server and client request will simply time out. Setting expiration for messages works only for RabbitMQ 3.x and above. Visit http://www.rabbitmq.com/ttl.html#per-message-ttl for more information. ```php public function indexAction($name) @@ -650,7 +650,7 @@ As you can guess, we can also make __parallel RPC calls__. ### Parallel RPC ### -Let's say that for rendering some webpage, you need to perform two database queries, one taking 5 seconds to complete and the other one taking 2 seconds –very expensive queries–. If you execute them sequentially, then your page will be ready to deliver in about 7 seconds. If you run them in parallel then you will have your page served in about 5 seconds. With RabbitMqBundle we can do such parallel calls with ease. Let's define a parallel client in the config and another RPC server: +Let's say that for rendering some webpage, you need to perform two database queries, one taking 5 seconds to complete, and the other one taking 2 seconds –very expensive queries–. If you execute them sequentially, then your page will be ready to deliver in about 7 seconds. If you run them in parallel then you will have your page served in about 5 seconds. With RabbitMqBundle we can do such parallel calls with ease. Let's define a parallel client in the config and another RPC server: ```yaml rpc_clients: @@ -677,7 +677,7 @@ public function indexAction($name) } ``` -Is very similar to the previous example, we just have an extra `addRequest` call. Also we provide meaningful request identifiers so later will be easier for us to find the reply we want in the __$replies__ array. +Is very similar to the previous example, we just have an extra `addRequest` call. Also, we provide meaningful request identifiers so later will be easier for us to find the reply we want in the __$replies__ array. ### Direct Reply-To clients ### @@ -687,7 +687,7 @@ This option will use pseudo-queue __amq.rabbitmq.reply-to__ when doing RPC calls ### Multiple Consumers ### -It's a good practice to have a lot of queues for logic separation. With a simple consumer you will have to create one worker (consumer) per queue and it can be hard to manage when dealing +It's a good practice to have a lot of queues for logic separation. With a simple consumer you will have to create one worker (consumer) per queue, and it can be hard to manage when dealing with many evolutions (forget to add a line in your supervisord configuration?). This is also useful for small queues as you may not want to have as many workers as queues, and want to regroup some tasks together without losing flexibility and separation principle. @@ -731,13 +731,13 @@ Be aware that queues providers are responsible for the proper calls to `setDeque ### Arbitrary Bindings ### -You may find that your application has a complex workflow and you you need to have arbitrary binding. Arbitrary +You may find that your application has a complex workflow, and you need to have arbitrary binding. Arbitrary binding scenarios might include exchange to exchange bindings via `destination_is_exchange` property. ```yaml bindings: - {exchange: foo, destination: bar, routing_key: 'baz.*' } - - {exchange: foo1, destination: foo, routing_key: 'baz.*' destination_is_exchange: true} + - {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true} ``` The rabbitmq:setup-fabric command will declare exchanges and queues as defined in your producer, consumer @@ -749,7 +749,7 @@ and multi consumer configurations before it creates your arbitrary bindings. How Sometimes you have to change the consumer's configuration on the fly. Dynamic consumers allow you to define the consumers queue options programmatically, based on the context. -e.g. In a scenario when the defined consumer must be responsible for a dynamic number of topics and you do not want (or can't) change it's configuration every time. +e.g. In a scenario when the defined consumer must be responsible for a dynamic number of topics, and you do not want (or can't) change it's configuration every time. Define a service `queue_options_provider` that implements the `QueueOptionsProviderInterface`, and add it to your `dynamic_consumers` configuration. @@ -774,7 +774,7 @@ In this case the `proc_logs` consumer runs for `server1` and it can decide over Now, why will we ever need anonymous consumers? This sounds like some internet threat or something… Keep reading. -In AMQP there's a type of exchange called __topic__ where the messages are routed to queues based on –you guess– the topic of the message. We can send logs about our application to a RabbiMQ topic exchange using as topic the hostname where the log was created and the severity of such log. The message body will be the log content and our routing keys the will be like this: +In AMQP there's a type of exchange called __topic__ where the messages are routed to the queues based on –you guess– the topic of the message. We can send logs about our application to a RabbiMQ topic exchange using as topic the hostname where the log was created and the severity of such log. The message body will be the log content and our routing keys the will be like this: - server1.error - server2.info @@ -783,7 +783,7 @@ In AMQP there's a type of exchange called __topic__ where the messages are route Since we don't want to be filling up queues with unlimited logs what we can do is that when we want to monitor the system, we can launch a consumer that creates a queue and attaches to the __logs__ exchange based on some topic, for example, we would like to see all the errors reported by our servers. The routing key will be something like: __\#.error__. In such case we have to come up with a queue name, bind it to the exchange, get the logs, unbind it and delete the queue. Happily AMPQ provides a way to do this automatically if you provide the right options when you declare and bind the queue. The problem is that you don't want to remember all those options. For such reason we implemented the __Anonymous Consumer__ pattern. -When we start an Anonymous Consumer, it will take care of such details and we just have to think about implementing the callback for when the messages arrive. Is it called Anonymous because it won't specify a queue name, but it will wait for RabbitMQ to assign a random one to it. +When we start an Anonymous Consumer, it will take care of such details, and we just have to think about implementing the callback for when the messages arrive. Is it called Anonymous because it won't specify a queue name, but it will wait for RabbitMQ to assign a random one to it. Now, how to configure and run such consumer? @@ -795,7 +795,7 @@ anon_consumers: callback: logs_watcher ``` -There we specify the exchange name and it's type along with the callback that should be executed when a message arrives. +There we specify the exchange name, and it's type along with the callback that should be executed when a message arrives. This Anonymous Consumer is now able to listen to Producers, which are linked to the same exchange and of type _topic_: @@ -838,9 +838,9 @@ batch_consumers: timeout: 60 ``` -*Note*: If the `keep_alive` option is set to `true`, `idle_timeout_exit_code` will be ignored and the consumer process continues. +*Note*: If the `keep_alive` option is set to `true`, `idle_timeout_exit_code` will be ignored, and the consumer process continues. -You can implement a batch consumer that will acknowledge all messages in one return or you can have control on what message to acknoledge. +You can implement a batch consumer that will acknowledge all messages in one return, or you can have control on what message to acknowledge. ```php namespace AppBundle\Service; @@ -927,9 +927,9 @@ producers: exchange_options: {name: 'words', type: direct} ``` -That producer will publish messages to the `words` direct exchange. Of course you can adapt the configuration to whatever you like. +That producer will publish messages to the `words` direct exchange. Of course, you can adapt the configuration to whatever you like. -Then let's say you want to publish the contents of some XML files so they are processed by a farm of consumers. You could publish them by just using a command like this: +Then let's say you want to publish the contents of some XML files, so they are processed by a farm of consumers. You could publish them by just using a command like this: ```bash $ find vendor/symfony/ -name "*.xml" -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words @@ -984,8 +984,8 @@ consumers: auto_setup_fabric: false ``` -By default a consumer or producer will declare everything it needs with RabbitMQ when it starts. -Be careful using this, when exchanges or queues are not defined, there will be errors. When you've changed any configuration you need to run the above setup-fabric command to declare your configuration. +By default, a consumer or producer will declare everything it needs with RabbitMQ when it starts. +Be careful using this, when exchanges or queues are not defined, there will be errors. After you've changed any configuration, you need to run the above setup-fabric command to declare your configuration. ## How To Contribute ## diff --git a/RabbitMq/AMQPConnectionFactory.php b/RabbitMq/AMQPConnectionFactory.php index 3a2b5439..bb916a31 100644 --- a/RabbitMq/AMQPConnectionFactory.php +++ b/RabbitMq/AMQPConnectionFactory.php @@ -32,11 +32,11 @@ class AMQPConnectionFactory * @param string $class FQCN of AMQPConnection class to instantiate. * @param array $parameters Map containing parameters resolved by * Extension. - * @param ConnectionParametersProviderInterface $parametersProvider Optional service providing/overriding + * @param ConnectionParametersProviderInterface|null $parametersProvider Optional service providing/overriding * connection parameters. */ public function __construct( - $class, + string $class, array $parameters, ConnectionParametersProviderInterface $parametersProvider = null ) { @@ -105,12 +105,8 @@ public function createConnection() /** * Parses connection parameters from URL parameter. - * - * @param array $parameters - * - * @return array */ - private function parseUrl(array $parameters) + private function parseUrl(array $parameters): array { if (!$parameters['url']) { return $parameters; diff --git a/RabbitMq/AMQPLoggedChannel.php b/RabbitMq/AMQPLoggedChannel.php index bdc4e2ce..268f09d4 100644 --- a/RabbitMq/AMQPLoggedChannel.php +++ b/RabbitMq/AMQPLoggedChannel.php @@ -11,9 +11,10 @@ */ class AMQPLoggedChannel extends AMQPChannel { + /** @var array */ private $basicPublishLog = array(); - public function basic_publish($msg, $exchange = '', $routingKey = '', $mandatory = false, $immediate = false, $ticket = NULL) + public function basic_publish($msg, $exchange = '', $routingKey = '', $mandatory = false, $immediate = false, $ticket = NULL): void { $this->basicPublishLog[] = array( 'msg' => $msg, @@ -27,7 +28,7 @@ public function basic_publish($msg, $exchange = '', $routingKey = '', $mandatory parent::basic_publish($msg, $exchange, $routingKey, $mandatory, $immediate, $ticket); } - public function getBasicPublishLog() + public function getBasicPublishLog(): array { return $this->basicPublishLog; } diff --git a/RabbitMq/AmqpPartsHolder.php b/RabbitMq/AmqpPartsHolder.php index d3e92b93..23ac34f6 100644 --- a/RabbitMq/AmqpPartsHolder.php +++ b/RabbitMq/AmqpPartsHolder.php @@ -4,6 +4,7 @@ class AmqpPartsHolder { + /** @var array */ protected $parts; public function __construct() @@ -11,12 +12,12 @@ public function __construct() $this->parts = array(); } - public function addPart($type, BaseAmqp $part) + public function addPart(string $type, BaseAmqp $part): void { $this->parts[$type][] = $part; } - public function getParts($type) + public function getParts(string $type): array { $type = (string) $type; return isset($this->parts[$type]) ? $this->parts[$type] : array(); diff --git a/RabbitMq/BaseAmqp.php b/RabbitMq/BaseAmqp.php index f0f0a1de..49e13fc8 100644 --- a/RabbitMq/BaseAmqp.php +++ b/RabbitMq/BaseAmqp.php @@ -12,13 +12,21 @@ abstract class BaseAmqp { + /** @var AbstractConnection */ protected $conn; + /** @var AMQPChannel|null */ protected $ch; + /** @var string|null */ protected $consumerTag; + /** @var bool */ protected $exchangeDeclared = false; + /** @var bool */ protected $queueDeclared = false; + /** @var string */ protected $routingKey = ''; + /** @var bool */ protected $autoSetupFabric = true; + /** @var array */ protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2); /** @@ -26,6 +34,7 @@ abstract class BaseAmqp */ protected $logger; + /** @var array */ protected $exchangeOptions = array( 'passive' => false, 'durable' => true, @@ -37,6 +46,7 @@ abstract class BaseAmqp 'declare' => true, ); + /** @var array */ protected $queueOptions = array( 'name' => '', 'passive' => false, @@ -78,7 +88,7 @@ public function __destruct() $this->close(); } - public function close() + public function close(): void { if ($this->ch) { try { @@ -88,7 +98,7 @@ public function close() } } - if ($this->conn && $this->conn->isConnected()) { + if (!empty($this->conn) && $this->conn->isConnected()) { try { $this->conn->close(); } catch (\Exception $e) { @@ -97,7 +107,7 @@ public function close() } } - public function reconnect() + public function reconnect(): void { if (!$this->conn->isConnected()) { return; @@ -106,34 +116,24 @@ public function reconnect() $this->conn->reconnect(); } - /** - * @return AMQPChannel - */ - public function getChannel() + public function getChannel(): ?AMQPChannel { - if (empty($this->ch) || null === $this->ch->getChannelId()) { + if (empty($this->ch) || empty($this->ch->getChannelId())) { $this->ch = $this->conn->channel(); } return $this->ch; } - /** - * @param AMQPChannel $ch - * - * @return void - */ - public function setChannel(AMQPChannel $ch) + public function setChannel(AMQPChannel $ch): void { $this->ch = $ch; } /** * @throws \InvalidArgumentException - * @param array $options - * @return void */ - public function setExchangeOptions(array $options = array()) + public function setExchangeOptions(array $options = array()): void { if (!isset($options['name'])) { throw new \InvalidArgumentException('You must provide an exchange name'); @@ -146,25 +146,17 @@ public function setExchangeOptions(array $options = array()) $this->exchangeOptions = array_merge($this->exchangeOptions, $options); } - /** - * @param array $options - * @return void - */ - public function setQueueOptions(array $options = array()) + public function setQueueOptions(array $options = array()): void { $this->queueOptions = array_merge($this->queueOptions, $options); } - /** - * @param string $routingKey - * @return void - */ - public function setRoutingKey($routingKey) + public function setRoutingKey(string $routingKey): void { $this->routingKey = $routingKey; } - public function setupFabric() + public function setupFabric(): void { if (!$this->exchangeDeclared) { $this->exchangeDeclare(); @@ -178,15 +170,12 @@ public function setupFabric() /** * disables the automatic SetupFabric when using a consumer or producer */ - public function disableAutoSetupFabric() + public function disableAutoSetupFabric(): void { $this->autoSetupFabric = false; } - /** - * @param LoggerInterface $logger - */ - public function setLogger($logger) + public function setLogger(LoggerInterface $logger): void { $this->logger = $logger; } @@ -194,7 +183,7 @@ public function setLogger($logger) /** * Declares exchange */ - protected function exchangeDeclare() + protected function exchangeDeclare(): void { if ($this->exchangeOptions['declare']) { $this->getChannel()->exchange_declare( @@ -215,7 +204,7 @@ protected function exchangeDeclare() /** * Declares queue, creates if needed */ - protected function queueDeclare() + protected function queueDeclare(): void { if ($this->queueOptions['declare']) { list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'], @@ -237,12 +226,8 @@ protected function queueDeclare() /** * Binds queue to an exchange - * - * @param string $queue - * @param string $exchange - * @param string $routing_key */ - protected function queueBind($queue, $exchange, $routing_key, array $arguments = array()) + protected function queueBind(string $queue, string $exchange, string $routing_key, array $arguments = array()): void { // queue binding is not permitted on the default exchange if ('' !== $exchange) { @@ -250,23 +235,14 @@ protected function queueBind($queue, $exchange, $routing_key, array $arguments = } } - /** - * @param EventDispatcherInterface $eventDispatcher - * - * @return BaseAmqp - */ - public function setEventDispatcher(EventDispatcherInterface $eventDispatcher) + public function setEventDispatcher(EventDispatcherInterface $eventDispatcher): BaseAmqp { $this->eventDispatcher = $eventDispatcher; return $this; } - /** - * @param string $eventName - * @param AMQPEvent $event - */ - protected function dispatchEvent($eventName, AMQPEvent $event) + protected function dispatchEvent(string $eventName, AMQPEvent $event): void { if ($this->getEventDispatcher() instanceof ContractsEventDispatcherInterface) { $this->getEventDispatcher()->dispatch( @@ -276,10 +252,7 @@ protected function dispatchEvent($eventName, AMQPEvent $event) } } - /** - * @return EventDispatcherInterface|null - */ - public function getEventDispatcher() + public function getEventDispatcher(): ?EventDispatcherInterface { return $this->eventDispatcher; } diff --git a/RabbitMq/BaseConsumer.php b/RabbitMq/BaseConsumer.php index cbd6e627..c5364888 100644 --- a/RabbitMq/BaseConsumer.php +++ b/RabbitMq/BaseConsumer.php @@ -24,7 +24,10 @@ abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface /** @var int */ protected $idleTimeoutExitCode; - public function setCallback($callback) + /** + * @param callable $callback + */ + public function setCallback($callback): void { $this->callback = $callback; } @@ -38,10 +41,9 @@ public function getCallback() } /** - * @param int $msgAmount * @throws \ErrorException */ - public function start($msgAmount = 0) + public function start (int $msgAmount = 0): void { $this->target = $msgAmount; @@ -57,13 +59,13 @@ public function start($msgAmount = 0) * * It will finish up the last message and not send you any more. */ - public function stopConsuming() + public function stopConsuming(): void { // This gets stuck and will not exit without the last two parameters set. $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true); } - protected function setupConsumer() + public function setupConsumer(): void { if ($this->autoSetupFabric) { $this->setupFabric(); @@ -71,12 +73,12 @@ protected function setupConsumer() $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage')); } - public function processMessage(AMQPMessage $msg) + public function processMessage(AMQPMessage $msg): void { //To be implemented by descendant classes } - protected function maybeStopConsumer() + protected function maybeStopConsumer(): void { if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) { if (!function_exists('pcntl_signal_dispatch')) { @@ -91,17 +93,17 @@ protected function maybeStopConsumer() } } - public function setConsumerTag($tag) + public function setConsumerTag(string $tag): void { $this->consumerTag = $tag; } - public function getConsumerTag() + public function getConsumerTag(): ?string { return $this->consumerTag; } - public function forceStopConsumer() + public function forceStopConsumer(): void { $this->forceStop = true; } @@ -109,42 +111,34 @@ public function forceStopConsumer() /** * Sets the qos settings for the current channel * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0 - * - * @param int $prefetchSize - * @param int $prefetchCount - * @param bool $global */ - public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false) + public function setQosOptions(int $prefetchSize = 0, int $prefetchCount = 0, bool $global = false): void { $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global); } - public function setIdleTimeout($idleTimeout) + public function setIdleTimeout(int $idleTimeout): void { $this->idleTimeout = $idleTimeout; } /** * Set exit code to be returned when there is a timeout exception - * - * @param int|null $idleTimeoutExitCode */ - public function setIdleTimeoutExitCode($idleTimeoutExitCode) + public function setIdleTimeoutExitCode(?int $idleTimeoutExitCode = null): void { $this->idleTimeoutExitCode = $idleTimeoutExitCode; } - public function getIdleTimeout() + public function getIdleTimeout(): int { return $this->idleTimeout; } /** * Get exit code to be returned when there is a timeout exception - * - * @return int|null */ - public function getIdleTimeoutExitCode() + public function getIdleTimeoutExitCode(): ?int { return $this->idleTimeoutExitCode; } @@ -153,7 +147,7 @@ public function getIdleTimeoutExitCode() * Resets the consumed property. * Use when you want to call start() or consume() multiple times. */ - public function resetConsumed() + public function resetConsumed(): void { $this->consumed = 0; } diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 4cd63567..f6ca7a12 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -81,32 +81,30 @@ class BatchConsumer extends BaseAmqp implements DequeuerInterface /** * @param \DateTime|null $dateTime */ - public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null) + public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null): void { $this->gracefulMaxExecutionDateTime = $dateTime; } - /** - * @param int $secondsInTheFuture - */ - public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture) + public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(int $secondsInTheFuture): void { $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds")); } /** * @param \Closure|callable $callback - * - * @return $this */ - public function setCallback($callback) + public function setCallback($callback): BatchConsumer { $this->callback = $callback; return $this; } - public function consume(int $batchAmountTarget = 0) + /** + * @throws \ErrorException + */ + public function consume(int $batchAmountTarget = 0): ?int { $this->batchAmountTarget = $batchAmountTarget; @@ -141,7 +139,10 @@ public function consume(int $batchAmountTarget = 0) return 0; } - private function batchConsume() + /** + * @throws \Exception + */ + private function batchConsume(): void { try { $processFlags = call_user_func($this->callback, $this->messages); @@ -191,10 +192,8 @@ private function batchConsume() /** * @param mixed $processFlags - * - * @return void */ - protected function handleProcessMessages($processFlags = null) + protected function handleProcessMessages($processFlags = null): void { $processFlags = $this->analyzeProcessFlags($processFlags); foreach ($processFlags as $deliveryTag => $processFlag) { @@ -208,7 +207,7 @@ protected function handleProcessMessages($processFlags = null) * * @return void */ - private function handleProcessFlag($deliveryTag, $processFlag) + private function handleProcessFlag($deliveryTag, $processFlag): void { if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) { // Reject and requeue message to RabbitMQ @@ -226,31 +225,21 @@ private function handleProcessFlag($deliveryTag, $processFlag) } - /** - * @return bool - */ - protected function isCompleteBatch() + protected function isCompleteBatch(): bool { return $this->batchCounter === $this->prefetchCount; } - /** - * @return bool - */ - protected function isEmptyBatch() + protected function isEmptyBatch(): bool { return $this->batchCounter === 0; } /** - * @param AMQPMessage $msg - * - * @return void - * * @throws \Error * @throws \Exception */ - public function processMessage(AMQPMessage $msg) + public function processMessage(AMQPMessage $msg): void { $this->addMessage($msg); @@ -259,10 +248,8 @@ public function processMessage(AMQPMessage $msg) /** * @param mixed $processFlags - * - * @return array */ - private function analyzeProcessFlags($processFlags = null) + private function analyzeProcessFlags($processFlags = null): array { if (is_array($processFlags)) { if (count($processFlags) !== $this->batchCounter) { @@ -283,32 +270,19 @@ private function analyzeProcessFlags($processFlags = null) } - /** - * @return void - */ - private function resetBatch() + private function resetBatch(): void { $this->messages = array(); $this->batchCounter = 0; } - /** - * @param AMQPMessage $message - * - * @return void - */ - private function addMessage(AMQPMessage $message) + private function addMessage(AMQPMessage $message): void { $this->batchCounter++; $this->messages[(int)$message->delivery_info['delivery_tag']] = $message; } - /** - * @param int $deliveryTag - * - * @return AMQPMessage|null - */ - private function getMessage($deliveryTag) + private function getMessage(int $deliveryTag): ?AMQPMessage { return isset($this->messages[$deliveryTag]) ? $this->messages[$deliveryTag] @@ -317,13 +291,9 @@ private function getMessage($deliveryTag) } /** - * @param int $deliveryTag - * - * @return AMQPChannel - * * @throws AMQPRuntimeException */ - private function getMessageChannel($deliveryTag) + private function getMessageChannel(int $deliveryTag): AMQPChannel { $message = $this->getMessage($deliveryTag); if ($message === null) { @@ -333,10 +303,7 @@ private function getMessageChannel($deliveryTag) return $message->delivery_info['channel']; } - /** - * @return void - */ - public function stopConsuming() + public function stopConsuming(): void { if (!$this->isEmptyBatch()) { $this->batchConsume(); @@ -345,10 +312,7 @@ public function stopConsuming() $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true); } - /** - * @return void - */ - protected function setupConsumer() + protected function setupConsumer(): void { if ($this->autoSetupFabric) { $this->setupFabric(); @@ -358,11 +322,9 @@ protected function setupConsumer() } /** - * @return void - * * @throws \BadFunctionCallException */ - protected function maybeStopConsumer() + protected function maybeStopConsumer(): void { if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) { if (!function_exists('pcntl_signal_dispatch')) { @@ -381,30 +343,19 @@ protected function maybeStopConsumer() } } - /** - * @param string $tag - * - * @return $this - */ - public function setConsumerTag($tag) + public function setConsumerTag(string $tag): BatchConsumer { $this->consumerTag = $tag; return $this; } - /** - * @return string - */ - public function getConsumerTag() + public function getConsumerTag(): string { return $this->consumerTag; } - /** - * @return void - */ - public function forceStopConsumer() + public function forceStopConsumer(): void { $this->forceStop = true; } @@ -412,23 +363,14 @@ public function forceStopConsumer() /** * Sets the qos settings for the current channel * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0 - * - * @param int $prefetchSize - * @param int $prefetchCount - * @param bool $global */ - public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false) + public function setQosOptions(int $prefetchSize = 0, int $prefetchCount = 0, bool $global = false): void { $this->prefetchCount = $prefetchCount; $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global); } - /** - * @param int $idleTimeout - * - * @return $this - */ - public function setIdleTimeout($idleTimeout) + public function setIdleTimeout(int $idleTimeout): BatchConsumer { $this->idleTimeout = $idleTimeout; @@ -437,12 +379,8 @@ public function setIdleTimeout($idleTimeout) /** * Set exit code to be returned when there is a timeout exception - * - * @param int $idleTimeoutExitCode - * - * @return $this */ - public function setIdleTimeoutExitCode($idleTimeoutExitCode) + public function setIdleTimeoutExitCode(int $idleTimeoutExitCode): BatchConsumer { $this->idleTimeoutExitCode = $idleTimeoutExitCode; @@ -451,10 +389,8 @@ public function setIdleTimeoutExitCode($idleTimeoutExitCode) /** * keepAlive - * - * @return $this */ - public function keepAlive() + public function keepAlive(): BatchConsumer { $this->keepAlive = true; @@ -464,7 +400,7 @@ public function keepAlive() /** * Purge the queue */ - public function purge() + public function purge(): void { $this->getChannel()->queue_purge($this->queueOptions['name'], true); } @@ -472,35 +408,28 @@ public function purge() /** * Delete the queue */ - public function delete() + public function delete(): void { $this->getChannel()->queue_delete($this->queueOptions['name'], true); } /** * Checks if memory in use is greater or equal than memory allowed for this process - * - * @return boolean */ - protected function isRamAlmostOverloaded() + protected function isRamAlmostOverloaded(): bool { return (memory_get_usage(true) >= ($this->getMemoryLimit() * 1048576)); } - /** - * @return int - */ - public function getIdleTimeout() + public function getIdleTimeout(): int { return $this->idleTimeout; } /** * Get exit code to be returned when there is a timeout exception - * - * @return int|null */ - public function getIdleTimeoutExitCode() + public function getIdleTimeoutExitCode(): ?int { return $this->idleTimeoutExitCode; } @@ -509,77 +438,55 @@ public function getIdleTimeoutExitCode() * Resets the consumed property. * Use when you want to call start() or consume() multiple times. */ - public function resetConsumed() + public function resetConsumed(): void { $this->consumed = 0; } - /** - * @param int $timeout - * - * @return $this - */ - public function setTimeoutWait($timeout) + public function setTimeoutWait(int $timeout): BatchConsumer { $this->timeoutWait = $timeout; return $this; } - /** - * @param int $amount - * - * @return $this - */ - public function setPrefetchCount($amount) + public function setPrefetchCount(int $amount): BatchConsumer { $this->prefetchCount = $amount; return $this; } - /** - * @return int - */ - public function getTimeoutWait() + public function getTimeoutWait(): int { return $this->timeoutWait; } - /** - * @return int - */ - public function getPrefetchCount() + public function getPrefetchCount(): int { return $this->prefetchCount; } /** * Set the memory limit - * - * @param int $memoryLimit */ - public function setMemoryLimit($memoryLimit) + public function setMemoryLimit(int $memoryLimit): void { $this->memoryLimit = $memoryLimit; } /** * Get the memory limit - * - * @return int */ - public function getMemoryLimit() + public function getMemoryLimit(): int { return $this->memoryLimit; } /** * Check graceful max execution date time and stop if limit is reached - * - * @return void */ - private function checkGracefulMaxExecutionDateTime() + private function checkGracefulMaxExecutionDateTime(): void { if (!$this->gracefulMaxExecutionDateTime) { return; diff --git a/RabbitMq/Binding.php b/RabbitMq/Binding.php index 7b44e3b7..0a470958 100644 --- a/RabbitMq/Binding.php +++ b/RabbitMq/Binding.php @@ -34,98 +34,62 @@ class Binding extends BaseAmqp */ protected $arguments; - /** - * @return string - */ - public function getExchange() + public function getExchange(): string { return $this->exchange; } - /** - * @param string $exchange - */ - public function setExchange($exchange) + public function setExchange(string $exchange): void { $this->exchange = $exchange; } - /** - * @return string - */ - public function getDestination() + public function getDestination(): string { return $this->destination; } - /** - * @param string $destination - */ - public function setDestination($destination) + public function setDestination(string $destination): void { $this->destination = $destination; } - /** - * @return bool - */ - public function getDestinationIsExchange() + public function getDestinationIsExchange(): bool { return $this->destinationIsExchange; } - /** - * @param bool $destinationIsExchange - */ - public function setDestinationIsExchange($destinationIsExchange) + public function setDestinationIsExchange(bool $destinationIsExchange): void { $this->destinationIsExchange = $destinationIsExchange; } - /** - * @return string - */ - public function getRoutingKey() + public function getRoutingKey(): string { return $this->routingKey; } - /** - * @param string $routingKey - */ - public function setRoutingKey($routingKey) + public function setRoutingKey(string $routingKey): void { $this->routingKey = $routingKey; } - /** - * @return boolean - */ - public function isNowait() + public function isNowait(): bool { return $this->nowait; } - /** - * @param boolean $nowait - */ - public function setNowait($nowait) + public function setNowait(bool $nowait): void { $this->nowait = $nowait; } - /** - * @return array - */ - public function getArguments() + public function getArguments(): array { return $this->arguments; } - /** - * @param array $arguments - */ - public function setArguments($arguments) + public function setArguments(array $arguments): void { $this->arguments = $arguments; } @@ -133,10 +97,8 @@ public function setArguments($arguments) /** * create bindings - * - * @return void */ - public function setupFabric() + public function setupFabric(): void { $method = ($this->destinationIsExchange) ? 'exchange_bind' : 'queue_bind'; $channel = $this->getChannel(); diff --git a/RabbitMq/Consumer.php b/RabbitMq/Consumer.php index 11b16cc6..7b75e13a 100644 --- a/RabbitMq/Consumer.php +++ b/RabbitMq/Consumer.php @@ -10,6 +10,8 @@ use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider; use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; +use phpDocumentor\Reflection\Types\Callable_; +use PHPUnit\Framework\Constraint\Callback; class Consumer extends BaseConsumer { @@ -41,20 +43,16 @@ class Consumer extends BaseConsumer /** * Set the memory limit - * - * @param int $memoryLimit */ - public function setMemoryLimit($memoryLimit) + public function setMemoryLimit(int $memoryLimit): void { $this->memoryLimit = $memoryLimit; } /** * Get the memory limit - * - * @return int|null */ - public function getMemoryLimit() + public function getMemoryLimit(): ?int { return $this->memoryLimit; } @@ -62,13 +60,9 @@ public function getMemoryLimit() /** * Consume the message * - * @param int $msgAmount - * - * @return int - * * @throws AMQPTimeoutException */ - public function consume($msgAmount) + public function consume(int $msgAmount): int { $this->target = $msgAmount; @@ -125,7 +119,7 @@ public function consume($msgAmount) /** * Purge the queue */ - public function purge() + public function purge(): void { $this->getChannel()->queue_purge($this->queueOptions['name'], true); } @@ -133,12 +127,16 @@ public function purge() /** * Delete the queue */ - public function delete() + public function delete(): void { $this->getChannel()->queue_delete($this->queueOptions['name'], true); } - protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback) + /** + * @param mixed $callback + * @throws \Exception + */ + protected function processMessageQueueCallback(AMQPMessage $msg, string $queueName, $callback): void { $this->dispatchEvent(BeforeProcessingMessageEvent::NAME, new BeforeProcessingMessageEvent($this, $msg) @@ -188,12 +186,15 @@ protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $ca } } - public function processMessage(AMQPMessage $msg) + public function processMessage(AMQPMessage $msg): void { $this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback); } - protected function handleProcessMessage(AMQPMessage $msg, $processFlag) + /** + * @param int|bool $processFlag + */ + protected function handleProcessMessage(AMQPMessage $msg, $processFlag): void { if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) { // Reject and requeue message to RabbitMQ @@ -219,36 +220,25 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag) /** * Checks if memory in use is greater or equal than memory allowed for this process - * - * @return boolean */ - protected function isRamAlmostOverloaded() + protected function isRamAlmostOverloaded(): bool { $memoryManager = new MemoryConsumptionChecker(new NativeMemoryUsageProvider()); return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit().'M', '5M'); } - /** - * @param \DateTime|null $dateTime - */ - public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null) + public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null): void { $this->gracefulMaxExecutionDateTime = $dateTime; } - /** - * @param int $secondsInTheFuture - */ - public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture) + public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(int $secondsInTheFuture): void { $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds")); } - /** - * @param int $exitCode - */ - public function setGracefulMaxExecutionTimeoutExitCode($exitCode) + public function setGracefulMaxExecutionTimeoutExitCode(int $exitCode): void { $this->gracefulMaxExecutionTimeoutExitCode = $exitCode; } @@ -258,18 +248,12 @@ public function setTimeoutWait(int $timeoutWait): void $this->timeoutWait = $timeoutWait; } - /** - * @return \DateTime|null - */ - public function getGracefulMaxExecutionDateTime() + public function getGracefulMaxExecutionDateTime(): ?\DateTime { return $this->gracefulMaxExecutionDateTime; } - /** - * @return int - */ - public function getGracefulMaxExecutionTimeoutExitCode() + public function getGracefulMaxExecutionTimeoutExitCode(): int { return $this->gracefulMaxExecutionTimeoutExitCode; } @@ -316,7 +300,7 @@ private function chooseWaitTimeout(): int return $waitTimeout; } - public function setLastActivityDateTime(\DateTime $dateTime) + public function setLastActivityDateTime(\DateTime $dateTime): void { $this->lastActivityDateTime = $dateTime; } diff --git a/RabbitMq/DequeuerAwareInterface.php b/RabbitMq/DequeuerAwareInterface.php index 9e6b8e6e..4a4d9f34 100644 --- a/RabbitMq/DequeuerAwareInterface.php +++ b/RabbitMq/DequeuerAwareInterface.php @@ -4,5 +4,8 @@ interface DequeuerAwareInterface { + /** + * @return mixed + */ public function setDequeuer(DequeuerInterface $dequeuer); } diff --git a/RabbitMq/DequeuerInterface.php b/RabbitMq/DequeuerInterface.php index ecac8b5a..7d39f885 100644 --- a/RabbitMq/DequeuerInterface.php +++ b/RabbitMq/DequeuerInterface.php @@ -6,10 +6,8 @@ interface DequeuerInterface { /** * Stop dequeuing messages. - * - * @return void */ - public function forceStopConsumer(); + public function forceStopConsumer(): void; /** * Set idle timeout @@ -18,12 +16,10 @@ public function forceStopConsumer(); * * @return void */ - public function setIdleTimeout($idleTimeout); + public function setIdleTimeout(int $idleTimeout); /** * Get current idle timeout - * - * @return int */ - public function getIdleTimeout(); + public function getIdleTimeout(): int; } diff --git a/RabbitMq/DynamicConsumer.php b/RabbitMq/DynamicConsumer.php index 3383c537..17ec5045 100644 --- a/RabbitMq/DynamicConsumer.php +++ b/RabbitMq/DynamicConsumer.php @@ -27,25 +27,25 @@ class DynamicConsumer extends Consumer{ * * @return self */ - public function setQueueOptionsProvider(QueueOptionsProviderInterface $queueOptionsProvider) + public function setQueueOptionsProvider(QueueOptionsProviderInterface $queueOptionsProvider): self { $this->queueOptionsProvider = $queueOptionsProvider; return $this; } - public function setContext($context) + public function setContext(string $context): void { $this->context = $context; } - protected function setupConsumer() + public function setupConsumer(): void { $this->mergeQueueOptions(); parent::setupConsumer(); } - protected function mergeQueueOptions() + protected function mergeQueueOptions(): void { if (null === $this->queueOptionsProvider) { return; diff --git a/RabbitMq/Exception/AckStopConsumerException.php b/RabbitMq/Exception/AckStopConsumerException.php index eb6b5ade..bfb24b23 100644 --- a/RabbitMq/Exception/AckStopConsumerException.php +++ b/RabbitMq/Exception/AckStopConsumerException.php @@ -7,7 +7,7 @@ class AckStopConsumerException extends StopConsumerException { - public function getHandleCode() + public function getHandleCode(): int { return ConsumerInterface::MSG_ACK; } diff --git a/RabbitMq/Exception/StopConsumerException.php b/RabbitMq/Exception/StopConsumerException.php index e791280d..bd7e3b18 100644 --- a/RabbitMq/Exception/StopConsumerException.php +++ b/RabbitMq/Exception/StopConsumerException.php @@ -12,7 +12,7 @@ */ class StopConsumerException extends \RuntimeException { - public function getHandleCode() + public function getHandleCode(): int { return ConsumerInterface::MSG_SINGLE_NACK_REQUEUE; } diff --git a/RabbitMq/Fallback.php b/RabbitMq/Fallback.php index e946ccaf..0f5ff43e 100644 --- a/RabbitMq/Fallback.php +++ b/RabbitMq/Fallback.php @@ -4,7 +4,7 @@ class Fallback implements ProducerInterface { - public function publish($msgBody, $routingKey = '', $additionalProperties = array()) + public function publish(string $msgBody, string $routingKey = '', array $additionalProperties = array()): bool { return false; } diff --git a/RabbitMq/MultipleConsumer.php b/RabbitMq/MultipleConsumer.php index f3c6dc45..d343e907 100644 --- a/RabbitMq/MultipleConsumer.php +++ b/RabbitMq/MultipleConsumer.php @@ -8,6 +8,7 @@ class MultipleConsumer extends Consumer { + /** @var array */ protected $queues = array(); /** @@ -26,33 +27,29 @@ class MultipleConsumer extends Consumer /** * QueuesProvider setter - * - * @param QueuesProviderInterface $queuesProvider - * - * @return self */ - public function setQueuesProvider(QueuesProviderInterface $queuesProvider) + public function setQueuesProvider(QueuesProviderInterface $queuesProvider): MultipleConsumer { $this->queuesProvider = $queuesProvider; return $this; } - public function getQueueConsumerTag($queue) + public function getQueueConsumerTag(string $queue): string { return sprintf('%s-%s', $this->getConsumerTag(), $queue); } - public function setQueues(array $queues) + public function setQueues(array $queues): void { $this->queues = $queues; } - public function setContext($context) + public function setContext(string $context): void { $this->context = $context; } - protected function setupConsumer() + public function setupConsumer(): void { $this->mergeQueues(); @@ -70,10 +67,11 @@ protected function setupConsumer() } } - protected function queueDeclare() + protected function queueDeclare(): void { foreach ($this->queues as $name => $options) { - list($queueName, ,) = $this->getChannel()->queue_declare($name, $options['passive'], + list($queueName, ,) = $this->getChannel()->queue_declare( + $name, $options['passive'], $options['durable'], $options['exclusive'], $options['auto_delete'], $options['nowait'], $options['arguments'], $options['ticket']); @@ -90,7 +88,10 @@ protected function queueDeclare() $this->queueDeclared = true; } - public function processQueueMessage($queueName, AMQPMessage $msg) + /** + * @throws \Exception + */ + public function processQueueMessage(string $queueName, AMQPMessage $msg): void { if (!isset($this->queues[$queueName])) { throw new QueueNotFoundException(); @@ -99,7 +100,7 @@ public function processQueueMessage($queueName, AMQPMessage $msg) $this->processMessageQueueCallback($msg, $queueName, $this->queues[$queueName]['callback']); } - public function stopConsuming() + public function stopConsuming(): void { foreach ($this->queues as $name => $options) { $this->getChannel()->basic_cancel($this->getQueueConsumerTag($name), false, true); @@ -109,7 +110,7 @@ public function stopConsuming() /** * Merges static and provided queues into one array */ - protected function mergeQueues() + protected function mergeQueues(): void { if ($this->queuesProvider) { $this->queues = array_merge( diff --git a/RabbitMq/Producer.php b/RabbitMq/Producer.php index 446bb611..6b817278 100644 --- a/RabbitMq/Producer.php +++ b/RabbitMq/Producer.php @@ -10,37 +10,34 @@ */ class Producer extends BaseAmqp implements ProducerInterface { + /** @var string */ protected $contentType = 'text/plain'; + /** @var int */ protected $deliveryMode = 2; - public function setContentType($contentType) + public function setContentType(string $contentType): Producer { $this->contentType = $contentType; return $this; } - public function setDeliveryMode($deliveryMode) + public function setDeliveryMode(int $deliveryMode): Producer { $this->deliveryMode = $deliveryMode; return $this; } - protected function getBasicProperties() + protected function getBasicProperties(): array { return array('content_type' => $this->contentType, 'delivery_mode' => $this->deliveryMode); } /** * Publishes the message and merges additional properties with basic properties - * - * @param string $msgBody - * @param string $routingKey - * @param array $additionalProperties - * @param array $headers */ - public function publish($msgBody, $routingKey = '', $additionalProperties = array(), array $headers = null) + public function publish(string $msgBody, string $routingKey = '', array $additionalProperties = array(), array $headers = null): void { if ($this->autoSetupFabric) { $this->setupFabric(); diff --git a/RabbitMq/ProducerInterface.php b/RabbitMq/ProducerInterface.php index b4e166c1..6fc9506f 100644 --- a/RabbitMq/ProducerInterface.php +++ b/RabbitMq/ProducerInterface.php @@ -6,10 +6,7 @@ interface ProducerInterface { /** * Publish a message - * - * @param string $msgBody - * @param string $routingKey - * @param array $additionalProperties + * @return mixed */ - public function publish($msgBody, $routingKey = '', $additionalProperties = array()); + public function publish(string $msgBody, string $routingKey = '', array $additionalProperties = array()); } diff --git a/RabbitMq/RpcClient.php b/RabbitMq/RpcClient.php index 69be1d39..aaa261b5 100644 --- a/RabbitMq/RpcClient.php +++ b/RabbitMq/RpcClient.php @@ -6,23 +6,31 @@ class RpcClient extends BaseAmqp { + /** @var int */ protected $requests = 0; + /** @var array */ protected $replies = array(); + /** @var bool */ protected $expectSerializedResponse; + /** @var int */ protected $timeout = 0; + /** @var string */ protected $notifyCallback; - + /** @var string|null */ private $queueName; + /** @var string */ private $unserializer = 'unserialize'; + /** @var string|null */ private $directReplyTo; + /** @var string|null */ private $directConsumerTag; - public function initClient($expectSerializedResponse = true) + public function initClient(bool $expectSerializedResponse = true): void { $this->expectSerializedResponse = $expectSerializedResponse; } - public function addRequest($msgBody, $server, $requestId = null, $routingKey = '', $expiration = 0) + public function addRequest(string $msgBody, string $server, string $requestId = null, string $routingKey = '', int $expiration = 0): void { if (empty($requestId)) { throw new \InvalidArgumentException('You must provide a $requestId'); @@ -55,7 +63,7 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = ' } } - public function getReplies() + public function getReplies(): array { if ($this->directReplyTo) { $consumer_tag = $this->directConsumerTag; @@ -78,7 +86,7 @@ public function getReplies() return $this->replies; } - public function processMessage(AMQPMessage $msg) + public function processMessage(AMQPMessage $msg): void { $messageBody = $msg->body; if ($this->expectSerializedResponse) { @@ -91,7 +99,7 @@ public function processMessage(AMQPMessage $msg) $this->replies[$msg->get('correlation_id')] = $messageBody; } - protected function getQueueName() + protected function getQueueName(): ?string { if (null === $this->queueName) { list($this->queueName, ,) = $this->getChannel()->queue_declare("", false, false, true, false); @@ -100,12 +108,18 @@ protected function getQueueName() return $this->queueName; } - public function setUnserializer($unserializer) + /** + * @param callable $unserializer + */ + public function setUnserializer($unserializer): void { $this->unserializer = $unserializer; } - public function notify($callback) + /** + * @param mixed $callback + */ + public function notify($callback): void { if (is_callable($callback)) { $this->notifyCallback = $callback; @@ -114,12 +128,12 @@ public function notify($callback) } } - public function setDirectReplyTo($directReplyTo) + public function setDirectReplyTo(string $directReplyTo): void { $this->directReplyTo = $directReplyTo; } - public function reset() + public function reset(): void { $this->replies = array(); $this->requests = 0; diff --git a/RabbitMq/RpcServer.php b/RabbitMq/RpcServer.php index 40ce39fb..b2fb2c71 100644 --- a/RabbitMq/RpcServer.php +++ b/RabbitMq/RpcServer.php @@ -6,21 +6,22 @@ class RpcServer extends BaseConsumer { + /** @var string */ private $serializer = 'serialize'; - public function initServer($name) + public function initServer(string $name): void { $this->setExchangeOptions(array('name' => $name, 'type' => 'direct')); $this->setQueueOptions(array('name' => $name . '-queue')); } - public function processMessage(AMQPMessage $msg) + public function processMessage(AMQPMessage $msg): void { try { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); $result = call_user_func($this->callback, $msg); $result = call_user_func($this->serializer, $result); - $this->sendReply($result, $msg->get('reply_to'), $msg->get('correlation_id')); + $this->sendReply($result ?? '', $msg->get('reply_to') ?? '', $msg->get('correlation_id') ?? ''); $this->consumed++; $this->maybeStopConsumer(); } catch (\Exception $e) { @@ -28,13 +29,16 @@ public function processMessage(AMQPMessage $msg) } } - protected function sendReply($result, $client, $correlationId) + protected function sendReply(string $result, string $client, string $correlationId): void { $reply = new AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId)); $this->getChannel()->basic_publish($reply, '', $client); } - public function setSerializer($serializer) + /** + * @param callable $serializer + */ + public function setSerializer($serializer): void { $this->serializer = $serializer; } diff --git a/Tests/Command/BaseCommandTest.php b/Tests/Command/BaseCommandTest.php index 651c6c7b..a9b96528 100644 --- a/Tests/Command/BaseCommandTest.php +++ b/Tests/Command/BaseCommandTest.php @@ -2,13 +2,22 @@ namespace OldSound\RabbitMqBundle\Tests\Command; +use OldSound\RabbitMqBundle\Command\BaseConsumerCommand; +use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use Symfony\Component\Console\Application; +use Symfony\Component\Console\Helper\HelperSet; +use Symfony\Component\Console\Input\InputDefinition; abstract class BaseCommandTest extends TestCase { + /** @var MockObject | Application */ protected $application; + /** @var MockObject | InputDefinition */ protected $definition; + /** @var MockObject | HelperSet */ protected $helperSet; + /** @var mixed */ protected $command; protected function setUp(): void diff --git a/Tests/Command/ConsumerCommandTest.php b/Tests/Command/ConsumerCommandTest.php index 1161d302..d27daabc 100644 --- a/Tests/Command/ConsumerCommandTest.php +++ b/Tests/Command/ConsumerCommandTest.php @@ -28,7 +28,7 @@ protected function setUp(): void /** * testInputsDefinitionCommand */ - public function testInputsDefinitionCommand() + public function testInputsDefinitionCommand(): void { $definition = $this->command->getDefinition(); // check argument diff --git a/Tests/Command/DynamicConsumerCommandTest.php b/Tests/Command/DynamicConsumerCommandTest.php index e0951f85..cb3dbb3c 100644 --- a/Tests/Command/DynamicConsumerCommandTest.php +++ b/Tests/Command/DynamicConsumerCommandTest.php @@ -29,7 +29,7 @@ protected function setUp(): void /** * testInputsDefinitionCommand */ - public function testInputsDefinitionCommand() + public function testInputsDefinitionCommand(): void { // check argument $definition = $this->command->getDefinition(); diff --git a/Tests/Command/MultipleConsumerCommandTest.php b/Tests/Command/MultipleConsumerCommandTest.php index ccaa2c94..f93ed497 100644 --- a/Tests/Command/MultipleConsumerCommandTest.php +++ b/Tests/Command/MultipleConsumerCommandTest.php @@ -29,7 +29,7 @@ protected function setUp(): void /** * testInputsDefinitionCommand */ - public function testInputsDefinitionCommand() + public function testInputsDefinitionCommand(): void { // check argument $definition = $this->command->getDefinition(); diff --git a/Tests/Command/PurgeCommandTest.php b/Tests/Command/PurgeCommandTest.php index e6d27317..6eb637bc 100644 --- a/Tests/Command/PurgeCommandTest.php +++ b/Tests/Command/PurgeCommandTest.php @@ -26,7 +26,7 @@ protected function setUp(): void /** * testInputsDefinitionCommand */ - public function testInputsDefinitionCommand() + public function testInputsDefinitionCommand(): void { // check argument $definition = $this->command->getDefinition(); diff --git a/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php b/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php index a2500dc9..b972bf22 100644 --- a/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php +++ b/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php @@ -13,7 +13,7 @@ class OldSoundRabbitMqExtensionTest extends TestCase { - public function testFooConnectionDefinition() + public function testFooConnectionDefinition(): void { $container = $this->getContainer('test.yml'); @@ -40,7 +40,7 @@ public function testFooConnectionDefinition() $this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass()); } - public function testSslConnectionDefinition() + public function testSslConnectionDefinition(): void { $container = $this->getContainer('test.yml'); @@ -69,7 +69,7 @@ public function testSslConnectionDefinition() $this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass()); } - public function testLazyConnectionDefinition() + public function testLazyConnectionDefinition(): void { $container = $this->getContainer('test.yml'); @@ -96,7 +96,7 @@ public function testLazyConnectionDefinition() $this->assertEquals('%old_sound_rabbit_mq.lazy.connection.class%', $definition->getClass()); } - public function testDefaultConnectionDefinition() + public function testDefaultConnectionDefinition(): void { $container = $this->getContainer('test.yml'); @@ -123,7 +123,7 @@ public function testDefaultConnectionDefinition() $this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass()); } - public function testSocketConnectionDefinition() + public function testSocketConnectionDefinition(): void { $container = $this->getContainer('test.yml'); $this->assertTrue($container->has('old_sound_rabbit_mq.connection.socket_connection')); @@ -132,7 +132,7 @@ public function testSocketConnectionDefinition() $this->assertEquals('%old_sound_rabbit_mq.socket_connection.class%', $definiton->getClass()); } - public function testLazySocketConnectionDefinition() + public function testLazySocketConnectionDefinition(): void { $container = $this->getContainer('test.yml'); $this->assertTrue($container->has('old_sound_rabbit_mq.connection.lazy_socket')); @@ -141,7 +141,7 @@ public function testLazySocketConnectionDefinition() $this->assertEquals('%old_sound_rabbit_mq.lazy.socket_connection.class%', $definiton->getClass()); } - public function testFooBinding() + public function testFooBinding(): void { $container = $this->getContainer('test.yml'); $binding = array( @@ -163,7 +163,7 @@ public function testFooBinding() $this->assertBindingMethodCalls($definition, $binding); } - public function testMooBinding() + public function testMooBinding(): void { $container = $this->getContainer('test.yml'); $binding = array( @@ -185,7 +185,7 @@ public function testMooBinding() $this->assertBindingMethodCalls($definition, $binding); } - protected function assertBindingMethodCalls(Definition $definition, $binding) + protected function assertBindingMethodCalls(Definition $definition, array $binding): void { $this->assertEquals(array( array( @@ -228,7 +228,7 @@ protected function assertBindingMethodCalls(Definition $definition, $binding) $definition->getMethodCalls() ); } - public function testFooProducerDefinition() + public function testFooProducerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -272,7 +272,7 @@ public function testFooProducerDefinition() /** * @group alias */ - public function testAliasedFooProducerDefinition() + public function testAliasedFooProducerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -280,7 +280,7 @@ public function testAliasedFooProducerDefinition() $this->assertTrue($container->has('foo_producer_alias')); } - public function testDefaultProducerDefinition() + public function testDefaultProducerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -321,7 +321,7 @@ public function testDefaultProducerDefinition() $this->assertEquals('%old_sound_rabbit_mq.producer.class%', $definition->getClass()); } - public function testFooConsumerDefinition() + public function testFooConsumerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -378,7 +378,7 @@ public function testFooConsumerDefinition() $this->assertEquals('%old_sound_rabbit_mq.consumer.class%', $definition->getClass()); } - public function testDefaultConsumerDefinition() + public function testDefaultConsumerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -431,7 +431,7 @@ public function testDefaultConsumerDefinition() $this->assertEquals('%old_sound_rabbit_mq.consumer.class%', $definition->getClass()); } - public function testConsumerWithQosOptions() + public function testConsumerWithQosOptions(): void { $container = $this->getContainer('test.yml'); @@ -457,7 +457,7 @@ public function testConsumerWithQosOptions() ); } - public function testMultipleConsumerDefinition() + public function testMultipleConsumerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -532,7 +532,7 @@ public function testMultipleConsumerDefinition() ); } - public function testDynamicConsumerDefinition() + public function testDynamicConsumerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -575,7 +575,7 @@ public function testDynamicConsumerDefinition() ); } - public function testFooAnonConsumerDefinition() + public function testFooAnonConsumerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -611,7 +611,7 @@ public function testFooAnonConsumerDefinition() $this->assertEquals('%old_sound_rabbit_mq.anon_consumer.class%', $definition->getClass()); } - public function testDefaultAnonConsumerDefinition() + public function testDefaultAnonConsumerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -647,7 +647,7 @@ public function testDefaultAnonConsumerDefinition() $this->assertEquals('%old_sound_rabbit_mq.anon_consumer.class%', $definition->getClass()); } - public function testFooRpcClientDefinition() + public function testFooRpcClientDefinition(): void { $container = $this->getContainer('rpc-clients.yml'); @@ -666,7 +666,7 @@ public function testFooRpcClientDefinition() $this->assertEquals('%old_sound_rabbit_mq.rpc_client.class%', $definition->getClass()); } - public function testDefaultRpcClientDefinition() + public function testDefaultRpcClientDefinition(): void { $container = $this->getContainer('rpc-clients.yml'); @@ -686,7 +686,7 @@ public function testDefaultRpcClientDefinition() $this->assertEquals('%old_sound_rabbit_mq.rpc_client.class%', $definition->getClass()); } - public function testLazyRpcClientDefinition() + public function testLazyRpcClientDefinition(): void { $container = $this->getContainer('rpc-clients.yml'); @@ -706,7 +706,7 @@ public function testLazyRpcClientDefinition() $this->assertEquals('%old_sound_rabbit_mq.rpc_client.class%', $definition->getClass()); } - public function testFooRpcServerDefinition() + public function testFooRpcServerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -724,7 +724,7 @@ public function testFooRpcServerDefinition() $this->assertEquals('%old_sound_rabbit_mq.rpc_server.class%', $definition->getClass()); } - public function testDefaultRpcServerDefinition() + public function testDefaultRpcServerDefinition(): void { $container = $this->getContainer('test.yml'); @@ -742,7 +742,7 @@ public function testDefaultRpcServerDefinition() $this->assertEquals('%old_sound_rabbit_mq.rpc_server.class%', $definition->getClass()); } - public function testRpcServerWithQueueOptionsDefinition() + public function testRpcServerWithQueueOptionsDefinition(): void { $container = $this->getContainer('test.yml'); @@ -772,7 +772,7 @@ public function testRpcServerWithQueueOptionsDefinition() $this->assertEquals('%old_sound_rabbit_mq.rpc_server.class%', $definition->getClass()); } - public function testRpcServerWithExchangeOptionsDefinition() + public function testRpcServerWithExchangeOptionsDefinition(): void { $container = $this->getContainer('test.yml'); @@ -802,7 +802,7 @@ public function testRpcServerWithExchangeOptionsDefinition() $this->assertEquals('%old_sound_rabbit_mq.rpc_server.class%', $definition->getClass()); } - public function testHasCollectorWhenChannelsExist() + public function testHasCollectorWhenChannelsExist(): void { $container = $this->getContainer('collector.yml'); @@ -817,19 +817,19 @@ public function testHasCollectorWhenChannelsExist() ); } - public function testHasNoCollectorWhenNoChannelsExist() + public function testHasNoCollectorWhenNoChannelsExist(): void { $container = $this->getContainer('no_collector.yml'); $this->assertFalse($container->has('old_sound_rabbit_mq.data_collector')); } - public function testCollectorCanBeDisabled() + public function testCollectorCanBeDisabled(): void { $container = $this->getContainer('collector_disabled.yml'); $this->assertFalse($container->has('old_sound_rabbit_mq.data_collector')); } - public function testExchangeArgumentsAreArray() + public function testExchangeArgumentsAreArray(): void { $container = $this->getContainer('exchange_arguments.yml'); @@ -846,7 +846,7 @@ public function testExchangeArgumentsAreArray() $this->assertEquals(array('name' => 'bar'), $options[0]['arguments']); } - public function testProducerWithoutExplicitExchangeOptionsConnectsToAMQPDefault() + public function testProducerWithoutExplicitExchangeOptionsConnectsToAMQPDefault(): void { $container = $this->getContainer('no_exchange_options.yml'); @@ -861,7 +861,7 @@ public function testProducerWithoutExplicitExchangeOptionsConnectsToAMQPDefault( $this->assertEquals(true, $options[0]['passive']); } - public function testProducersWithLogger() + public function testProducersWithLogger(): void { $container = $this->getContainer('config_with_enable_logger.yml'); $definition = $container->getDefinition('old_sound_rabbit_mq.default_consumer_consumer'); @@ -870,7 +870,10 @@ public function testProducersWithLogger() ); } - private function getContainer($file, $debug = false) + /** + * @throws \Exception + */ + private function getContainer(string $file, bool $debug = false): ContainerBuilder { $container = new ContainerBuilder(new ParameterBag(array('kernel.debug' => $debug))); $container->registerExtension(new OldSoundRabbitMqExtension()); diff --git a/Tests/Event/AfterProcessingMessageEventTest.php b/Tests/Event/AfterProcessingMessageEventTest.php index 63ca5dd6..7163a9aa 100644 --- a/Tests/Event/AfterProcessingMessageEventTest.php +++ b/Tests/Event/AfterProcessingMessageEventTest.php @@ -14,7 +14,7 @@ */ class AfterProcessingMessageEventTest extends TestCase { - protected function getConsumer() + protected function getConsumer(): Consumer { return new Consumer( $this->getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection') @@ -26,7 +26,7 @@ protected function getConsumer() ); } - public function testEvent() + public function testEvent(): void { $AMQPMessage = new AMQPMessage('body'); $consumer = $this->getConsumer(); diff --git a/Tests/Event/BeforeProcessingMessageEventTest.php b/Tests/Event/BeforeProcessingMessageEventTest.php index 5fae76e7..01be4b0a 100644 --- a/Tests/Event/BeforeProcessingMessageEventTest.php +++ b/Tests/Event/BeforeProcessingMessageEventTest.php @@ -14,7 +14,7 @@ */ class BeforeProcessingMessageEventTest extends TestCase { - protected function getConsumer() + protected function getConsumer(): Consumer { return new Consumer( $this->getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection') @@ -26,7 +26,7 @@ protected function getConsumer() ); } - public function testEvent() + public function testEvent(): void { $AMQPMessage = new AMQPMessage('body'); $consumer = $this->getConsumer(); diff --git a/Tests/Event/OnIdleEventTest.php b/Tests/Event/OnIdleEventTest.php index 06e9e9a8..46bb5d01 100644 --- a/Tests/Event/OnIdleEventTest.php +++ b/Tests/Event/OnIdleEventTest.php @@ -13,7 +13,7 @@ */ class OnIdleEventTest extends TestCase { - protected function getConsumer() + protected function getConsumer(): Consumer { return new Consumer( $this->getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection') @@ -25,7 +25,7 @@ protected function getConsumer() ); } - public function testShouldAllowGetConsumerSetInConstructor() + public function testShouldAllowGetConsumerSetInConstructor(): void { $consumer = $this->getConsumer(); $event = new OnIdleEvent($consumer); @@ -33,7 +33,7 @@ public function testShouldAllowGetConsumerSetInConstructor() $this->assertSame($consumer, $event->getConsumer()); } - public function testShouldSetForceStopToTrueInConstructor() + public function testShouldSetForceStopToTrueInConstructor(): void { $consumer = $this->getConsumer(); $event = new OnIdleEvent($consumer); @@ -41,7 +41,7 @@ public function testShouldSetForceStopToTrueInConstructor() $this->assertTrue($event->isForceStop()); } - public function testShouldReturnPreviouslySetForceStop() + public function testShouldReturnPreviouslySetForceStop(): void { $consumer = $this->getConsumer(); $event = new OnIdleEvent($consumer); diff --git a/Tests/Manager/MemoryConsumptionCheckerTest.php b/Tests/Manager/MemoryConsumptionCheckerTest.php index 144197e1..09dea0e0 100644 --- a/Tests/Manager/MemoryConsumptionCheckerTest.php +++ b/Tests/Manager/MemoryConsumptionCheckerTest.php @@ -13,7 +13,7 @@ */ class MemoryConsumptionCheckerTest extends TestCase { - public function testMemoryIsNotAlmostOverloaded() + public function testMemoryIsNotAlmostOverloaded(): void { $currentMemoryUsage = '7M'; $allowedConsumptionUntil = '2M'; @@ -27,7 +27,7 @@ public function testMemoryIsNotAlmostOverloaded() $this->assertFalse($memoryManager->isRamAlmostOverloaded($maxConsumptionAllowed, $allowedConsumptionUntil)); } - public function testMemoryIsAlmostOverloaded() + public function testMemoryIsAlmostOverloaded(): void { $currentMemoryUsage = '9M'; $allowedConsumptionUntil = '2M'; @@ -41,7 +41,7 @@ public function testMemoryIsAlmostOverloaded() $this->assertTrue($memoryManager->isRamAlmostOverloaded($maxConsumptionAllowed, $allowedConsumptionUntil)); } - public function testMemoryExactValueIsNotAlmostOverloaded() + public function testMemoryExactValueIsNotAlmostOverloaded(): void { $currentMemoryUsage = '7M'; $maxConsumptionAllowed = '10M'; @@ -54,7 +54,7 @@ public function testMemoryExactValueIsNotAlmostOverloaded() $this->assertFalse($memoryManager->isRamAlmostOverloaded($maxConsumptionAllowed)); } - public function testMemoryExactValueIsAlmostOverloaded() + public function testMemoryExactValueIsAlmostOverloaded(): void { $currentMemoryUsage = '11M'; $maxConsumptionAllowed = '10M'; diff --git a/Tests/RabbitMq/AMQPConnectionFactoryTest.php b/Tests/RabbitMq/AMQPConnectionFactoryTest.php index d82d9c31..5580fdca 100644 --- a/Tests/RabbitMq/AMQPConnectionFactoryTest.php +++ b/Tests/RabbitMq/AMQPConnectionFactoryTest.php @@ -11,7 +11,7 @@ class AMQPConnectionFactoryTest extends TestCase { - public function testDefaultValues() + public function testDefaultValues(): void { $factory = new AMQPConnectionFactory( 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', @@ -39,7 +39,7 @@ public function testDefaultValues() ), $instance->constructParams); } - public function testSocketConnection() + public function testSocketConnection(): void { $factory = new AMQPConnectionFactory( 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSocketConnection', @@ -66,7 +66,7 @@ public function testSocketConnection() ), $instance->constructParams); } - public function testSocketConnectionWithParams() + public function testSocketConnectionWithParams(): void { $factory = new AMQPConnectionFactory( 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSocketConnection', @@ -96,7 +96,7 @@ public function testSocketConnectionWithParams() ), $instance->constructParams); } - public function testStandardConnectionParameters() + public function testStandardConnectionParameters(): void { $factory = new AMQPConnectionFactory( 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', @@ -130,7 +130,7 @@ public function testStandardConnectionParameters() ), $instance->constructParams); } - public function testSetConnectionParametersWithUrl() + public function testSetConnectionParametersWithUrl(): void { $factory = new AMQPConnectionFactory( 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', @@ -165,7 +165,7 @@ public function testSetConnectionParametersWithUrl() ), $instance->constructParams); } - public function testSetConnectionParametersWithUrlEncoded() + public function testSetConnectionParametersWithUrlEncoded(): void { $factory = new AMQPConnectionFactory( 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', @@ -195,7 +195,7 @@ public function testSetConnectionParametersWithUrlEncoded() ), $instance->constructParams); } - public function testSetConnectionParametersWithUrlWithoutVhost() + public function testSetConnectionParametersWithUrlWithoutVhost(): void { $factory = new AMQPConnectionFactory( 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', @@ -225,7 +225,7 @@ public function testSetConnectionParametersWithUrlWithoutVhost() ), $instance->constructParams); } - public function testSSLConnectionParameters() + public function testSSLConnectionParameters(): void { $factory = new AMQPConnectionFactory( 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', @@ -270,7 +270,7 @@ public function testSSLConnectionParameters() ), $instance->constructParams); } - public function testConnectionsParametersProviderWithConstructorArgs() + public function testConnectionsParametersProviderWithConstructorArgs(): void { $connectionParametersProvider = $this->prepareConnectionParametersProvider(); $connectionParametersProvider->expects($this->once()) @@ -292,7 +292,7 @@ public function testConnectionsParametersProviderWithConstructorArgs() $this->assertEquals(array(1,2,3,4), $instance->constructParams); } - public function testConnectionsParametersProvider() + public function testConnectionsParametersProvider(): void { $connectionParametersProvider = $this->prepareConnectionParametersProvider(); $connectionParametersProvider->expects($this->once()) diff --git a/Tests/RabbitMq/BaseAmqpTest.php b/Tests/RabbitMq/BaseAmqpTest.php index d376a311..1b0f0b97 100644 --- a/Tests/RabbitMq/BaseAmqpTest.php +++ b/Tests/RabbitMq/BaseAmqpTest.php @@ -3,6 +3,7 @@ namespace OldSound\RabbitMqBundle\Tests\RabbitMq; use PHPUnit\Framework\MockObject\MockObject; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface; use OldSound\RabbitMqBundle\Event\AMQPEvent; use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp; @@ -12,7 +13,7 @@ class BaseAmqpTest extends TestCase { - public function testLazyConnection() + public function testLazyConnection(): void { $connection = $this->getMockBuilder('PhpAmqpLib\Connection\AbstractConnection') ->disableOriginalConstructor() @@ -28,7 +29,7 @@ public function testLazyConnection() new Consumer($connection, null); } - public function testNotLazyConnection() + public function testNotLazyConnection(): void { $connection = $this->getMockBuilder('PhpAmqpLib\Connection\AbstractConnection') ->disableOriginalConstructor() @@ -44,14 +45,15 @@ public function testNotLazyConnection() new Consumer($connection, null); } - public function testDispatchEvent() + public function testDispatchEvent(): void { /** @var BaseAmqp|MockObject $baseAmqpConsumer */ $baseAmqpConsumer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\BaseAmqp') ->disableOriginalConstructor() ->getMock(); - $eventDispatcher = $this->getMockBuilder('Symfony\Contracts\EventDispatcher\EventDispatcherInterface') + /** @var EventDispatcherInterface|MockObject $eventDispatcher */ + $eventDispatcher = $this->getMockBuilder('Symfony\Component\EventDispatcher\EventDispatcherInterface') ->disableOriginalConstructor() ->getMock(); diff --git a/Tests/RabbitMq/BaseConsumerTest.php b/Tests/RabbitMq/BaseConsumerTest.php index 9a5a3bfb..d3a223a5 100644 --- a/Tests/RabbitMq/BaseConsumerTest.php +++ b/Tests/RabbitMq/BaseConsumerTest.php @@ -21,31 +21,31 @@ protected function setUp(): void ->getMockForAbstractClass(); } - public function testItExtendsBaseAmqpInterface() + public function testItExtendsBaseAmqpInterface(): void { $this->assertInstanceOf('OldSound\RabbitMqBundle\RabbitMq\BaseAmqp', $this->consumer); } - public function testItImplementsDequeuerInterface() + public function testItImplementsDequeuerInterface(): void { $this->assertInstanceOf('OldSound\RabbitMqBundle\RabbitMq\DequeuerInterface', $this->consumer); } - public function testItsIdleTimeoutIsMutable() + public function testItsIdleTimeoutIsMutable(): void { $this->assertEquals(0, $this->consumer->getIdleTimeout()); $this->consumer->setIdleTimeout(42); $this->assertEquals(42, $this->consumer->getIdleTimeout()); } - public function testItsIdleTimeoutExitCodeIsMutable() + public function testItsIdleTimeoutExitCodeIsMutable(): void { $this->assertEquals(0, $this->consumer->getIdleTimeoutExitCode()); $this->consumer->setIdleTimeoutExitCode(43); $this->assertEquals(43, $this->consumer->getIdleTimeoutExitCode()); } - public function testForceStopConsumer() + public function testForceStopConsumer(): void { $this->assertAttributeEquals(false, 'forceStop', $this->consumer); $this->consumer->forceStopConsumer(); diff --git a/Tests/RabbitMq/BindingTest.php b/Tests/RabbitMq/BindingTest.php index b45ce0cb..23387dff 100644 --- a/Tests/RabbitMq/BindingTest.php +++ b/Tests/RabbitMq/BindingTest.php @@ -3,6 +3,9 @@ namespace OldSound\RabbitMqBundle\Tests\RabbitMq; use OldSound\RabbitMqBundle\RabbitMq\Binding; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Connection\AMQPConnection; +use PhpAmqpLib\Connection\AMQPSSLConnection; use PHPUnit\Framework\Assert; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; @@ -10,13 +13,13 @@ class BindingTest extends TestCase { - protected function getBinding($amqpConnection, $amqpChannel) + protected function getBinding(AMQPConnection $amqpConnection, AMQPChannel $amqpChannel): Binding { return new Binding($amqpConnection, $amqpChannel); } /** - * @return MockObject + * @return MockObject | AMQPConnection */ protected function prepareAMQPConnection() { @@ -25,7 +28,10 @@ protected function prepareAMQPConnection() ->getMock(); } - protected function prepareAMQPChannel($channelId = null) + /** + * @return MockObject | AMQPChannel + */ + protected function prepareAMQPChannel(?string $channelId = null) { $channelMock = $this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel') ->disableOriginalConstructor() @@ -37,7 +43,7 @@ protected function prepareAMQPChannel($channelId = null) return $channelMock; } - public function testQueueBind() + public function testQueueBind(): void { $ch = $this->prepareAMQPChannel('channel_id'); $con = $this->prepareAMQPConnection(); @@ -62,7 +68,7 @@ public function testQueueBind() $binding->setupFabric(); } - public function testExhangeBind() + public function testExhangeBind(): void { $ch = $this->prepareAMQPChannel('channel_id'); $con = $this->prepareAMQPConnection(); diff --git a/Tests/RabbitMq/ConsumerTest.php b/Tests/RabbitMq/ConsumerTest.php index c7744eff..bf0caf2d 100644 --- a/Tests/RabbitMq/ConsumerTest.php +++ b/Tests/RabbitMq/ConsumerTest.php @@ -13,17 +13,24 @@ use PhpAmqpLib\Message\AMQPMessage; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PHPUnit\Framework\Assert; +use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface; class ConsumerTest extends TestCase { - protected function getConsumer($amqpConnection, $amqpChannel) + /** + * @return Consumer + */ + protected function getConsumer(AMQPConnection $amqpConnection, AMQPChannel $amqpChannel) { return new Consumer($amqpConnection, $amqpChannel); } + /** + * @return MockObject | AMQPConnection + */ protected function prepareAMQPConnection() { return $this->getMockBuilder(AMQPConnection::class) @@ -31,6 +38,9 @@ protected function prepareAMQPConnection() ->getMock(); } + /** + * @return MockObject | AMQPChannel + */ protected function prepareAMQPChannel() { return $this->getMockBuilder(AMQPChannel::class) @@ -42,8 +52,10 @@ protected function prepareAMQPChannel() * Check if the message is requeued or not correctly. * * @dataProvider processMessageProvider + * + * @param int|bool|null $processFlag */ - public function testProcessMessage($processFlag, $expectedMethod = null, $expectedRequeue = null) + public function testProcessMessage($processFlag, string $expectedMethod = null, ?bool $expectedRequeue = null): void { $amqpConnection = $this->prepareAMQPConnection(); $amqpChannel = $this->prepareAMQPChannel(); @@ -98,7 +110,7 @@ public function testProcessMessage($processFlag, $expectedMethod = null, $expect $consumer->processMessage($amqpMessage); } - public function processMessageProvider() + public function processMessageProvider(): array { return array( array(null, 'basic_ack'), // Remove message from queue only if callback return not false @@ -111,10 +123,7 @@ public function processMessageProvider() ); } - /** - * @return array - */ - public function consumeProvider() + public function consumeProvider(): array { $testCases["All ok 4 callbacks"] = array( array( @@ -138,10 +147,8 @@ public function consumeProvider() /** * @dataProvider consumeProvider - * - * @param array $data */ - public function testConsume(array $data) + public function testConsume(array $data): void { $consumerCallBacks = $data['messages']; @@ -193,7 +200,7 @@ function () use ($amqpChannel) { $consumer->consume(1); } - public function testIdleTimeoutExitCode() + public function testIdleTimeoutExitCode(): void { // set up amqp connection $amqpConnection = $this->prepareAMQPConnection(); @@ -229,7 +236,7 @@ public function testIdleTimeoutExitCode() $this->assertTrue(2 == $consumer->consume(1)); } - public function testShouldAllowContinueConsumptionAfterIdleTimeout() + public function testShouldAllowContinueConsumptionAfterIdleTimeout(): void { // set up amqp connection $amqpConnection = $this->prepareAMQPConnection(); @@ -289,7 +296,7 @@ public function testShouldAllowContinueConsumptionAfterIdleTimeout() $consumer->consume(10); } - public function testGracefulMaxExecutionTimeoutExitCode() + public function testGracefulMaxExecutionTimeoutExitCode(): void { // set up amqp connection $amqpConnection = $this->prepareAMQPConnection(); @@ -325,7 +332,7 @@ public function testGracefulMaxExecutionTimeoutExitCode() $this->assertSame(10, $consumer->consume(1)); } - public function testGracefulMaxExecutionWontWaitIfPastTheTimeout() + public function testGracefulMaxExecutionWontWaitIfPastTheTimeout(): void { // set up amqp connection $amqpConnection = $this->prepareAMQPConnection(); @@ -355,7 +362,7 @@ public function testGracefulMaxExecutionWontWaitIfPastTheTimeout() $consumer->consume(1); } - public function testTimeoutWait() + public function testTimeoutWait(): void { // set up amqp connection $amqpConnection = $this->prepareAMQPConnection(); @@ -397,7 +404,7 @@ public function testTimeoutWait() $consumer->consume(1); } - public function testTimeoutWaitWontWaitPastGracefulMaxExecutionTimeout() + public function testTimeoutWaitWontWaitPastGracefulMaxExecutionTimeout(): void { // set up amqp connection $amqpConnection = $this->prepareAMQPConnection(); @@ -434,7 +441,7 @@ public function testTimeoutWaitWontWaitPastGracefulMaxExecutionTimeout() $consumer->consume(1); } - public function testTimeoutWaitWontWaitPastIdleTimeout() + public function testTimeoutWaitWontWaitPastIdleTimeout(): void { // set up amqp connection $amqpConnection = $this->prepareAMQPConnection(); diff --git a/Tests/RabbitMq/DynamicConsumerTest.php b/Tests/RabbitMq/DynamicConsumerTest.php index ee535d49..d80f40ad 100644 --- a/Tests/RabbitMq/DynamicConsumerTest.php +++ b/Tests/RabbitMq/DynamicConsumerTest.php @@ -3,12 +3,18 @@ namespace OldSound\RabbitMqBundle\Tests\RabbitMq; use OldSound\RabbitMqBundle\Provider\QueueOptionsProviderInterface; +use OldSound\RabbitMqBundle\RabbitMq\Consumer; use OldSound\RabbitMqBundle\RabbitMq\DynamicConsumer; +use PhpAmqpLib\Channel\AMQPChannel; +use PhpAmqpLib\Connection\AMQPConnection; use PHPUnit\Framework\MockObject\MockObject; class DynamicConsumerTest extends ConsumerTest { - public function getConsumer($amqpConnection, $amqpChannel) + /** + * @return DynamicConsumer + */ + public function getConsumer(AMQPConnection $amqpConnection, AMQPChannel $amqpChannel) { return new DynamicConsumer($amqpConnection, $amqpChannel); } @@ -24,7 +30,7 @@ private function prepareQueueOptionsProvider() ->getMock(); } - public function testQueueOptionsPrivider() + public function testQueueOptionsProvider(): void { $amqpConnection = $this->prepareAMQPConnection(); $amqpChannel = $this->prepareAMQPChannel(); diff --git a/Tests/RabbitMq/Fixtures/AMQPConnection.php b/Tests/RabbitMq/Fixtures/AMQPConnection.php index 073dea48..40dbca71 100644 --- a/Tests/RabbitMq/Fixtures/AMQPConnection.php +++ b/Tests/RabbitMq/Fixtures/AMQPConnection.php @@ -4,6 +4,7 @@ class AMQPConnection { + /** @var array */ public $constructParams; public function __construct() diff --git a/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php b/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php index 48241bff..3686528c 100644 --- a/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php +++ b/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php @@ -6,6 +6,7 @@ class AMQPSocketConnection extends BaseAMQPSocketConnection { + /** @var array */ public $constructParams; public function __construct() diff --git a/Tests/RabbitMq/MultipleConsumerTest.php b/Tests/RabbitMq/MultipleConsumerTest.php index 8d7dc88a..2cb90504 100644 --- a/Tests/RabbitMq/MultipleConsumerTest.php +++ b/Tests/RabbitMq/MultipleConsumerTest.php @@ -45,14 +45,18 @@ public function setUp(): void $this->amqpConnection = $this->prepareAMQPConnection(); $this->amqpChannel = $this->prepareAMQPChannel(); $this->multipleConsumer = new MultipleConsumer($this->amqpConnection, $this->amqpChannel); + $this->multipleConsumer->setChannel($this->amqpChannel); } /** * Check if the message is requeued or not correctly. * * @dataProvider processMessageProvider + * + * @param int|bool|null $processFlag + * @throws \Exception */ - public function testProcessMessage($processFlag, $expectedMethod, $expectedRequeue = null) + public function testProcessMessage($processFlag, string $expectedMethod, bool $expectedRequeue = null): void { $callback = $this->prepareCallback($processFlag); @@ -78,8 +82,11 @@ public function testProcessMessage($processFlag, $expectedMethod, $expectedReque * Check queues provider works well * * @dataProvider processMessageProvider + * + * @param int|bool|null $processFlag + * @throws \ReflectionException */ - public function testQueuesProvider($processFlag, $expectedMethod, $expectedRequeue = null) + public function testQueuesProvider($processFlag, string $expectedMethod, bool $expectedRequeue = null): void { $callback = $this->prepareCallback($processFlag); @@ -115,7 +122,7 @@ public function testQueuesProvider($processFlag, $expectedMethod, $expectedReque $this->multipleConsumer->processQueueMessage('test-2', $amqpMessage); } - public function testQueuesPrivider() + public function testQueuesPrivider(): void { $amqpConnection = $this->prepareAMQPConnection(); $amqpChannel = $this->prepareAMQPChannel(); @@ -142,8 +149,10 @@ public function testQueuesPrivider() * Check queues provider works well with static queues together * * @dataProvider processMessageProvider + * + * @param int|bool|null $processFlag */ - public function testQueuesProviderAndStaticQueuesTogether($processFlag, $expectedMethod, $expectedRequeue = null) + public function testQueuesProviderAndStaticQueuesTogether($processFlag, string $expectedMethod, bool $expectedRequeue = null): void { $callback = $this->prepareCallback($processFlag); @@ -188,7 +197,7 @@ public function testQueuesProviderAndStaticQueuesTogether($processFlag, $expecte $this->multipleConsumer->processQueueMessage('test-4', $amqpMessage); } - public function processMessageProvider() + public function processMessageProvider(): array { return array( array(null, 'basic_ack'), // Remove message from queue only if callback return not false @@ -202,8 +211,9 @@ public function processMessageProvider() /** * @dataProvider queueBindingRoutingKeyProvider + * */ - public function testShouldConsiderQueueArgumentsOnQueueDeclaration($routingKeysOption, $expectedRoutingKey) + public function testShouldConsiderQueueArgumentsOnQueueDeclaration(array $routingKeysOption, string $expectedRoutingKey): void { $queueName = 'test-queue-name'; $exchangeName = 'test-exchange-name'; @@ -241,10 +251,12 @@ public function testShouldConsiderQueueArgumentsOnQueueDeclaration($routingKeysO ->method('queue_bind') ->with($queueName, $exchangeName, $expectedRoutingKey, false, $expectedArgs); + $this->multipleConsumer->setChannel($this->amqpChannel); + $this->multipleConsumer->setupFabric(); } - public function queueBindingRoutingKeyProvider() + public function queueBindingRoutingKeyProvider(): array { return array( array(array(), 'test-routing-key'), @@ -291,11 +303,10 @@ private function prepareQueuesProvider() * Preparing AMQP Channel Expectations * * @param mixed $expectedMethod - * @param string $expectedRequeue + * @param bool $expectedRequeue * - * @return void */ - private function prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue) + private function prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue): void { $this->amqpChannel->expects($this->any()) ->method('basic_reject') @@ -314,10 +325,9 @@ private function prepareAMQPChannelExpectations($expectedMethod, $expectedRequeu /** * Prepare callback * - * @param bool $processFlag * @return callable */ - private function prepareCallback($processFlag) + private function prepareCallback(?int $processFlag) { return function ($msg) use ($processFlag) { return $processFlag; diff --git a/Tests/RabbitMq/RpcClientTest.php b/Tests/RabbitMq/RpcClientTest.php index 2428b610..b53df629 100644 --- a/Tests/RabbitMq/RpcClientTest.php +++ b/Tests/RabbitMq/RpcClientTest.php @@ -9,7 +9,7 @@ class RpcClientTest extends TestCase { - public function testProcessMessageWithCustomUnserializer() + public function testProcessMessageWithCustomUnserializer(): void { /** @var RpcClient $client */ $client = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcClient') @@ -32,7 +32,7 @@ public function testProcessMessageWithCustomUnserializer() $client->processMessage($message); } - public function testProcessMessageWithNotifyMethod() + public function testProcessMessageWithNotifyMethod(): void { /** @var RpcClient $client */ $client = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcClient') @@ -56,7 +56,7 @@ public function testProcessMessageWithNotifyMethod() $this->assertSame($expectedNotify, $notified); } - public function testInvalidParameterOnNotify() + public function testInvalidParameterOnNotify(): void { /** @var RpcClient $client */ $client = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcClient') @@ -69,7 +69,7 @@ public function testInvalidParameterOnNotify() $client->notify('not a callable'); } - public function testChannelCancelOnGetRepliesException() + public function testChannelCancelOnGetRepliesException(): void { $client = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcClient') ->setMethods(null) diff --git a/Tests/RabbitMq/RpcServerTest.php b/Tests/RabbitMq/RpcServerTest.php index 76f09e19..f85b2558 100644 --- a/Tests/RabbitMq/RpcServerTest.php +++ b/Tests/RabbitMq/RpcServerTest.php @@ -8,7 +8,7 @@ class RpcServerTest extends TestCase { - public function testProcessMessageWithCustomSerializer() + public function testProcessMessageWithCustomSerializer(): void { /** @var RpcServer $server */ $server = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\RpcServer') diff --git a/phpstan.neon.dist b/phpstan.neon.dist index d9304a9f..fbd047f0 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -2,8 +2,9 @@ includes: - vendor/phpstan/phpstan-phpunit/extension.neon - vendor/phpstan/phpstan-phpunit/rules.neon parameters: - level: 5 + level: 6 reportUnmatchedIgnoredErrors: false + checkMissingIterableValueType: false paths: - Command - DataCollector @@ -16,8 +17,6 @@ parameters: - Tests - OldSoundRabbitMqBundle.php ignoreErrors: - - '#Call to an undefined method Symfony\\Component\\DependencyInjection\\Definition::((setFactoryService)|(setFactoryMethod))\(\)\.#' - '#Call to an undefined method Symfony\\Component\\Config\\Definition\\Builder\\NodeDefinition::((children)|(append))\(\)\.#' - '#Call to an undefined method Symfony\\Component\\Config\\Definition\\Builder\\NodeParentInterface::((booleanNode)|(scalarNode))\(\)#' - - '#Parameter \#1 \$node of method OldSound\\RabbitMqBundle\\DependencyInjection\\Configuration::addQueueNodeConfiguration\(\) expects Symfony\\Component\\Config\\Definition\\Builder\\ArrayNodeDefinition, Symfony\\Component\\Config\\Definition\\Builder\\NodeDefinition given\.#' - - '#Method Symfony\\Contracts\\EventDispatcher\\EventDispatcherInterface::dispatch\(\) invoked with 2 parameters, 1 required\.#' \ No newline at end of file + - '#Property PhpAmqpLib\\Channel\\AMQPChannel::\$callbacks \(array\\) does not accept array\(#' diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 90dab01d..ef87dd78 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -8,7 +8,6 @@ convertWarningsToExceptions = "true" processIsolation = "false" stopOnFailure = "false" - syntaxCheck = "false" bootstrap = "Tests/bootstrap.php">