blob: 1297aee6446c6392e40c8893731983101cd1aeb3 [file] [log] [blame]
/* Test deflate() on concurrently modified next_in.
*
* Plain zlib does not document that this is supported, but in practice it tolerates this, and QEMU live migration is
* known to rely on this. Make sure zlib-ng tolerates this as well.
*/
#include "zbuild.h"
#ifdef ZLIB_COMPAT
#include "zlib.h"
#else
#include "zlib-ng.h"
#endif
#include <gtest/gtest.h>
#include <algorithm>
#include <atomic>
#include <cstring>
#include <thread>
static uint8_t buf[8 * 1024];
static uint8_t zbuf[4 * 1024];
static uint8_t tmp[8 * 1024];
/* Thread that increments all bytes in buf by 1. */
class Mutator {
enum class State {
PAUSED,
RUNNING,
STOPPED,
};
public:
Mutator()
: m_state(State::PAUSED), m_target_state(State::PAUSED),
m_thread(&Mutator::run, this) {}
~Mutator() {
transition(State::STOPPED);
m_thread.join();
}
void pause() {
transition(State::PAUSED);
}
void resume() {
transition(State::RUNNING);
}
private:
void run() {
while (true) {
m_state.store(m_target_state);
if (m_state == State::PAUSED)
continue;
if (m_state == State::STOPPED)
break;
for (uint8_t & i: buf)
i++;
}
}
void transition(State target_state) {
m_target_state = target_state;
while (m_state != target_state) {
}
}
std::atomic<State> m_state, m_target_state;
std::thread m_thread;
};
TEST(deflate, concurrency) {
#ifdef S390_DFLTCC_DEFLATE
GTEST_SKIP() << "Known to be broken with S390_DFLTCC_DEFLATE";
#endif
/* Create reusable mutator and streams. */
Mutator mutator;
PREFIX3(stream) dstrm;
memset(&dstrm, 0, sizeof(dstrm));
int err = PREFIX(deflateInit2)(&dstrm, Z_BEST_SPEED, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
ASSERT_EQ(Z_OK, err) << dstrm.msg;
PREFIX3(stream) istrm;
memset(&istrm, 0, sizeof(istrm));
err = PREFIX(inflateInit2)(&istrm, -15);
ASSERT_EQ(Z_OK, err) << istrm.msg;
/* Iterate for a certain amount of time. */
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(1);
while (std::chrono::steady_clock::now() < deadline) {
/* Start each iteration with a fresh stream state. */
err = PREFIX(deflateReset)(&dstrm);
ASSERT_EQ(Z_OK, err) << dstrm.msg;
err = PREFIX(inflateReset)(&istrm);
ASSERT_EQ(Z_OK, err) << istrm.msg;
/* Mutate and compress the first half of buf concurrently.
* Decompress and throw away the results, which are unpredictable.
*/
mutator.resume();
dstrm.next_in = buf;
dstrm.avail_in = sizeof(buf) / 2;
while (dstrm.avail_in > 0) {
dstrm.next_out = zbuf;
dstrm.avail_out = sizeof(zbuf);
err = PREFIX(deflate)(&dstrm, Z_NO_FLUSH);
ASSERT_EQ(Z_OK, err) << dstrm.msg;
istrm.next_in = zbuf;
istrm.avail_in = sizeof(zbuf) - dstrm.avail_out;
while (istrm.avail_in > 0) {
istrm.next_out = tmp;
istrm.avail_out = sizeof(tmp);
err = PREFIX(inflate)(&istrm, Z_NO_FLUSH);
ASSERT_EQ(Z_OK, err) << istrm.msg;
}
}
/* Stop mutation and compress the second half of buf.
* Decompress and check that the result matches.
*/
mutator.pause();
dstrm.next_in = buf + sizeof(buf) / 2;
dstrm.avail_in = sizeof(buf) - sizeof(buf) / 2;
while (dstrm.avail_in > 0) {
dstrm.next_out = zbuf;
dstrm.avail_out = sizeof(zbuf);
err = PREFIX(deflate)(&dstrm, Z_FINISH);
if (err == Z_STREAM_END)
ASSERT_EQ(0u, dstrm.avail_in);
else
ASSERT_EQ(Z_OK, err) << dstrm.msg;
istrm.next_in = zbuf;
istrm.avail_in = sizeof(zbuf) - dstrm.avail_out;
while (istrm.avail_in > 0) {
size_t orig_total_out = istrm.total_out;
istrm.next_out = tmp;
istrm.avail_out = sizeof(tmp);
err = PREFIX(inflate)(&istrm, Z_NO_FLUSH);
if (err == Z_STREAM_END)
ASSERT_EQ(0u, istrm.avail_in);
else
ASSERT_EQ(Z_OK, err) << istrm.msg;
size_t concurrent_size = sizeof(buf) - sizeof(buf) / 2;
if (istrm.total_out > concurrent_size) {
size_t tmp_offset, buf_offset, size;
if (orig_total_out >= concurrent_size) {
tmp_offset = 0;
buf_offset = orig_total_out - concurrent_size;
size = istrm.total_out - orig_total_out;
} else {
tmp_offset = concurrent_size - orig_total_out;
buf_offset = 0;
size = istrm.total_out - concurrent_size;
}
ASSERT_EQ(0, memcmp(tmp + tmp_offset, buf + sizeof(buf) / 2 + buf_offset, size));
}
}
}
}
err = PREFIX(inflateEnd)(&istrm);
ASSERT_EQ(Z_OK, err) << istrm.msg;
err = PREFIX(deflateEnd)(&dstrm);
ASSERT_EQ(Z_OK, err) << istrm.msg;
}