When your Symfony command loops over an array of objects to perform and action on each on, you may find that as your database grows the performance of operating on each object sequentially takes too long.
Suppose we have written an application that stores employees, the hours they have worked, their pay, and generates payroll on a weekly basis. A weekly cron kicks off the following command.
php bin/console app:employee:payroll
Your command my look something like this.
namespace App\Command;
use App\Manager\EmployeeManager;
use App\Repository\EmployeeRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class EmployeePayrollCommand extends Command
{
protected static $defaultName = 'app:employee:payroll';
protected EmployeeManager $employeeManager;
protected EmployeeRepository $employeeRepository;
public function __construct(EmployeeManager $employeeManager, EmployeeRepository $employeeRepository)
{
$this->employeeManager = $employeeManager;
$this->employeeRepository = $employeeRepository;
}
protected function configure()
{
$this->setDescription('Generate the weekly payroll.');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$employees = $this->employeeRepository->findAll();
foreach ($employees as $employee) {
$this->employeeManager->generatePayroll($employee);
}
return 0;
}
}
For a time this is working well for you, but as your company signs on more and more clients to utilize your application the perils of a long running PHP application start causing problems. Wouldn't it be great if we could generate the payroll for employees asynchronously? Let's do just that. To accomplish this task we will utilize Symfony Locks, Processes and Stores. In this example we will use a Redis store, but any store that inherits from the Symfony\Component\Lock\StoreInterface
will work.
The approach here is that our command will optionally take in the ID of an employee to generate payroll for. When the employee ID is not given the command will loop over each employee and then call this same command again with the optional employee ID given as a Process. We will "lock" the generation of each payroll with a unique lock name to make sure we never try to generate payroll for the same employee twice at the same time.
Start by adding the optional ability to call the command with an employee's ID.
…
class EmployeePayrollCommand extends Command
{
…
protected function configure()
{
$this->setDescription('Generate the weekly payroll.')
->addArgument(
'employeeId',
InputArgument::OPTIONAL,
'The ID of the employee to generate payroll for.'
);
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$employee = $input->getArgument('employeeId');
if ($employee) {
$this->employeeManager->generatePayroll($employee);
} else {
$employees = $this->employeeRepository->findAll();
foreach ($employees as $employee) {
$this->employeeManager->generatePayroll($employee);
}
}
return 0;
}
}
Now, instead of calling $this->employeeManager->generatePayroll($employee);
in our foreach
loop, let's start a new process that runs asynchronously. The Symfony Process Component allows us to do just that. Using it we can find the location of the PHP binary and call our command with the ID of each employee.
…
protected EmployeeManager $employeeManager;
protected EmployeeRepository $employeeRepository;
protected string $environment;
public function __construct(EmployeeManager $employeeManager, EmployeeRepository $employeeRepository, string $environment)
{
$this->employeeManager = $employeeManager;
$this->employeeRepository = $employeeRepository;
$this->environment = $environment;
}
…
$employees = $this->employeeRepository->findAll();
foreach ($employees as $employee) {
$process = new Process (
sprintf(
'%s console %s %s --env=%s',
$phpBinaryPath,
self::$defaultName,
$employee->getId(),
$this->environment
)
);
$process->start();
}
…
We have a couple of problems here. If Payroll is in the process of being generated for an employee, then there will be two process generating it for that employee at the second time. For example:
php bin/console app:employee:payroll 1
php bin/console app:employee:payroll
This could become very problematic.
Our second problem is that this could spawn more processes than your server is able to handle at once, bringing everything to a standstill.
We can tackle the first problem using the Symfony Lock Component. We need a unique lock name for the payroll generation of each employee, we need to create a lock with that name, and we need to check the existence of a lock before we being any process.
We can start by create a lock name generation method.
protected static function getLockName(Employee $employee) {
return sprintf('%s-%s', self::$defaultName, $employee->getId());
}
Now we need to stick our lock in a Store. Here we will use a Redis store, but read the documentation to find the store that works best for you.
protected StoreInterface $store;
…
public function __construct(EmployeeManager $employeeManager, EmployeeRepository $employeeRepository, Client $client, string $environment)
{
…
$this->store = new RedisStore($client);
}
…
protected function lock($name) {
$lock = (new Factory($this->store))->createLock($name): ?Lock;
if (!$lock->acquire()) {
return null;
}
return $lock;
}
…
And now that we have a store we can lock each process.
…
if ($employee) {
$key = new Key(self::getLockName($employee));
$lock = $this->lock(self::getLockName($employee));
if ($lock === null) {
$ouput->writeln(sprintf('Already generating payroll for employee %s.', $employee->getId()));
return 1;
}
try {
$this->employeeManager->generatePayroll($employee);
} catch(Exception $e) {
$output->writeln($e->getMessage());
return 1;
} finally {
$lock->release();
}
}
…
Now the command will refuse to attempt to generate Payroll for any employee in which it is already generating payroll for.
Our final task is to rate-limit how many processes it will run at once. We will accomplish this by storing each Process in an array and sleeping if the array is too large.
…
const MAX_SIMULTANEOUS_PROCESSES = 50;
…
$processes = [];
foreach ($employees as $employee) {
$process = new Process (
sprintf(
'%s console %s %s --env=%s',
$phpBinaryPath,
self::$defaultName,
$employee->getId(),
$this->environment
)
);
$process->start();
$processes[] = $process;
$wait = true;
while ($wait) {
if (sizeof($processes) > self::MAX_SIMULTANEOUS_PROCESSES) {
sleep(1);
$processes = $this->checkRunningProcesses($processes);
} else {
$wait = false;
}
}
}
$wait = true;
while ($wait) {
$processes = $this->checkRunningProcesses($processes);
if (sizeof($processes) === 0) {
$wait = false;
}
sleep(5);
}
…
protected function checkRunningProcesses(array $processes): array {
foreach($processes as $i => $process) {
if(!$process->isRunning()) {
unset($processes[$i]);
}
}
return $processes;
}
…
A final note that is a little beyond the scope of this article, the EmployeeManager::generatePayroll
command should be idempotent. In other words, if this method is called multiple times for the same employee it shouldn't be a problem. This way, if there is a failure anywhere then after fixing the problem you can simply re-run the entire command and it will just pic up where it left off.
The final command looks like this.
<?php
namespace App\Command;
use App\Manager\EmployeeManager;
use App\Repository\EmployeeRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Lock\Factory;
use Symfony\Component\Lock\Key;
use Symfony\Component\Lock\StoreInterface;
use Symfony\Component\Lock\Store\RedisStore;
use Symfony\Component\Process\Process;
class EmployeePayrollCommand extends Command
{
const MAX_SIMULTANEOUS_PROCESSES = 50;
protected static $defaultName = 'app:user:payroll';
protected EmployeeManager $employeeManager;
protected EmployeeRepository $employeeRepository;
protected StoreInterface $store;
protected string $environment;
public function __construct(EmployeeManager $employeeManager, EmployeeRepository $employeeRepository, Client $client, string $environment)
{
$this->employeeManager = $employeeManager;
$this->employeeRepository = $employeeRepository;
$this->store = new RedisStore($client);
$this->environment = $environment;
}
protected function configure()
{
$this->setDescription('Generate the weekly payroll.')
->addArgument(
'employeeId',
InputArgument::OPTIONAL,
'The ID of the employee to generate payroll for.'
);
}
protected function execute(InputInterface $input, OutputInterface $output)
{
$employee = $input->getArgument('employeeId');
if ($employee) {
$key = new Key(self::getLockName($employee));
$lock = $this->lock(self::getLockName($employee));
if ($lock === null) {
$ouput->writeln(sprintf('Already generating payroll for employee %s.', $employee->getId()));
return 1;
}
try {
$this->employeeManager->generatePayroll($employee);
} catch(Exception $e) {
$output->writeln($e->getMessage());
return 1;
} finally {
$lock->release();
}
} else {
$processes = [];
foreach ($employees as $employee) {
$process = new Process(
sprintf(
'%s console %s %s --env=%s',
$phpBinaryPath,
self::$defaultName,
$employee->getId(),
$this->environment
)
);
$process->start();
$processes[] = $process;
$wait = true;
while ($wait) {
if (sizeof($processes) > self::MAX_SIMULTANEOUS_PROCESSES) {
sleep(1);
$processes = $this->checkRunningProcesses($processes);
} else {
$wait = false;
}
}
}
$wait = true;
while ($wait) {
$processes = $this->checkRunningProcesses($processes);
if (sizeof($processes) === 0) {
$wait = false;
}
sleep(5);
}
}
return 0;
}
protected function lock($name) {
$lock = (new Factory($this->store))->createLock($name);
if (!$lock->acquire()) {
return null;
}
return $lock;
}
protected static function getLockName(Employee $employee) {
return sprintf('%s-%s', self::$defaultName, $employee->getId());
}
protected function checkRunningProcesses(array $processes): array {
foreach($processes as $i => $process) {
if(!$process->isRunning()) {
unset($processes[$i]);
}
}
return $processes;
}
}
Comments
No comments yet. Be the first to react!