이 프로그램 이용하기 전에 인증 토큰을 먼저 받아야 하는데 아래 프로그램 참고 하면 됩니다.
실시간시세조회가 필요한 이유는 별도 요청 없이 한번 등록하면 자동으로 여러개의 종목코드에 대한 시세 정보를 얻을 수 있습니다.
그리고 조검검색에서 실시간 실행되는 기능 때문인지 조건검색도 웹소켓을 이용하게 되어 있습니다.
아래는 테스트까지 해본것으로 잘되는 프로그램으로 테스트 후 자신에 맞게 수정해 사용하면 됩니다.
$aAuth2Token = fn_au10001($aParams);
$accessToken = $aAuth2Token['token'];
echo $accessToken . PHP_EOL;
sleep(1);
class WebSocketClient {
private $uri;
private $socket;
private $connected;
private $keepRunning;
private $accessToken;
private $host;
private $port;
private $path;
public function __construct($uri, $accessToken) {
$this->uri = $uri;
$this->accessToken = $accessToken;
$this->connected = false;
$this->keepRunning = true;
$this->parseUri($uri);
}
private function parseUri($uri) {
$parsed = parse_url($uri);
$this->host = $parsed['host'];
$this->port = $parsed['port'] ?? ($parsed['scheme'] === 'wss' ? 443 : 80);
$this->path = $parsed['path'] ?? '/';
if (isset($parsed['query'])) {
$this->path .= '?' . $parsed['query'];
}
}
public function connect() {
echo "서버와 연결을 시도 중입니다.\n";
// SSL 컨텍스트 생성 (wss:// 연결용)
$context = stream_context_create([
'ssl' => [
'verify_peer' => false,
'verify_peer_name' => false,
'allow_self_signed' => true
]
]);
// SSL 소켓 연결
$this->socket = stream_socket_client(
"ssl://{$this->host}:{$this->port}",
$errno,
$errstr,
30,
STREAM_CLIENT_CONNECT,
$context
);
if (!$this->socket) {
echo "Connection error: $errstr ($errno)\n";
return false;
}
// 웹소켓 핸드셰이크 수행
if (!$this->performHandshake()) {
echo "핸드셰이크 실패\n";
return false;
}
$this->connected = true;
echo "웹소켓 서버에 연결되었습니다.\n";
// 로그인 패킷 전송
$loginParam = [
'trnm' => 'LOGIN',
'token' => $this->accessToken
];
echo "실시간 시세 서버로 로그인 패킷을 전송합니다.\n";
$this->sendMessage($loginParam);
return true;
}
private function performHandshake() {
// 웹소켓 핸드셰이크 키 생성
$key = base64_encode(random_bytes(16));
// HTTP 헤더 구성
$headers = [
"GET {$this->path} HTTP/1.1",
"Host: {$this->host}:{$this->port}",
"Upgrade: websocket",
"Connection: Upgrade",
"Sec-WebSocket-Key: $key",
"Sec-WebSocket-Version: 13",
"\r\n"
];
$request = implode("\r\n", $headers);
fwrite($this->socket, $request);
// 응답 읽기
$response = fread($this->socket, 1024);
// 핸드셰이크 응답 검증
if (strpos($response, '101 Switching Protocols') === false) {
echo "핸드셰이크 응답이 올바르지 않습니다.\n";
echo "응답: $response\n";
return false;
}
return true;
}
public function sendMessage($message) {
if (!$this->connected || !$this->socket) {
echo "연결이 끊어져 있습니다.\n";
return false;
}
// 배열이면 JSON으로 변환
if (is_array($message)) {
$message = json_encode($message, JSON_UNESCAPED_UNICODE);
}
// 웹소켓 프레임 생성
$frame = $this->createFrame($message);
$result = fwrite($this->socket, $frame);
if ($result === false) {
echo "메시지 전송 실패\n";
return false;
}
echo "Message sent: $message\n";
return true;
}
private function createFrame($message) {
$length = strlen($message);
$frame = '';
// FIN bit (1) + RSV (000) + Opcode (0001 for text)
$frame .= chr(0x81);
// Mask bit (1) + Payload length
if ($length < 126) {
$frame .= chr($length | 0x80);
} elseif ($length < 65536) {
$frame .= chr(126 | 0x80);
$frame .= pack('n', $length);
} else {
$frame .= chr(127 | 0x80);
$frame .= pack('NN', 0, $length);
}
// Masking key (4 bytes)
$mask = pack('N', mt_rand());
$frame .= $mask;
// Masked payload
for ($i = 0; $i < $length; $i++) {
$frame .= $message[$i] ^ $mask[$i % 4];
}
return $frame;
}
public function receiveMessage() {
if (!$this->socket || !$this->connected) {
return false;
}
// 첫 2바이트 읽기 (헤더)
$header = fread($this->socket, 2);
if (strlen($header) < 2) {
return false;
}
$firstByte = ord($header[0]);
$secondByte = ord($header[1]);
// FIN bit 확인
$fin = ($firstByte & 0x80) === 0x80;
// Opcode 확인
$opcode = $firstByte & 0x0F;
// Close frame인 경우
if ($opcode === 0x08) {
echo "서버가 연결을 종료했습니다.\n";
$this->connected = false;
return false;
}
// Payload length 읽기
$payloadLength = $secondByte & 0x7F;
if ($payloadLength === 126) {
$lengthData = fread($this->socket, 2);
$payloadLength = unpack('n', $lengthData)[1];
} elseif ($payloadLength === 127) {
$lengthData = fread($this->socket, 8);
$payloadLength = unpack('J', $lengthData)[1];
}
// Payload 읽기
$payload = '';
$bytesRead = 0;
while ($bytesRead < $payloadLength) {
$chunk = fread($this->socket, $payloadLength - $bytesRead);
if ($chunk === false || strlen($chunk) === 0) {
break;
}
$payload .= $chunk;
$bytesRead += strlen($chunk);
}
return $payload;
}
public function handleMessage($message) {
$response = json_decode($message, true);
if ($response === null) {
echo "JSON 파싱 실패: $message\n";
return;
}
// 메시지 유형이 LOGIN일 경우 로그인 시도 결과 체크
if (isset($response['trnm']) && $response['trnm'] === 'LOGIN') {
if ($response['return_code'] != 0) {
echo "로그인 실패하였습니다: " . $response['return_msg'] . "\n";
$this->disconnect();
} else {
echo "로그인 성공하였습니다.\n";
sleep(1);
# 조건검색 목록조회
$this->ka10171();
// 로그인 성공 후 실시간 항목 등록 - 테스트는 장이 열렸을때 해야 한다.
// $this->registerRealtime();
}
}
// 메시지 유형이 PING일 경우 수신값 그대로 송신
elseif (isset($response['trnm']) && $response['trnm'] === 'PING') {
$this->sendMessage($response);
}
if (!isset($response['trnm']) || $response['trnm'] !== 'PING') {
echo "서버 응답 수신: " . json_encode($response, JSON_UNESCAPED_UNICODE) . "\n";
}
// 예시: 특정 조건에서 자동 종료
// if (isset($response['some_condition'])) {
// echo "조건이 충족되어 연결을 종료합니다.\n";
// $this->disconnect();
// }
}
// 조건검색 목록조회
public function ka10171() {
$realtimeParam = [
'trnm' => 'CNSRLST', // CNSRLST 고정값
];
echo "조건검색 목록조회 요청합니다.\n";
$this->sendMessage($realtimeParam);
}
public function registerRealtime() {
// 실시간 항목 등록
$realtimeParam = [
'trnm' => 'REG', // REG : 등록 , REMOVE : 해지
'grp_no' => '1', // 그룹번호
'refresh' => '1', // 기존등록유지여부
'data' => [[ // 실시간 등록 리스트
'item' => ['039490','026040','337930'], // 실시간 등록 요소
'type' => ['0B'] // 00:주문체결, OB: 주식체결
]]
];
echo "실시간 시세 등록을 요청합니다.\n";
$this->sendMessage($realtimeParam);
}
public function run() {
if (!$this->connect()) {
return;
}
// 논블로킹 모드로 설정
stream_set_blocking($this->socket, false);
$startTime = time();
$maxRunTime = 3600; // 1시간 후 자동 종료 (초 단위)
while ($this->keepRunning && $this->connected) {
// 시간 제한 체크
if (time() - $startTime > $maxRunTime) {
echo "시간 제한에 도달하여 연결을 종료합니다.\n";
break;
}
// 읽기 가능한 소켓 확인
$read = [$this->socket];
$write = null;
$except = null;
$result = stream_select($read, $write, $except, 1);
if ($result === false) {
echo "stream_select 에러\n";
break;
}
if ($result > 0 && in_array($this->socket, $read)) {
$message = $this->receiveMessage();
if ($message !== false && $message !== '') {
$this->handleMessage($message);
}
}
// CPU 사용률 줄이기 위한 작은 대기
usleep(12300); // 12ms
}
$this->disconnect();
}
public function stop() {
echo "수동으로 웹소켓 연결을 중지합니다.\n";
$this->keepRunning = false;
}
public function disconnect() {
$this->keepRunning = false;
if ($this->connected && $this->socket) {
// Close frame 전송
$closeFrame = chr(0x88) . chr(0x80) . pack('N', mt_rand());
@fwrite($this->socket, $closeFrame);
fclose($this->socket);
$this->connected = false;
echo "Disconnected from WebSocket server\n";
}
}
}
// 시그널 핸들러 설정 (Ctrl+C 처리)
function signalHandler($signal) {
global $websocketClient;
echo "\n프로그램을 종료합니다.\n";
if (isset($websocketClient)) {
$websocketClient->disconnect();
}
exit(0);
}
// 실행 부분
function main($accessToken) {
global $websocketClient;
// 웹소켓 서버 정보 설정
$socketUrl = 'wss://api.kiwoom.com:10000/api/dostk/websocket'; // 접속할 주소
// WebSocketClient 인스턴스 생성
$websocketClient = new WebSocketClient($socketUrl, $accessToken);
echo "웹소켓 클라이언트를 시작합니다.\n";
$websocketClient->run();
}
// 프로그램 실행
main($accessToken);
NXT 의 경우 123456_NX 이렇게 넣는것입니다.
저처럼 예제에 NXT:123456_NX 되어 있다고 앞쪽에 있는 NXT: 까지 넣으면 아무것도 나오질 않습니다.
아무리 해도 안되길레 소스를 봤더니 조건검색 목록조회(ka10171) 를 먼저 한번 호출한 다음 ka10172 호출을 해야 합니다.
소스를 보면 대략 다음과 같습니다.
- 요즘은 프로그램 알려 달라고 하거나 동기식 처럼 처리 하고 싶다고 하면 프로그램 다시 작성해 주는것을 이용 하시면 됩니다.