Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 55 additions & 20 deletions src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <algorithm>
#include <cassert>
#include <thread>
#include <ostream>

using namespace Aws::Client;
using namespace Aws::Http;
Expand Down Expand Up @@ -181,6 +182,27 @@ static int64_t GetContentLengthFromHeader(CURL* connectionHandle,
return hasContentLength ? static_cast<int64_t>(contentLength) : -1;
}

// Best-effort output position probe for diagnostics only.
// Returns false if the stream does not support positioning.
static bool TryGetOutputPos(std::ostream& os,
std::ostream::pos_type& outPos) noexcept
{
std::streambuf* sb = os.rdbuf();
if (!sb)
{
return false;
}

const auto pos = sb->pubseekoff(0, std::ios_base::cur, std::ios_base::out);
if (pos == std::ostream::pos_type(std::ostream::off_type(-1)))
{
return false;
}

outPos = pos;
return true;
}

static size_t WriteData(char* ptr, size_t size, size_t nmemb, void* userdata)
{
if (ptr)
Expand Down Expand Up @@ -217,38 +239,51 @@ static size_t WriteData(char* ptr, size_t size, size_t nmemb, void* userdata)
}
}

if (response->GetResponseBody().fail()) {
const auto& ref = response->GetResponseBody();
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Response output stream in bad state (eof: "
<< ref.eof() << ", bad: " << ref.bad() << ")");
return 0;
}
auto& body = response->GetResponseBody();

auto cur = response->GetResponseBody().tellp();
if (response->GetResponseBody().fail()) {
const auto& ref = response->GetResponseBody();
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Unable to query response output position (eof: "
<< ref.eof() << ", bad: " << ref.bad() << ")");
if (body.fail()) {
const auto& ref = body;
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG,
"Response output stream in bad state (eof: "
<< ref.eof() << ", bad: " << ref.bad() << ")");
return 0;
}

response->GetResponseBody().write(ptr, static_cast<std::streamsize>(sizeToWrite));
if (response->GetResponseBody().fail()) {
const auto& ref = response->GetResponseBody();
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to write " << size << " / " << sizeToWrite << " B response"
<< " at " << cur << " (eof: " << ref.eof() << ", bad: " << ref.bad() << ")");
body.write(ptr, static_cast<std::streamsize>(sizeToWrite));
if (body.fail()) {
const auto& ref = body;

std::ostream::pos_type pos{};
const bool hasPos = TryGetOutputPos(body, pos);

Aws::StringStream ss;
ss << "Failed to write " << size << " / " << sizeToWrite << " B response";
if (hasPos) {
ss << " at " << pos;
} else {
ss << " (output stream not seekable)";
}
ss << " (received so far: "
<< context->m_numBytesResponseReceived
<< " B, eof: " << ref.eof()
<< ", bad: " << ref.bad() << ")";

AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, ss.str());
return 0;
}

if ((context->m_request->IsEventStreamRequest() || context->m_request->HasEventStreamResponse() )
&& !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE))
{
response->GetResponseBody().flush();
if (response->GetResponseBody().fail()) {
const auto& ref = response->GetResponseBody();
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to flush event response (eof: "
body.flush();
if (body.fail()) {
const auto& ref = body;
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG,
"Failed to flush event response (eof: "
<< ref.eof() << ", bad: " << ref.bad() << ")");
return 0;
}

}
auto& receivedHandler = context->m_request->GetDataReceivedEventHandler();
if (receivedHandler)
Expand Down
97 changes: 96 additions & 1 deletion tests/aws-cpp-sdk-core-tests/http/HttpClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class HttpClientTest : public Aws::Testing::AwsCppSdkGTestSuite
{
};

class CURLHttpClientTest : public Aws::Testing::AwsCppSdkGTestSuite
{
};


TEST_F(HttpClientTest, TestRandomURLWithNoProxy)
{
auto httpClient = CreateHttpClient(Aws::Client::ClientConfiguration());
Expand Down Expand Up @@ -382,7 +387,97 @@ TEST_F(CURLHttpClientTest, TestHttpRequestWorksFine)
EXPECT_EQ(Aws::Http::HttpResponseCode::OK, response->GetResponseCode());
EXPECT_EQ("", response->GetClientErrorMessage());
}

#include <aws/core/utils/memory/stl/AWSVector.h>
#include <streambuf>

// A streambuf that supports writing but does NOT support seeking.
// This reproduces the behavior of many filtering / transforming streams.
class NonSeekableWriteBuf final : public std::streambuf
{
public:
explicit NonSeekableWriteBuf(Aws::Vector<char>& out) : m_out(out) {}

protected:
std::streamsize xsputn(const char* s, std::streamsize n) override
{
if (n > 0)
{
m_out.insert(m_out.end(), s, s + static_cast<size_t>(n));
}
return n;
}

int overflow(int ch) override
{
if (ch == traits_type::eof())
{
return traits_type::not_eof(ch);
}
m_out.push_back(static_cast<char>(ch));
return ch;
}

// Disallow positioning (seek/tell)
pos_type seekoff(off_type, std::ios_base::seekdir, std::ios_base::openmode) override
{
return pos_type(off_type(-1));
}

pos_type seekpos(pos_type, std::ios_base::openmode) override
{
return pos_type(off_type(-1));
}

private:
Aws::Vector<char>& m_out;
};

class NonSeekableIOStream final : public Aws::IOStream
{
public:
NonSeekableIOStream(const Aws::String& /*allocationTag*/, Aws::Vector<char>& out)
: Aws::IOStream(nullptr), m_buf(out)
{
rdbuf(&m_buf);
}

private:
NonSeekableWriteBuf m_buf;
};

// Regression test:
// Ensure CurlHttpClient can write response bodies into a non-seekable output stream.
// Older implementations that call tellp() as part of the write callback may fail here.
TEST_F(CURLHttpClientTest, TestNonSeekableResponseStreamDoesNotAbortTransfer)
{
Aws::Vector<char> captured;

auto request = CreateHttpRequest(
Aws::String("http://127.0.0.1:8778"),
HttpMethod::HTTP_GET,
Aws::Utils::Stream::DefaultResponseStreamFactoryMethod);

request->SetHeaderValue("WaitSeconds", "1");

request->SetResponseStreamFactory([&captured]() -> Aws::IOStream*
{
return Aws::New<NonSeekableIOStream>(ALLOCATION_TAG, ALLOCATION_TAG, captured);
});

Aws::Client::ClientConfiguration config;
config.requestTimeoutMs = 10000;

auto httpClient = CreateHttpClient(config);
auto response = httpClient->MakeRequest(request);

ASSERT_NE(nullptr, response);

ASSERT_FALSE(response->HasClientError()) << response->GetClientErrorMessage();
EXPECT_EQ(Aws::Http::HttpResponseCode::OK, response->GetResponseCode());

}
#endif // ENABLE_CURL_CLIENT
#endif // ENABLE_HTTP_CLIENT_TESTING
#endif // NO_HTTP_CLIENT
#endif // DISABLE_DNS_REQUIRED_TESTS
#endif // DISABLE_DNS_REQUIRED_TESTS