Skip to content

Commit f1a664f

Browse files
authored
KAFKA-19858 Set default min.insync.replicas=2 for __remote_log_metadata topic to prevent data loss (KIP-1235) (#20811)
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw) The __remote_log_metadata internal topic currently lacks a configurable min.insync.replicas setting, relying on the broker-level default (typically 1) when the factor is 3 as default. This creates a data loss risk in production environments, as writes may be acknowledged by only a single replica. What's more, this is inconsistent with some other cases such as __transaction_state, which explicitly sets min.insync.replicas=2 via the transaction.state.log.min.isr broker configuration. Both topics store critical metadata and should have equivalent durability guarantees. Note: ``` public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "rlmm.config.remote.log.metadata.topic.replication.factor"; public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3; ``` Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent f0488eb commit f1a664f

File tree

7 files changed

+99
-3
lines changed

7 files changed

+99
-3
lines changed

docs/getting-started/upgrade.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type: docs
3333
### Notable changes in 4.3.0
3434

3535
* Two new configs have been introduced: `group.coordinator.cached.buffer.max.bytes` and `share.coordinator.cached.buffer.max.bytes`. They allow the respective coordinators to set the maximum buffer size retained for reuse. For further details, please refer to [KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
36-
36+
* The new config have been introduced: `remote.log.metadata.topic.min.isr` with 2 as default value. You can correct the min.insync.replicas for the existed __remote_log_metadata topic via kafka-configs.sh if needed. For further details, please refer to [KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
3737

3838

3939
## Upgrading to 4.2.0

docs/operations/tiered-storage.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,13 @@ After build successfully, there should be a `kafka-storage-x.x.x-test.jar` file
8989
# Note, please make sure the brokers need to have access to this directory
9090
rsm.config.dir=/tmp/kafka-remote-storage
9191

92-
# This needs to be changed if number of brokers in the cluster is more than 1
92+
# For single broker cluster, set this to 1. Default is 3 for clusters with 3 or more brokers.
9393
rlmm.config.remote.log.metadata.topic.replication.factor=1
9494

95+
# The minimum number of replicas that must acknowledge a write to remote log metadata topic.
96+
# Default value is 2. For single broker cluster (replication factor = 1), set this to 1.
97+
rlmm.config.remote.log.metadata.topic.min.isr=1
98+
9599
# Try to speed up the log retention check interval for testing
96100
log.retention.check.interval.ms=1000
97101

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ private NewTopic newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerCon
434434
topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs()));
435435
topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
436436
topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false");
437+
topicConfigs.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Short.toString(rlmmConfig.metadataTopicMinIsr()));
437438
return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(),
438439
rlmmConfig.metadataTopicPartitionsCount(),
439440
rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
4646
public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor";
4747
public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions";
4848
public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP = "remote.log.metadata.topic.retention.ms";
49+
public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP = "remote.log.metadata.topic.min.isr";
4950
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = "remote.log.metadata.consume.wait.ms";
5051
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP = "remote.log.metadata.initialization.retry.max.timeout.ms";
5152
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP = "remote.log.metadata.initialization.retry.interval.ms";
5253

5354
public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50;
5455
public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS = -1L;
5556
public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3;
57+
public static final short DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR = 2;
5658
public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 2 * 60 * 1000L;
5759
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS = 2 * 60 * 1000L;
5860
public static final long DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS = 100L;
@@ -63,6 +65,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
6365
"Default: -1, that means unlimited. Users can configure this value based on their use cases. " +
6466
"To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " +
6567
"tiered storage in the cluster.";
68+
public static final String REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC = "The minimum number of replicas that must acknowledge a write to remote log metadata topic.";
6669
public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milliseconds to wait for the local consumer to " +
6770
"receive the published event.";
6871
public static final String REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_DOC = "The retry interval in milliseconds for " +
@@ -90,6 +93,8 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
9093
REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC)
9194
.define(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MS, LOW,
9295
REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_DOC)
96+
.define(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, SHORT, DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, atLeast(1), LOW,
97+
REMOTE_LOG_METADATA_TOPIC_MIN_ISR_DOC)
9398
.define(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, LONG, DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS, atLeast(0), LOW,
9499
REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC)
95100
.define(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP, LONG,
@@ -106,6 +111,7 @@ DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS, atLeast(0), LOW,
106111
private final long consumeWaitMs;
107112
private final long metadataTopicRetentionMs;
108113
private final short metadataTopicReplicationFactor;
114+
private final short metadataTopicMinIsr;
109115
private final long initializationRetryMaxTimeoutMs;
110116
private final long initializationRetryIntervalMs;
111117

@@ -126,6 +132,7 @@ public TopicBasedRemoteLogMetadataManagerConfig(Map<String, ?> props) {
126132
if (metadataTopicRetentionMs != -1 && metadataTopicRetentionMs <= 0) {
127133
throw new IllegalArgumentException("Invalid metadata topic retention in millis: " + metadataTopicRetentionMs);
128134
}
135+
metadataTopicMinIsr = (short) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP);
129136
consumeWaitMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP);
130137
initializationRetryIntervalMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP);
131138
initializationRetryMaxTimeoutMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_PROP);
@@ -184,6 +191,10 @@ public long initializationRetryIntervalMs() {
184191
return initializationRetryIntervalMs;
185192
}
186193

194+
public short metadataTopicMinIsr() {
195+
return metadataTopicMinIsr;
196+
}
197+
187198
public String logDir() {
188199
return logDir;
189200
}
@@ -229,6 +240,7 @@ public String toString() {
229240
", consumeWaitMs=" + consumeWaitMs +
230241
", metadataTopicRetentionMs=" + metadataTopicRetentionMs +
231242
", metadataTopicReplicationFactor=" + metadataTopicReplicationFactor +
243+
", metadataTopicMinIsr=" + metadataTopicMinIsr +
232244
", initializationRetryMaxTimeoutMs=" + initializationRetryMaxTimeoutMs +
233245
", initializationRetryIntervalMs=" + initializationRetryIntervalMs +
234246
", commonProps=" + configMapToRedactedString(commonProps, AdminClientConfig.configDef()) +

storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3535

3636
public class RemoteLogMetadataManagerTestUtils {
37-
private static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
37+
static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
3838
private static final short METADATA_TOPIC_REPLICATION_FACTOR = 2;
3939
private static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L;
4040

storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@
2727

2828
import java.util.AbstractMap;
2929
import java.util.Arrays;
30+
import java.util.Collections;
3031
import java.util.HashMap;
3132
import java.util.Map;
3233

3334
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
35+
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR;
3436
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
3537
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
3638
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
3739
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
40+
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP;
3841
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
3942
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
4043
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
@@ -140,6 +143,10 @@ void verifyToStringRedactsSensitiveConfigurations() {
140143
assertTrue(configString.contains("enable.auto.commit=false"));
141144
}
142145

146+
private Map<String, Object> createValidConfigProps() {
147+
return this.createValidConfigProps(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
148+
}
149+
143150
private Map<String, Object> createValidConfigProps(Map<String, Object> commonClientConfig,
144151
Map<String, Object> producerConfig,
145152
Map<String, Object> consumerConfig) {
@@ -192,4 +199,20 @@ private void assertMaskedSensitiveConfigurations(String configString) {
192199
Arrays.stream(sensitiveConfigKeys)
193200
.forEach(config -> assertTrue(configString.contains(config + "=(redacted)")));
194201
}
202+
203+
@Test
204+
public void testDefaultMinIsr() {
205+
Map<String, Object> props = createValidConfigProps();
206+
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props);
207+
assertEquals(DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR, rlmmConfig.metadataTopicMinIsr());
208+
}
209+
210+
@Test
211+
public void testCustomMinIsr() {
212+
Map<String, Object> props = createValidConfigProps();
213+
short customMinIsr = 3;
214+
props.put(REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, customMinIsr);
215+
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(props);
216+
assertEquals(customMinIsr, rlmmConfig.metadataTopicMinIsr());
217+
}
195218
}

storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
package org.apache.kafka.server.log.remote.metadata.storage;
1818

1919
import org.apache.kafka.clients.admin.Admin;
20+
import org.apache.kafka.clients.admin.Config;
21+
import org.apache.kafka.clients.admin.ConfigEntry;
22+
import org.apache.kafka.clients.admin.DescribeConfigsResult;
2023
import org.apache.kafka.clients.admin.DescribeTopicsResult;
2124
import org.apache.kafka.clients.admin.NewTopic;
2225
import org.apache.kafka.clients.admin.TopicDescription;
2326
import org.apache.kafka.common.KafkaFuture;
2427
import org.apache.kafka.common.TopicIdPartition;
2528
import org.apache.kafka.common.TopicPartition;
2629
import org.apache.kafka.common.Uuid;
30+
import org.apache.kafka.common.config.ConfigResource;
31+
import org.apache.kafka.common.config.TopicConfig;
2732
import org.apache.kafka.common.test.ClusterInstance;
2833
import org.apache.kafka.common.test.api.ClusterTest;
2934
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -49,6 +54,7 @@
4954

5055
import static org.junit.jupiter.api.Assertions.assertEquals;
5156
import static org.junit.jupiter.api.Assertions.assertFalse;
57+
import static org.junit.jupiter.api.Assertions.assertNotNull;
5258
import static org.junit.jupiter.api.Assertions.assertThrows;
5359
import static org.junit.jupiter.api.Assertions.assertTrue;
5460
import static org.mockito.ArgumentMatchers.any;
@@ -370,4 +376,54 @@ public void testInitializationFailure() throws IOException, InterruptedException
370376
Exit.resetExitProcedure();
371377
}
372378
}
379+
380+
@ClusterTest
381+
public void testRemoteLogMetadataTopicWithDefaultMinIsr() throws ExecutionException, InterruptedException {
382+
// Initialize the manager which will create the __remote_log_metadata topic
383+
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
384+
verifyRemoteLogMetadataTopicWithMinIsr(topicBasedRemoteLogMetadataManager,
385+
TopicBasedRemoteLogMetadataManagerConfig.DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR,
386+
"default value");
387+
}
388+
389+
@ClusterTest
390+
public void testRemoteLogMetadataTopicWithCustomMinIsr() throws ExecutionException, InterruptedException, IOException {
391+
// Create a manager with custom min.isr value
392+
short customMinIsr = 3;
393+
Map<String, Object> overrideProps = Map.of(
394+
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP, customMinIsr
395+
);
396+
try (TopicBasedRemoteLogMetadataManager customRlmm = RemoteLogMetadataManagerTestUtils.builder()
397+
.bootstrapServers(clusterInstance.bootstrapServers())
398+
.overrideRemoteLogMetadataManagerProps(overrideProps)
399+
.build()) {
400+
verifyRemoteLogMetadataTopicWithMinIsr(customRlmm, customMinIsr, "custom value");
401+
}
402+
}
403+
404+
private void verifyRemoteLogMetadataTopicWithMinIsr(TopicBasedRemoteLogMetadataManager rlmm,
405+
short expectedMinIsr,
406+
String valueDescription)
407+
throws ExecutionException, InterruptedException {
408+
try (Admin admin = clusterInstance.admin()) {
409+
String metadataTopic = TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
410+
411+
// Wait for the topic to be created
412+
clusterInstance.waitTopicCreation(metadataTopic, RemoteLogMetadataManagerTestUtils.METADATA_TOPIC_PARTITIONS_COUNT);
413+
414+
// Verify the topic exists
415+
assertTrue(rlmm.doesTopicExist(admin, metadataTopic));
416+
417+
// Describe the topic configs to verify min.insync.replicas
418+
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, metadataTopic);
419+
DescribeConfigsResult describeResult = admin.describeConfigs(List.of(topicResource));
420+
Config config = describeResult.all().get().get(topicResource);
421+
422+
assertNotNull(config, "Topic config should not be null");
423+
ConfigEntry minIsrEntry = config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
424+
assertNotNull(minIsrEntry, "min.insync.replicas config should exist");
425+
assertEquals(String.valueOf(expectedMinIsr), minIsrEntry.value(),
426+
"min.insync.replicas should be " + expectedMinIsr + " (" + valueDescription + ")");
427+
}
428+
}
373429
}

0 commit comments

Comments
 (0)