Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming requests #1143

Merged
merged 11 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions lib/ApiOperations/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ protected function _request($method, $url, $params = [], $options = null)
return [$resp->json, $options];
}

/**
* @param string $method HTTP method ('get', 'post', etc.)
* @param string $url URL for the request
* @param callable $readBodyChunk function that will receive chunks of data from a successful request body
* @param array $params list of parameters for the request
* @param null|array|string $options
*
* @throws \Stripe\Exception\ApiErrorException if the request fails
*
* @return array tuple containing (the JSON response, $options)
*/
protected function _requestStream($method, $url, $readBodyChunk, $params = [], $options = null)
{
$opts = $this->_opts->merge($options);
static::_staticStreamingRequest($method, $url, $readBodyChunk, $params, $opts);
}

/**
* @param string $method HTTP method ('get', 'post', etc.)
* @param string $url URL for the request
Expand All @@ -65,4 +82,21 @@ protected static function _staticRequest($method, $url, $params, $options)

return [$response, $opts];
}

/**
* @param string $method HTTP method ('get', 'post', etc.)
* @param string $url URL for the request
* @param callable $readBodyChunk function that will receive chunks of data from a successful request body
* @param array $params list of parameters for the request
* @param null|array|string $options
*
* @throws \Stripe\Exception\ApiErrorException if the request fails
*/
protected static function _staticStreamingRequest($method, $url, $readBodyChunk, $params, $options)
{
$opts = \Stripe\Util\RequestOptions::parse($options);
$baseUrl = isset($opts->apiBase) ? $opts->apiBase : static::baseUrl();
$requestor = new \Stripe\ApiRequestor($opts->apiKey, $baseUrl);
$requestor->requestStream($method, $url, $readBodyChunk, $params, $opts->headers);
}
}
92 changes: 80 additions & 12 deletions lib/ApiRequestor.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@ public function request($method, $url, $params = null, $headers = null)
return [$resp, $myApiKey];
}

/**
* @param string $method
* @param string $url
* @param callable $readBodyChunk
* @param null|array $params
* @param null|array $headers
*
* @throws Exception\ApiErrorException
*
* @return array tuple containing (ApiReponse, API key)
*/
public function requestStream($method, $url, $readBodyChunk, $params = null, $headers = null)
{
$params = $params ?: [];
$headers = $headers ?: [];
list($rbody, $rcode, $rheaders, $myApiKey) =
$this->_requestRawStreaming($method, $url, $params, $headers, $readBodyChunk);
if ($rcode >= 300) {
$this->_interpretResponse($rbody, $rcode, $rheaders);
}
}

/**
* @param string $rbody a JSON string
* @param int $rcode
Expand Down Expand Up @@ -328,18 +350,7 @@ private static function _defaultHeaders($apiKey, $clientInfo = null)
];
}

/**
* @param string $method
* @param string $url
* @param array $params
* @param array $headers
*
* @throws Exception\AuthenticationException
* @throws Exception\ApiConnectionException
*
* @return array
*/
private function _requestRaw($method, $url, $params, $headers)
private function _prepareRequest($method, $url, $params, $headers)
{
$myApiKey = $this->_apiKey;
if (!$myApiKey) {
Expand Down Expand Up @@ -416,6 +427,24 @@ function ($key) use ($params) {
$rawHeaders[] = $header . ': ' . $value;
}

return [$absUrl, $rawHeaders, $params, $hasFile, $myApiKey];
}

/**
* @param string $method
* @param string $url
* @param array $params
* @param array $headers
*
* @throws Exception\AuthenticationException
* @throws Exception\ApiConnectionException
*
* @return array
*/
private function _requestRaw($method, $url, $params, $headers)
{
list($absUrl, $rawHeaders, $params, $hasFile, $myApiKey) = $this->_prepareRequest($method, $url, $params, $headers);

$requestStartMs = Util\Util::currentTimeMillis();

list($rbody, $rcode, $rheaders) = $this->httpClient()->request(
Expand All @@ -438,6 +467,45 @@ function ($key) use ($params) {
return [$rbody, $rcode, $rheaders, $myApiKey];
}

/**
* @param string $method
* @param string $url
* @param array $params
* @param array $headers
* @param callable $readBodyChunk
*
* @throws Exception\AuthenticationException
* @throws Exception\ApiConnectionException
*
* @return array
*/
private function _requestRawStreaming($method, $url, $params, $headers, $readBodyChunk)
{
list($absUrl, $rawHeaders, $params, $hasFile, $myApiKey) = $this->_prepareRequest($method, $url, $params, $headers);

$requestStartMs = Util\Util::currentTimeMillis();

list($rbody, $rcode, $rheaders) = $this->httpClient()->requestStream(
$method,
$absUrl,
$rawHeaders,
$params,
$hasFile,
$readBodyChunk
);

if (isset($rheaders['request-id'])
&& \is_string($rheaders['request-id'])
&& \strlen($rheaders['request-id']) > 0) {
self::$requestTelemetry = new RequestTelemetry(
$rheaders['request-id'],
Util\Util::currentTimeMillis() - $requestStartMs
);
}

return [$rbody, $rcode, $rheaders, $myApiKey];
}

/**
* @param resource $resource
*
Expand Down
204 changes: 194 additions & 10 deletions lib/HttpClient/CurlClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public function getConnectTimeout()

// END OF USER DEFINED TIMEOUTS

public function request($method, $absUrl, $headers, $params, $hasFile)
private function constructRequest($method, $absUrl, $headers, $params, $hasFile)
{
$method = \strtolower($method);

Expand Down Expand Up @@ -275,16 +275,207 @@ public function request($method, $absUrl, $headers, $params, $hasFile)
// potential issues (cf. https://github.com/stripe/stripe-php/issues/1045).
$opts[\CURLOPT_IPRESOLVE] = \CURL_IPRESOLVE_V4;

return [$opts, $absUrl];
}

public function request($method, $absUrl, $headers, $params, $hasFile)
{
list($opts, $absUrl) = $this->constructRequest($method, $absUrl, $headers, $params, $hasFile);

list($rbody, $rcode, $rheaders) = $this->executeRequestWithRetries($opts, $absUrl);

return [$rbody, $rcode, $rheaders];
}

public function requestStream($method, $absUrl, $headers, $params, $hasFile, $readBodyChunk)
{
list($opts, $absUrl) = $this->constructRequest($method, $absUrl, $headers, $params, $hasFile);

$opts[\CURLOPT_RETURNTRANSFER] = false;
list($rbody, $rcode, $rheaders) = $this->executeStreamingRequestWithRetries($opts, $absUrl, $readBodyChunk);

return [$rbody, $rcode, $rheaders];
}

/**
* Curl permits sending \CURLOPT_HEADERFUNCTION, which is called with lines
* from the header and \CURLOPT_WRITEFUNCTION, which is called with bytes
* from the body. You usually want to handle the body differently depending
* on what was in the header.
*
* This function makes it easier to specify different callbacks depending
* on the contents of the heeder. After the header has been completely read
* and the body begins to stream, it will call $determineWriteCallback with
* the array of headers. $determineWriteCallback should, based on the
* headers it receives, return a "writeCallback" that describes what to do
* with the incoming HTTP response body.
*
* @param array $opts
* @param callable $determineWriteCallback
*
* @return array
*/
private function useHeadersToDetermineWriteCallback($opts, $determineWriteCallback)
{
$rheaders = new Util\CaseInsensitiveArray();
$headerCallback = function ($curl, $header_line) use (&$rheaders) {
return self::parseLineIntoHeaderArray($header_line, $rheaders);
};

$writeCallback = null;
$writeCallbackWrapper = function ($curl, $data) use (&$writeCallback, &$rheaders, &$determineWriteCallback) {
if (null === $writeCallback) {
$writeCallback = \call_user_func_array($determineWriteCallback, [$rheaders]);
}

return \call_user_func_array($writeCallback, [$curl, $data]);
};

return [$headerCallback, $writeCallbackWrapper];
}

private static function parseLineIntoHeaderArray($line, &$headers)
{
if (false === \strpos($line, ':')) {
return \strlen($line);
}
list($key, $value) = \explode(':', \trim($line), 2);
$headers[\trim($key)] = \trim($value);

return \strlen($line);
}

/**
* Like `executeRequestWithRetries` except:
* 1. Does not buffer the body of a successful (status code < 300)
* response into memory -- instead, calls the caller-provided
* $readBodyChunk with each chunk of incoming data.
* 2. Does not retry if a network error occurs while streaming the
* body of a successful response.
*
* @param array $opts cURL options
* @param string $absUrl
* @param callable @readBodyChunk
* @param mixed $readBodyChunk
*
* @return array
*/
public function executeStreamingRequestWithRetries($opts, $absUrl, $readBodyChunk)
{
/** @var bool */
$shouldRetry = false;
/** @var int */
$numRetries = 0;

// Will contain the bytes of the body of the last request
// if it was not successful and should not be retries
/** @var null|string */
$rbody = null;

// Status code of the last request
/** @var null|bool */
$rcode = null;

// Array of headers from the last request
/** @var null|array */
$lastRHeaders = null;

$errno = null;
$message = null;

$determineWriteCallback = function ($rheaders) use (
&$readBodyChunk,
&$shouldRetry,
&$rbody,
&$numRetries,
&$rcode,
&$lastRHeaders,
&$errno
) {
$lastRHeaders = $rheaders;
$errno = \curl_errno($this->curlHandle);

$rcode = \curl_getinfo($this->curlHandle, \CURLINFO_HTTP_CODE);

// Send the bytes from the body of a successful request to the caller-provided $readBodyChunk.
if ($rcode < 300) {
$rbody = null;

return function ($curl, $data) use (&$readBodyChunk) {
// Don't expose the $curl handle to the user, and don't require them to
// return the length of $data.
\call_user_func_array($readBodyChunk, [$data]);

return \strlen($data);
};
}

$shouldRetry = $this->shouldRetry($errno, $rcode, $rheaders, $numRetries);
richardm-stripe marked this conversation as resolved.
Show resolved Hide resolved

// Discard the body from an unsuccessful request that should be retried.
if ($shouldRetry) {
return function ($curl, $data) {
return \strlen($data);
};
} else {
// Otherwise, buffer the body into $rbody. It will need to be parsed to determine
// which exception to throw to the user.
$rbody = '';

return function ($curl, $data) use (&$rbody) {
$rbody .= $data;

return \strlen($data);
};
}
};

while (true) {
list($headerCallback, $writeCallback) = $this->useHeadersToDetermineWriteCallback($opts, $determineWriteCallback);
$opts[\CURLOPT_HEADERFUNCTION] = $headerCallback;
$opts[\CURLOPT_WRITEFUNCTION] = $writeCallback;

$shouldRetry = false;
$rbody = null;
$this->resetCurlHandle();
\curl_setopt_array($this->curlHandle, $opts);
$result = \curl_exec($this->curlHandle);
$errno = \curl_errno($this->curlHandle);
if (0 !== $errno) {
$message = \curl_error($this->curlHandle);
}
if (!$this->getEnablePersistentConnections()) {
$this->closeCurlHandle();
}

if (\is_callable($this->getRequestStatusCallback())) {
\call_user_func_array(
$this->getRequestStatusCallback(),
[$rbody, $rcode, $lastRHeaders, $errno, $message, $shouldRetry, $numRetries]
);
}

if ($shouldRetry) {
++$numRetries;
$sleepSeconds = $this->sleepTime($numRetries, $lastRHeaders);
\usleep((int) ($sleepSeconds * 1000000));
} else {
break;
}
}

if (0 !== $errno) {
$this->handleCurlError($absUrl, $errno, $message, $numRetries);
}

return [$rbody, $rcode, $lastRHeaders];
}

/**
* @param array $opts cURL options
* @param string $absUrl
*/
private function executeRequestWithRetries($opts, $absUrl)
public function executeRequestWithRetries($opts, $absUrl)
{
$numRetries = 0;

Expand All @@ -296,14 +487,7 @@ private function executeRequestWithRetries($opts, $absUrl)
// Create a callback to capture HTTP headers for the response
$rheaders = new Util\CaseInsensitiveArray();
$headerCallback = function ($curl, $header_line) use (&$rheaders) {
// Ignore the HTTP request line (HTTP/1.1 200 OK)
if (false === \strpos($header_line, ':')) {
return \strlen($header_line);
}
list($key, $value) = \explode(':', \trim($header_line), 2);
$rheaders[\trim($key)] = \trim($value);

return \strlen($header_line);
return CurlClient::parseLineIntoHeaderArray($header_line, $rheaders);
};
$opts[\CURLOPT_HEADERFUNCTION] = $headerCallback;

Expand Down
Loading