v2: Updates

* Simplifies & beautifies everything
* Introduces a new Class system.
* Errors are defaulted to AWS's handler.
* New function names & more efficient handling.
* Should fix a majority of the errors.

Please read the README for more!
This commit is contained in:
Devang Srivastava 2020-09-28 15:32:51 +05:30
commit e6d7753dc8
1095 changed files with 45088 additions and 2911 deletions

View file

@ -0,0 +1,61 @@
<?php
namespace Aws\Retry;
use Aws\Retry\Exception\ConfigurationException;
class Configuration implements ConfigurationInterface
{
private $mode;
private $maxAttempts;
private $validModes = [
'legacy',
'standard',
'adaptive'
];
public function __construct($mode = 'legacy', $maxAttempts = 3)
{
$mode = strtolower($mode);
if (!in_array($mode, $this->validModes)) {
throw new ConfigurationException("'{$mode}' is not a valid mode."
. " The mode has to be 'legacy', 'standard', or 'adaptive'.");
}
if (!is_numeric($maxAttempts)
|| intval($maxAttempts) != $maxAttempts
|| $maxAttempts < 1
) {
throw new ConfigurationException("The 'maxAttempts' parameter has"
. " to be an integer >= 1.");
}
$this->mode = $mode;
$this->maxAttempts = intval($maxAttempts);
}
/**
* {@inheritdoc}
*/
public function getMode()
{
return $this->mode;
}
/**
* {@inheritdoc}
*/
public function getMaxAttempts()
{
return $this->maxAttempts;
}
/**
* {@inheritdoc}
*/
public function toArray()
{
return [
'mode' => $this->getMode(),
'max_attempts' => $this->getMaxAttempts(),
];
}
}

View file

@ -0,0 +1,30 @@
<?php
namespace Aws\Retry;
/**
* Provides access to retry configuration
*/
interface ConfigurationInterface
{
/**
* Returns the retry mode. Available modes include 'legacy', 'standard', and
* 'adapative'.
*
* @return string
*/
public function getMode();
/**
* Returns the maximum number of attempts that will be used for a request
*
* @return string
*/
public function getMaxAttempts();
/**
* Returns the configuration as an associative array
*
* @return array
*/
public function toArray();
}

View file

@ -0,0 +1,222 @@
<?php
namespace Aws\Retry;
use Aws\AbstractConfigurationProvider;
use Aws\CacheInterface;
use Aws\ConfigurationProviderInterface;
use Aws\Retry\Exception\ConfigurationException;
use GuzzleHttp\Promise;
use GuzzleHttp\Promise\PromiseInterface;
/**
* A configuration provider is a function that returns a promise that is
* fulfilled with a {@see \Aws\Retry\ConfigurationInterface}
* or rejected with an {@see \Aws\Retry\Exception\ConfigurationException}.
*
* <code>
* use Aws\Sts\RegionalEndpoints\ConfigurationProvider;
* $provider = ConfigurationProvider::defaultProvider();
* // Returns a ConfigurationInterface or throws.
* $config = $provider()->wait();
* </code>
*
* Configuration providers can be composed to create configuration using
* conditional logic that can create different configurations in different
* environments. You can compose multiple providers into a single provider using
* {@see \Aws\Retry\ConfigurationProvider::chain}. This function
* accepts providers as variadic arguments and returns a new function that will
* invoke each provider until a successful configuration is returned.
*
* <code>
* // First try an INI file at this location.
* $a = ConfigurationProvider::ini(null, '/path/to/file.ini');
* // Then try an INI file at this location.
* $b = ConfigurationProvider::ini(null, '/path/to/other-file.ini');
* // Then try loading from environment variables.
* $c = ConfigurationProvider::env();
* // Combine the three providers together.
* $composed = ConfigurationProvider::chain($a, $b, $c);
* // Returns a promise that is fulfilled with a configuration or throws.
* $promise = $composed();
* // Wait on the configuration to resolve.
* $config = $promise->wait();
* </code>
*/
class ConfigurationProvider extends AbstractConfigurationProvider
implements ConfigurationProviderInterface
{
const DEFAULT_MAX_ATTEMPTS = 3;
const DEFAULT_MODE = 'legacy';
const ENV_MAX_ATTEMPTS = 'AWS_MAX_ATTEMPTS';
const ENV_MODE = 'AWS_RETRY_MODE';
const ENV_PROFILE = 'AWS_PROFILE';
const INI_MAX_ATTEMPTS = 'max_attempts';
const INI_MODE = 'retry_mode';
public static $cacheKey = 'aws_retries_config';
protected static $interfaceClass = ConfigurationInterface::class;
protected static $exceptionClass = ConfigurationException::class;
/**
* Create a default config provider that first checks for environment
* variables, then checks for a specified profile in the environment-defined
* config file location (env variable is 'AWS_CONFIG_FILE', file location
* defaults to ~/.aws/config), then checks for the "default" profile in the
* environment-defined config file location, and failing those uses a default
* fallback set of configuration options.
*
* This provider is automatically wrapped in a memoize function that caches
* previously provided config options.
*
* @param array $config
*
* @return callable
*/
public static function defaultProvider(array $config = [])
{
$configProviders = [self::env()];
if (
!isset($config['use_aws_shared_config_files'])
|| $config['use_aws_shared_config_files'] != false
) {
$configProviders[] = self::ini();
}
$configProviders[] = self::fallback();
$memo = self::memoize(
call_user_func_array('self::chain', $configProviders)
);
if (isset($config['retries'])
&& $config['retries'] instanceof CacheInterface
) {
return self::cache($memo, $config['retries'], self::$cacheKey);
}
return $memo;
}
/**
* Provider that creates config from environment variables.
*
* @return callable
*/
public static function env()
{
return function () {
// Use config from environment variables, if available
$mode = getenv(self::ENV_MODE);
$maxAttempts = getenv(self::ENV_MAX_ATTEMPTS)
? getenv(self::ENV_MAX_ATTEMPTS)
: self::DEFAULT_MAX_ATTEMPTS;
if (!empty($mode)) {
return Promise\promise_for(
new Configuration($mode, $maxAttempts)
);
}
return self::reject('Could not find environment variable config'
. ' in ' . self::ENV_MODE);
};
}
/**
* Fallback config options when other sources are not set.
*
* @return callable
*/
public static function fallback()
{
return function () {
return Promise\promise_for(
new Configuration(self::DEFAULT_MODE, self::DEFAULT_MAX_ATTEMPTS)
);
};
}
/**
* Config provider that creates config using a config file whose location
* is specified by an environment variable 'AWS_CONFIG_FILE', defaulting to
* ~/.aws/config if not specified
*
* @param string|null $profile Profile to use. If not specified will use
* the "default" profile.
* @param string|null $filename If provided, uses a custom filename rather
* than looking in the default directory.
*
* @return callable
*/
public static function ini(
$profile = null,
$filename = null
) {
$filename = $filename ?: (self::getDefaultConfigFilename());
$profile = $profile ?: (getenv(self::ENV_PROFILE) ?: 'default');
return function () use ($profile, $filename) {
if (!is_readable($filename)) {
return self::reject("Cannot read configuration from $filename");
}
$data = \Aws\parse_ini_file($filename, true);
if ($data === false) {
return self::reject("Invalid config file: $filename");
}
if (!isset($data[$profile])) {
return self::reject("'$profile' not found in config file");
}
if (!isset($data[$profile][self::INI_MODE])) {
return self::reject("Required retry config values
not present in INI profile '{$profile}' ({$filename})");
}
$maxAttempts = isset($data[$profile][self::INI_MAX_ATTEMPTS])
? $data[$profile][self::INI_MAX_ATTEMPTS]
: self::DEFAULT_MAX_ATTEMPTS;
return Promise\promise_for(
new Configuration(
$data[$profile][self::INI_MODE],
$maxAttempts
)
);
};
}
/**
* Unwraps a configuration object in whatever valid form it is in,
* always returning a ConfigurationInterface object.
*
* @param mixed $config
* @return ConfigurationInterface
* @throws \InvalidArgumentException
*/
public static function unwrap($config)
{
if (is_callable($config)) {
$config = $config();
}
if ($config instanceof PromiseInterface) {
$config = $config->wait();
}
if ($config instanceof ConfigurationInterface) {
return $config;
}
// An integer value for this config indicates the legacy 'retries'
// config option, which is incremented to translate to max attempts
if (is_int($config)) {
return new Configuration('legacy', $config + 1);
}
if (is_array($config) && isset($config['mode'])) {
$maxAttempts = isset($config['max_attempts'])
? $config['max_attempts']
: self::DEFAULT_MAX_ATTEMPTS;
return new Configuration($config['mode'], $maxAttempts);
}
throw new \InvalidArgumentException('Not a valid retry configuration'
. ' argument.');
}
}

View file

@ -0,0 +1,14 @@
<?php
namespace Aws\Retry\Exception;
use Aws\HasMonitoringEventsTrait;
use Aws\MonitoringEventsInterface;
/**
* Represents an error interacting with retry configuration
*/
class ConfigurationException extends \RuntimeException implements
MonitoringEventsInterface
{
use HasMonitoringEventsTrait;
}

View file

@ -0,0 +1,86 @@
<?php
namespace Aws\Retry;
use Aws\Exception\AwsException;
use Aws\ResultInterface;
/**
* @internal
*/
class QuotaManager
{
private $availableCapacity;
private $capacityAmount;
private $initialRetryTokens;
private $maxCapacity;
private $noRetryIncrement;
private $retryCost;
private $timeoutRetryCost;
public function __construct($config = [])
{
$this->initialRetryTokens = isset($config['initial_retry_tokens'])
? $config['initial_retry_tokens']
: 500;
$this->noRetryIncrement = isset($config['no_retry_increment'])
? $config['no_retry_increment']
: 1;
$this->retryCost = isset($config['retry_cost'])
? $config['retry_cost']
: 5;
$this->timeoutRetryCost = isset($config['timeout_retry_cost'])
? $config['timeout_retry_cost']
: 10;
$this->maxCapacity = $this->initialRetryTokens;
$this->availableCapacity = $this->initialRetryTokens;
}
public function hasRetryQuota($result)
{
if ($result instanceof AwsException && $result->isConnectionError()) {
$this->capacityAmount = $this->timeoutRetryCost;
} else {
$this->capacityAmount = $this->retryCost;
}
if ($this->capacityAmount > $this->availableCapacity) {
return false;
}
$this->availableCapacity -= $this->capacityAmount;
return true;
}
public function releaseToQuota($result)
{
if ($result instanceof AwsException) {
$statusCode = (int) $result->getStatusCode();
} elseif ($result instanceof ResultInterface) {
$statusCode = isset($result['@metadata']['statusCode'])
? (int) $result['@metadata']['statusCode']
: null;
}
if (!empty($statusCode) && $statusCode >= 200 && $statusCode < 300) {
if (isset($this->capacityAmount)) {
$amount = $this->capacityAmount;
$this->availableCapacity += $amount;
unset($this->capacityAmount);
} else {
$amount = $this->noRetryIncrement;
$this->availableCapacity += $amount;
}
$this->availableCapacity = min(
$this->availableCapacity,
$this->maxCapacity
);
}
return (isset($amount) ? $amount : 0);
}
public function getAvailableCapacity()
{
return $this->availableCapacity;
}
}

View file

@ -0,0 +1,182 @@
<?php
namespace Aws\Retry;
/**
* @internal
*/
class RateLimiter
{
// User-configurable constants
private $beta;
private $minCapacity;
private $minFillRate;
private $scaleConstant;
private $smooth;
// Optional callable time provider
private $timeProvider;
// Pre-set state variables
private $currentCapacity = 0;
private $enabled = false;
private $lastMaxRate = 0;
private $measuredTxRate = 0;
private $requestCount = 0;
// Other state variables
private $fillRate;
private $lastThrottleTime;
private $lastTimestamp;
private $lastTxRateBucket;
private $maxCapacity;
private $timeWindow;
public function __construct($options = [])
{
$this->beta = isset($options['beta'])
? $options['beta']
: 0.7;
$this->minCapacity = isset($options['min_capacity'])
? $options['min_capacity']
: 1;
$this->minFillRate = isset($options['min_fill_rate'])
? $options['min_fill_rate']
: 0.5;
$this->scaleConstant = isset($options['scale_constant'])
? $options['scale_constant']
: 0.4;
$this->smooth = isset($options['smooth'])
? $options['smooth']
: 0.8;
$this->timeProvider = isset($options['time_provider'])
? $options['time_provider']
: null;
$this->lastTxRateBucket = floor($this->time());
$this->lastThrottleTime = $this->time();
}
public function isEnabled()
{
return $this->enabled;
}
public function getSendToken()
{
$this->acquireToken(1);
}
public function updateSendingRate($isThrottled)
{
$this->updateMeasuredRate();
if ($isThrottled) {
if (!$this->isEnabled()) {
$rateToUse = $this->measuredTxRate;
} else {
$rateToUse = min($this->measuredTxRate, $this->fillRate);
}
$this->lastMaxRate = $rateToUse;
$this->calculateTimeWindow();
$this->lastThrottleTime = $this->time();
$calculatedRate = $this->cubicThrottle($rateToUse);
$this->enableTokenBucket();
} else {
$this->calculateTimeWindow();
$calculatedRate = $this->cubicSuccess($this->time());
}
$newRate = min($calculatedRate, 2 * $this->measuredTxRate);
$this->updateTokenBucketRate($newRate);
return $newRate;
}
private function acquireToken($amount)
{
if (!$this->enabled) {
return true;
}
$this->refillTokenBucket();
if ($amount > $this->currentCapacity) {
usleep(1000000 * ($amount - $this->currentCapacity) / $this->fillRate);
}
$this->currentCapacity -= $amount;
return true;
}
private function calculateTimeWindow()
{
$this->timeWindow = pow(($this->lastMaxRate * (1 - $this->beta) / $this->scaleConstant), 0.333);
}
private function cubicSuccess($timestamp)
{
$dt = $timestamp - $this->lastThrottleTime;
return $this->scaleConstant * pow($dt - $this->timeWindow, 3) + $this->lastMaxRate;
}
private function cubicThrottle($rateToUse)
{
return $rateToUse * $this->beta;
}
private function enableTokenBucket()
{
$this->enabled = true;
}
private function refillTokenBucket()
{
$timestamp = $this->time();
if (!isset($this->lastTimestamp)) {
$this->lastTimestamp = $timestamp;
return;
}
$fillAmount = ($timestamp - $this->lastTimestamp) * $this->fillRate;
$this->currentCapacity = $this->currentCapacity + $fillAmount;
if (!is_null($this->maxCapacity)) {
$this->currentCapacity = min(
$this->maxCapacity,
$this->currentCapacity
);
}
$this->lastTimestamp = $timestamp;
}
private function time()
{
if (is_callable($this->timeProvider)) {
$provider = $this->timeProvider;
$time = $provider();
return $time;
}
return microtime(true);
}
private function updateMeasuredRate()
{
$timestamp = $this->time();
$timeBucket = floor(round($timestamp, 3) * 2) / 2;
$this->requestCount++;
if ($timeBucket > $this->lastTxRateBucket) {
$currentRate = $this->requestCount / ($timeBucket - $this->lastTxRateBucket);
$this->measuredTxRate = ($currentRate * $this->smooth)
+ ($this->measuredTxRate * (1 - $this->smooth));
$this->requestCount = 0;
$this->lastTxRateBucket = $timeBucket;
}
}
private function updateTokenBucketRate($newRps)
{
$this->refillTokenBucket();
$this->fillRate = max($newRps, $this->minFillRate);
$this->maxCapacity = max($newRps, $this->minCapacity);
$this->currentCapacity = min($this->currentCapacity, $this->maxCapacity);
}
}

View file

@ -0,0 +1,56 @@
<?php
namespace Aws\Retry;
use Aws\Exception\AwsException;
use Aws\ResultInterface;
trait RetryHelperTrait
{
private function addRetryHeader($request, $retries, $delayBy)
{
return $request->withHeader('aws-sdk-retry', "{$retries}/{$delayBy}");
}
private function updateStats($retries, $delay, array &$stats)
{
if (!isset($stats['total_retry_delay'])) {
$stats['total_retry_delay'] = 0;
}
$stats['total_retry_delay'] += $delay;
$stats['retries_attempted'] = $retries;
}
private function updateHttpStats($value, array &$stats)
{
if (empty($stats['http'])) {
$stats['http'] = [];
}
if ($value instanceof AwsException) {
$resultStats = $value->getTransferInfo();
$stats['http'] []= $resultStats;
} elseif ($value instanceof ResultInterface) {
$resultStats = isset($value['@metadata']['transferStats']['http'][0])
? $value['@metadata']['transferStats']['http'][0]
: [];
$stats['http'] []= $resultStats;
}
}
private function bindStatsToReturn($return, array $stats)
{
if ($return instanceof ResultInterface) {
if (!isset($return['@metadata'])) {
$return['@metadata'] = [];
}
$return['@metadata']['transferStats'] = $stats;
} elseif ($return instanceof AwsException) {
$return->setTransferInfo($stats);
}
return $return;
}
}