From 9e56a07bfda57126c02219c88fdafc64da5aceb2 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Tue, 20 Aug 2024 10:04:35 +0000 Subject: [PATCH] Add Valkey8 support Signed-off-by: Shoham Elias --- .github/json_matrices/engine-matrix.json | 4 + .github/workflows/install-valkey/action.yml | 1 + .github/workflows/python.yml | 2 + java/integTest/build.gradle | 51 +++++++++-- .../test/java/glide/SharedCommandTests.java | 66 +++++++++++---- .../test/java/glide/cluster/CommandTests.java | 24 ++++-- .../java/glide/standalone/CommandTests.java | 48 ++++++++--- node/tests/SharedTests.ts | 42 +++++++--- python/python/tests/test_async_client.py | 35 +++++--- python/python/tests/test_transaction.py | 12 ++- python/python/tests/utils/utils.py | 3 + utils/TestUtils.ts | 22 +++-- utils/cluster_manager.py | 84 ++++++++++++------- 13 files changed, 288 insertions(+), 106 deletions(-) diff --git a/.github/json_matrices/engine-matrix.json b/.github/json_matrices/engine-matrix.json index f20f0c955e..bf755b782e 100644 --- a/.github/json_matrices/engine-matrix.json +++ b/.github/json_matrices/engine-matrix.json @@ -2,5 +2,9 @@ { "type": "valkey", "version": "7.2.5" + }, + { + "type": "valkey", + "version": "8.0.0-rc1" } ] diff --git a/.github/workflows/install-valkey/action.yml b/.github/workflows/install-valkey/action.yml index f15a875c03..b7ae1cb5b2 100644 --- a/.github/workflows/install-valkey/action.yml +++ b/.github/workflows/install-valkey/action.yml @@ -62,6 +62,7 @@ runs: echo 'export PATH=/usr/local/bin:$PATH' >>~/.bash_profile - name: Verify Valkey installation and symlinks + if: ${{ !contains(inputs.engine-version, '-rc1') }} shell: bash run: | # In Valkey releases, the engine is built with symlinks from valkey-server and valkey-cli diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 7e453178a6..6bd1e23d1d 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -15,6 +15,7 @@ on: - .github/workflows/lint-rust/action.yml - .github/workflows/install-valkey/action.yml - .github/json_matrices/build-matrix.json + - .github/json_matrices/engine-matrix.json pull_request: paths: @@ -29,6 +30,7 @@ on: - .github/workflows/lint-rust/action.yml - .github/workflows/install-valkey/action.yml - .github/json_matrices/build-matrix.json + - .github/json_matrices/engine-matrix.json workflow_dispatch: concurrency: diff --git a/java/integTest/build.gradle b/java/integTest/build.gradle index d6c7593820..ec7c4a2805 100644 --- a/java/integTest/build.gradle +++ b/java/integTest/build.gradle @@ -103,12 +103,53 @@ tasks.register('startStandalone') { tasks.register('getServerVersion') { doLast { - new ByteArrayOutputStream().withStream { os -> - exec { - commandLine 'redis-server', '-v' - standardOutput = os + def detectedVersion + def output = new ByteArrayOutputStream() + + // Helper method to find the full path of a command + def findFullPath = { command -> + def pathOutput = new ByteArrayOutputStream() + try { + exec { + commandLine 'which', command // Use 'where' for Windows + standardOutput = pathOutput + } + return pathOutput.toString().trim() + } catch (Exception e) { + println "Failed to find path for ${command}: ${e.message}" + return "" + } + } + + // Get full paths + def valkeyPath = findFullPath('valkey-server') + def redisPath = findFullPath('redis-server') + + def tryGetVersion = { serverPath -> + try { + exec { + commandLine serverPath, '-v' + standardOutput = output + } + return output.toString() + } catch (Exception e) { + println "Failed to execute ${serverPath}: ${e.message}" + return "" } - serverVersion = extractServerVersion(os.toString()) + } + + // Try valkey-server first, then redis-server if it fails + def versionOutput = tryGetVersion(valkeyPath) + if (versionOutput.isEmpty() && !redisPath.isEmpty()) { + versionOutput = tryGetVersion(redisPath) + } + + if (!versionOutput.isEmpty()) { + detectedVersion = extractServerVersion(versionOutput) + println "Detected server version: ${detectedVersion}" + serverVersion = detectedVersion + } else { + throw new GradleException("Failed to retrieve the server version.") } } } diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 76b367a773..aab628e7de 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -13774,9 +13774,14 @@ public void sscan(BaseClient client) { assertDeepEquals(new String[] {}, result[resultCollectionIndex]); // Negative cursor - result = client.sscan(key1, "-1").get(); - assertEquals(initialCursor, result[resultCursorIndex]); - assertDeepEquals(new String[] {}, result[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.sscan(key1, "-1").get()); + } else { + result = client.sscan(key1, "-1").get(); + assertEquals(initialCursor, result[resultCursorIndex]); + assertDeepEquals(new String[] {}, result[resultCollectionIndex]); + } // Result contains the whole set assertEquals(charMembers.length, client.sadd(key1, charMembers).get()); @@ -13910,9 +13915,14 @@ public void sscan_binary(BaseClient client) { assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); // Negative cursor - result = client.sscan(key1, gs("-1")).get(); - assertEquals(initialCursor, gs(result[resultCursorIndex].toString())); - assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.sscan(key1, gs("-1")).get()); + } else { + result = client.sscan(key1, gs("-1")).get(); + assertEquals(initialCursor, gs(result[resultCursorIndex].toString())); + assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); + } // Result contains the whole set assertEquals(charMembers.length, client.sadd(key1, charMembers).get()); @@ -14059,9 +14069,14 @@ public void zscan(BaseClient client) { assertDeepEquals(new String[] {}, result[resultCollectionIndex]); // Negative cursor - result = client.zscan(key1, "-1").get(); - assertEquals(initialCursor, result[resultCursorIndex]); - assertDeepEquals(new String[] {}, result[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.zscan(key1, "-1").get()); + } else { + result = client.zscan(key1, "-1").get(); + assertEquals(initialCursor, result[resultCursorIndex]); + assertDeepEquals(new String[] {}, result[resultCollectionIndex]); + } // Result contains the whole set assertEquals(charMembers.length, client.zadd(key1, charMap).get()); @@ -14240,9 +14255,14 @@ public void zscan_binary(BaseClient client) { assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); // Negative cursor - result = client.zscan(key1, gs("-1")).get(); - assertEquals(initialCursor, gs(result[resultCursorIndex].toString())); - assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.zscan(key1, gs("-1")).get()); + } else { + result = client.zscan(key1, gs("-1")).get(); + assertEquals(initialCursor, gs(result[resultCursorIndex].toString())); + assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); + } // Result contains the whole set assertEquals(charMembers.length, client.zadd(key1.toString(), charMap_strings).get()); @@ -14425,9 +14445,14 @@ public void hscan(BaseClient client) { assertDeepEquals(new String[] {}, result[resultCollectionIndex]); // Negative cursor - result = client.hscan(key1, "-1").get(); - assertEquals(initialCursor, result[resultCursorIndex]); - assertDeepEquals(new String[] {}, result[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.hscan(key1, "-1").get()); + } else { + result = client.hscan(key1, "-1").get(); + assertEquals(initialCursor, result[resultCursorIndex]); + assertDeepEquals(new String[] {}, result[resultCollectionIndex]); + } // Result contains the whole set assertEquals(charMembers.length, client.hset(key1, charMap).get()); @@ -14589,9 +14614,14 @@ public void hscan_binary(BaseClient client) { assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); // Negative cursor - result = client.hscan(key1, gs("-1")).get(); - assertEquals(initialCursor, gs(result[resultCursorIndex].toString())); - assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.hscan(key1, gs("-1")).get()); + } else { + result = client.hscan(key1, gs("-1")).get(); + assertEquals(initialCursor, gs(result[resultCursorIndex].toString())); + assertDeepEquals(new GlideString[] {}, result[resultCollectionIndex]); + } // Result contains the whole set assertEquals(charMembers.length, client.hset(key1, charMap).get()); diff --git a/java/integTest/src/test/java/glide/cluster/CommandTests.java b/java/integTest/src/test/java/glide/cluster/CommandTests.java index 83c6058d26..492967980a 100644 --- a/java/integTest/src/test/java/glide/cluster/CommandTests.java +++ b/java/integTest/src/test/java/glide/cluster/CommandTests.java @@ -1051,15 +1051,19 @@ public void flushall() { assertEquals(OK, clusterClient.flushall(ASYNC, route).get()); var replicaRoute = new SlotKeyRoute("key", REPLICA); - // command should fail on a replica, because it is read-only - ExecutionException executionException = - assertThrows(ExecutionException.class, () -> clusterClient.flushall(replicaRoute).get()); - assertInstanceOf(RequestException.class, executionException.getCause()); - assertTrue( - executionException - .getMessage() - .toLowerCase() - .contains("can't write against a read only replica")); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + assertEquals(OK, clusterClient.flushall(route).get()); + } else { + // command should fail on a replica, because it is read-only + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> clusterClient.flushall(replicaRoute).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + assertTrue( + executionException + .getMessage() + .toLowerCase() + .contains("can't write against a read only replica")); + } } // TODO: add a binary version of this test @@ -1640,6 +1644,7 @@ public void fcall_binary_with_keys(String prefix) { @Test public void fcall_readonly_function() { assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + assumeTrue(!SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0"), "Temporary disabeling this test on valkey 8"); String libName = "fcall_readonly_function"; // intentionally using a REPLICA route @@ -1695,6 +1700,7 @@ public void fcall_readonly_function() { @Test public void fcall_readonly_binary_function() { assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + assumeTrue(!SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0"), "Temporary disabeling this test on valkey 8"); String libName = "fcall_readonly_function"; // intentionally using a REPLICA route diff --git a/java/integTest/src/test/java/glide/standalone/CommandTests.java b/java/integTest/src/test/java/glide/standalone/CommandTests.java index 5cdac40b9c..2dd317b2cd 100644 --- a/java/integTest/src/test/java/glide/standalone/CommandTests.java +++ b/java/integTest/src/test/java/glide/standalone/CommandTests.java @@ -1535,9 +1535,14 @@ public void scan() { assertDeepEquals(new String[] {}, emptyResult[resultCollectionIndex]); // Negative cursor - Object[] negativeResult = regularClient.scan("-1").get(); - assertEquals(initialCursor, negativeResult[resultCursorIndex]); - assertDeepEquals(new String[] {}, negativeResult[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> regularClient.scan("-1").get()); + } else { + Object[] negativeResult = regularClient.scan("-1").get(); + assertEquals(initialCursor, negativeResult[resultCursorIndex]); + assertDeepEquals(new String[] {}, negativeResult[resultCollectionIndex]); + } // Add keys to the database using mset regularClient.mset(keys).get(); @@ -1589,9 +1594,14 @@ public void scan_binary() { assertDeepEquals(new String[] {}, emptyResult[resultCollectionIndex]); // Negative cursor - Object[] negativeResult = regularClient.scan(gs("-1")).get(); - assertEquals(initialCursor, negativeResult[resultCursorIndex]); - assertDeepEquals(new String[] {}, negativeResult[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> regularClient.scan(gs("-1")).get()); + } else { + Object[] negativeResult = regularClient.scan(gs("-1")).get(); + assertEquals(initialCursor, negativeResult[resultCursorIndex]); + assertDeepEquals(new String[] {}, negativeResult[resultCollectionIndex]); + } // Add keys to the database using mset regularClient.msetBinary(keys).get(); @@ -1652,9 +1662,16 @@ public void scan_with_options() { assertDeepEquals(new String[] {}, emptyResult[resultCollectionIndex]); // Negative cursor - Object[] negativeResult = regularClient.scan("-1", options).get(); - assertEquals(initialCursor, negativeResult[resultCursorIndex]); - assertDeepEquals(new String[] {}, negativeResult[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + final ScanOptions finalOptions = options; + ExecutionException executionException = + assertThrows( + ExecutionException.class, () -> regularClient.scan("-1", finalOptions).get()); + } else { + Object[] negativeResult = regularClient.scan("-1", options).get(); + assertEquals(initialCursor, negativeResult[resultCursorIndex]); + assertDeepEquals(new String[] {}, negativeResult[resultCollectionIndex]); + } // scan for strings by match pattern: options = @@ -1734,9 +1751,16 @@ public void scan_binary_with_options() { assertDeepEquals(new String[] {}, emptyResult[resultCollectionIndex]); // Negative cursor - Object[] negativeResult = regularClient.scan(gs("-1"), options).get(); - assertEquals(initialCursor, negativeResult[resultCursorIndex]); - assertDeepEquals(new String[] {}, negativeResult[resultCollectionIndex]); + if (SERVER_VERSION.isGreaterThanOrEqualTo("7.9.0")) { + final ScanOptions finalOptions = options; + ExecutionException executionException = + assertThrows( + ExecutionException.class, () -> regularClient.scan(gs("-1"), finalOptions).get()); + } else { + Object[] negativeResult = regularClient.scan(gs("-1"), options).get(); + assertEquals(initialCursor, negativeResult[resultCursorIndex]); + assertDeepEquals(new String[] {}, negativeResult[resultCollectionIndex]); + } // scan for strings by match pattern: options = diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 7eaaa3cd9c..5166203f8f 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -1464,7 +1464,7 @@ export function runBaseTests(config: { it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `hscan and sscan empty set, negative cursor, negative count, and non-hash key exception tests`, async (protocol) => { - await runTest(async (client: BaseClient) => { + await runTest(async (client: BaseClient, cluster: RedisCluster) => { const key1 = "{key}-1" + uuidv4(); const key2 = "{key}-2" + uuidv4(); const initialCursor = "0"; @@ -1481,13 +1481,23 @@ export function runBaseTests(config: { expect(result[resultCollectionIndex]).toEqual([]); // Negative cursor - result = await client.hscan(key1, "-1"); - expect(result[resultCursorIndex]).toEqual(initialCursor); - expect(result[resultCollectionIndex]).toEqual([]); + if (cluster.checkIfServerVersionLessThan("7.9.0")) { + result = await client.hscan(key1, "-1"); + expect(result[resultCursorIndex]).toEqual(initialCursor); + expect(result[resultCollectionIndex]).toEqual([]); + + result = await client.sscan(key1, "-1"); + expect(result[resultCursorIndex]).toEqual(initialCursor); + expect(result[resultCollectionIndex]).toEqual([]); + } else { + await expect(client.hscan(key1, "-1")).rejects.toThrow( + RequestError, + ); - result = await client.sscan(key1, "-1"); - expect(result[resultCursorIndex]).toEqual(initialCursor); - expect(result[resultCollectionIndex]).toEqual([]); + await expect(client.sscan(key1, "-1")).rejects.toThrow( + RequestError, + ); + } // Exceptions // Non-hash key @@ -7525,7 +7535,7 @@ export function runBaseTests(config: { it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `zscan test_%p`, async (protocol) => { - await runTest(async (client: BaseClient) => { + await runTest(async (client: BaseClient, cluster: RedisCluster) => { const key1 = "{key}-1" + uuidv4(); const key2 = "{key}-2" + uuidv4(); const initialCursor = "0"; @@ -7556,9 +7566,19 @@ export function runBaseTests(config: { expect(result[resultCollectionIndex]).toEqual([]); // Negative cursor - result = await client.zscan(key1, "-1"); - expect(result[resultCursorIndex]).toEqual(initialCursor); - expect(result[resultCollectionIndex]).toEqual([]); + if (cluster.checkIfServerVersionLessThan("7.9.0")) { + result = await client.zscan(key1, "-1"); + expect(result[resultCursorIndex]).toEqual(initialCursor); + expect(result[resultCollectionIndex]).toEqual([]); + } else { + try { + expect(await client.zscan(key1, "-1")).toThrow(); + } catch (e) { + expect((e as Error).message).toMatch( + "ResponseError: invalid cursor", + ); + } + } // Result contains the whole set expect(await client.zadd(key1, charMap)).toEqual( diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index d46e490b70..4a0853a0b3 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -412,7 +412,10 @@ async def test_info_server_replication(self, glide_client: TGlideClient): info_res = get_first_result(await glide_client.info([InfoSection.SERVER])) info = info_res.decode() assert "# Server" in info - cluster_mode = parse_info_response(info_res)["redis_mode"] + if not await check_if_server_version_lt(glide_client, "7.9.0"): + cluster_mode = parse_info_response(info_res)["server_mode"] + else: + cluster_mode = parse_info_response(info_res)["redis_mode"] expected_cluster_mode = isinstance(glide_client, GlideClusterClient) assert cluster_mode == "cluster" if expected_cluster_mode else "standalone" info = get_first_result( @@ -9823,9 +9826,13 @@ async def test_sscan(self, glide_client: GlideClusterClient): assert result[result_collection_index] == [] # Negative cursor - result = await glide_client.sscan(key1, "-1") - assert result[result_cursor_index] == initial_cursor.encode() - assert result[result_collection_index] == [] + if await check_if_server_version_lt(glide_client, "7.9.0"): + result = await glide_client.sscan(key1, "-1") + assert result[result_cursor_index] == initial_cursor.encode() + assert result[result_collection_index] == [] + else: + with pytest.raises(RequestError): + await glide_client.sscan(key2, "-1") # Result contains the whole set assert await glide_client.sadd(key1, char_members) == len(char_members) @@ -9933,9 +9940,13 @@ async def test_zscan(self, glide_client: GlideClusterClient): assert result[result_collection_index] == [] # Negative cursor - result = await glide_client.zscan(key1, "-1") - assert result[result_cursor_index] == initial_cursor.encode() - assert result[result_collection_index] == [] + if await check_if_server_version_lt(glide_client, "7.9.0"): + result = await glide_client.zscan(key1, "-1") + assert result[result_cursor_index] == initial_cursor.encode() + assert result[result_collection_index] == [] + else: + with pytest.raises(RequestError): + await glide_client.zscan(key2, "-1") # Result contains the whole set assert await glide_client.zadd(key1, char_map) == len(char_map) @@ -10046,9 +10057,13 @@ async def test_hscan(self, glide_client: GlideClusterClient): assert result[result_collection_index] == [] # Negative cursor - result = await glide_client.hscan(key1, "-1") - assert result[result_cursor_index] == initial_cursor.encode() - assert result[result_collection_index] == [] + if await check_if_server_version_lt(glide_client, "7.9.0"): + result = await glide_client.hscan(key1, "-1") + assert result[result_cursor_index] == initial_cursor.encode() + assert result[result_collection_index] == [] + else: + with pytest.raises(RequestError): + await glide_client.hscan(key2, "-1") # Result contains the whole set assert await glide_client.hset(key1, char_map) == len(char_map) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 9d8f09f865..11a5182f0c 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -842,9 +842,15 @@ async def test_transaction_custom_unsupported_command( transaction.custom_command(["WATCH", key]) with pytest.raises(RequestError) as e: await self.exec_transaction(glide_client, transaction) - assert "WATCH inside MULTI is not allowed" in str( - e - ) # TODO : add an assert on EXEC ABORT + if await check_if_server_version_lt(glide_client, "7.9.0"): + assert "WATCH inside MULTI is not allowed" in str( + e + ) # TODO : add an assert on EXEC ABORT + + else: + assert "Command not allowed inside a transaction" in str( + e + ) # TODO : add an assert on EXEC ABORT @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) diff --git a/python/python/tests/utils/utils.py b/python/python/tests/utils/utils.py index 96f08a7b5a..28b7b462bc 100644 --- a/python/python/tests/utils/utils.py +++ b/python/python/tests/utils/utils.py @@ -79,6 +79,9 @@ def get_random_string(length): async def check_if_server_version_lt(client: TGlideClient, min_version: str) -> bool: # TODO: change it to pytest fixture after we'll implement a sync client info_str = await client.info([InfoSection.SERVER]) + valkey_version = parse_info_response(info_str).get("valkey_version") + if valkey_version: + return version.parse(valkey_version) < version.parse(min_version) server_version = parse_info_response(info_str).get("redis_version") assert server_version is not None return version.parse(server_version) < version.parse(min_version) diff --git a/utils/TestUtils.ts b/utils/TestUtils.ts index 84f9acd146..52546dcf19 100644 --- a/utils/TestUtils.ts +++ b/utils/TestUtils.ts @@ -51,15 +51,23 @@ export class RedisCluster { } private static async detectVersion(): Promise { - return new Promise((resolve, reject) => - exec(`redis-server -v`, (error, stdout) => { + return new Promise((resolve, reject) => { + // First, try with `valkey-server -v` + exec("valkey-server -v", (error, stdout) => { if (error) { - reject(error); - } else { - resolve(stdout.split("v=")[1].split(" ")[0]); + // If `valkey-server` fails, try `redis-server -v` + exec("redis-server -v", (error, stdout) => { + if (error) { + reject(error); + } else { + resolve(stdout.split("v=")[1].split(" ")[0]); + } + }); } - }) - ); + + resolve(stdout.split("v=")[1].split(" ")[0]); + }); + }); } public static createCluster( diff --git a/utils/cluster_manager.py b/utils/cluster_manager.py index 8eedcb0e4d..0ddbdba45c 100644 --- a/utils/cluster_manager.py +++ b/utils/cluster_manager.py @@ -283,38 +283,60 @@ def start_redis_server( # Create sub-folder for each node node_folder = f"{cluster_folder}/{port}" Path(node_folder).mkdir(exist_ok=True) - cmd_args = [ - "redis-server", - f"{'--tls-port' if tls else '--port'}", - str(port), - "--cluster-enabled", - f"{'yes' if cluster_mode else 'no'}", - "--dir", - node_folder, - "--daemonize", - "yes", - "--logfile", - f"{node_folder}/redis.log", - ] - if load_module: - if len(load_module) == 0: - raise ValueError( - "Please provide the path(s) to the module(s) you want to load." - ) - for module_path in load_module: - cmd_args.extend(["--loadmodule", module_path]) - cmd_args += tls_args - p = subprocess.Popen( - cmd_args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - output, err = p.communicate(timeout=2) - if p.returncode != 0: - raise Exception( - f"Failed to execute command: {str(p.args)}\n Return code: {p.returncode}\n Error: {err}" + + # Define commands + def create_cmd_args(server_name: str) -> List[str]: + cmd_args = [ + server_name, + f"{'--tls-port' if tls else '--port'}", + str(port), + "--cluster-enabled", + f"{'yes' if cluster_mode else 'no'}", + "--dir", + node_folder, + "--daemonize", + "yes", + "--logfile", + f"{node_folder}/redis.log", + ] + if load_module: + if len(load_module) == 0: + raise ValueError( + "Please provide the path(s) to the module(s) you want to load." + ) + for module_path in load_module: + cmd_args.extend(["--loadmodule", module_path]) + cmd_args += tls_args + return cmd_args + + # Try starting valkey-server first + server_name = "valkey-server" + cmd_args = create_cmd_args(server_name) + try: + p = subprocess.Popen( + cmd_args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, ) + output, err = p.communicate(timeout=2) + if p.returncode != 0: + raise Exception(f"Failed to execute command with valkey-server: {err}") + except Exception as e: + logging.error(f"{e}. Trying with redis-server.") + # Fallback to redis-server + server_name = "redis-server" + cmd_args = create_cmd_args(server_name) + p = subprocess.Popen( + cmd_args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + output, err = p.communicate(timeout=2) + if p.returncode != 0: + raise Exception(f"Failed to execute command with redis-server: {err}") + server = RedisServer(host, port) return server, node_folder