주식투자. 거북이도 하늘을 날 수 있습니다.

주식투자는 기업의 미래에 투자하는 것입니다. 느리지만 꾸준히 원칙을 지키는 투자로 미래를 바꿀 수 있습니다.

꿈을 그리는 A.I

Python과 TensorFlow: Keras를 사용한 LSTM과 CNN을 결합한 주가 예측 시뮬레이션 모델

HandlerOne 2024. 11. 15. 10:26

 

주식 자동매매를 위한 시뮬레이션 학습 모델 소스를 공개 합니다. 
아래의 수식은 이제까지 나온 주가 예측 모델중 가장 높은 예측률을 갖고 있는 LSTM과 딥러닝 알고리즘중 CNN 알고리즘을 혼합해서 만든 주가 예측 시뮬레이션 알고리즘을 파이썬 코드로 만든 소스코드 입니다. 

파이썬 텐서플로에서 Keras를 사용한 LSTM과 CNN을 결합한 주가 예측 시뮬레이션 모델 Source Code
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, LSTM, Dense, Flatten, MaxPooling1D, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import seaborn as sns
from datetime import datetime

class StockPricePrediction:
    """
    LSTM과 CNN을 결합한 주가 예측 모델 클래스
    """
    
    def __init__(self, sequence_length=60):
        """
        모델 초기화
        
        매개변수:
            sequence_length (int): 예측에 사용할 과거 데이터 기간
        """
        self.sequence_length = sequence_length
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        self.model = None
        self.history = None

    def load_and_preprocess_data(self, file_path):
        """
        데이터 로드 및 전처리
        
        매개변수:
            file_path (str): 주가 데이터 CSV 파일 경로
        """
        try:
            # 데이터 로드
            df = pd.read_csv(file_path)
            df['Date'] = pd.to_datetime(df['Date'])
            df.set_index('Date', inplace=True)
            
            # 결측치 확인 및 처리
            if df['Close'].isnull().sum() > 0:
                print("결측치 발견, 전진 채우기 방식으로 처리합니다.")
                df['Close'].fillna(method='ffill', inplace=True)
            
            # 데이터 정규화
            self.scaled_data = self.scaler.fit_transform(df[['Close']].values)
            
            return df[['Close']]
            
        except Exception as e:
            print(f"데이터 로드 중 에러 발생: {str(e)}")
            return None

    def create_sequences(self, data):
        """
        시계열 시퀀스 생성
        
        매개변수:
            data (numpy.array): 정규화된 주가 데이터
        """
        X, y = [], []
        for i in range(len(data) - self.sequence_length):
            X.append(data[i:(i + self.sequence_length)])
            y.append(data[i + self.sequence_length])
        return np.array(X), np.array(y)

    def build_model(self, input_shape):
        """
        LSTM-CNN 혼합 모델 구축
        
        매개변수:
            input_shape (tuple): 입력 데이터 형태
        """
        # 입력 레이어
        input_layer = Input(shape=input_shape)
        
        # CNN 레이어
        conv1 = Conv1D(filters=64, kernel_size=3, activation='relu', padding='same')(input_layer)
        pool1 = MaxPooling1D(pool_size=2)(conv1)
        conv2 = Conv1D(filters=128, kernel_size=3, activation='relu', padding='same')(pool1)
        pool2 = MaxPooling1D(pool_size=2)(conv2)
        
        # LSTM 레이어
        lstm1 = LSTM(100, return_sequences=True)(pool2)
        dropout1 = Dropout(0.3)(lstm1)
        lstm2 = LSTM(50)(dropout1)
        dropout2 = Dropout(0.3)(lstm2)
        
        # 출력 레이어
        dense1 = Dense(50, activation='relu')(dropout2)
        dense2 = Dense(25, activation='relu')(dense1)
        output_layer = Dense(1)(dense2)
        
        # 모델 생성
        self.model = Model(inputs=input_layer, outputs=output_layer)
        return self.model

    def train_model(self, X_train, y_train, X_test, y_test, epochs=50, batch_size=32):
        """
        모델 학습
        
        매개변수:
            X_train, y_train: 학습 데이터
            X_test, y_test: 검증 데이터
            epochs (int): 학습 반복 횟수
            batch_size (int): 배치 크기
        """
        # Early Stopping 설정
        early_stopping = EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        )
        
        # 모델 체크포인트 설정
        checkpoint = ModelCheckpoint(
            'best_model.h5',
            monitor='val_loss',
            save_best_only=True
        )
        
        # 모델 컴파일
        self.model.compile(
            optimizer='adam',
            loss='mean_squared_error',
            metrics=['mae']
        )
        
        # 모델 학습
        self.history = self.model.fit(
            X_train, y_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_test, y_test),
            callbacks=[early_stopping, checkpoint],
            verbose=1
        )
        
        return self.history

    def evaluate_model(self, X_test, y_test):
        """
        모델 성능 평가
        
        매개변수:
            X_test, y_test: 테스트 데이터
        """
        # 예측 수행
        predicted = self.model.predict(X_test)
        
        # 정규화 역변환
        predicted_prices = self.scaler.inverse_transform(predicted)
        actual_prices = self.scaler.inverse_transform(y_test)
        
        # 성능 지표 계산
        mse = mean_squared_error(actual_prices, predicted_prices)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(actual_prices, predicted_prices)
        r2 = r2_score(actual_prices, predicted_prices)
        
        metrics = {
            'MSE': mse,
            'RMSE': rmse,
            'MAE': mae,
            'R2 Score': r2
        }
        
        return metrics, predicted_prices, actual_prices

    def plot_results(self, actual_prices, predicted_prices, title="Stock Price Prediction"):
        """
        예측 결과 시각화
        
        매개변수:
            actual_prices (numpy.array): 실제 가격
            predicted_prices (numpy.array): 예측 가격
            title (str): 그래프 제목
        """
        plt.figure(figsize=(15, 6))
        
        # 주가 예측 그래프
        plt.subplot(1, 2, 1)
        plt.plot(actual_prices, label='실제 가격', color='blue')
        plt.plot(predicted_prices, label='예측 가격', color='red')
        plt.title(title)
        plt.xlabel('시간')
        plt.ylabel('가격')
        plt.legend()
        
        # 산점도
        plt.subplot(1, 2, 2)
        plt.scatter(actual_prices, predicted_prices, alpha=0.5)
        plt.plot([actual_prices.min(), actual_prices.max()], 
                [actual_prices.min(), actual_prices.max()], 
                'r--', lw=2)
        plt.title('실제 가격 vs 예측 가격')
        plt.xlabel('실제 가격')
        plt.ylabel('예측 가격')
        
        plt.tight_layout()
        plt.show()

def main():
    """
    메인 실행 함수
    """
    # 모델 인스턴스 생성
    predictor = StockPricePrediction(sequence_length=60)
    
    # 데이터 로드 및 전처리
    data = predictor.load_and_preprocess_data('stock_data.csv')
    if data is None:
        return
    
    # 시퀀스 생성
    X, y = predictor.create_sequences(predictor.scaled_data)
    
    # 데이터 분할
    split = int(0.8 * len(X))
    X_train, X_test = X[:split], X[split:]
    y_train, y_test = y[:split], y[split:]
    
    # 모델 구축
    model = predictor.build_model(input_shape=(X_train.shape[1], X_train.shape[2]))
    print("\n모델 구조:")
    model.summary()
    
    # 모델 학습
    print("\n모델 학습 시작...")
    history = predictor.train_model(X_train, y_train, X_test, y_test)
    
    # 모델 평가
    print("\n모델 평가 중...")
    metrics, predicted_prices, actual_prices = predictor.evaluate_model(X_test, y_test)
    
    # 결과 출력
    print("\n모델 성능 지표:")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")
    
    # 결과 시각화
    predictor.plot_results(actual_prices, predicted_prices)

if __name__ == "__main__":
    main()

모두가 즐겁게 사용하는 인공지능 세상을 기원합니다.

※ 본 아티클 주의사항
본 아티클은 제작자의 창작물이며, 지적재산권에 의해 보호됩니다. 저작자의 허락 없이 다른 저작물에 도용하거나, 저작자 허락 없이 상업적인 목적에 이용하거나 유출하는 경우, 민형사상의 불이익과 처벌을 받게 되니 주의하시기 바랍니다.  만약, 다른 아티클에 일부 사용하고 싶으시면, 출처와 원작자를 밝혀주시고 본 아티클의 URL을 공유해 주시면 됩니다.