Internally, the Environment will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream.
翻译:
在内部,Environment将查询broker以了解流的拓扑结构,并将创建或重用毗连以发布到流的 leader 节点。
[2]剖析
在 Environment 中封装的毗连信息仅负责毗连到 broker
Producer 在构建对象时会访问 broker 拉取集群中 Leader 的毗连信息
未来实际访问的是集群中的 Leader 节点
Leader 的毗连信息格式是:节点名称:端口号
[3]配置
为了让本机的应用步调知道 Leader 节点名称对应的 IP 地址,我们需要在本地配置 hosts 文件,创建从节点名称到 IP 地址的映射关系
The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:
OffsetSpecification.first(): starting from the first available offset. If the stream has not been truncated, this means the beginning of the stream (offset 0).
OffsetSpecification.last(): starting from the end of the stream and returning the last chunk of messages immediately (if the stream is not empty).
OffsetSpecification.next(): starting from the next offset to be written. Contrary to OffsetSpecification.last(), consuming with OffsetSpecification.next() will not return anything if no-one is publishing to the stream. The broker will start sending messages to the consumer when messages are published to the stream.
OffsetSpecification.offset(offset): starting from the specified offset. 0 means consuming from the beginning of the stream (first messages). The client can also specify any number, for example the offset where it left off in a previous incarnation of the application.
OffsetSpecification.timestamp(timestamp): starting from the messages stored after the specified timestamp. Note consumers can receive messages published a bit before the specified timestamp. Application code can filter out those messages if necessary.
3、指定Offset消耗
Environment environment = Environment.builder()
.host("192.168.200.100")
.port(33333)
.username("atguigu")
.password("123456")
.build();
CountDownLatch countDownLatch = new CountDownLatch(1);