1.pom 引入依赖
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.20.2</version>
</dependency>
2.创建配置类
import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
@Configuration
public class NatsConfig {
@Value(("${nats.url}"))
private String url;
@Bean
public Connection natsConnection() throws IOException, InterruptedException {
return Nats.connect(url);
}
@Bean
public JetStream jetStream(Connection natsConnection) throws IOException {
return natsConnection.jetStream();
}
@Bean
public JetStreamManagement jetStreamManagement(Connection natsConnection) throws IOException {
return natsConnection.jetStreamManagement();
}
}
3.application 添加配置
nats:
url: nats://127.0.0.1:4222
4.生产者
@Service
@RequiredArgsConstructor
public class NatsCorpPublisher {
private final JetStream jetStream;
public void sendAsyncMessage(JSONObject jsonObject) {
try {
jetStream.publish("subject_name", jsonObject.toJSONString().getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.消费者
@Service
@RequiredArgsConstructor
public class NatsCorpSubscriber {
private final JetStream jetStream;
private final NatsStreamCheck natsStreamCheck;
@PostConstruct
public void subscribe() throws JetStreamApiException, IOException {
natsStreamCheck.createStreamInfoIfExists("subject_name");
JetStreamSubscription subscription = jetStream.subscribe("subject_name");
Thread thread = new Thread(() -> {
for (; ; ) {
try {
Message message = subscription.nextMessage(-1);
System.out.println(new String(message.getData()));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.setName(threadName);
thread.start();
log.info("线程:{}启动完毕", threadName);
}
}
@Component
@RequiredArgsConstructor
public class NatsStreamCheck {
private final JetStreamManagement streamManager;
// 检查流是否存在
public StreamInfo createStreamInfoIfExists(String streamName) throws JetStreamApiException, IOException {
try {
// 尝试获取流信息
return streamManager.getStreamInfo(streamName);
} catch (Exception e) {
// 如果流不存在或发生其他异常,返回 null
// 创建流配置
StreamConfiguration streamConfig = StreamConfiguration.builder()
.name(streamName)
.subjects(streamName)
.storageType(StorageType.File) // 使用文件存储
.build();
return streamManager.addStream(streamConfig);
}
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容