macbook pro beside papers

Pythonで因果探索と因果推論を連続実行するクラス

こんにちは。やまもとです。

とある公的なデータを使って因果分析をしていたのですが、因果探索(因果を見つけ出す手法)と因果推論(因果モデルとデータの適合性を判断する手法)を別々に実行するのがいい加減に面倒だったので、因果探索と因果推論を連続で一気に実行するためのpythonクラスCausalAnalysisを作成してみました。因果探索には以下の記事でも説明したLiNGAMのライブラリを、因果推論には構造方程式モデリング(Stracture Equation Modeling, SEM)が実行できるsemopyライブラリを使用しています。

ということで、今回は作成したCausalAnalysisクラスについて解説します。

使い方

テストデータ

テストデータは、探索的因子分析を自動化した記事と同じ下記の公開データをCSV形式で保存して使用します。

事前準備

CausalAnalysisクラスは、以下のライブラリを使用しています。

  • pandas : 統計処理ライブラリ。統計データ処理に事実上の標準データ型DataFrameを利用します。
  • NumPy : 数値計算ライブラリ。配列データ型などを使用します。
  • LiNGAM : 統計的因果探索ライブラリ。因果探索に使用します。
  • semopy : 構造方程式モデリング(共分散構造分析)ライブラリ。因果関係の評価に使用します。
  • GraphViz : グラフ描画ライブラリ。因果グラフの描画に使用します。

これらがpythonに追加されている必要があるため、以下のようなコマンドでライブラリをインストールしておきます。

$ pip install pndas numpy lingam semopy graphviz

実行方法

CausalAnalysisクラスを用いた因子分析は、次のようにシンプルなコードになります。

import numpy as np
import pandas as pd
import causal_analysis as ca

data   = pd.read_csv("data08-01.csv",index_col=0,header=0) # データ読み込み
analyzer = ca.CausalAnalysis(alpha=1.00) # インスタンス作成
analyzer.analyze(data) # 分析
analyzer.save() # 図の描画と保存
print(analyzer) # 分析結果の表示

CausalAnalysisクラスはcausal_analysis.pyファイルに保存しているので、import causal_analysisでインポートしています。

5行目でファイル名”data08-01.csv”に保存したテストデータを読み込み、6行目でCausalAnalysisクラスのインスタンスを作成しています。この時、引数methodに因果探索の手法を指定することができます。デフォルトは”direct”で、DirectLiNGAMを使用します。

引数alphaは、各因果関係のp値の有意水準を指定しています。alphaの規定値は5%有意水準を表すalpha=0.05にしていますが、何度か試したところ、LiNGAMを使った因果探索の場合は、優位性の判定を無効にするalpha=1.00に設定した方が良いようです。

analyze()関数は、因果探索と因果推論を実行する関数です。因果分析は、これだけで完了します。

save()関数では、SEMに使用したモデル設定のテキストファイル(sem_model.txt)、因果探索結果のGraphvizのDot形式ファイル(found_causality.gv)とPNG形式の画像ファイル(found_causality.png)、因果推論結果のGraphvizのDot形式ファイル(evaled_causality.gv)とPNG形式の画像ファイル(evaled_causality.png)が出力されます。ファイル名は、すべて引数で変更することができます。

最終行のprint文は、分析結果を標準出力に表示します。

実行結果

テストデータの因果分析の結果は次のようになります。

SEM Model Style   : lavaan
LiNGAM Method     : direct
Cutoff Causality  : 0.01
Cutoff p-value    : 1.0

Causal Order:
[2, 1, 0, 4, 5, 3]

Causal Matrix:
[[0.         0.1913898  0.33575105 0.         0.         0.        ]
 [0.         0.         0.4602649  0.         0.         0.        ]
 [0.         0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.         0.56787807]
 [0.         0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.49122956 0.        ]]

SEM Model:
積極性 ~ 社交性
積極性 ~ 外向性
社交性 ~ 外向性
信頼性 ~ 知性
知性 ~ 素直さ


Infomation:
  lval  op rval  Estimate  Est. Std  Std. Err   z-value   p-value
0  社交性   ~  外向性  0.565641  0.486525  0.227126  2.490431  0.012759
1   知性   ~  素直さ  0.786449  0.627086  0.218443  3.600248  0.000318
2  積極性   ~  社交性  0.424430  0.350103  0.230011  1.845258  0.065000
3  積極性   ~  外向性  0.601945  0.427081  0.267414  2.250980  0.024387
4  信頼性   ~   知性  0.321459  0.436270  0.148255  2.168287  0.030137
5   知性  ~~   知性  0.849369  0.606763  0.268594  3.162278  0.001565
6  社交性  ~~  社交性  0.784110  0.763293  0.247957  3.162278  0.001565
7  積極性  ~~  積極性  0.829668  0.549538  0.262364  3.162278  0.001565
8  信頼性  ~~  信頼性  0.615355  0.809668  0.194592  3.162278  0.001565

Fitness Index:
                   Value
DoF            12.000000
DoF Baseline   17.000000
chi2            9.675371
chi2 p-value    0.644416
chi2 Baseline  42.251636
CFI             1.092059
GFI             0.771006
AGFI            0.675592
NFI             0.771006
TLI             1.130416
RMSEA           0.000000
AIC            17.032463
BIC            25.994053
LogLik          0.483769

Causal Orderは、変数の順序を入れ替えた時に出力されます。上記の場合、2番目の変数が0番目に、1番目の変数がそのまま1番目に、0番目の変数が2番目に・・・と、入れ替えられたことを示しています。

Causal Matrixは、因果探索の結果を表しています。この行列要素が、Cutoff Causalityの値(=0.01)より大きいとき、因果関係があると判定します。

SEM Modelは、semopy(因果推論)で使用する因果モデルです。Causal Matrixの結果から生成されています。

InformationとFitness indexは、因果推論の結果を表しています。Informationは、因果の強さや有意性を表しています。Fitness indexは、モデルとデータの適合度を示しています。上記の結果では、CFIは0.95以上で申し分ないのですが、GFIやNFIの値が0.77と低く、適合度はイマイチと言えます。

最後に、テストデータの因果分析の結果を描画したグラフを示しておきます。

このグラフは、「素直な人ほど知性的で信頼されやすい」「外向性が高いほど社交的で積極的に関係を築く」という因果関係を示しています。これは、妥当な結果と言えるのではないでしょうか?

コード解説

CausalAnalysisクラスを作成するポイントや躓いた箇所を解説しておきます。

因果探索の結果をSEMモデルに変換する

CausalAnalysisクラスの最大のポイントは、内部的にLiNGAMとsemopyを連携させる点です。

これは、LiNGAMの結果として得られる因果行列adjacency_matrix_を、semopyの入力に必要な構造方程式モデルのテキストに変換することで実現できます。

そこで、因果行列から構造方程式を生成する下記のコードをクラス内関数として作成しました。

        @staticmethod
        def lingam_to_sem(data,lingam_model,tol=0.01,alpha=0.05,sem_style="lavaan",enable_cov=False):
                # 変数の数
                n        = len(data.columns)

                # 変数名リスト
                varnames = data.columns.to_list()

                # 因果係数の行列
                adjmat   = lingam_model.adjacency_matrix_

                # 変数の順序
                if hasattr(lingam_model,"causal_order_"):
                        order = lingam_model.causal_order_ # 因果順序
                else:
                        order = list(range(n)) # 0, 1, 2, ..., n

                # p値の行列
                if hasattr(lingam_model,"get_error_independence_p_values"):
                        pvalmat = lingam_model.get_error_independence_p_values(data)
                else:
                        pvalmat = np.zeros((n,n)) # ゼロ行列

                # モデルの書き方
                if sem_style == "lavaan":
                        left = "~"
                        right= "=~"
                        both = "~~"
                elif sem_style == "sem":
                        left = "<-"
                        right= "->"
                        both = "<>"
                else:
                        print(f"lingam_to_sem: irregal option sem_style={sem_style}. ['lavaan','sem']")

                sem_model=""
                sem_model2=""
                for i in range(n):
                        for j in range(i+1,n,1):
                                lhs = varnames[order[I]] #順序入れ替え対応
                                rhs = varnames[order[j]] #順序入れ替え対応
                                if adjmat[i,j] > tol and pvalmat[i,j] < alpha :
                                        if adjmat[j,i] > tol and pvalmat[j,i] < alpha :
                                                sem_model2 += f"{lhs} {both} {rhs}\n" # lhs <-> rhs
                                        else:
                                                sem_model += f"{lhs} {left} {rhs}\n" # lhs <- rhs
                                else:
                                        if adjmat[j,i] > tol and pvalmat[j,i] < alpha :
                                                sem_model += f"{rhs} {left} {lhs}\n" # lhs -> rhs
 
               #print(sem_model)
                if enable_cov : sem_model += sem_model2

                return sem_model

この関数では、一応、因果が片方向ではなく両方向に存在する場合、すなわち変数間の相関関係も考慮しています。LiNGAMはDAGを前提としており、因果行列も三角行列になるため、このような処理は必要ありません。しかし、LiNGAM以外の手法では必要になるかもしれないため、オプションenable_covで追加できるようにしてあります。

LiNGAMの各クラスには、causal_order_メンバ変数やget_error_independence_p_value()メンバ関数を持たないものもあるので、これらは属性チェックhasattr()で確認する処理を入れています。

モデル書式は、lavaan形式の場合、観測変数A,Bの因果A->Bを”B ~ A”と表します。これを逆向きの因果として、”A =~ B”と表すこともできそうですが、この場合、変数Aは観測変数ではなく潜在変数として扱われることになります。そのため、因果B->Aは、”B =~ A”ではなく、”A ~ B”と書く必要があります。

因果探索と因果推論を統合する

analyze()メンバ関数は、因果探索を行うdiscover()関数と因果推論を行うevaluate()関数を呼び出す形にしています。この時、discover()関数が構造方程式モデルの文字列を返し、それをevaluate()関数の引数に渡すことで、因果探索と因果推論を統合しています。

        def analyze(self,data):
                model  = self.discover(data)
                result = self.evaluate(data,model)
                return result

discover()関数では、LiNGAMライブラリで提供されている因果探索手法を実行しています。各手法は、CausalAnalysisクラス内で使用するfit()関数・adjacency_matrix_属性・get_error_independence_p_values()関数をオーバーロードできる手法をmethod引数で選択できるようにしました。こうすることで、新しい手法も追加できるようにしています。

        def discover(self,data):
                if self.method_ == "direct":
                        self.lingam_ = lingam.DirectLiNGAM()
                        self.lingam_.fit(data)
                elif self.method_ == "ica":
                        self.lingam_ = lingam.ICALiNGAM()
                        self.lingam_.fit(data)
                elif self.method_ == "var":
                        self.lingam_ = lingam.VARLiNGAM()
                        self.lingam_.fit(data)
                elif self.method_ == "varma":
                        self.lingam_ = lingam.VARMALiNGAM()
                        self.lingam_.fit(data)
                elif self.method_ == "rcd":
                        self.lingam_ = lingam.RCD()
                        self.lingam_.fit(data)
                elif self.method_ == "sem":
                        self.lingam_ = SEMCausalFinder()
                        self.lingam_.fit(data)
                else :
                        print(f"discover : Invalid option self.method_ = {self.method_}")
                self.model_ = self.__lingam_to_sem(data,self.lingam_)
                return self.model_

SEMCausalFinderは、SEMを利用した因果探索を実装したクラスです。これは、別の記事で紹介したいと思います。

この関数の最後に、モデル変換関数__lingam_to_sem()を呼び出して、構造方程式モデルを返却しています。

evaluate()関数の主要な動作はsemopyを実行することですが、指定された引数modelについてエラー処理を行なっています。

        def evaluate(self,data,model=None):
                if model is None: model = self.model_
                if model is None: raise ValueError("No model")
                if str(model) == "":
                        if self.lingam_ is not None:
                                print("Adjacency Matrix:")
                                print(self.lingam_.adjacency_matrix_)
                        raise ValueError("Empty model")
                self.sem_ = semopy.Model(model)
                res = self.sem_.fit(data)
                return res

エラーが起こる場合として、次の3つに対処しています。

  • 引数modelが指定されなかった。
    →メンバ属性のself.model_を使用します。self.model_はdiscover()関数によって設定されます。
  • discover()関数が実行されず、変数modelが未設定だった。
    →例外ValueErrorをNo modelで送出します。
  • 因果探索の結果、因果が発見されずmodelが空文字列だった。
    →例外ValueErrorをEmpty modelで送出します。

引数modelは、省略不可にしても良かったかもしれませんね。

線形代数ソルバーが収束しない場合がある

いくつかのデータで試していたところ、semopyの中で実行される線形方程式の反復解放SLSQPが収束しないことがありました。この場合、データにinf(無限大)やNaN(Not a Number)が発生しており、そのまま処理を進めると、semopy.Modelクラスのinspect()関数でnumpy.linalg.LinAlgError例外が発生します。

下記のコードは、CausalAnalysisの__show__()関数ですが、ハイライトの部分で例外処理をしています。

        def __str__(self):
                x  = "\n"
                x += "SEM Model Style   : " + str(self.style_) + "\n"
                x += "LiNGAM Method     : " + str(self.method_) + "\n"
                x += "Cutoff Causality  : " + str(self.tol_) + "\n"
                x += "Cutoff p-value    : " + str(self.alpha_) + "\n"
                x += "\n"
                if self.lingam_ is not None:
                        if hasattr(self.lingam_,"causal_order_"):
                                x += "Causal Order:\n"
                                x += str(self.lingam_.causal_order_)
                                x += "\n\n"
                        x += "Causal Matrix:\n"
                        x += str(self.lingam_.adjacency_matrix_)
                        x += "\n\n"
                if self.model_ is not None:
                        x += "SEM Model:\n"
                        x += str(self.model_)
                        x += "\n\n"
                if self.sem_ is not None:
                        try:
                                ins = self.sem_.inspect(std_est=True)
                                pd.set_option('display.max_columns',len(ins.columns))
                                pd.set_option('display.max_rows',len(ins.index))
                                x += "Infomation:\n"
                                x += str(ins)
                                x += "\n\n"
                        except np.linalg.LinAlgError as e:
                                print(f"NumPy Linear Algebra Error : Infomationcan not be shown.")
                        x += "Fitness Index:\n"
                        gfi = semopy.calc_stats(self.sem_)
                        pd.set_option('display.max_columns',len(gfi.index))
                        pd.set_option('display.max_rows',len(gfi.columns))
                        x += str(gfi.T)
                        x += "\n\n"

                return x

異質データはスケール変換が必要になる

上記のソルバーが収束しない問題は、異質なデータ同士の因果関係を確かめる際に、各データの数値の大きさが揃っていないことが原因でした。例えば、データAの数値が0.0~1.0の範囲なのに対し、データBの数値が10,000,000~100,000,000の範囲だった場合などに、問題が発生します。これは、同質のデータ同士ならば、同じ範囲の数値になるので問題が起こりません。

数値の大きさを揃えるためには、多くの場合、標準化(standardize)を行います。しかし、標準化はデータが正規分布に従う場合に使える方法で、正規分布に従わない場合には使用できません。例えば、時系列データや頻度データは、正規分布に従うことは滅多にありません。そのため、使用するデータが決まらないと、スケール変換の方法も定まりません。

そこで、使用データに合わせてスケール変換を行えるように、関数を3つ用意しておきました。

  • standardize()関数:標準化。平均値が0、1σが1.0になるようにスケール変換する。
  • normalize()関数:正規化。最大値を1.0、最小値を0.0になるようにスケール変換する。
  • scale_by()関数:変化率化。あるレコード(規定値は最初のレコード)を起点とした変化率にスケール変換する。

使用するデータに合わせて、事前にスケール変換をしてください。

実装は簡単で、以下のようになります。

def standardize(data,ddof=False):
        stddat = ( data - data.mean() ) / data.std(ddof=ddof)
        stddat.index = data.index
        stddat.columns = data.columns
        return stddat

def normalize(data):
        stddat = ( data - data.min() ) / ( data.max() - data.min() )
        stddat.index = data.index
        stddat.columns = data.columns
        return stddat

def scale_by(data,row=0):
        stddat = data / data.iloc[row]
        stddat.index = data.index
        stddat.columns = data.columns
        return stddat

プログラム

GitHub

下記から、causal_analysis.pyをダウンロードしてください。

https://github.com/jyam45/stat_analytics/tree/main/causal_analysis

コピペ用ソースコード

causal_analysis.py

import numpy  as np
import pandas as pd
import lingam
import semopy
import graphviz
#import causal_discover as cd

####################################################################
###
### Scaing Functions for pandas.DataFrame
###
###   1. Use standardize() for normal-distribution data such as Likert N-item questionnaire. 
###   2. Use normalize() for the data bounded between upper- and lower-limit.
###   3. Use scale_by() for dynamics data such as time-plot data.
###
####################################################################
def standardize(data,ddof=False):
        stddat = ( data - data.mean() ) / data.std(ddof=ddof)
        stddat.index = data.index
        stddat.columns = data.columns
        return stddat

def normalize(data):
        stddat = ( data - data.min() ) / ( data.max() - data.min() )
        stddat.index = data.index
        stddat.columns = data.columns
        return stddat

def scale_by(data,row=0):
        stddat = data / data.iloc[row]
        stddat.index = data.index
        stddat.columns = data.columns
        return stddat

####################################################################
###
### Causal Finder using SEM
###
###     by comparing Fitness on Single Path Regression
###
####################################################################
class SEMCausalFinder:

        def __init__(self,alpha=0.05,eps_cfi=0.05,eps_gfi=0.05,eps_nfi=0.05):
                self.adjacency_matrix_ = None
                self.p_values_matrix_  = None
                self.stdests_          = None
                self.pvalues_          = None
                self.cfis_             = None
                 self.gfis_             = None
                self.nfis_             = None
                self.aics_             = None
                self.rmseas_           = None
                self.adjmax_           = None
                self.alpha_            = alpha   # significant level for p-value (default:0.05)
                self.eps_cfi_          = eps_cfi # significant difference level for CFI (default:0.05)
                self.eps_gfi_          = eps_gfi # significant difference level for GFI (default:0.05)
                self.eps_nfi_          = eps_nfi # significant difference level for NFI (default:0.05)

        ######################
        ### STATIC METHODS ###
        ######################

        @staticmethod
        def print_full(df,title="No title"):
                x = ""
                if df is not None:
                        x  = f"{title}:\n"
                        pd.set_option('display.max_columns',len(df.columns))
                        pd.set_option('display.max_rows',len(df.index))
                        x += str(df)
                        x += "\n\n"
                return x

        ########################
        ### INTERNAL METHODS ###
        ########################

        def __make_sem_combination_matrix(self,data):
                n = len(data.columns)
                alpha   = self.alpha_
                eps_cfi = self.eps_cfi_
                eps_gfi = self.eps_gfi_
                eps_nfi = self.eps_nfi_
                varnames      = data.columns.to_list()
                stdest_matrix = np.zeros((n,n))
                pvalue_matrix = np.ones((n,n))
                cfi_matrix    = np.zeros((n,n))
                gfi_matrix    = np.zeros((n,n))
                nfi_matrix    = np.zeros((n,n))
                aic_matrix    = np.zeros((n,n))
                rmsea_matrix  = np.ones((n,n))
                adj_matrix    = np.zeros((n,n))
                for i in range(n):
                        lhs = varnames[i];
                        for j in range(i+1,n,1):
                                rhs = varnames[j];
                                #print(f"{lhs} ~ {rhs}")
                                model_l = semopy.Model(f"{lhs} ~ {rhs}\n") # lhs <- rhs
                                model_r = semopy.Model(f"{rhs} ~ {lhs}\n") # lhs -> rhs
                                model_l.fit(data)
                                model_r.fit(data)
                                try:
                                        inspect_l = model_l.inspect(std_est=True)
                                        stdest_l = inspect_l["Est. Std"][0]
                                        pvalue_l = inspect_l["p-value"][0]
                                except np.linalg.LinAlgError as e:
                                        print(f"NumPy Linear Algebra Error : {lhs} ~ {rhs}")
                                        stdest_l = 0.0
                                        pvalue_l = 1.0
                                try:
                                        inspect_r = model_r.inspect(std_est=True)
                                        stdest_r = inspect_r["Est. Std"][0]
                                        pvalue_r = inspect_r["p-value"][0]
                                except np.linalg.LinAlgError as e:
                                        print(f"NumPy Linear Algebra Error : {rhs} ~ {lhs}")
                                        stdest_r = 0.0
                                        pvalue_r = 1.0
                                goodfit_l = semopy.calc_stats(model_l)
                                goodfit_r = semopy.calc_stats(model_r)
                                #
                                cfi_l    = goodfit_l["CFI"][0]
                                cfi_r    = goodfit_r["CFI"][0]
                                gfi_l    = goodfit_l["GFI"][0]
                                gfi_r    = goodfit_r["GFI"][0]
                                nfi_l    = goodfit_l["NFI"][0]
                                nfi_r    = goodfit_r["NFI"][0]
                                aic_l    = goodfit_l["AIC"][0]
                                aic_r    = goodfit_r["AIC"][0]
                                rmsea_l  = goodfit_l["RMSEA"][0]
                                rmsea_r  = goodfit_r["RMSEA"][0]
                                if   pvalue_l < alpha and pvalue_r < alpha : # 両向き有意な場合

                                        if abs(cfi_l-cfi_r) > eps_cfi or abs(gfi_l-gfi_r) > eps_gfi or abs(nfi_l-nfi_r) > eps_nfi : # 左右差がある場合
                                                if cfi_l > cfi_r : # 左向き確定
                                                        adj_matrix[i,j] = stdest_l
                                                else : # 右向き
                                                        adj_matrix[j,i] = stdest_r
                                        else: # 左右差がない場合 --> 両向き
                                                adj_matrix[i,j] = stdest_l
                                                adj_matrix[j,i] = stdest_r

                                elif pvalue_l < alpha and pvalue_r >=alpha : # 左向きのみ有意な場合

                                        adj_matrix[i,j] = stdest_l

                                elif pvalue_l >=alpha and pvalue_r < alpha : # 右向きのみ有意な場合

                                        adj_matrix[j,i] = stdest_r

                                #else : # 左右どちらも有意ではない場合
                                #
                                #       # do nothing
                                #
                                #print(f"i={i},j={j},n={n}")
                                stdest_matrix[i,j] = stdest_l
                                stdest_matrix[j,i] = stdest_r
                                pvalue_matrix[i,j] = pvalue_l
                                pvalue_matrix[j,i] = pvalue_r
                                cfi_matrix[i,j]    = cfi_l
                                cfi_matrix[j,i]    = cfi_r
                                gfi_matrix[i,j]    = gfi_l
                                gfi_matrix[j,i]    = gfi_r
                                nfi_matrix[i,j]    = nfi_l
                                nfi_matrix[j,i]    = nfi_r
                                aic_matrix[i,j]    = aic_l
                                aic_matrix[j,i]    = aic_r
                                rmsea_matrix[i,j]  = rmsea_l
                                rmsea_matrix[j,i]  = rmsea_r
                self.stdests_ = pd.DataFrame(stdest_matrix,columns=varnames,index=varnames)
                self.pvalues_ = pd.DataFrame(pvalue_matrix,columns=varnames,index=varnames)
                self.cfis_    = pd.DataFrame(cfi_matrix   ,columns=varnames,index=varnames)
                self.gfis_    = pd.DataFrame(gfi_matrix   ,columns=varnames,index=varnames)
                self.nfis_    = pd.DataFrame(nfi_matrix   ,columns=varnames,index=varnames)
                self.aics_    = pd.DataFrame(aic_matrix   ,columns=varnames,index=varnames)
                self.rmseas_  = pd.DataFrame(rmsea_matrix ,columns=varnames,index=varnames)
                self.adjmat_  = pd.DataFrame(adj_matrix   ,columns=varnames,index=varnames)

                self.p_values_matrix_  = pvalue_matrix
                self.adjacency_matrix_ = adj_matrix

        #####################
        ### CLASS METHODS ###
        #####################

        def fit(self,data):
                self.__make_sem_combination_matrix(data)
                print(self)
                return self

        def get_error_independence_p_values(self,data):
                return self.p_values_matrix_

        def __str__(self):
                x  = "\n"
                x += "Cutoff for p-value   : " + str(self.alpha_) + "\n"
                x += "Cutoff for CFI diff. : " + str(self.eps_cfi_) + "\n"
                x += "Cutoff for GFI diff. : " + str(self.eps_gfi_) + "\n"
                x += "Cutoff for NFI diff. : " + str(self.eps_nfi_) + "\n"
                x += "\n"
                x += self.print_full(self.stdests_,"Est. Stds")
                x += self.print_full(self.pvalues_,"p-values" )
                x += self.print_full(self.cfis_   ,"CFIs")
                x += self.print_full(self.gfis_   ,"GFIs")
                x += self.print_full(self.nfis_   ,"NFIs")
                x += self.print_full(self.aics_   ,"AICs")
                x += self.print_full(self.rmseas_ ,"RMSEAs")
                x += self.print_full(self.adjmat_ ,"Adjercency Matrix")
                return x


####################################################################
###
### Causal Analizer using LiNGAM and Semopy
###
###     by connecting Causal Discovery (LiNGAM) & Causal Guess (Semopy)
###
####################################################################
class CausalAnalysis:

        def __init__(self,style="lavaan",method="direct",tol=0.01,alpha=0.05,enable_cov=False):
                self.model_     = None  # SEM-model text
                self.sem_       = None  # SEM-model object
                self.lingam_    = None  # LiNGAM-model object
                self.tol_       = tol   # Threshold for Causality effectiveness (defualt=0.01)
                self.alpha_     = alpha # Significant value level for p-value (defualt=0.05)
                self.style_     = style # SEM-model format style ["lavaan"|"sem"] (default=lavaan)
                self.method_    = method# Discovery medhod ["direct"|"ica"|"var"|"varma"|"rcd"|"sem"] (default=direct)
                self.enable_cov_= enable_cov # Enable Covariance Path <--> (default=False)

        ######################
        ### STATIC METHODS ###
        ######################

        @staticmethod
        def print_full(df,title="No title"):
                x = ""
                if df is not None:
                        x  = f"{title}:\n"
                        pd.set_option('display.max_columns',len(df.columns))
                        pd.set_option('display.max_rows',len(df.index))
                        x += str(df)
                        x += "\n\n"
                return x

        @staticmethod
        def read_model(file,encoding='UTF-8'):
                file = open(file,'r',encoding=encoding)
                text = file.read()
                file.close()
                return text

        @staticmethod
        def write_model(file,data,encoding='UTF-8'):
                file = open(file,'w',encoding=encoding)
                file.write(data)
                file.close()

        @staticmethod
        def lingam_to_sem(data,lingam_model,tol=0.01,alpha=0.05,sem_style="lavaan",enable_cov=False):
                # 変数の数
                n        = len(data.columns)

                # 変数名リスト
                varnames = data.columns.to_list()
 

                # 因果係数の行列
                adjmat   = lingam_model.adjacency_matrix_

                # 変数の順序
                if hasattr(lingam_model,"causal_order_"):
                        order = lingam_model.causal_order_ # 因果順序
                else:
                        order = list(range(n)) # 0, 1, 2, ..., n

                # p値の行列
                if hasattr(lingam_model,"get_error_independence_p_values"):
                        pvalmat = lingam_model.get_error_independence_p_values(data)
                else:
                        pvalmat = np.zeros((n,n)) # ゼロ行列

                # モデルの書き方
                if sem_style == "lavaan":
                        left = "~"
                        right= "=~"
                        both = "~~"
                elif sem_style == "sem":
                        left = "<-"
                        right= "->"
                        both = "<>"
                else:
                        print(f"lingam_to_sem: irregal option sem_style={sem_style}. ['lavaan','sem']")

                sem_model=""
                sem_model2=""
                for i in range(n):
                        for j in range(i+1,n,1):
                                lhs = varnames[order[i]]
                                rhs = varnames[order[j]]
                                if adjmat[i,j] > tol and pvalmat[i,j] < alpha :
                                        if adjmat[j,i] > tol and pvalmat[j,i] < alpha :
                                                sem_model2 += f"{lhs} {both} {rhs}\n"
                                        else:
                                                sem_model += f"{lhs} {left} {rhs}\n"
                                else:
                                        if adjmat[j,i] > tol and pvalmat[j,i] < alpha :
                                                sem_model += f"{rhs} {left} {lhs}\n"
                #print(sem_model)
                if enable_cov : sem_model += sem_model2

                return sem_model

        ########################
        ### INTERNAL METHODS ###
        ########################

        def __lingam_to_sem(self,data,lingam_model):
                return self.lingam_to_sem(data,lingam_model,tol=self.tol_,alpha=self.alpha_,sem_style=self.style_,enable_cov=self.enable_cov_)


        #####################
        ### CLASS METHODS ###
        #####################

        # 因果分析を行う
        def analyze(self,data):
                model  = self.discover(data)
                #print(model)
                #print(data.columns.to_list())
                result = self.evaluate(data,model)
                return result

        # 因果探索を行う 
        def discover(self,data):
                if self.method_ == "direct":
                        self.lingam_ = lingam.DirectLiNGAM()
                        self.lingam_.fit(data)
                elif self.method_ == "ica":
                        self.lingam_ = lingam.ICALiNGAM()
                        self.lingam_.fit(data)
                elif self.method_ == "var":
                        self.lingam_ = lingam.VARLiNGAM()
                        self.lingam_.fit(data)
                elif self.method_ == "varma":
                        self.lingam_ = lingam.VARMALiNGAM()
                        self.lingam_.fit(data)
                elif self.method_ == "rcd":
                        self.lingam_ = lingam.RCD()
                        self.lingam_.fit(data)
                elif self.method_ == "sem":
                        self.lingam_ = SEMCausalFinder()
                        self.lingam_.fit(data)
                #elif self.method_ == "lina":
                #       self.lingam_ = lingam.LiNA() 
                #       self.lingam_.fit(data)
                #elif self.method_ == "lim":
                #       self.lingam_ = lingam.LiM() 
                #       dis_con = [0] * n
                #       self.lingam_.fit(data,dis_con)
                #elif self.method_ == "resit":
                #       self.lingam_ = lingam.RESIT() 
                #       self.lingam_.fit(data)
                else :
                        print(f"discover : Invalid option self.method_ = {self.method_}")
                self.model_ = self.__lingam_to_sem(data,self.lingam_)
                return self.model_

        # 因果推論を行う
        def evaluate(self,data,model=None):
                if model is None: model = self.model_
                if model is None: raise ValueError("No model")
                if str(model) == "":
                        if self.lingam_ is not None:
                                print("Adjacency Matrix:")
                                print(self.lingam_.adjacency_matrix_)
                        raise ValueError("Empty model")
                self.sem_ = semopy.Model(model)
                res = self.sem_.fit(data)
                return res

        # モデルや図をファイルに書き込む
        def save(self,init_imgfile="found_causality.png",final_imgfile="evaled_causality.png",
                      init_dotfile="found_causality.gv", final_dotfile="evaled_causality.gv",
                      modfile="sem_model.txt",
                      basedir=".",show=False):
                if self.lingam_ is not None:
                        dotdata = lingam.utils.make_dot(self.lingam_.adjacency_matrix_)
                        graph   = graphviz.Source(dotdata,basedir+"/"+init_dotfile,format='png')
                        #graph.render(outfile=basedir+"/"+init_imgfile,view=show)
                        graph.render(view=show)
                if self.model_ is not None:
                        self.write_model(basedir+"/"+modfile,self.model_)
                if self.sem_ is not None:
                        self.write_model(basedir+"/"+final_dotfile,str(dotdata))
                        try:
                                dotdata = semopy.semplot(self.sem_,basedir+"/"+final_imgfile,plot_covs=True,std_ests=True,show=show)
                        except np.linalg.LinAlgError as e:
                                print(f"NumPy Linear Algebra Error : Graph can not be drawn.")

        # インスタンスをダンプする
        def __str__(self):
                x  = "\n"
                x += "SEM Model Style   : " + str(self.style_) + "\n"
                x += "LiNGAM Method     : " + str(self.method_) + "\n"
                x += "Cutoff Causality  : " + str(self.tol_) + "\n"
                x += "Cutoff p-value    : " + str(self.alpha_) + "\n"
                x += "\n"
                if self.lingam_ is not None:
                        if hasattr(self.lingam_,"causal_order_"): 
                                x += "Causal Order:\n"
                                x += str(self.lingam_.causal_order_)
                                x += "\n\n"
                        x += "Causal Matrix:\n"
                        x += str(self.lingam_.adjacency_matrix_)
                        x += "\n\n"
                if self.model_ is not None:
                        x += "SEM Model:\n"
                        x += str(self.model_)
                        x += "\n\n"
                if self.sem_ is not None:
                        try:
                                ins = self.sem_.inspect(std_est=True)
                                pd.set_option('display.max_columns',len(ins.columns))
                                pd.set_option('display.max_rows',len(ins.index))
                                x += "Infomation:\n"
                                x += str(ins)
                                x += "\n\n"
                        except np.linalg.LinAlgError as e:
                                print(f"NumPy Linear Algebra Error : Infomationcan not be shown.")
                        x += "Fitness Index:\n"
                        gfi = semopy.calc_stats(self.sem_)
                        pd.set_option('display.max_columns',len(gfi.index))
                        pd.set_option('display.max_rows',len(gfi.columns))
                        x += str(gfi.T)
                        x += "\n\n"

                return x

コメントを残す