webClient를 이용한 외부 stream API통신
WebClient
webClient는 Spring WebFlux에서 제공하는 비동기, 논블록킹 방식의 클라이언트이다. 해당 클라이언트를 통해 외부 api를 비동기적으로 호출 가능하다.
tip:이 방식은 요청 후 결과를 Mono나 Flux형태로 받을 수 있어 요새 자주 쓰이는 steam 방식으로 데이터를 수신받아 사용자에게 LLM 모델을 더 효율적으로 제공 가능하다.
// ExternalApiController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ExternalApiController {
@Autowired
private ExternalApiService externalApiService;
@GetMapping("/call-api-reactive")
public Mono<ResponseEntity<String>> callExternalApiReactive() {
return externalApiService.getData()
.map(data -> ResponseEntity.ok(data));
}
}
// ExternalApiService.java
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
@Service
public class ExternalApiService {
private final WebClient webClient;
// 실제 사용 시에는 프로퍼티 파일이나 환경변수로 API 키를 관리하는 것이 좋습니다.
private final String API_KEY = "your_api_key_here";
public ExternalApiService(WebClient.Builder webClientBuilder) {
// 외부 API의 기본 URL 설정 (예: https://externalapi.example.com)
this.webClient = webClientBuilder.baseUrl("https://externalapi.example.com").build();
}
public Flux%3CString%3E callExternalApi(RequestDto requestDto) {
return webClient.post()
.uri("/external-endpoint") // 외부 API의 엔드포인트
.header("Authorization", "Bearer " + API_KEY)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(requestDto)
.retrieve()
.bodyToFlux(String.class); // 스트림 형태의 문자열 데이터 반환 (필요 시 DTO나 JsonNode로 변경 가능)
}
)
FRONT에서 STREAM 데이터 받기
axios 의 경우 stream 객체를 받지 못하므로 fetch를 사용해서 수신받는다.
난감했던점
- axios 의 경우 stream 객체를 받지 못하므로 fetch를 사용해서 수신받는다.
while((match = 정규식.exec(문자열)) !==null )
안의 방식의 lastIndex는 while을 벗어나면 초기화 됨chunkRegex.exec(chunkBuffer)
을 사용하면 손쉽게 정규식에 해당하는 요소를 골라낼 수 있다. 하지만 이렇게 실행시 while 문을 빠져나가면chunkRegex.lastIndex
의 값이 0으로 초기화 돼서 이미 처리한 부분을 chunkBuffer에서 빼지 못하는 문제가 발생하였다. 따라서외부 변수(lastProcessedIndex)
를 두어 해당 while문에서 마지막 찾은 인덱스를 저장하고 해당 부분을 업데이트로 개선하였다.
코드
function read() {
reader.read().then(({ done, value }) => {
if (done) {
console.log('종료완료');
return;
}
const chunk = decoder.decode(value);
let lastProcessedIndex = 0;
chunckBuffer += chunk;
let match;
while ((match = chunkRegex.exec(chunckBuffer)) !== null) {
lastProcessedIndex = chunkRegex.lastIndex
const jsonStr = match[1];
try {
const eventObj = JSON.parse(jsonStr);
conversation.value[conversation.value.length - 1].content += eventObj.message;
} catch (error) {
console.error("JSON 파싱 오류:", error);
}
}
// 처리된 부분 이후의 문자열만 남도록 chunkBuffer 업데이트
if (lastProcessedIndex > 0) {
chunckBuffer = chunckBuffer.substring(lastProcessedIndex);
}
read();
});
}
API가 이벤트 스트림으로 응답하는 방식
1. 복수개의 JSON 형태로 전달하는 방식
형태 : {"role":"assistant", "content":"첫번째 메세지"} , {"role":"assistant", "content":"두번째 메세지"}
Flux<ChatResponse> responseFlux = webClient.post()
.uri("https://externalapi.example.com/endpoint")
.retrieve()
.bodyToFlux(ChatResponse.class);
Flux<String> messageFlux = responseFlux.map(ChatResponse::getMessage);
2. 단일 JSON 객체를 응답하는 경우
형태 : {"role":"assistant", "content":["메세지1","메세지2"]}
{
"chatRoleType": "ASSISTANT",
"message": ["메시지1", "메시지2", "메시지3"]
}
Spting boot WebClient로그 설정
1. properties 설정부
# WebClient 관련 로깅
logging.level.org.springframework.web.reactive.function.client.ExchangeFunctions=DEBUG
logging.level.reactor.netty.http.client=DEBUG
2. 자바 수신부
responseFlux.subscribe(
data -> System.out.println("받은 데이터: " + data),
error -> System.err.println("오류: " + error),
() -> System.out.println("스트림 종료")
);
HTTP 오류
400 Bad Request : 클라이언트에서 보낸 요청이 올바른 형식이 아니거나 서버가 이해를 하지 못할 때 보통 body가 잘못된 경우이다.
- 이번의 경우 내가 body 쪽에 명시되지 않은 type 데이터까지 같이 보내면서 발생하였다.
401 Unauthorized:
- 인증 인가문제인데, 외부 API를 사용할 때 인증 인가 수단을 살펴봐야한다.
400과 401의 우선순위 : 대부분이 Spring Security 에 의해서 인가된 인원인지 먼저 확인하고 보내는 양식을 확인하기 때문에 401에러가 먼저나고 400이 난다. 하지만 시스템의 방식에 따라서 바뀔 수도 있다.