Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
cba2afe
wip
bluestreak01 Apr 18, 2026
4065597
wip
bluestreak01 Apr 18, 2026
0ccbae1
wip 2
bluestreak01 Apr 18, 2026
8c5eb08
wip 3
bluestreak01 Apr 18, 2026
782f1dd
wip 4
bluestreak01 Apr 18, 2026
f523d22
wip 5
bluestreak01 Apr 18, 2026
4136fe2
wip 6
bluestreak01 Apr 19, 2026
7450d92
wip 7
bluestreak01 Apr 19, 2026
e399c62
wip 11
bluestreak01 Apr 19, 2026
167d175
wip 12
bluestreak01 Apr 19, 2026
a06fa02
wip 13
bluestreak01 Apr 19, 2026
96a403a
wip 16
bluestreak01 Apr 19, 2026
81237f0
wip 19
bluestreak01 Apr 19, 2026
90c8ff5
wip 21
bluestreak01 Apr 19, 2026
0e7ee8a
wip 22
bluestreak01 Apr 19, 2026
c756825
Rebuild CXX libraries
Apr 20, 2026
de85c22
zstd
bluestreak01 Apr 20, 2026
c783aec
Enable ASM language for zstd huf_decompress_amd64.S
bluestreak01 Apr 20, 2026
9197da5
Include stdint.h in zstd_jni for uintptr_t
bluestreak01 Apr 20, 2026
f42254b
Rebuild CXX libraries
Apr 20, 2026
30ce27f
Harden QWP client: volatile ioThread, decoder leak fixes
bluestreak01 Apr 20, 2026
4685e0d
sql cache
bluestreak01 Apr 20, 2026
00751f1
wip 25
bluestreak01 Apr 20, 2026
707f8b2
wip 26
bluestreak01 Apr 20, 2026
f843ad8
self review
bluestreak01 Apr 21, 2026
7a00ad5
Add auth and TLS support to QwpQueryClient
bluestreak01 Apr 21, 2026
2f4393a
small fixes
bluestreak01 Apr 21, 2026
6a99163
Lazy null-index fill on the no-nulls fast path
bluestreak01 Apr 21, 2026
5727908
Rebuild CXX libraries
Apr 21, 2026
3e826cb
Merge remote-tracking branch 'origin/main' into vi_egress
bluestreak01 Apr 21, 2026
77bbeba
Drop dangling TYPE_STRING references
bluestreak01 Apr 21, 2026
198cb77
Latch terminal failures on QwpQueryClient
bluestreak01 Apr 22, 2026
c8224a7
client refactoring
bluestreak01 Apr 22, 2026
28ef787
client API refactor
bluestreak01 Apr 22, 2026
229ee79
bind variables
bluestreak01 Apr 22, 2026
359f2d7
self-review and metrics
bluestreak01 Apr 23, 2026
a81d259
read failover
bluestreak01 Apr 23, 2026
994d5fd
Merge origin/vi_egress: adopt CACHE_RESET, move SERVER_INFO to 0x18
bluestreak01 Apr 23, 2026
2c4838c
another review round
bluestreak01 Apr 23, 2026
2640278
fix tests
bluestreak01 Apr 24, 2026
2cf04ba
Merge remote-tracking branch 'origin/main' into vi_egress
bluestreak01 Apr 24, 2026
55c7005
test(ilp): close QwpQueryClient PR coverage gaps
bluestreak01 Apr 24, 2026
9fb5b51
test(ilp): close QwpQueryClient instances in fromConfig tests
bluestreak01 Apr 24, 2026
78ca057
more tests
bluestreak01 Apr 24, 2026
ee8876e
Reject 0-dim array
mtopolnik Apr 24, 2026
b665b8c
Harden QWP client decoder, binds, and close paths
bluestreak01 Apr 24, 2026
941b447
review
bluestreak01 Apr 25, 2026
ddc637b
Merge remote-tracking branch 'origin/main' into vi_egress
bluestreak01 Apr 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "core/src/main/c/share/zstd"]
path = core/src/main/c/share/zstd
url = https://github.com/facebook/zstd.git
56 changes: 56 additions & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
cmake_minimum_required(VERSION 3.5)
project(questdb)

# Required for zstd's huf_decompress_amd64.S to be assembled. Without this,
# CMake silently drops the .S file from the build and the link fails at
# _HUF_decompress4X1_usingDTable_internal_fast_asm_loop etc. (The asmlib
# subdirectory enables ASM_NASM separately for Agner Fog's .asm files.)
enable_language(ASM)

include(ExternalProject)

set(CMAKE_CXX_STANDARD 17)
Expand Down Expand Up @@ -49,6 +55,42 @@ set(
src/main/c/share/byte_sink.h
)

# libzstd is included via a git submodule at src/main/c/share/zstd (pinned to
# upstream tag v1.5.7). Covers the client side of the QWP egress compression
# feature; the server-side compressor lives in the Rust qdbr crate and isn't
# linked into this library. Only the decompress-only subset of upstream is
# compiled -- the compress/ directory is left out entirely. zstd_jni.c is our
# JNI glue and lives alongside the submodule (not inside) so upstream resets
# don't disturb it.
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/src/main/c/share/zstd/lib/zstd.h)
message(FATAL_ERROR
"libzstd submodule not initialised. Run:\n"
" git submodule update --init --recursive\n"
"from java-questdb-client/.")
endif ()
set(
ZSTD_FILES
src/main/c/share/zstd_jni.c
src/main/c/share/zstd/lib/common/debug.c
src/main/c/share/zstd/lib/common/entropy_common.c
src/main/c/share/zstd/lib/common/error_private.c
src/main/c/share/zstd/lib/common/fse_decompress.c
src/main/c/share/zstd/lib/common/pool.c
src/main/c/share/zstd/lib/common/threading.c
src/main/c/share/zstd/lib/common/xxhash.c
src/main/c/share/zstd/lib/common/zstd_common.c
src/main/c/share/zstd/lib/decompress/huf_decompress.c
src/main/c/share/zstd/lib/decompress/zstd_ddict.c
src/main/c/share/zstd/lib/decompress/zstd_decompress_block.c
src/main/c/share/zstd/lib/decompress/zstd_decompress.c
)
# x86_64-only hand-tuned Huffman decoder; C fallback kicks in when
# ZSTD_DISABLE_ASM is set.
if (ARCH_AMD64 AND NOT WIN32)
list(APPEND ZSTD_FILES src/main/c/share/zstd/lib/decompress/huf_decompress_amd64.S)
endif ()
list(APPEND SOURCE_FILES ${ZSTD_FILES})

# JNI includes
include_directories($ENV{JAVA_HOME}/include/)

Expand Down Expand Up @@ -111,6 +153,20 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${OUTPUT})

add_library(questdb SHARED ${SOURCE_FILES})

# libzstd public header is at zstd/lib/zstd.h; internal headers live under
# zstd/lib/common/. Both directories go on the include path so zstd_jni.c can
# use the short include "zstd.h" and the upstream .c files can find their own
# siblings without patching.
target_include_directories(questdb PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src/main/c/share/zstd/lib
${CMAKE_CURRENT_SOURCE_DIR}/src/main/c/share/zstd/lib/common)

# Drop the zstd-internal hand-written amd64 assembly on platforms that can't
# assemble it; libzstd falls back to a C implementation when this is set.
if (NOT ARCH_AMD64 OR WIN32)
target_compile_definitions(questdb PRIVATE ZSTD_DISABLE_ASM=1)
endif ()

set(COMMON_OPTIONS "-Wno-gnu-anonymous-struct;-Wno-nested-anon-types;-Wno-unused-parameter;-fPIC;-fno-rtti;-fno-exceptions")

set(DEBUG_OPTIONS "-Wall;-pedantic;-Wextra;-g;-O0")
Expand Down
1 change: 1 addition & 0 deletions core/src/main/c/share/zstd
Submodule zstd added at f8745d
106 changes: 106 additions & 0 deletions core/src/main/c/share/zstd_jni.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*+*****************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2026 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

/*
* JNI wrapper over the bundled libzstd (decompression only). The server ships
* compression support in the Rust qdbr crate; this file covers the client
* decompression path so RESULT_BATCH frames with FLAG_ZSTD can be decoded
* without any external native dependency.
*
* libzstd is vendored as a git submodule at share/zstd/ pinned to v1.5.7;
* this file lives alongside (not inside) the submodule so upstream resets
* don't nuke our JNI glue.
*/

#include <jni.h>
#include <stdint.h>
#include <stdlib.h>
#include "zstd.h"

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Zstd_createDCtx(
JNIEnv *env, jclass cls) {
return (jlong) (uintptr_t) ZSTD_createDCtx();
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Zstd_getFrameContentSize(
JNIEnv *env, jclass cls,
jlong src_addr, jlong src_len) {
/*
* Peeks the zstd frame header at src_addr to recover the declared
* uncompressed size. Returns:
* positive -- declared content size in bytes
* -1 -- frame valid, content size not stored (ZSTD_CONTENTSIZE_UNKNOWN)
* -2 -- invalid frame, truncated header, or size > INT64_MAX
*
* Lets the Java caller size the destination buffer in a single allocation
* instead of retrying decompress on dst-too-small. Crucially, it also lets
* a corrupt frame fail BEFORE any output buffer growth, eliminating a
* memory-amplification vector where one bad frame would have driven
* scratch growth all the way to the 64 MiB cap.
*/
if (src_len < 0 || (src_len > 0 && src_addr == 0)) {
return -2;
}
unsigned long long size = ZSTD_getFrameContentSize(
(const void *) (uintptr_t) src_addr, (size_t) src_len);
if (size == ZSTD_CONTENTSIZE_UNKNOWN) {
return -1;
}
if (size == ZSTD_CONTENTSIZE_ERROR) {
return -2;
}
if (size > (unsigned long long) INT64_MAX) {
/* Cast to jlong would wrap to negative and look like an error code;
* reject upfront so the caller doesn't double-interpret. */
return -2;
}
return (jlong) size;
}

JNIEXPORT void JNICALL Java_io_questdb_client_std_Zstd_freeDCtx(
JNIEnv *env, jclass cls, jlong ptr) {
if (ptr != 0) {
ZSTD_freeDCtx((ZSTD_DCtx *) (uintptr_t) ptr);
}
}

JNIEXPORT jlong JNICALL Java_io_questdb_client_std_Zstd_decompress(
JNIEnv *env, jclass cls,
jlong ctx,
jlong src_addr, jlong src_len,
jlong dst_addr, jlong dst_cap) {
if (ctx == 0) {
return -1;
}
ZSTD_DCtx *dctx = (ZSTD_DCtx *) (uintptr_t) ctx;
size_t n = ZSTD_decompressDCtx(
dctx,
(void *) (uintptr_t) dst_addr, (size_t) dst_cap,
(const void *) (uintptr_t) src_addr, (size_t) src_len);
if (ZSTD_isError(n)) {
unsigned code = ZSTD_getErrorCode(n);
return -(jlong) code;
}
return (jlong) n;
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,17 @@ public abstract class WebSocketClient implements QuietCloseable {
private CharSequence host;
private int port;
// QWP version negotiation
// Verbatim header value sent as X-QWP-Accept-Encoding during upgrade, e.g.
// "zstd;level=3,raw". When null, the header is omitted and the server ships
// batches uncompressed. The echoed X-QWP-Content-Encoding response header
// is intentionally not parsed: the RESULT_BATCH decoder branches on
// FLAG_ZSTD in every frame, which is the authoritative signal.
private String qwpAcceptEncoding;
private String qwpClientId;
// Client-requested per-batch row cap advertised via X-QWP-Max-Batch-Rows.
// 0 means "omit the header" (server uses its default cap). Server may clamp
// down to its own hard limit.
private int qwpMaxBatchRows;
private int qwpMaxVersion = 1;
// Opt-in for STATUS_DURABLE_ACK frames; sent as X-QWP-Request-Durable-Ack: true
private boolean qwpRequestDurableAck;
Expand Down Expand Up @@ -378,13 +388,35 @@ public void sendPing(int timeout) {
}
}

/**
* Sets the value sent as the {@code X-QWP-Accept-Encoding} upgrade header,
* e.g. {@code "zstd;level=3,raw"}. Pass {@code null} to omit the header
* entirely (server ships uncompressed batches). Must be called before
* {@link #upgrade}.
*/
public void setQwpAcceptEncoding(String acceptEncoding) {
this.qwpAcceptEncoding = acceptEncoding;
}

/**
* Sets the QWP client identifier sent in the X-QWP-Client-Id upgrade header.
*/
public void setQwpClientId(String clientId) {
this.qwpClientId = clientId;
}

/**
* Sets the client's preferred per-batch row cap, sent in the
* {@code X-QWP-Max-Batch-Rows} upgrade header. {@code 0} (the default)
* omits the header entirely and the server uses its own cap. Positive
* values ask the server to flush batches sooner (lower time-to-first-row
* for streaming consumers, at the cost of more per-batch overhead); the
* server clamps down to its own maximum.
*/
public void setQwpMaxBatchRows(int maxBatchRows) {
this.qwpMaxBatchRows = maxBatchRows;
}

/**
* Sets the maximum QWP version this client supports, sent in the X-QWP-Max-Version upgrade header.
*/
Expand Down Expand Up @@ -488,6 +520,16 @@ public void upgrade(CharSequence path, int timeout, CharSequence authorizationHe
sendBuffer.putAscii(qwpClientId);
sendBuffer.putAscii("\r\n");
}
if (qwpAcceptEncoding != null) {
sendBuffer.putAscii("X-QWP-Accept-Encoding: ");
sendBuffer.putAscii(qwpAcceptEncoding);
sendBuffer.putAscii("\r\n");
}
if (qwpMaxBatchRows > 0) {
sendBuffer.putAscii("X-QWP-Max-Batch-Rows: ");
sendBuffer.putAscii(Integer.toString(qwpMaxBatchRows));
sendBuffer.putAscii("\r\n");
}
if (qwpRequestDurableAck) {
sendBuffer.putAscii("X-QWP-Request-Durable-Ack: true\r\n");
}
Expand Down Expand Up @@ -973,7 +1015,7 @@ private void validateUpgradeResponse(int headerEnd) {
throw new HttpClientException("Invalid Sec-WebSocket-Accept header");
}

// Extract X-QWP-Version (optional defaults to 1 if absent)
// Extract X-QWP-Version (optional, defaults to 1 if absent)
serverQwpVersion = extractQwpVersion(response);
}

Expand Down
Loading
Loading