mirror of
https://github.com/SociallyDev/Spaces-API.git
synced 2025-08-20 05:13:42 -07:00
spaces.php
This commit is contained in:
parent
7755490b81
commit
eefa32741e
845 changed files with 50409 additions and 0 deletions
36
aws/Aws/DynamoDb/BinaryValue.php
Normal file
36
aws/Aws/DynamoDb/BinaryValue.php
Normal file
|
@ -0,0 +1,36 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
use GuzzleHttp\Psr7;
|
||||
|
||||
/**
|
||||
* Special object to represent a DynamoDB binary (B) value.
|
||||
*/
|
||||
class BinaryValue implements \JsonSerializable
|
||||
{
|
||||
/** @var string Binary value. */
|
||||
private $value;
|
||||
|
||||
/**
|
||||
* @param mixed $value A binary value compatible with Guzzle streams.
|
||||
*
|
||||
* @see GuzzleHttp\Stream\Stream::factory
|
||||
*/
|
||||
public function __construct($value)
|
||||
{
|
||||
if (!is_string($value)) {
|
||||
$value = Psr7\stream_for($value);
|
||||
}
|
||||
$this->value = (string) $value;
|
||||
}
|
||||
|
||||
public function jsonSerialize()
|
||||
{
|
||||
return $this->value;
|
||||
}
|
||||
|
||||
public function __toString()
|
||||
{
|
||||
return $this->value;
|
||||
}
|
||||
}
|
130
aws/Aws/DynamoDb/DynamoDbClient.php
Normal file
130
aws/Aws/DynamoDb/DynamoDbClient.php
Normal file
|
@ -0,0 +1,130 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
use Aws\Api\Parser\Crc32ValidatingParser;
|
||||
use Aws\AwsClient;
|
||||
use Aws\ClientResolver;
|
||||
use Aws\HandlerList;
|
||||
use Aws\Middleware;
|
||||
use Aws\RetryMiddleware;
|
||||
|
||||
/**
|
||||
* This client is used to interact with the **Amazon DynamoDB** service.
|
||||
*
|
||||
* @method \Aws\Result batchGetItem(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise batchGetItemAsync(array $args = [])
|
||||
* @method \Aws\Result batchWriteItem(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise batchWriteItemAsync(array $args = [])
|
||||
* @method \Aws\Result createTable(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise createTableAsync(array $args = [])
|
||||
* @method \Aws\Result deleteItem(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise deleteItemAsync(array $args = [])
|
||||
* @method \Aws\Result deleteTable(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise deleteTableAsync(array $args = [])
|
||||
* @method \Aws\Result describeTable(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise describeTableAsync(array $args = [])
|
||||
* @method \Aws\Result getItem(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise getItemAsync(array $args = [])
|
||||
* @method \Aws\Result listTables(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise listTablesAsync(array $args = [])
|
||||
* @method \Aws\Result putItem(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise putItemAsync(array $args = [])
|
||||
* @method \Aws\Result query(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise queryAsync(array $args = [])
|
||||
* @method \Aws\Result scan(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise scanAsync(array $args = [])
|
||||
* @method \Aws\Result updateItem(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise updateItemAsync(array $args = [])
|
||||
* @method \Aws\Result updateTable(array $args = [])
|
||||
* @method \GuzzleHttp\Promise\Promise updateTableAsync(array $args = [])
|
||||
* @method \Aws\Result createBackup(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise createBackupAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result createGlobalTable(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise createGlobalTableAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result deleteBackup(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise deleteBackupAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result describeBackup(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise describeBackupAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result describeContinuousBackups(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise describeContinuousBackupsAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result describeGlobalTable(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise describeGlobalTableAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result describeLimits(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise describeLimitsAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result describeTimeToLive(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise describeTimeToLiveAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result listBackups(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise listBackupsAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result listGlobalTables(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise listGlobalTablesAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result listTagsOfResource(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise listTagsOfResourceAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result restoreTableFromBackup(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise restoreTableFromBackupAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result tagResource(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise tagResourceAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result untagResource(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise untagResourceAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result updateGlobalTable(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise updateGlobalTableAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \Aws\Result updateTimeToLive(array $args = []) (supported in versions 2012-08-10)
|
||||
* @method \GuzzleHttp\Promise\Promise updateTimeToLiveAsync(array $args = []) (supported in versions 2012-08-10)
|
||||
*/
|
||||
class DynamoDbClient extends AwsClient
|
||||
{
|
||||
public static function getArguments()
|
||||
{
|
||||
$args = parent::getArguments();
|
||||
$args['retries']['default'] = 10;
|
||||
$args['retries']['fn'] = [__CLASS__, '_applyRetryConfig'];
|
||||
$args['api_provider']['fn'] = [__CLASS__, '_applyApiProvider'];
|
||||
|
||||
return $args;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method for instantiating and registering the DynamoDB
|
||||
* Session handler with this DynamoDB client object.
|
||||
*
|
||||
* @param array $config Array of options for the session handler factory
|
||||
*
|
||||
* @return SessionHandler
|
||||
*/
|
||||
public function registerSessionHandler(array $config = [])
|
||||
{
|
||||
$handler = SessionHandler::fromClient($this, $config);
|
||||
$handler->register();
|
||||
|
||||
return $handler;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
public static function _applyRetryConfig($value, array &$args, HandlerList $list)
|
||||
{
|
||||
if (!$value) {
|
||||
return;
|
||||
}
|
||||
|
||||
$list->appendSign(
|
||||
Middleware::retry(
|
||||
RetryMiddleware::createDefaultDecider($value),
|
||||
function ($retries) {
|
||||
return $retries
|
||||
? RetryMiddleware::exponentialDelay($retries) / 2
|
||||
: 0;
|
||||
},
|
||||
isset($args['stats']['retries'])
|
||||
? (bool) $args['stats']['retries']
|
||||
: false
|
||||
),
|
||||
'retry'
|
||||
);
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
public static function _applyApiProvider($value, array &$args, HandlerList $list)
|
||||
{
|
||||
ClientResolver::_apply_api_provider($value, $args, $list);
|
||||
$args['parser'] = new Crc32ValidatingParser($args['parser']);
|
||||
}
|
||||
}
|
9
aws/Aws/DynamoDb/Exception/DynamoDbException.php
Normal file
9
aws/Aws/DynamoDb/Exception/DynamoDbException.php
Normal file
|
@ -0,0 +1,9 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb\Exception;
|
||||
|
||||
use Aws\Exception\AwsException;
|
||||
|
||||
/**
|
||||
* Represents an error interacting with the Amazon DynamoDB service.
|
||||
*/
|
||||
class DynamoDbException extends AwsException {}
|
62
aws/Aws/DynamoDb/LockingSessionConnection.php
Normal file
62
aws/Aws/DynamoDb/LockingSessionConnection.php
Normal file
|
@ -0,0 +1,62 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
use Aws\DynamoDb\Exception\DynamoDbException;
|
||||
|
||||
/**
|
||||
* The locking connection adds locking logic to the read operation.
|
||||
*/
|
||||
class LockingSessionConnection extends StandardSessionConnection
|
||||
{
|
||||
public function __construct(DynamoDbClient $client, array $config = [])
|
||||
{
|
||||
parent::__construct($client, $config + [
|
||||
'max_lock_wait_time' => 10,
|
||||
'min_lock_retry_microtime' => 10000,
|
||||
'max_lock_retry_microtime' => 50000,
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
* Retries the request until the lock can be acquired
|
||||
*/
|
||||
public function read($id)
|
||||
{
|
||||
// Create the params for the UpdateItem operation so that a lock can be
|
||||
// set and item returned (via ReturnValues) in a one, atomic operation.
|
||||
$params = [
|
||||
'TableName' => $this->config['table_name'],
|
||||
'Key' => $this->formatKey($id),
|
||||
'Expected' => ['lock' => ['Exists' => false]],
|
||||
'AttributeUpdates' => ['lock' => ['Value' => ['N' => '1']]],
|
||||
'ReturnValues' => 'ALL_NEW',
|
||||
];
|
||||
|
||||
// Acquire the lock and fetch the item data.
|
||||
$timeout = time() + $this->config['max_lock_wait_time'];
|
||||
while (true) {
|
||||
try {
|
||||
$item = [];
|
||||
$result = $this->client->updateItem($params);
|
||||
if (isset($result['Attributes'])) {
|
||||
foreach ($result['Attributes'] as $key => $value) {
|
||||
$item[$key] = current($value);
|
||||
}
|
||||
}
|
||||
return $item;
|
||||
} catch (DynamoDbException $e) {
|
||||
if ($e->getAwsErrorCode() === 'ConditionalCheckFailedException'
|
||||
&& time() < $timeout
|
||||
) {
|
||||
usleep(rand(
|
||||
$this->config['min_lock_retry_microtime'],
|
||||
$this->config['max_lock_retry_microtime']
|
||||
));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
322
aws/Aws/DynamoDb/Marshaler.php
Normal file
322
aws/Aws/DynamoDb/Marshaler.php
Normal file
|
@ -0,0 +1,322 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
use Psr\Http\Message\StreamInterface;
|
||||
|
||||
/**
|
||||
* Marshals and unmarshals JSON documents and PHP arrays into DynamoDB items.
|
||||
*/
|
||||
class Marshaler
|
||||
{
|
||||
/** @var array Default options to merge into provided options. */
|
||||
private static $defaultOptions = [
|
||||
'ignore_invalid' => false,
|
||||
'nullify_invalid' => false,
|
||||
'wrap_numbers' => false,
|
||||
];
|
||||
|
||||
/** @var array Marshaler options. */
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* Instantiates a DynamoDB Marshaler.
|
||||
*
|
||||
* The following options are valid.
|
||||
*
|
||||
* - ignore_invalid: (bool) Set to `true` if invalid values should be
|
||||
* ignored (i.e., not included) during marshaling.
|
||||
* - nullify_invalid: (bool) Set to `true` if invalid values should be set
|
||||
* to null.
|
||||
* - wrap_numbers: (bool) Set to `true` to wrap numbers with `NumberValue`
|
||||
* objects during unmarshaling to preserve the precision.
|
||||
*
|
||||
* @param array $options Marshaler options
|
||||
*/
|
||||
public function __construct(array $options = [])
|
||||
{
|
||||
$this->options = $options + self::$defaultOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a special object to represent a DynamoDB binary (B) value.
|
||||
*
|
||||
* This helps disambiguate binary values from string (S) values.
|
||||
*
|
||||
* @param mixed $value A binary value compatible with Guzzle streams.
|
||||
*
|
||||
* @return BinaryValue
|
||||
* @see GuzzleHttp\Stream\Stream::factory
|
||||
*/
|
||||
public function binary($value)
|
||||
{
|
||||
return new BinaryValue($value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a special object to represent a DynamoDB number (N) value.
|
||||
*
|
||||
* This helps maintain the precision of large integer/float in PHP.
|
||||
*
|
||||
* @param string|int|float $value A number value.
|
||||
*
|
||||
* @return NumberValue
|
||||
*/
|
||||
public function number($value)
|
||||
{
|
||||
return new NumberValue($value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a special object to represent a DynamoDB set (SS/NS/BS) value.
|
||||
*
|
||||
* This helps disambiguate set values from list (L) values.
|
||||
*
|
||||
* @param array $values The values of the set.
|
||||
*
|
||||
* @return SetValue
|
||||
*
|
||||
*/
|
||||
public function set(array $values)
|
||||
{
|
||||
return new SetValue($values);
|
||||
}
|
||||
|
||||
/**
|
||||
* Marshal a JSON document from a string to a DynamoDB item.
|
||||
*
|
||||
* The result is an array formatted in the proper parameter structure
|
||||
* required by the DynamoDB API for items.
|
||||
*
|
||||
* @param string $json A valid JSON document.
|
||||
*
|
||||
* @return array Item formatted for DynamoDB.
|
||||
* @throws \InvalidArgumentException if the JSON is invalid.
|
||||
*/
|
||||
public function marshalJson($json)
|
||||
{
|
||||
$data = json_decode($json);
|
||||
if (!($data instanceof \stdClass)) {
|
||||
throw new \InvalidArgumentException(
|
||||
'The JSON document must be valid and be an object at its root.'
|
||||
);
|
||||
}
|
||||
|
||||
return current($this->marshalValue($data));
|
||||
}
|
||||
|
||||
/**
|
||||
* Marshal a native PHP array of data to a DynamoDB item.
|
||||
*
|
||||
* The result is an array formatted in the proper parameter structure
|
||||
* required by the DynamoDB API for items.
|
||||
*
|
||||
* @param array|\stdClass $item An associative array of data.
|
||||
*
|
||||
* @return array Item formatted for DynamoDB.
|
||||
*/
|
||||
public function marshalItem($item)
|
||||
{
|
||||
return current($this->marshalValue($item));
|
||||
}
|
||||
|
||||
/**
|
||||
* Marshal a native PHP value into a DynamoDB attribute value.
|
||||
*
|
||||
* The result is an associative array that is formatted in the proper
|
||||
* `[TYPE => VALUE]` parameter structure required by the DynamoDB API.
|
||||
*
|
||||
* @param mixed $value A scalar, array, or `stdClass` value.
|
||||
*
|
||||
* @return array Attribute formatted for DynamoDB.
|
||||
* @throws \UnexpectedValueException if the value cannot be marshaled.
|
||||
*/
|
||||
public function marshalValue($value)
|
||||
{
|
||||
$type = gettype($value);
|
||||
|
||||
// Handle string values.
|
||||
if ($type === 'string') {
|
||||
if ($value === '') {
|
||||
return $this->handleInvalid('empty strings are invalid');
|
||||
}
|
||||
|
||||
return ['S' => $value];
|
||||
}
|
||||
|
||||
// Handle number values.
|
||||
if ($type === 'integer'
|
||||
|| $type === 'double'
|
||||
|| $value instanceof NumberValue
|
||||
) {
|
||||
return ['N' => (string) $value];
|
||||
}
|
||||
|
||||
// Handle boolean values.
|
||||
if ($type === 'boolean') {
|
||||
return ['BOOL' => $value];
|
||||
}
|
||||
|
||||
// Handle null values.
|
||||
if ($type === 'NULL') {
|
||||
return ['NULL' => true];
|
||||
}
|
||||
|
||||
// Handle set values.
|
||||
if ($value instanceof SetValue) {
|
||||
if (count($value) === 0) {
|
||||
return $this->handleInvalid('empty sets are invalid');
|
||||
}
|
||||
$previousType = null;
|
||||
$data = [];
|
||||
foreach ($value as $v) {
|
||||
$marshaled = $this->marshalValue($v);
|
||||
$setType = key($marshaled);
|
||||
if (!$previousType) {
|
||||
$previousType = $setType;
|
||||
} elseif ($setType !== $previousType) {
|
||||
return $this->handleInvalid('sets must be uniform in type');
|
||||
}
|
||||
$data[] = current($marshaled);
|
||||
}
|
||||
|
||||
return [$previousType . 'S' => array_values(array_unique($data))];
|
||||
}
|
||||
|
||||
// Handle list and map values.
|
||||
$dbType = 'L';
|
||||
if ($value instanceof \stdClass) {
|
||||
$type = 'array';
|
||||
$dbType = 'M';
|
||||
}
|
||||
if ($type === 'array' || $value instanceof \Traversable) {
|
||||
$data = [];
|
||||
$index = 0;
|
||||
foreach ($value as $k => $v) {
|
||||
if ($v = $this->marshalValue($v)) {
|
||||
$data[$k] = $v;
|
||||
if ($dbType === 'L' && (!is_int($k) || $k != $index++)) {
|
||||
$dbType = 'M';
|
||||
}
|
||||
}
|
||||
}
|
||||
return [$dbType => $data];
|
||||
}
|
||||
|
||||
// Handle binary values.
|
||||
if (is_resource($value) || $value instanceof StreamInterface) {
|
||||
$value = $this->binary($value);
|
||||
}
|
||||
if ($value instanceof BinaryValue) {
|
||||
return ['B' => (string) $value];
|
||||
}
|
||||
|
||||
// Handle invalid values.
|
||||
return $this->handleInvalid('encountered unexpected value');
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmarshal a document (item) from a DynamoDB operation result into a JSON
|
||||
* document string.
|
||||
*
|
||||
* @param array $data Item/document from a DynamoDB result.
|
||||
* @param int $jsonEncodeFlags Flags to use with `json_encode()`.
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function unmarshalJson(array $data, $jsonEncodeFlags = 0)
|
||||
{
|
||||
return json_encode(
|
||||
$this->unmarshalValue(['M' => $data], true),
|
||||
$jsonEncodeFlags
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmarshal an item from a DynamoDB operation result into a native PHP
|
||||
* array. If you set $mapAsObject to true, then a stdClass value will be
|
||||
* returned instead.
|
||||
*
|
||||
* @param array $data Item from a DynamoDB result.
|
||||
* @param bool $mapAsObject Whether maps should be represented as stdClass.
|
||||
*
|
||||
* @return array|\stdClass
|
||||
*/
|
||||
public function unmarshalItem(array $data, $mapAsObject = false)
|
||||
{
|
||||
return $this->unmarshalValue(['M' => $data], $mapAsObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmarshal a value from a DynamoDB operation result into a native PHP
|
||||
* value. Will return a scalar, array, or (if you set $mapAsObject to true)
|
||||
* stdClass value.
|
||||
*
|
||||
* @param array $value Value from a DynamoDB result.
|
||||
* @param bool $mapAsObject Whether maps should be represented as stdClass.
|
||||
*
|
||||
* @return mixed
|
||||
* @throws \UnexpectedValueException
|
||||
*/
|
||||
public function unmarshalValue(array $value, $mapAsObject = false)
|
||||
{
|
||||
$type = key($value);
|
||||
$value = $value[$type];
|
||||
switch ($type) {
|
||||
case 'S':
|
||||
case 'BOOL':
|
||||
return $value;
|
||||
case 'NULL':
|
||||
return null;
|
||||
case 'N':
|
||||
if ($this->options['wrap_numbers']) {
|
||||
return new NumberValue($value);
|
||||
} else {
|
||||
// Use type coercion to unmarshal numbers to int/float.
|
||||
return $value + 0;
|
||||
}
|
||||
case 'M':
|
||||
if ($mapAsObject) {
|
||||
$data = new \stdClass;
|
||||
foreach ($value as $k => $v) {
|
||||
$data->$k = $this->unmarshalValue($v, $mapAsObject);
|
||||
}
|
||||
return $data;
|
||||
}
|
||||
// NOBREAK: Unmarshal M the same way as L, for arrays.
|
||||
case 'L':
|
||||
foreach ($value as $k => $v) {
|
||||
$value[$k] = $this->unmarshalValue($v, $mapAsObject);
|
||||
}
|
||||
return $value;
|
||||
case 'B':
|
||||
return new BinaryValue($value);
|
||||
case 'SS':
|
||||
case 'NS':
|
||||
case 'BS':
|
||||
foreach ($value as $k => $v) {
|
||||
$value[$k] = $this->unmarshalValue([$type[0] => $v]);
|
||||
}
|
||||
return new SetValue($value);
|
||||
}
|
||||
|
||||
throw new \UnexpectedValueException("Unexpected type: {$type}.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle invalid value based on marshaler configuration.
|
||||
*
|
||||
* @param string $message Error message
|
||||
*
|
||||
* @return array|null
|
||||
*/
|
||||
private function handleInvalid($message)
|
||||
{
|
||||
if ($this->options['ignore_invalid']) {
|
||||
return null;
|
||||
} elseif ($this->options['nullify_invalid']) {
|
||||
return ['NULL' => true];
|
||||
}
|
||||
|
||||
throw new \UnexpectedValueException("Marshaling error: {$message}.");
|
||||
}
|
||||
}
|
29
aws/Aws/DynamoDb/NumberValue.php
Normal file
29
aws/Aws/DynamoDb/NumberValue.php
Normal file
|
@ -0,0 +1,29 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
/**
|
||||
* Special object to represent a DynamoDB Number (N) value.
|
||||
*/
|
||||
class NumberValue implements \JsonSerializable
|
||||
{
|
||||
/** @var string Number value. */
|
||||
private $value;
|
||||
|
||||
/**
|
||||
* @param string|int|float $value A number value.
|
||||
*/
|
||||
public function __construct($value)
|
||||
{
|
||||
$this->value = (string) $value;
|
||||
}
|
||||
|
||||
public function jsonSerialize()
|
||||
{
|
||||
return $this->value;
|
||||
}
|
||||
|
||||
public function __toString()
|
||||
{
|
||||
return $this->value;
|
||||
}
|
||||
}
|
45
aws/Aws/DynamoDb/SessionConnectionInterface.php
Normal file
45
aws/Aws/DynamoDb/SessionConnectionInterface.php
Normal file
|
@ -0,0 +1,45 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
/**
|
||||
* The session connection provides the underlying logic for interacting with
|
||||
* Amazon DynamoDB and performs all of the reading and writing operations.
|
||||
*/
|
||||
interface SessionConnectionInterface
|
||||
{
|
||||
/**
|
||||
* Reads session data from DynamoDB
|
||||
*
|
||||
* @param string $id Session ID
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function read($id);
|
||||
|
||||
/**
|
||||
* Writes session data to DynamoDB
|
||||
*
|
||||
* @param string $id Session ID
|
||||
* @param string $data Serialized session data
|
||||
* @param bool $isChanged Whether or not the data has changed
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function write($id, $data, $isChanged);
|
||||
|
||||
/**
|
||||
* Deletes session record from DynamoDB
|
||||
*
|
||||
* @param string $id Session ID
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function delete($id);
|
||||
|
||||
/**
|
||||
* Performs garbage collection on the sessions stored in the DynamoDB
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function deleteExpired();
|
||||
}
|
227
aws/Aws/DynamoDb/SessionHandler.php
Normal file
227
aws/Aws/DynamoDb/SessionHandler.php
Normal file
|
@ -0,0 +1,227 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
/**
|
||||
* Provides an interface for using Amazon DynamoDB as a session store by hooking
|
||||
* into PHP's session handler hooks. Once registered, You may use the native
|
||||
* `$_SESSION` superglobal and session functions, and the sessions will be
|
||||
* stored automatically in DynamoDB. DynamoDB is a great session storage
|
||||
* solution due to its speed, scalability, and fault tolerance.
|
||||
*
|
||||
* For maximum performance, we recommend that you keep the size of your sessions
|
||||
* small. Locking is disabled by default, since it can drive up latencies and
|
||||
* costs under high traffic. Only turn it on if you need it.
|
||||
*
|
||||
* By far, the most expensive operation is garbage collection. Therefore, we
|
||||
* encourage you to carefully consider your session garbage collection strategy.
|
||||
* Note: the DynamoDB Session Handler does not allow garbage collection to be
|
||||
* triggered randomly. You must run garbage collection manually or through other
|
||||
* automated means using a cron job or similar scheduling technique.
|
||||
*/
|
||||
class SessionHandler implements \SessionHandlerInterface
|
||||
{
|
||||
/** @var SessionConnectionInterface Session save logic.*/
|
||||
private $connection;
|
||||
|
||||
/** @var string Session save path. */
|
||||
private $savePath;
|
||||
|
||||
/** @var string Session name. */
|
||||
private $sessionName;
|
||||
|
||||
/** @var string The last known session ID */
|
||||
private $openSessionId = '';
|
||||
|
||||
/** @var string Stores serialized data for tracking changes. */
|
||||
private $dataRead = '';
|
||||
|
||||
/** @var bool Keeps track of whether the session has been written. */
|
||||
private $sessionWritten = false;
|
||||
|
||||
/**
|
||||
* Creates a new DynamoDB Session Handler.
|
||||
*
|
||||
* The configuration array accepts the following array keys and values:
|
||||
* - table_name: Name of table to store the sessions.
|
||||
* - hash_key: Name of hash key in table. Default: "id".
|
||||
* - session_lifetime: Lifetime of inactive sessions expiration.
|
||||
* - consistent_read: Whether or not to use consistent reads.
|
||||
* - batch_config: Batch options used for garbage collection.
|
||||
* - locking: Whether or not to use session locking.
|
||||
* - max_lock_wait_time: Max time (s) to wait for lock acquisition.
|
||||
* - min_lock_retry_microtime: Min time (µs) to wait between lock attempts.
|
||||
* - max_lock_retry_microtime: Max time (µs) to wait between lock attempts.
|
||||
*
|
||||
* @param DynamoDbClient $client Client for doing DynamoDB operations
|
||||
* @param array $config Configuration for the Session Handler
|
||||
*
|
||||
* @return SessionHandler
|
||||
*/
|
||||
public static function fromClient(DynamoDbClient $client, array $config = [])
|
||||
{
|
||||
$config += ['locking' => false];
|
||||
if ($config['locking']) {
|
||||
$connection = new LockingSessionConnection($client, $config);
|
||||
} else {
|
||||
$connection = new StandardSessionConnection($client, $config);
|
||||
}
|
||||
|
||||
return new static($connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param SessionConnectionInterface $connection
|
||||
*/
|
||||
public function __construct(SessionConnectionInterface $connection)
|
||||
{
|
||||
$this->connection = $connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the DynamoDB session handler.
|
||||
*
|
||||
* @return bool Whether or not the handler was registered.
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public function register()
|
||||
{
|
||||
return session_set_save_handler($this, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a session for writing. Triggered by session_start().
|
||||
*
|
||||
* @param string $savePath Session save path.
|
||||
* @param string $sessionName Session name.
|
||||
*
|
||||
* @return bool Whether or not the operation succeeded.
|
||||
*/
|
||||
public function open($savePath, $sessionName)
|
||||
{
|
||||
$this->savePath = $savePath;
|
||||
$this->sessionName = $sessionName;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a session from writing.
|
||||
*
|
||||
* @return bool Success
|
||||
*/
|
||||
public function close()
|
||||
{
|
||||
$id = session_id();
|
||||
// Make sure the session is unlocked and the expiration time is updated,
|
||||
// even if the write did not occur
|
||||
if ($this->openSessionId !== $id || !$this->sessionWritten) {
|
||||
$result = $this->connection->write($this->formatId($id), '', false);
|
||||
$this->sessionWritten = (bool) $result;
|
||||
}
|
||||
|
||||
return $this->sessionWritten;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a session stored in DynamoDB.
|
||||
*
|
||||
* @param string $id Session ID.
|
||||
*
|
||||
* @return string Session data.
|
||||
*/
|
||||
public function read($id)
|
||||
{
|
||||
$this->openSessionId = $id;
|
||||
// PHP expects an empty string to be returned from this method if no
|
||||
// data is retrieved
|
||||
$this->dataRead = '';
|
||||
|
||||
// Get session data using the selected locking strategy
|
||||
$item = $this->connection->read($this->formatId($id));
|
||||
|
||||
// Return the data if it is not expired. If it is expired, remove it
|
||||
if (isset($item['expires']) && isset($item['data'])) {
|
||||
$this->dataRead = $item['data'];
|
||||
if ($item['expires'] <= time()) {
|
||||
$this->dataRead = '';
|
||||
$this->destroy($id);
|
||||
}
|
||||
}
|
||||
|
||||
return $this->dataRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a session to DynamoDB.
|
||||
*
|
||||
* @param string $id Session ID.
|
||||
* @param string $data Serialized session data to write.
|
||||
*
|
||||
* @return bool Whether or not the operation succeeded.
|
||||
*/
|
||||
public function write($id, $data)
|
||||
{
|
||||
$changed = $id !== $this->openSessionId
|
||||
|| $data !== $this->dataRead;
|
||||
$this->openSessionId = $id;
|
||||
|
||||
// Write the session data using the selected locking strategy
|
||||
$this->sessionWritten = $this->connection
|
||||
->write($this->formatId($id), $data, $changed);
|
||||
|
||||
return $this->sessionWritten;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a session stored in DynamoDB.
|
||||
*
|
||||
* @param string $id Session ID.
|
||||
*
|
||||
* @return bool Whether or not the operation succeeded.
|
||||
*/
|
||||
public function destroy($id)
|
||||
{
|
||||
$this->openSessionId = $id;
|
||||
// Delete the session data using the selected locking strategy
|
||||
$this->sessionWritten
|
||||
= $this->connection->delete($this->formatId($id));
|
||||
|
||||
return $this->sessionWritten;
|
||||
}
|
||||
|
||||
/**
|
||||
* Satisfies the session handler interface, but does nothing. To do garbage
|
||||
* collection, you must manually call the garbageCollect() method.
|
||||
*
|
||||
* @param int $maxLifetime Ignored.
|
||||
*
|
||||
* @return bool Whether or not the operation succeeded.
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public function gc($maxLifetime)
|
||||
{
|
||||
// Garbage collection for a DynamoDB table must be triggered manually.
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers garbage collection on expired sessions.
|
||||
* @codeCoverageIgnore
|
||||
*/
|
||||
public function garbageCollect()
|
||||
{
|
||||
$this->connection->deleteExpired();
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepend the session ID with the session name.
|
||||
*
|
||||
* @param string $id The session ID.
|
||||
*
|
||||
* @return string Prepared session ID.
|
||||
*/
|
||||
private function formatId($id)
|
||||
{
|
||||
return trim($this->sessionName . '_' . $id, '_');
|
||||
}
|
||||
}
|
44
aws/Aws/DynamoDb/SetValue.php
Normal file
44
aws/Aws/DynamoDb/SetValue.php
Normal file
|
@ -0,0 +1,44 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
/**
|
||||
* Special object to represent a DynamoDB set (SS/NS/BS) value.
|
||||
*/
|
||||
class SetValue implements \JsonSerializable, \Countable, \IteratorAggregate
|
||||
{
|
||||
/** @var array Values in the set as provided. */
|
||||
private $values;
|
||||
|
||||
/**
|
||||
* @param array $values Values in the set.
|
||||
*/
|
||||
public function __construct(array $values)
|
||||
{
|
||||
$this->values = $values;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the values formatted for PHP and JSON.
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function toArray()
|
||||
{
|
||||
return $this->values;
|
||||
}
|
||||
|
||||
public function count()
|
||||
{
|
||||
return count($this->values);
|
||||
}
|
||||
|
||||
public function getIterator()
|
||||
{
|
||||
return new \ArrayIterator($this->values);
|
||||
}
|
||||
|
||||
public function jsonSerialize()
|
||||
{
|
||||
return $this->toArray();
|
||||
}
|
||||
}
|
149
aws/Aws/DynamoDb/StandardSessionConnection.php
Normal file
149
aws/Aws/DynamoDb/StandardSessionConnection.php
Normal file
|
@ -0,0 +1,149 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
use Aws\DynamoDb\Exception\DynamoDbException;
|
||||
|
||||
/**
|
||||
* The standard connection performs the read and write operations to DynamoDB.
|
||||
*/
|
||||
class StandardSessionConnection implements SessionConnectionInterface
|
||||
{
|
||||
/** @var DynamoDbClient The DynamoDB client */
|
||||
protected $client;
|
||||
|
||||
/** @var array The session handler config options */
|
||||
protected $config;
|
||||
|
||||
/**
|
||||
* @param DynamoDbClient $client DynamoDB client
|
||||
* @param array $config Session handler config
|
||||
*/
|
||||
public function __construct(DynamoDbClient $client, array $config = [])
|
||||
{
|
||||
$this->client = $client;
|
||||
$this->config = $config + [
|
||||
'table_name' => 'sessions',
|
||||
'hash_key' => 'id',
|
||||
'session_lifetime' => (int) ini_get('session.gc_maxlifetime'),
|
||||
'consistent_read' => true,
|
||||
'batch_config' => [],
|
||||
];
|
||||
}
|
||||
|
||||
public function read($id)
|
||||
{
|
||||
$item = [];
|
||||
try {
|
||||
// Execute a GetItem command to retrieve the item.
|
||||
$result = $this->client->getItem([
|
||||
'TableName' => $this->config['table_name'],
|
||||
'Key' => $this->formatKey($id),
|
||||
'ConsistentRead' => (bool) $this->config['consistent_read'],
|
||||
]);
|
||||
|
||||
// Get the item values
|
||||
$result = isset($result['Item']) ? $result['Item'] : [];
|
||||
foreach ($result as $key => $value) {
|
||||
$item[$key] = current($value);
|
||||
}
|
||||
} catch (DynamoDbException $e) {
|
||||
// Could not retrieve item, so return nothing.
|
||||
}
|
||||
|
||||
return $item;
|
||||
}
|
||||
|
||||
public function write($id, $data, $isChanged)
|
||||
{
|
||||
// Prepare the attributes
|
||||
$expires = time() + $this->config['session_lifetime'];
|
||||
$attributes = [
|
||||
'expires' => ['Value' => ['N' => (string) $expires]],
|
||||
'lock' => ['Action' => 'DELETE'],
|
||||
];
|
||||
if ($isChanged) {
|
||||
if ($data != '') {
|
||||
$attributes['data'] = ['Value' => ['S' => $data]];
|
||||
} else {
|
||||
$attributes['data'] = ['Action' => 'DELETE'];
|
||||
}
|
||||
}
|
||||
|
||||
// Perform the UpdateItem command
|
||||
try {
|
||||
return (bool) $this->client->updateItem([
|
||||
'TableName' => $this->config['table_name'],
|
||||
'Key' => $this->formatKey($id),
|
||||
'AttributeUpdates' => $attributes,
|
||||
]);
|
||||
} catch (DynamoDbException $e) {
|
||||
return $this->triggerError("Error writing session $id: {$e->getMessage()}");
|
||||
}
|
||||
}
|
||||
|
||||
public function delete($id)
|
||||
{
|
||||
try {
|
||||
return (bool) $this->client->deleteItem([
|
||||
'TableName' => $this->config['table_name'],
|
||||
'Key' => $this->formatKey($id),
|
||||
]);
|
||||
} catch (DynamoDbException $e) {
|
||||
return $this->triggerError("Error deleting session $id: {$e->getMessage()}");
|
||||
}
|
||||
}
|
||||
|
||||
public function deleteExpired()
|
||||
{
|
||||
// Create a Scan iterator for finding expired session items
|
||||
$scan = $this->client->getPaginator('Scan', [
|
||||
'TableName' => $this->config['table_name'],
|
||||
'AttributesToGet' => [$this->config['hash_key']],
|
||||
'ScanFilter' => [
|
||||
'expires' => [
|
||||
'ComparisonOperator' => 'LT',
|
||||
'AttributeValueList' => [['N' => (string) time()]],
|
||||
],
|
||||
'lock' => [
|
||||
'ComparisonOperator' => 'NULL',
|
||||
]
|
||||
],
|
||||
]);
|
||||
|
||||
// Create a WriteRequestBatch for deleting the expired items
|
||||
$batch = new WriteRequestBatch($this->client, $this->config['batch_config']);
|
||||
|
||||
// Perform Scan and BatchWriteItem (delete) operations as needed
|
||||
foreach ($scan->search('Items') as $item) {
|
||||
$batch->delete(
|
||||
[$this->config['hash_key'] => $item[$this->config['hash_key']]],
|
||||
$this->config['table_name']
|
||||
);
|
||||
}
|
||||
|
||||
// Delete any remaining items that were not auto-flushed
|
||||
$batch->flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $key
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
protected function formatKey($key)
|
||||
{
|
||||
return [$this->config['hash_key'] => ['S' => $key]];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $error
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
protected function triggerError($error)
|
||||
{
|
||||
trigger_error($error, E_USER_WARNING);
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
267
aws/Aws/DynamoDb/WriteRequestBatch.php
Normal file
267
aws/Aws/DynamoDb/WriteRequestBatch.php
Normal file
|
@ -0,0 +1,267 @@
|
|||
<?php
|
||||
namespace Aws\DynamoDb;
|
||||
|
||||
use Aws\AwsClientInterface;
|
||||
use Aws\CommandInterface;
|
||||
use Aws\CommandPool;
|
||||
use Aws\Exception\AwsException;
|
||||
use Aws\ResultInterface;
|
||||
|
||||
/**
|
||||
* The WriteRequestBatch is an object that is capable of efficiently sending
|
||||
* DynamoDB BatchWriteItem requests from queued up put and delete item requests.
|
||||
* requests. The batch attempts to send the requests with the fewest requests
|
||||
* to DynamoDB as possible and also re-queues any unprocessed items to ensure
|
||||
* that all items are sent.
|
||||
*/
|
||||
class WriteRequestBatch
|
||||
{
|
||||
/** @var DynamoDbClient DynamoDB client used to perform write operations. */
|
||||
private $client;
|
||||
|
||||
/** @var array Configuration options for the batch. */
|
||||
private $config;
|
||||
|
||||
/** @var array Queue of pending put/delete requests in the batch. */
|
||||
private $queue;
|
||||
|
||||
/**
|
||||
* Creates a WriteRequestBatch object that is capable of efficiently sending
|
||||
* DynamoDB BatchWriteItem requests from queued up Put and Delete requests.
|
||||
*
|
||||
* @param DynamoDbClient $client DynamoDB client used to send batches.
|
||||
* @param array $config Batch configuration options.
|
||||
* - table: (string) DynamoDB table used by the batch, this can be
|
||||
* overridden for each individual put() or delete() call.
|
||||
* - batch_size: (int) The size of each batch (default: 25). The batch
|
||||
* size must be between 2 and 25. If you are sending batches of large
|
||||
* items, you may consider lowering the batch size, otherwise, you
|
||||
* should use 25.
|
||||
* - pool_size: (int) This number dictates how many BatchWriteItem
|
||||
* requests you would like to do in parallel. For example, if the
|
||||
* "batch_size" is 25, and "pool_size" is 3, then you would send 3
|
||||
* BatchWriteItem requests at a time, each with 25 items. Please keep
|
||||
* your throughput in mind when choosing the "pool_size" option.
|
||||
* - autoflush: (bool) This option allows the batch to automatically
|
||||
* flush once there are enough items (i.e., "batch_size" * "pool_size")
|
||||
* in the queue. This defaults to true, so you must set this to false
|
||||
* to stop autoflush.
|
||||
* - before: (callable) Executed before every BatchWriteItem operation.
|
||||
* It should accept an \Aws\CommandInterface object as its argument.
|
||||
* - error: Executed if an error was encountered executing a,
|
||||
* BatchWriteItem operation, otherwise errors are ignored. It should
|
||||
* accept an \Aws\Exception\AwsException as its argument.
|
||||
*
|
||||
* @throws \InvalidArgumentException if the batch size is not between 2 and 25.
|
||||
*/
|
||||
public function __construct(DynamoDbClient $client, array $config = [])
|
||||
{
|
||||
// Apply defaults
|
||||
$config += [
|
||||
'table' => null,
|
||||
'batch_size' => 25,
|
||||
'pool_size' => 1,
|
||||
'autoflush' => true,
|
||||
'before' => null,
|
||||
'error' => null
|
||||
];
|
||||
|
||||
// Ensure the batch size is valid
|
||||
if ($config['batch_size'] > 25 || $config['batch_size'] < 2) {
|
||||
throw new \InvalidArgumentException('"batch_size" must be between 2 and 25.');
|
||||
}
|
||||
|
||||
// Ensure the callbacks are valid
|
||||
if ($config['before'] && !is_callable($config['before'])) {
|
||||
throw new \InvalidArgumentException('"before" must be callable.');
|
||||
}
|
||||
if ($config['error'] && !is_callable($config['error'])) {
|
||||
throw new \InvalidArgumentException('"error" must be callable.');
|
||||
}
|
||||
|
||||
// If autoflush is enabled, set the threshold
|
||||
if ($config['autoflush']) {
|
||||
$config['threshold'] = $config['batch_size'] * $config['pool_size'];
|
||||
}
|
||||
|
||||
$this->client = $client;
|
||||
$this->config = $config;
|
||||
$this->queue = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a put item request to the batch.
|
||||
*
|
||||
* @param array $item Data for an item to put. Format:
|
||||
* [
|
||||
* 'attribute1' => ['type' => 'value'],
|
||||
* 'attribute2' => ['type' => 'value'],
|
||||
* ...
|
||||
* ]
|
||||
* @param string|null $table The name of the table. This must be specified
|
||||
* unless the "table" option was provided in the
|
||||
* config of the WriteRequestBatch.
|
||||
*
|
||||
* @return $this
|
||||
*/
|
||||
public function put(array $item, $table = null)
|
||||
{
|
||||
$this->queue[] = [
|
||||
'table' => $this->determineTable($table),
|
||||
'data' => ['PutRequest' => ['Item' => $item]],
|
||||
];
|
||||
|
||||
$this->autoFlush();
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a delete item request to the batch.
|
||||
*
|
||||
* @param array $key Key of an item to delete. Format:
|
||||
* [
|
||||
* 'key1' => ['type' => 'value'],
|
||||
* ...
|
||||
* ]
|
||||
* @param string|null $table The name of the table. This must be specified
|
||||
* unless the "table" option was provided in the
|
||||
* config of the WriteRequestBatch.
|
||||
*
|
||||
* @return $this
|
||||
*/
|
||||
public function delete(array $key, $table = null)
|
||||
{
|
||||
$this->queue[] = [
|
||||
'table' => $this->determineTable($table),
|
||||
'data' => ['DeleteRequest' => ['Key' => $key]],
|
||||
];
|
||||
|
||||
$this->autoFlush();
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes the batch by combining all the queued put and delete requests
|
||||
* into BatchWriteItem commands and executing them. Unprocessed items are
|
||||
* automatically re-queued.
|
||||
*
|
||||
* @param bool $untilEmpty If true, flushing will continue until the queue
|
||||
* is completely empty. This will make sure that
|
||||
* unprocessed items are all eventually sent.
|
||||
*
|
||||
* @return $this
|
||||
*/
|
||||
public function flush($untilEmpty = true)
|
||||
{
|
||||
// Send BatchWriteItem requests until the queue is empty
|
||||
$keepFlushing = true;
|
||||
while ($this->queue && $keepFlushing) {
|
||||
$commands = $this->prepareCommands();
|
||||
$pool = new CommandPool($this->client, $commands, [
|
||||
'before' => $this->config['before'],
|
||||
'concurrency' => $this->config['pool_size'],
|
||||
'fulfilled' => function (ResultInterface $result) {
|
||||
// Re-queue any unprocessed items
|
||||
if ($result->hasKey('UnprocessedItems')) {
|
||||
$this->retryUnprocessed($result['UnprocessedItems']);
|
||||
}
|
||||
},
|
||||
'rejected' => function ($reason) {
|
||||
if ($reason instanceof AwsException) {
|
||||
$code = $reason->getAwsErrorCode();
|
||||
if ($code === 'ProvisionedThroughputExceededException') {
|
||||
$this->retryUnprocessed($reason->getCommand()['RequestItems']);
|
||||
} elseif (is_callable($this->config['error'])) {
|
||||
$this->config['error']($reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
]);
|
||||
$pool->promise()->wait();
|
||||
$keepFlushing = (bool) $untilEmpty;
|
||||
}
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates BatchWriteItem commands from the items in the queue.
|
||||
*
|
||||
* @return CommandInterface[]
|
||||
*/
|
||||
private function prepareCommands()
|
||||
{
|
||||
// Chunk the queue into batches
|
||||
$batches = array_chunk($this->queue, $this->config['batch_size']);
|
||||
$this->queue = [];
|
||||
|
||||
// Create BatchWriteItem commands for each batch
|
||||
$commands = [];
|
||||
foreach ($batches as $batch) {
|
||||
$requests = [];
|
||||
foreach ($batch as $item) {
|
||||
if (!isset($requests[$item['table']])) {
|
||||
$requests[$item['table']] = [];
|
||||
}
|
||||
$requests[$item['table']][] = $item['data'];
|
||||
}
|
||||
$commands[] = $this->client->getCommand(
|
||||
'BatchWriteItem',
|
||||
['RequestItems' => $requests]
|
||||
);
|
||||
}
|
||||
|
||||
return $commands;
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-queues unprocessed results with the correct data.
|
||||
*
|
||||
* @param array $unprocessed Unprocessed items from a result.
|
||||
*/
|
||||
private function retryUnprocessed(array $unprocessed)
|
||||
{
|
||||
foreach ($unprocessed as $table => $requests) {
|
||||
foreach ($requests as $request) {
|
||||
$this->queue[] = [
|
||||
'table' => $table,
|
||||
'data' => $request,
|
||||
];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If autoflush is enabled and the threshold is met, flush the batch
|
||||
*/
|
||||
private function autoFlush()
|
||||
{
|
||||
if ($this->config['autoflush']
|
||||
&& count($this->queue) >= $this->config['threshold']
|
||||
) {
|
||||
// Flush only once. Unprocessed items are handled in a later flush.
|
||||
$this->flush(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the table name by looking at what was provided and what the
|
||||
* WriteRequestBatch was originally configured with.
|
||||
*
|
||||
* @param string|null $table The table name.
|
||||
*
|
||||
* @return string
|
||||
* @throws \RuntimeException if there was no table specified.
|
||||
*/
|
||||
private function determineTable($table)
|
||||
{
|
||||
$table = $table ?: $this->config['table'];
|
||||
if (!$table) {
|
||||
throw new \RuntimeException('There was no table specified.');
|
||||
}
|
||||
|
||||
return $table;
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue