vendor/doctrine/dbal/lib/Doctrine/DBAL/Connections/PrimaryReadReplicaConnection.php line 248

Open in your IDE?
  1. <?php
  2. namespace Doctrine\DBAL\Connections;
  3. use Doctrine\Common\EventManager;
  4. use Doctrine\DBAL\Configuration;
  5. use Doctrine\DBAL\Connection;
  6. use Doctrine\DBAL\Driver;
  7. use Doctrine\DBAL\Driver\Connection as DriverConnection;
  8. use Doctrine\DBAL\DriverManager;
  9. use Doctrine\DBAL\Event\ConnectionEventArgs;
  10. use Doctrine\DBAL\Events;
  11. use InvalidArgumentException;
  12. use function array_rand;
  13. use function assert;
  14. use function count;
  15. use function func_get_args;
  16. /**
  17.  * Primary-Replica Connection
  18.  *
  19.  * Connection can be used with primary-replica setups.
  20.  *
  21.  * Important for the understanding of this connection should be how and when
  22.  * it picks the replica or primary.
  23.  *
  24.  * 1. Replica if primary was never picked before and ONLY if 'getWrappedConnection'
  25.  *    or 'executeQuery' is used.
  26.  * 2. Primary picked when 'exec', 'executeUpdate', 'executeStatement', 'insert', 'delete', 'update', 'createSavepoint',
  27.  *    'releaseSavepoint', 'beginTransaction', 'rollback', 'commit', 'query' or
  28.  *    'prepare' is called.
  29.  * 3. If Primary was picked once during the lifetime of the connection it will always get picked afterwards.
  30.  * 4. One replica connection is randomly picked ONCE during a request.
  31.  *
  32.  * ATTENTION: You can write to the replica with this connection if you execute a write query without
  33.  * opening up a transaction. For example:
  34.  *
  35.  *      $conn = DriverManager::getConnection(...);
  36.  *      $conn->executeQuery("DELETE FROM table");
  37.  *
  38.  * Be aware that Connection#executeQuery is a method specifically for READ
  39.  * operations only.
  40.  *
  41.  * Use Connection#executeStatement for any SQL statement that changes/updates
  42.  * state in the database (UPDATE, INSERT, DELETE or DDL statements).
  43.  *
  44.  * This connection is limited to replica operations using the
  45.  * Connection#executeQuery operation only, because it wouldn't be compatible
  46.  * with the ORM or SchemaManager code otherwise. Both use all the other
  47.  * operations in a context where writes could happen to a replica, which makes
  48.  * this restricted approach necessary.
  49.  *
  50.  * You can manually connect to the primary at any time by calling:
  51.  *
  52.  *      $conn->ensureConnectedToPrimary();
  53.  *
  54.  * Instantiation through the DriverManager looks like:
  55.  *
  56.  * @psalm-import-type Params from DriverManager
  57.  * @example
  58.  *
  59.  * $conn = DriverManager::getConnection(array(
  60.  *    'wrapperClass' => 'Doctrine\DBAL\Connections\PrimaryReadReplicaConnection',
  61.  *    'driver' => 'pdo_mysql',
  62.  *    'primary' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''),
  63.  *    'replica' => array(
  64.  *        array('user' => 'replica1', 'password', 'host' => '', 'dbname' => ''),
  65.  *        array('user' => 'replica2', 'password', 'host' => '', 'dbname' => ''),
  66.  *    )
  67.  * ));
  68.  *
  69.  * You can also pass 'driverOptions' and any other documented option to each of this drivers
  70.  * to pass additional information.
  71.  */
  72. class PrimaryReadReplicaConnection extends Connection
  73. {
  74.     /**
  75.      * Primary and Replica connection (one of the randomly picked replicas).
  76.      *
  77.      * @var DriverConnection[]|null[]
  78.      */
  79.     protected $connections = ['primary' => null'replica' => null];
  80.     /**
  81.      * You can keep the replica connection and then switch back to it
  82.      * during the request if you know what you are doing.
  83.      *
  84.      * @var bool
  85.      */
  86.     protected $keepReplica false;
  87.     /**
  88.      * Creates Primary Replica Connection.
  89.      *
  90.      * @internal The connection can be only instantiated by the driver manager.
  91.      *
  92.      * @param array<string,mixed> $params
  93.      * @psalm-param Params $params
  94.      * @phpstan-param array<string,mixed> $params
  95.      *
  96.      * @throws InvalidArgumentException
  97.      */
  98.     public function __construct(
  99.         array $params,
  100.         Driver $driver,
  101.         ?Configuration $config null,
  102.         ?EventManager $eventManager null
  103.     ) {
  104.         if (! isset($params['replica'], $params['primary'])) {
  105.             throw new InvalidArgumentException('primary or replica configuration missing');
  106.         }
  107.         if (count($params['replica']) === 0) {
  108.             throw new InvalidArgumentException('You have to configure at least one replica.');
  109.         }
  110.         if (isset($params['driver'])) {
  111.             $params['primary']['driver'] = $params['driver'];
  112.             foreach ($params['replica'] as $replicaKey => $replica) {
  113.                 $params['replica'][$replicaKey]['driver'] = $params['driver'];
  114.             }
  115.         }
  116.         $this->keepReplica = (bool) ($params['keepReplica'] ?? false);
  117.         parent::__construct($params$driver$config$eventManager);
  118.     }
  119.     /**
  120.      * Checks if the connection is currently towards the primary or not.
  121.      */
  122.     public function isConnectedToPrimary(): bool
  123.     {
  124.         return $this->_conn !== null && $this->_conn === $this->connections['primary'];
  125.     }
  126.     /**
  127.      * @param string|null $connectionName
  128.      *
  129.      * @return bool
  130.      */
  131.     public function connect($connectionName null)
  132.     {
  133.         if ($connectionName !== null) {
  134.             throw new InvalidArgumentException(
  135.                 'Passing a connection name as first argument is not supported anymore.'
  136.                     ' Use ensureConnectedToPrimary()/ensureConnectedToReplica() instead.'
  137.             );
  138.         }
  139.         return $this->performConnect();
  140.     }
  141.     protected function performConnect(?string $connectionName null): bool
  142.     {
  143.         $requestedConnectionChange = ($connectionName !== null);
  144.         $connectionName            $connectionName ?: 'replica';
  145.         if ($connectionName !== 'replica' && $connectionName !== 'primary') {
  146.             throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.');
  147.         }
  148.         // If we have a connection open, and this is not an explicit connection
  149.         // change request, then abort right here, because we are already done.
  150.         // This prevents writes to the replica in case of "keepReplica" option enabled.
  151.         if ($this->_conn !== null && ! $requestedConnectionChange) {
  152.             return false;
  153.         }
  154.         $forcePrimaryAsReplica false;
  155.         if ($this->getTransactionNestingLevel() > 0) {
  156.             $connectionName        'primary';
  157.             $forcePrimaryAsReplica true;
  158.         }
  159.         if (isset($this->connections[$connectionName])) {
  160.             $this->_conn $this->connections[$connectionName];
  161.             if ($forcePrimaryAsReplica && ! $this->keepReplica) {
  162.                 $this->connections['replica'] = $this->_conn;
  163.             }
  164.             return false;
  165.         }
  166.         if ($connectionName === 'primary') {
  167.             $this->connections['primary'] = $this->_conn $this->connectTo($connectionName);
  168.             // Set replica connection to primary to avoid invalid reads
  169.             if (! $this->keepReplica) {
  170.                 $this->connections['replica'] = $this->connections['primary'];
  171.             }
  172.         } else {
  173.             $this->connections['replica'] = $this->_conn $this->connectTo($connectionName);
  174.         }
  175.         if ($this->_eventManager->hasListeners(Events::postConnect)) {
  176.             $eventArgs = new ConnectionEventArgs($this);
  177.             $this->_eventManager->dispatchEvent(Events::postConnect$eventArgs);
  178.         }
  179.         return true;
  180.     }
  181.     /**
  182.      * Connects to the primary node of the database cluster.
  183.      *
  184.      * All following statements after this will be executed against the primary node.
  185.      */
  186.     public function ensureConnectedToPrimary(): bool
  187.     {
  188.         return $this->performConnect('primary');
  189.     }
  190.     /**
  191.      * Connects to a replica node of the database cluster.
  192.      *
  193.      * All following statements after this will be executed against the replica node,
  194.      * unless the keepReplica option is set to false and a primary connection
  195.      * was already opened.
  196.      */
  197.     public function ensureConnectedToReplica(): bool
  198.     {
  199.         return $this->performConnect('replica');
  200.     }
  201.     /**
  202.      * Connects to a specific connection.
  203.      *
  204.      * @param string $connectionName
  205.      *
  206.      * @return DriverConnection
  207.      */
  208.     protected function connectTo($connectionName)
  209.     {
  210.         $params $this->getParams();
  211.         $driverOptions $params['driverOptions'] ?? [];
  212.         $connectionParams $this->chooseConnectionConfiguration($connectionName$params);
  213.         $user     $connectionParams['user'] ?? null;
  214.         $password $connectionParams['password'] ?? null;
  215.         return $this->_driver->connect($connectionParams$user$password$driverOptions);
  216.     }
  217.     /**
  218.      * @param string  $connectionName
  219.      * @param mixed[] $params
  220.      *
  221.      * @return mixed
  222.      */
  223.     protected function chooseConnectionConfiguration($connectionName$params)
  224.     {
  225.         if ($connectionName === 'primary') {
  226.             return $params['primary'];
  227.         }
  228.         $config $params['replica'][array_rand($params['replica'])];
  229.         if (! isset($config['charset']) && isset($params['primary']['charset'])) {
  230.             $config['charset'] = $params['primary']['charset'];
  231.         }
  232.         return $config;
  233.     }
  234.     /**
  235.      * {@inheritDoc}
  236.      *
  237.      * @deprecated Use {@link executeStatement()} instead.
  238.      */
  239.     public function executeUpdate($sql, array $params = [], array $types = [])
  240.     {
  241.         $this->ensureConnectedToPrimary();
  242.         return parent::executeUpdate($sql$params$types);
  243.     }
  244.     /**
  245.      * {@inheritDoc}
  246.      */
  247.     public function executeStatement($sql, array $params = [], array $types = [])
  248.     {
  249.         $this->ensureConnectedToPrimary();
  250.         return parent::executeStatement($sql$params$types);
  251.     }
  252.     /**
  253.      * {@inheritDoc}
  254.      */
  255.     public function beginTransaction()
  256.     {
  257.         $this->ensureConnectedToPrimary();
  258.         return parent::beginTransaction();
  259.     }
  260.     /**
  261.      * {@inheritDoc}
  262.      */
  263.     public function commit()
  264.     {
  265.         $this->ensureConnectedToPrimary();
  266.         return parent::commit();
  267.     }
  268.     /**
  269.      * {@inheritDoc}
  270.      */
  271.     public function rollBack()
  272.     {
  273.         $this->ensureConnectedToPrimary();
  274.         return parent::rollBack();
  275.     }
  276.     /**
  277.      * {@inheritDoc}
  278.      */
  279.     public function delete($table, array $criteria, array $types = [])
  280.     {
  281.         $this->ensureConnectedToPrimary();
  282.         return parent::delete($table$criteria$types);
  283.     }
  284.     /**
  285.      * {@inheritDoc}
  286.      */
  287.     public function close()
  288.     {
  289.         unset($this->connections['primary'], $this->connections['replica']);
  290.         parent::close();
  291.         $this->_conn       null;
  292.         $this->connections = ['primary' => null'replica' => null];
  293.     }
  294.     /**
  295.      * {@inheritDoc}
  296.      */
  297.     public function update($table, array $data, array $criteria, array $types = [])
  298.     {
  299.         $this->ensureConnectedToPrimary();
  300.         return parent::update($table$data$criteria$types);
  301.     }
  302.     /**
  303.      * {@inheritDoc}
  304.      */
  305.     public function insert($table, array $data, array $types = [])
  306.     {
  307.         $this->ensureConnectedToPrimary();
  308.         return parent::insert($table$data$types);
  309.     }
  310.     /**
  311.      * {@inheritDoc}
  312.      */
  313.     public function exec($sql)
  314.     {
  315.         $this->ensureConnectedToPrimary();
  316.         return parent::exec($sql);
  317.     }
  318.     /**
  319.      * {@inheritDoc}
  320.      */
  321.     public function createSavepoint($savepoint)
  322.     {
  323.         $this->ensureConnectedToPrimary();
  324.         parent::createSavepoint($savepoint);
  325.     }
  326.     /**
  327.      * {@inheritDoc}
  328.      */
  329.     public function releaseSavepoint($savepoint)
  330.     {
  331.         $this->ensureConnectedToPrimary();
  332.         parent::releaseSavepoint($savepoint);
  333.     }
  334.     /**
  335.      * {@inheritDoc}
  336.      */
  337.     public function rollbackSavepoint($savepoint)
  338.     {
  339.         $this->ensureConnectedToPrimary();
  340.         parent::rollbackSavepoint($savepoint);
  341.     }
  342.     /**
  343.      * {@inheritDoc}
  344.      */
  345.     public function query()
  346.     {
  347.         $this->ensureConnectedToPrimary();
  348.         assert($this->_conn instanceof DriverConnection);
  349.         $args func_get_args();
  350.         $logger $this->getConfiguration()->getSQLLogger();
  351.         if ($logger) {
  352.             $logger->startQuery($args[0]);
  353.         }
  354.         $statement $this->_conn->query(...$args);
  355.         $statement->setFetchMode($this->defaultFetchMode);
  356.         if ($logger) {
  357.             $logger->stopQuery();
  358.         }
  359.         return $statement;
  360.     }
  361.     /**
  362.      * {@inheritDoc}
  363.      */
  364.     public function prepare($sql)
  365.     {
  366.         $this->ensureConnectedToPrimary();
  367.         return parent::prepare($sql);
  368.     }
  369. }