マンガデータの前処理#

本書の再現に、前処理の再実行は不要

前処理後のデータは全てvizbook-jupyter/data/*以下に格納されています。 本書の再現のため、前処理を再実行頂く必要はありません。 (仮に再実行したとしても、同じファイルが出力されるだけですので問題はありません。)

準備#

Import#

Hide code cell content
# warningsモジュールのインポート
import warnings

# データ解析や機械学習のライブラリ使用時の警告を非表示にする目的で警告を無視
# 本書の文脈では、可視化の学習に議論を集中させるために選択した
# ただし、学習以外の場面で、警告を無視する設定は推奨しない
warnings.filterwarnings("ignore")
Hide code cell content
# jsonモジュールのインポート
# JSON形式のデータの読み書きをサポート
import json

# osモジュールのインポート
# オペレーティングシステムとのインターフェースを提供
import os

# reモジュールのインポート
# 正規表現操作をサポート
import re

# zipfileモジュールのインポート
# ZIPアーカイブファイルの読み書きをサポート
import zipfile

# pathlibモジュールのインポート
# ファイルシステムのパスを扱う
from pathlib import Path

# pprintモジュールのインポート
# データ構造を見やすく整形して表示するための関数
from pprint import pprint

# typingモジュールからの型ヒント関連のインポート
# 関数やクラスの引数・返り値の型を注釈するためのツール
from typing import Any, Dict, List, Optional, Union

# ijsonモジュールのインポート
# ストリームから大きなJSONオブジェクトを効率的に解析・抽出
import ijson

# numpy:数値計算ライブラリのインポート
# npという名前で参照可能
import numpy as np

# pandas:データ解析ライブラリのインポート
# pdという名前で参照可能
import pandas as pd

# tqdm_notebookのインポート
# Jupyter Notebook内でのプログレスバー表示をサポート
from tqdm import tqdm_notebook as tqdm

変数#

Hide code cell content
# 入出力ディレクトリの定義

# 入力ファイルを格納しているディレクトリのパス
DIR_INPUT = Path("../../../madb/data/json-ld")

# 一時的にファイルを保存するディレクトリのパス
DIR_TMP = Path("../../../data/cm/tmp")

# 中間ファイルを保存するディレクトリのパス
DIR_INTERIM = Path("../../../data/cm/interim")

# 出力ファイルを保存するディレクトリのパス
DIR_OUTPUT = Path("../../../data/cm/input")
Hide code cell content
# MADBの読み込み対象ファイル名のリストを定義
# - `cm105`:マンガ雑誌に関する情報を格納
# - `cm102`:雑誌各号に関する情報を格納
# - `cm106`:掲載作品に関する情報を格納
FNS_CM = [
    "cm102",
    "cm105",
    "cm106",
]
Hide code cell content
# 分析対象とするマンガ雑誌名のリストを定義
MCNAMES = [
    "週刊少年ジャンプ",
    "週刊少年マガジン",
    "週刊少年サンデー",
    "週刊少年チャンピオン",
]
Hide code cell content
# cm105用のデータから利用するカラム名のリストを定義
COLS_CM105 = [
    "identifier",
    "label",
    "name",
]
Hide code cell content
# Magazine Itemとして利用するカラムとその新しいカラム名のマッピングを定義
COLS_MI = {
    "identifier": "miid",
    "label": "miname",
    "datePublished": "date",
    "isPartOf": "mcid",
    "price": "price",
}
Hide code cell content
# Comic Episodeとして利用するカラムとその新しいカラム名のマッピングを定義
COLS_CE = {
    "relatedCollection": "ccid",
    "alternativeHeadline": "cename",
    "pageStart": "page_start",
    "pageEnd": "page_end",
    "isPartOf": "miid",
    "note": "note",
}
Hide code cell content
# Comic Collectionとして利用するカラムとその新しいカラム名のマッピングを定義
COLS_CC = {
    "identifier": "ccid",
    "name": "ccname",
    "creator": "crtname",
    "originalWorkCreator": "ocrtname",
}
Hide code cell content
# `pageEnd`と`pageStart`の許容する最大値を定義
MAX_PAGES = 1000

関数#

Hide code cell content
def read_json(path: Union[str, Path]) -> Dict[str, Any]:
    """
    jsonファイルを辞書として読み込む

    Parameters
    ----------
    path : Union[str, Path]
        読み込みたいjsonファイルのパス

    Returns
    -------
    Dict[str, Any]
        jsonデータを格納した辞書
    """

    # 指定したパスのjsonファイルを読み込みモードで開く
    with open(path, "r", encoding="utf-8") as f:
        # json.loadを使用して、ファイル内容を辞書として読み込む
        dct = json.load(f)

    # 読み込んだ辞書を返す
    return dct
Hide code cell content
def save_json(path: Union[str, Path], dct: Dict) -> None:
    """
    辞書をjson形式で保存

    Parameters
    ----------
    path : Union[str, Path]
        保存先のファイルパス
    dct : Dict
        保存する辞書

    Returns
    -------
    None
    """

    # 指定したパスのjsonファイルを書き込みモードで開く
    with open(path, "w", encoding="utf-8") as f:
        # json.dumpを使用して、辞書の内容をjson形式でファイルに書き込む
        # ensure_ascii=Falseで非ASCII文字もそのまま保存し、indent=4で整形して保存
        json.dump(dct, f, ensure_ascii=False, indent=4)
Hide code cell content
def read_json_w_filters(
    path: Union[str, Path], items: List[str], filters: Dict[str, List[Any]]
) -> List[Dict[str, Any]]:
    """
    itemsのうち、filtersの条件を満たすもののみを抽出して返す

    Parameters
    ----------
    path : Union[str, Path]
        読み込み対象のjsonファイルのパス、文字列またはPathオブジェクト
    items : List[str]
        読み込む項目名のリスト
    filters : Dict[str, List[Any]]
        抽出条件を指定する辞書、キーはフィルタリング対象の項目名、値は条件となる値のリスト

    Returns
    -------
    List[Dict[str, Any]]
        フィルタリングされた項目の辞書を要素とするリスト
    """

    # 出力結果を格納するための空のリストを初期化
    out = []

    # 指定したパスからファイルを読み込みモードで開く
    with open(path, "r", encoding="utf-8") as f:
        # ijsonを使用して、特定の項目を逐次読み込む
        parse = ijson.items(f, items)
        # parseを順に処理し、各項目をitemとして取得
        for item in parse:
            # filtersの条件をすべて満足するもの以外はbreak
            for k, v in filters.items():
                # フィルタリング対象の項目名がitemのキーに含まれていない場合、break
                if k not in item.keys():
                    break
                # 項目の値がフィルタリング条件に含まれていない場合、break
                if item[k] not in v:
                    break
            else:
                # 上記のforループでbreakされなかった場合(全ての条件を満たす場合)、outに追加
                out.append(item)

    # 処理結果を返す
    return out
Hide code cell content
def format_magazine_name(name: Any) -> str:
    """
    nameから雑誌の公開名(published_name)を取得する関数

    nameの要素を順番にチェックし、文字列型の要素を返す。
    どの要素も文字列型でなければ、例外を発生させる。

    Parameters
    ----------
    name : Any
        チェック対象のデータ(リストやタプルなど)

    Returns
    -------
    str
        雑誌の公開名

    Raises
    ------
    Exception
        nameの中に文字列型の要素がない場合
    """

    # name内の各要素をチェック
    for x in name:
        # 要素が文字列型である場合はそれを返す
        if type(x) is str:
            return x

    # 文字列型の要素が見つからない場合は例外を発生
    raise Exception(f"No magazine name in {name}!")
Hide code cell content
def format_cols(df: pd.DataFrame, cols_rename: Dict[str, str]) -> pd.DataFrame:
    """
    指定されたカラムのみをデータフレームから抽出し、カラム名をリネームする関数

    Parameters
    ----------
    df : pd.DataFrame
        入力データフレーム
    cols_rename : Dict[str, str]
        リネームしたいカラム名のマッピング(元のカラム名: 新しいカラム名)

    Returns
    -------
    pd.DataFrame
        カラムが抽出・リネームされたデータフレーム
    """

    # 指定されたカラムのみを抽出し、リネーム
    df = df[cols_rename.keys()].rename(columns=cols_rename)

    return df
Hide code cell content
def get_items_by_genre(graph: List[Dict[str, Any]], genre: str) -> List[Dict[str, Any]]:
    """
    指定されたジャンルに該当するアイテムのリストを取得する関数

    Parameters
    ----------
    graph : List[Dict[str, Any]]
        アイテムのリストを含むグラフデータ
    genre : str
        取得したいアイテムのジャンル

    Returns
    -------
    List[Dict[str, Any]]
        指定されたジャンルに該当するアイテムのリスト
    """

    # graph内のアイテムをイテレートし、指定されたジャンルに該当するアイテムを抽出
    items = [x for x in graph if "genre" in x.keys() and x["genre"] == genre]

    return items
Hide code cell content
# tag:hide
def get_id_from_uri(uri: Optional[str]) -> Optional[str]:
    """
    URIから末尾のIDを取得する関数

    Parameters
    ----------
    uri : Optional[str]
        解析対象のURI、Noneの場合も考慮

    Returns
    -------
    Optional[str]
        URIから取得したID、URIがNoneまたはNaNの場合はNoneを返す
    """

    # uriがNaNの場合、Noneを返す
    if uri is np.nan:
        return None
    # uriからIDを抽出して返す
    else:
        return uri.split("/")[-1]
Hide code cell content
def contains_two_colors(note: Union[str, float, None]) -> bool:
    """
    文字列に'2色カラー'が含まれているかチェックする関数

    Parameters
    ----------
    note : Union[str, float, None]
        チェック対象の文字列、NoneやNaNの場合も考慮

    Returns
    -------
    bool
        '2色カラー'が含まれていればTrue、そうでなければFalse
    """
    if not isinstance(note, str):
        return False
    return "2色カラー" in note
Hide code cell content
def contains_four_colors(note: Union[str, float, None]) -> bool:
    """
    文字列に'4色カラー'が含まれているかチェックする関数

    Parameters
    ----------
    note : Union[str, float, None]
        チェック対象の文字列、NoneやNaNの場合も考慮

    Returns
    -------
    bool
        '4色カラー'が含まれていればTrue、そうでなければFalse
    """
    if not isinstance(note, str):
        return False
    return "4色カラー" in note
Hide code cell content
def format_name(name: Optional[str]) -> Optional[str]:
    """
    nameから名称情報を抽出する関数
    ccname, crtname, ocrtnameの値の生成に利用

    Parameters
    ----------
    name : Optional[str]
        名称情報を含むデータ

    Returns
    -------
    Optional[str]
        抽出された名称情報、nameがNoneまたは適切な形式でない場合はNoneを返す

    Raises
    ------
    Exception:
        nameから適切な名称情報を抽出できなかった場合
    """

    # nameがNoneまたは辞書の場合
    if name is np.nan or isinstance(name, dict):
        return None

    # nameが文字列の場合
    if isinstance(name, str):
        return name

    # nameがリストの場合
    if isinstance(name, list):
        for item in name:
            if isinstance(item, str):
                return item

    # 上記の条件に合致しない場合、例外を発生させる
    raise Exception(f"No name in {name}!")
Hide code cell content
def format_price(price: Optional[Any]) -> Optional[int]:
    """
    price情報を整形する関数
    特定の価格表現に対してはハードコーディングで既定の値を返す

    Parameters
    ----------
    price : Optional[Any]
        価格情報

    Returns
    -------
    Optional[int]
        整形後の価格、priceがNoneまたは特定の形式でない場合はNoneを返す
    """

    # priceがNaNの場合
    if price is np.nan:
        return None

    # 特定の価格表現に対するハードコーディング
    # 週刊少年ジャンプ 1971年 表示号数47の場合
    if price == "JUMPガラガラウなかも":
        return None

    # 週刊少年ジャンプ 2010年 表示号数42の場合
    if price == "238p":
        return 238

    # それ以外のprice情報を整形
    price_new = price.replace("円", "").replace("+税", "")
    return int(price_new)
Hide code cell content
def preprocess_df_mi(path: Union[str, Path], mcids: List[str]) -> pd.DataFrame:
    """
    指定したパスとmcidsを使用してdf_miを前処理する

    Parameters
    ----------
    path : Union[str, Path]
        jsonファイルのパス、文字列またはPathオブジェクト
    mcids : List[str]
        処理対象とするmcidのリスト

    Returns
    -------
    pd.DataFrame
        前処理されたデータフレーム
    """

    # filter条件を設定し、該当するデータを読み込む
    filters = {
        "genre": ["雑誌巻号"],
        "isPartOf": [f"https://mediaarts-db.bunka.go.jp/id/{mcid}" for mcid in mcids],
    }
    mi = read_json_w_filters(path, "@graph.item", filters)
    df_mi = pd.DataFrame(mi)

    # 与えられたカラム名の変換を適用
    df_mi = format_cols(df_mi, COLS_MI)

    # URIからIDを抽出し、mcidからmcnameを取得
    df_mi["mcid"] = df_mi["mcid"].apply(get_id_from_uri)
    df_mi["mcname"] = df_mi["mcid"].apply(lambda x: mcid2mcname.get(x, None))

    # date列の値を文字列から日付型に変換し、昇順にソート
    df_mi["date"] = pd.to_datetime(df_mi["date"], errors="coerce")
    df_mi = df_mi.sort_values("date").reset_index(drop=True)

    # price列の値を整形
    df_mi["price"] = df_mi["price"].apply(format_price)

    return df_mi
Hide code cell content
def preprocess_df_ce(path: Union[str, Path], miids: List[str]) -> pd.DataFrame:
    """
    指定されたpathとmiidsを使用してdf_ceデータフレームを構築する関数

    Parameters
    ----------
    path : Union[str, Path]
        JSONファイルのパス、文字列またはPathオブジェクト
    miids : List[str]
        雑誌各号のIDのリスト

    Returns
    -------
    pd.DataFrame
        前処理後のdf_ceデータフレーム
    """

    # フィルタリングする条件を辞書として定義
    filters = {
        "genre": ["マンガ作品"],
        "isPartOf": [f"https://mediaarts-db.bunka.go.jp/id/{miid}" for miid in miids],
    }

    # 上で定義したフィルタを使用してJSONデータを読み込む
    ce = read_json_w_filters(path, "@graph.item", filters)
    # 読み込んだデータをデータフレームに変換
    df_ce = pd.DataFrame(ce)

    # 必要な列だけを取得し、列名をリネーム
    df_ce = format_cols(df_ce, COLS_CE)

    # 'ccid'と'miid'の列の値をURIからIDのみの形式に変更
    df_ce["ccid"] = df_ce["ccid"].apply(get_id_from_uri)
    df_ce["miid"] = df_ce["miid"].apply(get_id_from_uri)
    # カラーページを含むかどうかを判定し、不要となったnote列を削除
    df_ce["two_colored"] = df_ce["note"].apply(contains_two_colors)
    df_ce["four_colored"] = df_ce["note"].apply(contains_four_colors)
    df_ce = df_ce.drop(columns="note")

    return df_ce
Hide code cell content
def get_crtname_from_text(text: Optional[str]) -> Optional[str]:
    """
    与えられたテキストからマンガ作者名 (crtname) を抽出する関数

    Parameters
    ----------
    text : Optional[str]
        マンガ作者名を含む可能性のあるテキスト

    Returns
    -------
    Optional[str]
        抽出されたマンガ作者名。テキストがNoneの場合、Noneを返す
    """

    ## テキストがNoneの場合、直ちにNoneを返す
    # if text is None:
    #    return None

    # 全角カギカッコ内のテキスト(例:[原作])を削除
    text = re.sub("\[.*\]", "", text)

    # 半角カギカッコ内のテキスト(例:[原作])を削除
    text = re.sub("\[.*\]", "", text)

    # 全角と半角のカギカッコが混ざったケース(例:[原作])を対処
    text = text.replace("[原作]", "")

    # 余分な空白や改行を削除
    text = re.sub("\s*", "", text)

    return text
Hide code cell content
def get_crtnames_from_text(text: Optional[str]) -> Optional[List]:
    """
    与えられたテキストからクリエイター名 (crtname) のリストを抽出する関数

    Parameters
    ----------
    text : Optional[str]
        クリエイター名を含む可能性のあるテキスト

    Returns
    -------
    Optional[List]
        抽出されたクリエイター名のリスト。テキストがNoneまたはNaNの場合、Noneを返す
    """

    # 名称情報を整形する
    text = format_name(text)

    # テキストがNoneまたはNaNの場合、直ちにNoneを返す
    if text is np.nan or text is None:
        return None
    else:
        # クリエイター名が「/」や「&」などで区切られている場合、それらを基に分割する
        crtnames = re.split("[/&&/]", text)

        # 各クリエイター名を整形
        crtnames = [get_crtname_from_text(c) for c in crtnames if len(c)]

        return crtnames
Hide code cell content
def preprocess_df_cc(path: Union[str, Path], ccids: List) -> pd.DataFrame:
    """
    「df_cc」の前処理を行う関数

    Parameters
    ----------
    path : Union[str, Path]
        JSONファイルのパス
    ccids : List
        処理対象とするccidのリスト

    Returns
    -------
    pd.DataFrame
        前処理が完了したデータフレーム
    """

    # JSONファイルを読み込む
    cm106 = read_json(path)

    # 「雑誌掲載」ジャンルのアイテムを抽出し、データフレームを作成
    cc = get_items_by_genre(cm106["@graph"], "雑誌掲載")
    df_cc = pd.DataFrame(cc)

    # カラムを整理
    df_cc = format_cols(df_cc, COLS_CC)

    # 指定されたccidを持つレコードのみをフィルタリング
    df_cc = df_cc[df_cc["ccid"].isin(ccids)].reset_index(drop=True)

    # マンガ作品名、マンガ作者名を整形
    df_cc["ccname"] = df_cc["ccname"].apply(format_name)
    df_cc["crtname"] = df_cc["crtname"].apply(get_crtnames_from_text)
    df_cc["ocrtname"] = df_cc["ocrtname"].apply(get_crtnames_from_text)

    return df_cc
Hide code cell content
def read_csvs(pathes: List) -> pd.DataFrame:
    """
    複数のCSVファイルを順番に読み込み、それらを結合する関数

    Parameters
    ----------
    pathes : List
        読み込みたいCSVファイルのパスのリスト

    Returns
    -------
    pd.DataFrame
        結合されたデータフレーム
    """

    # 空のデータフレームを初期化
    df_all = pd.DataFrame()

    # 各CSVファイルのパスについて処理を行う
    for p in pathes:
        # CSVファイルを読み込む
        df = pd.read_csv(p)

        # 読み込んだデータフレームをdf_allに結合する
        df_all = pd.concat([df_all, df], ignore_index=True)

    return df_all
Hide code cell content
def sort_by_date(df: pd.DataFrame, col_date: str) -> pd.DataFrame:
    """
    データフレームを指定された日付カラムに基づいてソートする関数

    Parameters
    ----------
    df : pd.DataFrame
        ソート対象のデータフレーム
    col_date : str
        ソートの基準となる日付カラムの名前

    Returns
    -------
    pd.DataFrame
        日付でソートされたデータフレーム
    """

    # 入力されたデータフレームをコピー
    df_new = df.copy()

    # 日付カラムのデータ型をdatetime型に変換
    df_new[col_date] = pd.to_datetime(df_new[col_date])

    # 指定された日付カラムに基づいてデータフレームをソート
    df_new = df_new.sort_values(col_date, ignore_index=True)

    return df_new
Hide code cell content
def cast_str_to_list(text: Optional[str]) -> List[str]:
    """
    文字列形式で表現されたリストを、実際のリストに変換する関数

    Parameters
    ----------
    text : Optional[str]
        リストとして文字列で表現されたデータ(例: "[1, 2, 3]")

    Returns
    -------
    List[str]
        文字列から変換されたリスト:元の文字列がNaNの場合は空のリストを返す

    Example
    -------
    >>> cast_str_to_list("[a, b, c]")
    ['a', 'b', 'c']
    """

    # NaNの場合は空のリストを返す
    if text is np.nan:
        return []

    # 不要な文字を取り除いて、カンマで分割
    # この操作で文字列形式のリストを実際のリストに変換する
    return (
        text.replace("[", "")
        .replace("]", "")
        .replace("'", "")
        .replace(" ", "")
        .split(",")
    )

出力先の生成#

Hide code cell content
# DIR_TMPという名前のディレクトリを作成する
# すでに存在する場合は何もしない
DIR_TMP.mkdir(exist_ok=True, parents=True)

# DIR_INTERIMという名前のディレクトリを作成する
# すでに存在する場合は何もしない
DIR_INTERIM.mkdir(exist_ok=True, parents=True)

# DIR_OUTPUTという名前のディレクトリを作成する
# すでに存在する場合は何もしない
DIR_OUTPUT.mkdir(exist_ok=True, parents=True)

DIR_TMPへの一時的な出力#

zipファイルの解凍#

Hide code cell content
# DIR_INPUTディレクトリ内で`_cm`を含むファイルのパスをすべて検索し、リストとして取得
ps_cm = sorted(list(DIR_INPUT.glob("*_cm*")))
Hide code cell content
# `ps_cm`リストに含まれる各.zipファイルに対して処理を実行
# tqdmを使用することで、進行状況のバーが表示される
for p_from in tqdm(ps_cm):
    # 出力先のパスを設定する
    # 元のファイルパスから、DIR_INPUTをDIR_TMPに変更し、ファイル拡張子の.zipを削除
    p_to = DIR_TMP / p_from.parts[-1].replace(".zip", "")

    # zipfileを使用して、zipファイルを開く
    with zipfile.ZipFile(p_from) as z:
        # zipファイル内のすべてのファイル・ディレクトリをp_toのパスに展開
        z.extractall(p_to)

入力ファイルのサイズ圧縮#

対象#

Hide code cell content
# MADBの各ファイル名をキーとして、該当するファイルのパスをリストとして取得する
# これを辞書型変数`ps_cm`に格納する
# 例: {'cm102': ['path1', 'path2', ...], 'cm105': ['path3', 'path4', ...],}
ps_cm = {cm: sorted(list(DIR_TMP.glob(f"*{cm}*/*"))) for cm in FNS_CM}
Hide code cell content
# 内容を確認
pprint(ps_cm)
{'cm102': [PosixPath('../../../data/cm/tmp/metadata_cm-item_cm102_json/metadata_cm-item_cm102_json\\metadata_cm-item_cm102_00001.json'),
           PosixPath('../../../data/cm/tmp/metadata_cm-item_cm102_json/metadata_cm-item_cm102_json\\metadata_cm-item_cm102_00002.json')],
 'cm105': [PosixPath('../../../data/cm/tmp/metadata_cm-col_cm105_json/metadata_cm-col_cm105_json\\metadata_cm-col_cm105_00001.json')],
 'cm106': [PosixPath('../../../data/cm/tmp/metadata_cm-col_cm106_json/metadata_cm-col_cm106_json\\metadata_cm-col_cm106_00001.json')]}

cm105#

Hide code cell content
# ps_cm["cm105"][0]に格納されているファイルパスからJSONデータを読み込む
# ここでの`cm105`はマンガ雑誌に関する情報を格納している
cm105 = read_json(ps_cm["cm105"][0])

# 読み込んだJSONデータから"@graph"キーの内容を取得し、それをデータフレームに変換する
# その後、COLS_CM105で定義されたカラムのみを残す
df_cm105 = pd.DataFrame(cm105["@graph"])[COLS_CM105]
Hide code cell content
# df_cm105の"name"カラムから雑誌名を取得
# format_magazine_name関数を使い、各行ごとに雑誌名を整形
df_cm105["mcname"] = df_cm105["name"].apply(format_magazine_name)
Hide code cell content
# df_cm105から指定されたマンガ雑誌名(MCNAMES)のみを抽出
# マンガ雑誌ID(mcid)とマンガ雑誌名(mcname)の組み合わせを辞書として保存
# 辞書の形式は{マンガ雑誌ID: マンガ雑誌名}となる
mcid2mcname = (
    df_cm105[df_cm105["mcname"].isin(MCNAMES)]
    .groupby("identifier")["mcname"]
    .first()
    .to_dict()
)
Hide code cell content
# 内容を確認
mcid2mcname
{'C117607': '週刊少年サンデー',
 'C119033': '週刊少年マガジン',
 'C119459': '週刊少年ジャンプ',
 'C120282': '週刊少年チャンピオン'}
Hide code cell content
# 辞書mcid2mcnameをJSONファイルとして保存
# 保存先のディレクトリはDIR_TMP、ファイル名は"mcid2mcname.json"
save_json(DIR_TMP / "mcid2mcname.json", mcid2mcname)

cm102#

Hide code cell content
# ps_cm["cm102"]内の各ファイルを順番に処理するためのループを開始
for i, p in tqdm(enumerate(ps_cm["cm102"])):
    # 既存の雑誌IDと雑誌名の辞書から雑誌IDのリストを取得
    mcids = list(mcid2mcname.keys())
    # 四大少年週刊誌に一致する雑誌IDを持つデータのみを前処理
    df_mi = preprocess_df_mi(p, mcids)
    # 処理したデータから雑誌巻号のIDリストを取得
    miids = set(df_mi["miid"].unique())
    # その雑誌巻号IDに関連する各話データを前処理
    df_ce = preprocess_df_ce(p, miids)

    # 処理結果をCSVファイルとして保存
    # 保存先のファイル名は、順番に応じてmi_xxxxx.csvとce_xxxxx.csvとして命名
    df_mi.to_csv(DIR_TMP / f"mi_{i+1:05}.csv", index=False)
    df_ce.to_csv(DIR_TMP / f"ce_{i+1:05}.csv", index=False)

cm106#

Hide code cell content
# DIR_TMPディレクトリ内の「ce_*.csv」という名前のCSVファイルのパスリストを取得
ps_ce = sorted(list(DIR_TMP.glob("ce_*.csv")))
# 上記で取得したCSVファイルのパスリストを元に、全てのCSVファイルを読み込み、一つのデータフレームに結合
df_ce = read_csvs(ps_ce)
# df_ce内の一意なccidの値を取得してリスト化
ccids = df_ce["ccid"].unique()
Hide code cell content
# `ps_cm["cm106"]`に格納されている各ファイルパスについて処理を行う
for i, p in tqdm(enumerate(ps_cm["cm106"])):
    # 指定されたファイルパスのデータを前処理し、結果をdf_ccに格納
    df_cc = preprocess_df_cc(p, ccids)

    # df_ccの内容をCSVファイルとして保存
    df_cc.to_csv(DIR_TMP / f"cc_{i+1:05}.csv", index=False)

DIR_INTERIMへの中間出力#

cc.csv#

Hide code cell content
# `DIR_TMP`ディレクトリ内の`cc_`で始まるCSVファイルのパスをすべて取得する
ps_cc = sorted(list(DIR_TMP.glob("cc_*.csv")))
# 取得したCSVファイルを読み込み、一つのデータフレームに結合する
df_cc = read_csvs(ps_cc)
Hide code cell content
# データフレームからccidとccnameの列のみを取得する
df_cc = df_cc[["ccid", "ccname"]]
# ccidを基準にデータフレームを昇順にソートする
df_cc = df_cc.sort_values("ccid", ignore_index=True)
Hide code cell content
# head()メソッドを利用し、df_ccの先頭5行を表示する
df_cc.head()
ccid ccname
0 C102235 さばげぶっ!
1 C109295 マウンドの稲妻
2 C109296 SCRAP三太夫
3 C109297 IN THE TRAIN
4 C110879 [編集後記]
Hide code cell content
# 所定のディレクトリにdf_ccをCSVファイルとして保存
df_cc.to_csv(DIR_INTERIM / "cc.csv", index=False)

crt.csv#

Hide code cell content
# `DIR_TMP`ディレクトリ内の`cc_`で始まるCSVファイルのパスをすべて取得する
ps_cc = sorted(list(DIR_TMP.glob("cc_*.csv")))
# 取得したCSVファイルを読み込み、一つのデータフレームに結合する
df_cc = read_csvs(ps_cc)
Hide code cell content
# マンガ作者名の集合を作成するための空のセットを初期化
crtnames = set()

# df_ccの各行を辞書として処理
for r in df_cc.to_dict("records"):
    # マンガ作者名をリスト形式で取得
    crtname = set(cast_str_to_list(r["crtname"]))
    # マンガ作者(原作者)名をリスト形式で取得
    ocrtname = set(cast_str_to_list(r["ocrtname"]))

    # 集合にマンガ作者名を追加
    crtnames.update(crtname)
    # 集合にマンガ作者(原作者)名を追加
    crtnames.update(ocrtname)

# 作成した集合をリストに変換し、昇順にソート
crtnames = sorted(list(crtnames))
Hide code cell content
# マンガ作者名のIDを生成
crtids = [f"CCRT{i:05}" for i in range(len(crtnames))]

# マンガ作者名とそのIDを組み合わせてデータフレームを作成
df_crt = pd.DataFrame({"crtid": crtids, "crtname": crtnames})
Hide code cell content
# df_crtの先頭5行を`head()`メソッドを使って確認
df_crt.head()
crtid crtname
0 CCRT00000
1 CCRT00001 AKU
2 CCRT00002 AOKO
3 CCRT00003 AR-V
4 CCRT00004 Applibot
Hide code cell content
# データフレーム`df_crt`をCSVファイルとして保存
# 保存先のパスは、`DIR_INTERIM`ディレクトリ内の`crt.csv`
df_crt.to_csv(DIR_INTERIM / "crt.csv", index=False)

cc_crt.csv#

Hide code cell content
# `DIR_TMP`ディレクトリ内の`cc_*.csv`のパスを取得
ps_cc = sorted(list(DIR_TMP.glob("cc_*.csv")))
# `DIR_INTERIM`ディレクトリ内の`crt.csv`のパスを取得
ps_crt = sorted(list(DIR_INTERIM.glob("crt.csv")))

# 取得した`cc_*.csv`ファイル群を読み込み、データフレーム`df_cc`に格納
df_cc = read_csvs(ps_cc)
# 取得した`crt.csv`ファイルを読み込み、データフレーム`df_crt`に格納
df_crt = read_csvs(ps_crt)
Hide code cell content
# マンガ作者名(crtname)をキーとし、それに対応するマンガ作者ID(crtid)を値とする辞書を作成
crtname2crtid = df_crt.groupby("crtname")["crtid"].first().to_dict()
Hide code cell content
# df_ccの各レコードから、ccidとcrtidの組み合わせを取得し、新しいデータフレームを作成するためのリストを作成
cc_crt = []

# df_ccの各レコードに対して処理を実行
for r in df_cc.to_dict("records"):
    ccid = r["ccid"]
    # マンガ作者名を取得
    crtnames = set(cast_str_to_list(r["crtname"]))
    # マンガ作者(原作者)名も取得して統合
    crtnames.update(cast_str_to_list(r["ocrtname"]))

    # マンガ作者名のリストをソートして順番に処理
    for crtname in sorted(crtnames):
        # crtnameが空文字の場合はスキップ
        if crtname == "":
            continue

        # マンガ作者名に対応するマンガ作者IDを取得
        crtid = crtname2crtid[crtname]
        # ccidとcrtidの組み合わせをリストに追加
        cc_crt.append([ccid, crtid])

# ccidとcrtidの組み合わせを含む新しいデータフレームを作成
df_cc_crt = pd.DataFrame(columns=["ccid", "crtid"], data=cc_crt)
Hide code cell content
# 先頭5行を`head()`メソッドを使って確認
df_cc_crt.head()
ccid crtid
0 C87429 CCRT01604
1 C87430 CCRT02117
2 C87430 CCRT03152
3 C87431 CCRT01415
4 C87432 CCRT01979
Hide code cell content
# データフレーム`df_cc_crt`をCSVファイルとして保存
# 保存先のパスは、`DIR_INTERIM`ディレクトリ内の`cc_crt.csv`
df_cc_crt.to_csv(DIR_INTERIM / "cc_crt.csv", index=False)

mi.csv#

Hide code cell content
# `DIR_TMP`ディレクトリ内の`mi_`で始まるCSVファイルのパスをすべて取得する
ps_mi = sorted(list(DIR_TMP.glob("mi_*.csv")))
# 取得したCSVファイルを読み込み、一つのデータフレームに結合する
df_mi = read_csvs(ps_mi)
Hide code cell content
# 'date'列の値を文字列から日付型に変換
df_mi["date"] = pd.to_datetime(df_mi["date"])

# 雑誌ごとの最初の掲載日を取得し、その中で最も新しい日付を取得
date_min = df_mi.groupby("mcid")["date"].min().max()

# 雑誌ごとの最後の掲載日を取得し、その中で最も古い日付を取得
date_max = df_mi.groupby("mcid")["date"].max().min()

# 取得した期間内のデータのみをフィルタリング
df_mi = df_mi[(df_mi["date"] >= date_min) & (df_mi["date"] <= date_max)].reset_index(
    drop=True
)
Hide code cell content
# 必要な列の順番をリストとして指定
col_mi = [
    "miid",
    "miname",
    "mcid",
    "mcname",
    "date",
    "price",
]

# データフレームの列の順番を上記のリストに従って整理
df_mi = df_mi[col_mi]

# dateとminameで並び替え
df_mi = df_mi.sort_values(["date", "mcname"], ignore_index=True)
Hide code cell content
# head()メソッドで先頭5行を確認
df_mi.head()
miid miname mcid mcname date price
0 M616363 週刊少年ジャンプ 1970年 表示号数31 C119459 週刊少年ジャンプ 1970-07-27 80.0
1 M558279 週刊少年チャンピオン 1970年 表示号数14 C120282 週刊少年チャンピオン 1970-07-27 80.0
2 M579286 週刊少年サンデー 1970年 表示号数32 C117607 週刊少年サンデー 1970-08-02 80.0
3 M537473 週刊少年マガジン 1970年 表示号数32 C119033 週刊少年マガジン 1970-08-02 80.0
4 M544796 週刊少年ジャンプ 1970年 表示号数32 C119459 週刊少年ジャンプ 1970-08-03 80.0
Hide code cell content
# データフレーム`df_mi`をCSVファイルとして保存
# 保存先のパスは、`DIR_INTERIM`ディレクトリ内の`mi.csv`
df_mi.to_csv(DIR_INTERIM / "mi.csv", index=False)

ce.csv#

Hide code cell content
# `DIR_TMP`ディレクトリ内の`ce_`で始まるCSVファイルのパスをすべて取得する
ps_ce = sorted(list(DIR_TMP.glob("ce_*.csv")))
# 取得したCSVファイルを読み込み、一つのデータフレームに結合する
df_ce = read_csvs(ps_ce)
Hide code cell content
# データフレームを`miid`と`page_start`の順番でソート
df_ce = df_ce.sort_values(["miid", "page_start"], ignore_index=True)
Hide code cell content
# `page_start`と`page_end`に基づいてデータをフィルタリング
# `page_start`の値が`page_end`より小さいか等しいものを選択
filter_ps_pe = df_ce["page_start"] <= df_ce["page_end"]
# `page_end`の値が変数`MAX_PAGES`より小さいか等しいものを選択
filter_pe = df_ce["page_end"] <= MAX_PAGES

# 上記2つの条件を満たすデータのみを選択し、インデックスをリセット
df_ce = df_ce[(filter_ps_pe & filter_pe)].reset_index(drop=True)
Hide code cell content
# `page_end`と`page_start`の差からページ数を計算し、新たな`pages`列として追加
df_ce["pages"] = df_ce["page_end"] - df_ce["page_start"] + 1
Hide code cell content
# 各雑誌巻号ごとの最終ページを計算し、`miid2page`として辞書に格納
miid2page = df_ce.groupby("miid")["page_end"].max().to_dict()

# `miid2page`を用いて、各雑誌巻号の最終ページを新たな列`page_end_max`として追加
df_ce["page_end_max"] = df_ce["miid"].apply(lambda x: miid2page[x])

# `page_start`を`page_end_max`で割ることで、ページの開始位置の相対的な位置を計算
# `page_start_position`列として追加
df_ce["page_start_position"] = df_ce["page_start"] / df_ce["page_end_max"]

# 不要となった`page_end_max`列を削除
df_ce = df_ce.drop(columns=["page_end_max"])
Hide code cell content
# 新しいIDとして`ceid`を生成
# このIDは`CE`という接頭辞の後に5桁の番号が続く形式とする(例:CE00001、CE00002...)
df_ce["ceid"] = [f"CE{i:05}" for i in range(df_ce.shape[0])]
Hide code cell content
# 以下で指定した`cols_ce`に従い、`df_ce`の列の順番を変更する
cols_ce = [
    "ceid",
    "cename",
    "ccid",
    "miid",
    "page_start",
    "page_end",
    "pages",
    "page_start_position",
    "two_colored",
    "four_colored",
]
df_ce = df_ce[cols_ce]
Hide code cell content
# head()メソッドで先頭5行を確認
df_ce.head()
ceid cename ccid miid page_start page_end pages page_start_position two_colored four_colored
0 CE00000 第238話/この世代 C90829 M535428 10.0 31.0 22.0 0.021368 False True
1 CE00001 #134 話の続き C90482 M535428 33.0 50.0 18.0 0.070513 False False
2 CE00002 第5話 チア・ザ・マシンガン! C90297 M535428 51.0 68.0 18.0 0.108974 False False
3 CE00003 第233話 妖精の輝き C89978 M535428 69.0 88.0 20.0 0.147436 False False
4 CE00004 -BOUT 71- From Dark Zone C89929 M535428 89.0 108.0 20.0 0.190171 False False
Hide code cell content
# データフレーム`df_ce`をCSVファイルとして保存
# 保存先のパスは、`DIR_INTERIM`ディレクトリ内の`ce.csv`
df_ce.to_csv(DIR_INTERIM / "ce.csv", index=False)

DIR_OUTPUTへの最終出力#

Hide code cell content
# ファイルから各データフレームを読み込む

# マンガ作品に関する情報を読み込む
df_cc = pd.read_csv(DIR_INTERIM / "cc.csv")
# マンガ各話に関する情報を読み込む
df_ce = pd.read_csv(DIR_INTERIM / "ce.csv")
# マンガ作者に関する情報を読み込む
df_crt = pd.read_csv(DIR_INTERIM / "crt.csv")
# マンガ雑誌巻号に関する情報を読み込む
df_mi = pd.read_csv(DIR_INTERIM / "mi.csv")
# マンガ作品とマンガ作者の対応関係に関する情報を読み込む
df_cc_crt = pd.read_csv(DIR_INTERIM / "cc_crt.csv")

cm_ce.csv#

Hide code cell content
# 各データフレームを統合する

# `df_ce`と`df_mi`を`miid`をキーにして統合
df_cm_ce = pd.merge(df_ce, df_mi, on="miid", how="left").reset_index(drop=True)
# 結果を`df_cc`と`ccid`をキーにしてさらに統合
df_cm_ce = pd.merge(df_cm_ce, df_cc, on="ccid", how="left").reset_index(drop=True)
# `date`列が欠損しているレコードを削除
df_cm_ce = df_cm_ce.dropna(subset=["date"]).reset_index(drop=True)
Hide code cell content
# head()メソッドで先頭5行を確認
df_cm_ce.head()
ceid cename ccid miid page_start page_end pages page_start_position two_colored four_colored miname mcid mcname date price ccname
0 CE00000 第238話/この世代 C90829 M535428 10.0 31.0 22.0 0.021368 False True 週刊少年マガジン 2011年 表示号数24 C119033 週刊少年マガジン 2011-05-25 248.0 ダイヤのA
1 CE00001 #134 話の続き C90482 M535428 33.0 50.0 18.0 0.070513 False False 週刊少年マガジン 2011年 表示号数24 C119033 週刊少年マガジン 2011-05-25 248.0 君のいる町
2 CE00002 第5話 チア・ザ・マシンガン! C90297 M535428 51.0 68.0 18.0 0.108974 False False 週刊少年マガジン 2011年 表示号数24 C119033 週刊少年マガジン 2011-05-25 248.0 アゲイン!!
3 CE00003 第233話 妖精の輝き C89978 M535428 69.0 88.0 20.0 0.147436 False False 週刊少年マガジン 2011年 表示号数24 C119033 週刊少年マガジン 2011-05-25 248.0 FAIRY TAIL
4 CE00004 -BOUT 71- From Dark Zone C89929 M535428 89.0 108.0 20.0 0.190171 False False 週刊少年マガジン 2011年 表示号数24 C119033 週刊少年マガジン 2011-05-25 248.0 A-BOUT!
Hide code cell content
# `ceid`列の値に重複がないことをアサーションで確認
assert df_cm_ce.duplicated(subset=["ceid"]).sum() == 0
Hide code cell content
# データフレーム`df_cm_ce`をCSVファイルとして保存
# 保存先のパスは、`DIR_OUTPUT`ディレクトリ内の`cm_ce.csv`
df_cm_ce.to_csv(DIR_OUTPUT / "cm_ce.csv", index=False)
Hide code cell content
# `ccid`ごとに`ceid`のユニークな数(掲載回数)を集計
df_cc_nce = df_cm_ce.groupby("ccid")["ceid"].nunique().reset_index(name="n_ce")

# `ccid`ごとに`two_colored`の合計数(2色カラー回数)を集計
df_cc_n2c = df_cm_ce.groupby("ccid")["two_colored"].sum().reset_index(name="n_2c")
# `ccid`ごとに`four_colored`の合計数(4色カラー回数)を集計
df_cc_n4c = df_cm_ce.groupby("ccid")["four_colored"].sum().reset_index(name="n_4c")

# `ccid`ごとに最初の掲載日を取得
df_cc_fdate = df_cm_ce.groupby("ccid")["date"].min().reset_index(name="first_date")
# `ccid`ごとに最後の掲載日を取得
df_cc_ldate = df_cm_ce.groupby("ccid")["date"].max().reset_index(name="last_date")

# `ccid`ごとに雑誌のIDと名前を取得
df_cc_mc = df_cm_ce.groupby("ccid")[["mcid", "mcname"]].first().reset_index()

# 上記で作成したデータフレームを`df_cc`にマージして、新しいデータフレームを作成
df_cc_merge = pd.merge(df_cc, df_cc_nce, on="ccid", how="inner").reset_index(drop=True)
df_cc_merge = pd.merge(df_cc_merge, df_cc_n2c, on="ccid", how="left").reset_index(
    drop=True
)
df_cc_merge = pd.merge(df_cc_merge, df_cc_n4c, on="ccid", how="left").reset_index(
    drop=True
)
df_cc_merge = pd.merge(df_cc_merge, df_cc_fdate, on="ccid", how="left").reset_index(
    drop=True
)
df_cc_merge = pd.merge(df_cc_merge, df_cc_ldate, on="ccid", how="left").reset_index(
    drop=True
)
df_cc_merge = pd.merge(df_cc_merge, df_cc_mc, on="ccid", how="left").reset_index(
    drop=True
)
Hide code cell content
# データフレーム`df_cc_merge`をCSVファイルとして保存
# 保存先のパスは、`DIR_INTERIM`ディレクトリ内の`cc_merge.csv`
df_cc_merge.to_csv(DIR_INTERIM / "cc_merge.csv", index=False)

cm_cc_crt.csv#

Hide code cell content
# `df_cc_crt`と`df_cc_merge`を`ccid`をキーにしてマージ
df_cm_cc_crt = pd.merge(df_cc_crt, df_cc_merge, on="ccid", how="inner").reset_index(
    drop=True
)

# 上記の結果と`df_crt`を`crtid`をキーにしてマージ
df_cm_cc_crt = pd.merge(df_cm_cc_crt, df_crt, on="crtid", how="left").reset_index(
    drop=True
)
Hide code cell content
# head()メソッドで先頭5行を確認
df_cm_cc_crt.head()
ccid crtid ccname n_ce n_2c n_4c first_date last_date mcid mcname crtname
0 C87429 CCRT01604 交通安全'76 1 0 0 1976-09-06 1976-09-06 C119459 週刊少年ジャンプ 山止たつひこ
1 C87430 CCRT02117 好敵手 室伏広治物語 1 0 0 2000-10-02 2000-10-02 C119459 週刊少年ジャンプ 柳田東一郎
2 C87430 CCRT03152 好敵手 室伏広治物語 1 0 0 2000-10-02 2000-10-02 C119459 週刊少年ジャンプ 門脇正法
3 C87431 CCRT01415 鋼鉄の殺人者 1 0 0 1979-12-10 1979-12-10 C119459 週刊少年ジャンプ 富沢順
4 C87432 CCRT01979 硬派山崎銀次郎 1 0 1 1974-04-29 1974-04-29 C119459 週刊少年ジャンプ 本宮ひろ志
Hide code cell content
# `df_cm_cc_crt`内の`ccid`と`crtid`の組み合わせが重複していないことを確認
assert df_cm_cc_crt.duplicated(subset=["ccid", "crtid"]).sum() == 0
Hide code cell content
# データフレーム`df_cm_cc_crt`をCSVファイルとして保存
# 保存先のパスは、`DIR_OUTPUT`ディレクトリ内の`cm_cc_crt.csv`
df_cm_cc_crt.to_csv(DIR_OUTPUT / "cm_cc_crt.csv", index=False)