JSON (Python標準ライブラリ)

Python標準ライブラリのJSON処理モジュール

serializationjsonpythonstandardbuiltin

JSON (Python標準ライブラリ)

PythonのJSONモジュールは、JavaScript Object Notation (JSON)形式のデータのエンコードとデコードを行うための標準ライブラリです。軽量で人間が読みやすいデータ交換形式として広く使用されています。

主な特徴

  • 標準ライブラリ: Pythonに組み込まれているため追加インストール不要
  • シンプルAPI: 直感的で使いやすいインターフェース
  • 高いパフォーマンス: C実装による高速処理
  • カスタマイズ可能: エンコーダー・デコーダーの拡張が可能
  • Unicode対応: 多言語文字を適切に処理
  • ストリーミング対応: 大きなデータの段階的処理

主なユースケース

  • Web API: RESTful APIでのデータ交換
  • 設定ファイル: アプリケーション設定の保存
  • データベース: NoSQLデータベースでのドキュメント保存
  • ログ記録: 構造化ログの出力
  • データ分析: データの一時保存と読み込み
  • キャッシュ: 計算結果の保存

基本的な使い方

1. 基本的なエンコード・デコード

import json

# 基本的なデータ型
data = {
    "name": "田中太郎",
    "age": 30,
    "city": "東京",
    "is_student": False,
    "hobbies": ["読書", "映画鑑賞", "プログラミング"],
    "address": {
        "street": "1-1-1 新宿",
        "postal_code": "160-0022"
    },
    "phone": None
}

# Python オブジェクトをJSONに変換
json_string = json.dumps(data, ensure_ascii=False, indent=2)
print("JSON文字列:")
print(json_string)

# JSON文字列をPythonオブジェクトに変換
parsed_data = json.loads(json_string)
print("\nパースされたデータ:")
print(f"名前: {parsed_data['name']}")
print(f"年齢: {parsed_data['age']}")
print(f"趣味: {', '.join(parsed_data['hobbies'])}")

2. ファイルへの読み書き

import json

# ファイルに書き込み
user_data = {
    "users": [
        {
            "id": 1,
            "name": "田中太郎",
            "email": "[email protected]",
            "profile": {
                "age": 30,
                "city": "東京",
                "interests": ["プログラミング", "読書"]
            }
        },
        {
            "id": 2,
            "name": "佐藤花子",
            "email": "[email protected]",
            "profile": {
                "age": 25,
                "city": "大阪",
                "interests": ["映画鑑賞", "旅行"]
            }
        }
    ],
    "metadata": {
        "total_users": 2,
        "created_at": "2024-01-01T00:00:00Z",
        "version": "1.0"
    }
}

# ファイルに書き込み
with open('users.json', 'w', encoding='utf-8') as f:
    json.dump(user_data, f, ensure_ascii=False, indent=2)

print("データをusers.jsonに保存しました")

# ファイルから読み込み
with open('users.json', 'r', encoding='utf-8') as f:
    loaded_data = json.load(f)

print(f"\n読み込まれたユーザー数: {loaded_data['metadata']['total_users']}")
for user in loaded_data['users']:
    print(f"- {user['name']} ({user['profile']['city']})")

3. 日本語データの処理

import json

# 日本語データの処理
japanese_data = {
    "会社名": "株式会社テスト",
    "部署": [
        {
            "名前": "開発部",
            "メンバー": ["田中太郎", "佐藤花子", "山田次郎"],
            "プロジェクト": {
                "名前": "新システム開発",
                "状態": "進行中",
                "締切": "2024-03-31"
            }
        },
        {
            "名前": "営業部",
            "メンバー": ["鈴木一郎", "高橋美咲"],
            "プロジェクト": {
                "名前": "顧客開拓",
                "状態": "計画中",
                "締切": "2024-06-30"
            }
        }
    ]
}

# ensure_ascii=False で日本語文字を保持
json_str = json.dumps(japanese_data, ensure_ascii=False, indent=2)
print("日本語JSON:")
print(json_str)

# ファイルに保存
with open('japanese_data.json', 'w', encoding='utf-8') as f:
    json.dump(japanese_data, f, ensure_ascii=False, indent=2)

print("\n日本語データを保存しました")

高度な使用例

1. カスタムエンコーダー・デコーダー

import json
from datetime import datetime, date
from decimal import Decimal
import uuid

class CustomJSONEncoder(json.JSONEncoder):
    """カスタムJSONエンコーダー"""
    
    def default(self, obj):
        if isinstance(obj, datetime):
            return {
                '__type__': 'datetime',
                'value': obj.isoformat()
            }
        elif isinstance(obj, date):
            return {
                '__type__': 'date',
                'value': obj.isoformat()
            }
        elif isinstance(obj, Decimal):
            return {
                '__type__': 'decimal',
                'value': str(obj)
            }
        elif isinstance(obj, uuid.UUID):
            return {
                '__type__': 'uuid',
                'value': str(obj)
            }
        elif isinstance(obj, set):
            return {
                '__type__': 'set',
                'value': list(obj)
            }
        elif isinstance(obj, complex):
            return {
                '__type__': 'complex',
                'real': obj.real,
                'imag': obj.imag
            }
        
        return super().default(obj)

def custom_json_decoder(dct):
    """カスタムJSONデコーダー"""
    if '__type__' in dct:
        obj_type = dct['__type__']
        
        if obj_type == 'datetime':
            return datetime.fromisoformat(dct['value'])
        elif obj_type == 'date':
            return datetime.fromisoformat(dct['value']).date()
        elif obj_type == 'decimal':
            return Decimal(dct['value'])
        elif obj_type == 'uuid':
            return uuid.UUID(dct['value'])
        elif obj_type == 'set':
            return set(dct['value'])
        elif obj_type == 'complex':
            return complex(dct['real'], dct['imag'])
    
    return dct

# 使用例
complex_data = {
    "id": uuid.uuid4(),
    "created_at": datetime.now(),
    "updated_at": date.today(),
    "price": Decimal('1234.56'),
    "tags": {"python", "json", "serialization"},
    "coordinates": complex(3, 4)
}

print("元のデータ:")
for key, value in complex_data.items():
    print(f"  {key}: {value} ({type(value).__name__})")

# カスタムエンコーダーでシリアライズ
json_str = json.dumps(complex_data, cls=CustomJSONEncoder, ensure_ascii=False, indent=2)
print(f"\nシリアライズされたJSON:")
print(json_str)

# カスタムデコーダーでデシリアライズ
decoded_data = json.loads(json_str, object_hook=custom_json_decoder)
print(f"\nデコードされたデータ:")
for key, value in decoded_data.items():
    print(f"  {key}: {value} ({type(value).__name__})")

2. ストリーミング処理

import json
import io
from typing import Generator, Dict, Any

class JSONStreamProcessor:
    """大きなJSONデータをストリーミング処理"""
    
    def __init__(self, chunk_size: int = 1024):
        self.chunk_size = chunk_size
        self.buffer = ""
        self.decoder = json.JSONDecoder()
    
    def process_stream(self, stream: io.TextIOBase) -> Generator[Dict[str, Any], None, None]:
        """ストリームから複数のJSONオブジェクトを処理"""
        self.buffer = ""
        
        while True:
            chunk = stream.read(self.chunk_size)
            if not chunk:
                break
            
            self.buffer += chunk
            
            # バッファから完全なJSONオブジェクトを抽出
            while self.buffer.strip():
                obj, idx = self._extract_json_object()
                if obj is not None:
                    yield obj
                    self.buffer = self.buffer[idx:].lstrip()
                else:
                    break
        
        # 残りのバッファを処理
        if self.buffer.strip():
            try:
                obj = json.loads(self.buffer)
                yield obj
            except json.JSONDecodeError:
                pass
    
    def _extract_json_object(self):
        """バッファから1つのJSONオブジェクトを抽出"""
        try:
            obj, idx = self.decoder.raw_decode(self.buffer)
            return obj, idx
        except json.JSONDecodeError:
            return None, 0

# 使用例
def create_sample_json_stream():
    """サンプルJSONストリームを作成"""
    sample_data = [
        {"id": 1, "name": "田中太郎", "age": 30},
        {"id": 2, "name": "佐藤花子", "age": 25},
        {"id": 3, "name": "山田次郎", "age": 28}
    ]
    
    # 複数のJSONオブジェクトを含む文字列を作成
    json_stream = ""
    for data in sample_data:
        json_stream += json.dumps(data, ensure_ascii=False) + "\n"
    
    return io.StringIO(json_stream)

# ストリーミング処理の実行
stream = create_sample_json_stream()
processor = JSONStreamProcessor()

print("ストリーミング処理結果:")
for obj in processor.process_stream(stream):
    print(f"処理されたオブジェクト: {obj}")

3. データバリデーション

import json
from typing import Dict, Any, List, Optional
from datetime import datetime

class JSONValidator:
    """JSONデータのバリデーション"""
    
    def __init__(self, schema: Dict[str, Any]):
        self.schema = schema
    
    def validate(self, data: Dict[str, Any]) -> List[str]:
        """データをスキーマに対してバリデーション"""
        errors = []
        self._validate_object(data, self.schema, "", errors)
        return errors
    
    def _validate_object(self, data: Any, schema: Dict[str, Any], path: str, errors: List[str]):
        """オブジェクトのバリデーション"""
        if not isinstance(data, dict):
            errors.append(f"{path}: オブジェクトである必要があります")
            return
        
        # 必須フィールドのチェック
        required_fields = schema.get("required", [])
        for field in required_fields:
            if field not in data:
                errors.append(f"{path}.{field}: 必須フィールドです")
        
        # 各フィールドのバリデーション
        properties = schema.get("properties", {})
        for field, field_schema in properties.items():
            if field in data:
                field_path = f"{path}.{field}" if path else field
                self._validate_field(data[field], field_schema, field_path, errors)
    
    def _validate_field(self, value: Any, schema: Dict[str, Any], path: str, errors: List[str]):
        """フィールドのバリデーション"""
        field_type = schema.get("type")
        
        if field_type == "string":
            if not isinstance(value, str):
                errors.append(f"{path}: 文字列である必要があります")
            else:
                min_length = schema.get("minLength")
                max_length = schema.get("maxLength")
                if min_length and len(value) < min_length:
                    errors.append(f"{path}: 最小{min_length}文字以上である必要があります")
                if max_length and len(value) > max_length:
                    errors.append(f"{path}: 最大{max_length}文字以下である必要があります")
        
        elif field_type == "number":
            if not isinstance(value, (int, float)):
                errors.append(f"{path}: 数値である必要があります")
            else:
                minimum = schema.get("minimum")
                maximum = schema.get("maximum")
                if minimum is not None and value < minimum:
                    errors.append(f"{path}: {minimum}以上である必要があります")
                if maximum is not None and value > maximum:
                    errors.append(f"{path}: {maximum}以下である必要があります")
        
        elif field_type == "array":
            if not isinstance(value, list):
                errors.append(f"{path}: 配列である必要があります")
            else:
                items_schema = schema.get("items")
                if items_schema:
                    for i, item in enumerate(value):
                        item_path = f"{path}[{i}]"
                        self._validate_field(item, items_schema, item_path, errors)
        
        elif field_type == "object":
            self._validate_object(value, schema, path, errors)

# スキーマの定義
user_schema = {
    "type": "object",
    "required": ["name", "email", "age"],
    "properties": {
        "name": {
            "type": "string",
            "minLength": 1,
            "maxLength": 100
        },
        "email": {
            "type": "string",
            "minLength": 5
        },
        "age": {
            "type": "number",
            "minimum": 0,
            "maximum": 150
        },
        "hobbies": {
            "type": "array",
            "items": {
                "type": "string"
            }
        },
        "address": {
            "type": "object",
            "properties": {
                "street": {"type": "string"},
                "city": {"type": "string"}
            }
        }
    }
}

# バリデーションの実行
validator = JSONValidator(user_schema)

# 正常なデータ
valid_data = {
    "name": "田中太郎",
    "email": "[email protected]",
    "age": 30,
    "hobbies": ["読書", "映画鑑賞"],
    "address": {
        "street": "1-1-1 新宿",
        "city": "東京"
    }
}

errors = validator.validate(valid_data)
print(f"正常なデータのバリデーション: {len(errors)}個のエラー")

# 異常なデータ
invalid_data = {
    "name": "",  # 長さが不足
    "email": "abc",  # 長さが不足
    "age": 200,  # 範囲外
    "hobbies": ["読書", 123],  # 配列内に数値
    "address": {
        "street": "1-1-1 新宿",
        "city": 123  # 文字列でない
    }
}

errors = validator.validate(invalid_data)
print(f"\n異常なデータのバリデーション: {len(errors)}個のエラー")
for error in errors:
    print(f"  - {error}")

実用的な例

1. 設定ファイルの管理

import json
import os
from typing import Dict, Any, Optional

class ConfigManager:
    """設定ファイルの管理"""
    
    def __init__(self, config_file: str = "config.json"):
        self.config_file = config_file
        self.config = self._load_default_config()
        self.load()
    
    def _load_default_config(self) -> Dict[str, Any]:
        """デフォルト設定を返す"""
        return {
            "server": {
                "host": "localhost",
                "port": 8080,
                "debug": False
            },
            "database": {
                "host": "localhost",
                "port": 5432,
                "name": "myapp",
                "user": "postgres",
                "password": ""
            },
            "logging": {
                "level": "INFO",
                "file": "app.log",
                "max_size": 10485760  # 10MB
            },
            "features": {
                "enable_caching": True,
                "enable_metrics": False,
                "rate_limit": 100
            }
        }
    
    def load(self):
        """設定ファイルを読み込む"""
        try:
            if os.path.exists(self.config_file):
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    loaded_config = json.load(f)
                    self._merge_config(self.config, loaded_config)
                print(f"設定ファイル '{self.config_file}' を読み込みました")
            else:
                print(f"設定ファイル '{self.config_file}' が見つかりません。デフォルト設定を使用します")
        except json.JSONDecodeError as e:
            print(f"設定ファイルの読み込みエラー: {e}")
        except Exception as e:
            print(f"設定ファイルの読み込みエラー: {e}")
    
    def save(self):
        """設定ファイルに保存"""
        try:
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(self.config, f, ensure_ascii=False, indent=2)
            print(f"設定ファイル '{self.config_file}' に保存しました")
        except Exception as e:
            print(f"設定ファイルの保存エラー: {e}")
    
    def get(self, key: str, default: Any = None) -> Any:
        """設定値を取得"""
        keys = key.split('.')
        value = self.config
        
        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default
        
        return value
    
    def set(self, key: str, value: Any):
        """設定値を設定"""
        keys = key.split('.')
        config = self.config
        
        for k in keys[:-1]:
            if k not in config:
                config[k] = {}
            config = config[k]
        
        config[keys[-1]] = value
    
    def _merge_config(self, default: Dict[str, Any], loaded: Dict[str, Any]):
        """設定をマージする"""
        for key, value in loaded.items():
            if key in default and isinstance(default[key], dict) and isinstance(value, dict):
                self._merge_config(default[key], value)
            else:
                default[key] = value

# 使用例
config = ConfigManager("app_config.json")

# 設定値の取得
print(f"サーバーポート: {config.get('server.port')}")
print(f"データベース名: {config.get('database.name')}")
print(f"ログレベル: {config.get('logging.level')}")

# 設定値の変更
config.set('server.port', 9090)
config.set('server.debug', True)
config.set('features.enable_metrics', True)

# 設定を保存
config.save()

# 設定の表示
print(f"\n現在の設定:")
print(json.dumps(config.config, ensure_ascii=False, indent=2))

2. API レスポンスの処理

import json
import requests
from typing import Dict, Any, List, Optional
from datetime import datetime

class APIClient:
    """API通信クライアント"""
    
    def __init__(self, base_url: str):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json',
            'User-Agent': 'Python-API-Client/1.0'
        })
    
    def get_users(self) -> List[Dict[str, Any]]:
        """ユーザーリストを取得"""
        try:
            response = self.session.get(f"{self.base_url}/users")
            response.raise_for_status()
            
            data = response.json()
            print(f"ユーザー取得成功: {len(data)}人")
            return data
        
        except requests.RequestException as e:
            print(f"API リクエストエラー: {e}")
            return []
        except json.JSONDecodeError as e:
            print(f"JSONデコードエラー: {e}")
            return []
    
    def create_user(self, user_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """ユーザーを作成"""
        try:
            response = self.session.post(
                f"{self.base_url}/users",
                json=user_data
            )
            response.raise_for_status()
            
            created_user = response.json()
            print(f"ユーザー作成成功: ID {created_user.get('id')}")
            return created_user
        
        except requests.RequestException as e:
            print(f"API リクエストエラー: {e}")
            return None
        except json.JSONDecodeError as e:
            print(f"JSONデコードエラー: {e}")
            return None
    
    def update_user(self, user_id: int, user_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """ユーザーを更新"""
        try:
            response = self.session.put(
                f"{self.base_url}/users/{user_id}",
                json=user_data
            )
            response.raise_for_status()
            
            updated_user = response.json()
            print(f"ユーザー更新成功: ID {user_id}")
            return updated_user
        
        except requests.RequestException as e:
            print(f"API リクエストエラー: {e}")
            return None
        except json.JSONDecodeError as e:
            print(f"JSONデコードエラー: {e}")
            return None

# モックAPIクライアントの使用例
class MockAPIClient(APIClient):
    """テスト用モックAPIクライアント"""
    
    def __init__(self):
        super().__init__("https://api.example.com")
        self.users = [
            {
                "id": 1,
                "name": "田中太郎",
                "email": "[email protected]",
                "created_at": "2024-01-01T00:00:00Z"
            },
            {
                "id": 2,
                "name": "佐藤花子",
                "email": "[email protected]",
                "created_at": "2024-01-02T00:00:00Z"
            }
        ]
        self.next_id = 3
    
    def get_users(self) -> List[Dict[str, Any]]:
        """モックユーザーリストを返す"""
        print(f"モックユーザー取得: {len(self.users)}人")
        return self.users.copy()
    
    def create_user(self, user_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """モックユーザーを作成"""
        new_user = user_data.copy()
        new_user["id"] = self.next_id
        new_user["created_at"] = datetime.now().isoformat() + "Z"
        self.users.append(new_user)
        self.next_id += 1
        
        print(f"モックユーザー作成: ID {new_user['id']}")
        return new_user

# 使用例
api_client = MockAPIClient()

# ユーザーリストの取得
users = api_client.get_users()
print("現在のユーザー:")
for user in users:
    print(f"  - {user['name']} ({user['email']})")

# 新しいユーザーの作成
new_user = {
    "name": "山田次郎",
    "email": "[email protected]"
}

created_user = api_client.create_user(new_user)
if created_user:
    print(f"\n作成されたユーザー: {json.dumps(created_user, ensure_ascii=False, indent=2)}")

# 更新されたユーザーリストの取得
users = api_client.get_users()
print(f"\n更新後のユーザー数: {len(users)}")

3. ログ記録システム

import json
import logging
from datetime import datetime
from typing import Dict, Any, Optional
import traceback

class JSONFormatter(logging.Formatter):
    """JSON形式のログフォーマッター"""
    
    def format(self, record: logging.LogRecord) -> str:
        # 基本的なログ情報
        log_entry = {
            "timestamp": datetime.fromtimestamp(record.created).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }
        
        # 追加情報があれば含める
        if hasattr(record, 'user_id'):
            log_entry["user_id"] = record.user_id
        
        if hasattr(record, 'request_id'):
            log_entry["request_id"] = record.request_id
        
        if hasattr(record, 'extra_data'):
            log_entry["extra_data"] = record.extra_data
        
        # 例外情報があれば含める
        if record.exc_info:
            log_entry["exception"] = {
                "type": record.exc_info[0].__name__,
                "message": str(record.exc_info[1]),
                "traceback": traceback.format_exception(*record.exc_info)
            }
        
        return json.dumps(log_entry, ensure_ascii=False, default=str)

class StructuredLogger:
    """構造化ログ記録システム"""
    
    def __init__(self, name: str, log_file: str = "app.log"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # ファイルハンドラーの設定
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setFormatter(JSONFormatter())
        self.logger.addHandler(file_handler)
        
        # コンソールハンドラーの設定
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(console_handler)
    
    def info(self, message: str, **kwargs):
        """INFO レベルのログ"""
        self._log(logging.INFO, message, **kwargs)
    
    def warning(self, message: str, **kwargs):
        """WARNING レベルのログ"""
        self._log(logging.WARNING, message, **kwargs)
    
    def error(self, message: str, **kwargs):
        """ERROR レベルのログ"""
        self._log(logging.ERROR, message, **kwargs)
    
    def _log(self, level: int, message: str, **kwargs):
        """内部ログ記録メソッド"""
        extra = {}
        for key, value in kwargs.items():
            extra[key] = value
        
        self.logger.log(level, message, extra=extra)

# 使用例
logger = StructuredLogger("myapp")

# 基本的なログ記録
logger.info("アプリケーションが開始されました")

# 追加情報付きログ
logger.info(
    "ユーザーがログインしました",
    user_id=12345,
    request_id="req-abc123",
    extra_data={
        "ip_address": "192.168.1.100",
        "user_agent": "Mozilla/5.0...",
        "login_method": "email"
    }
)

# 警告ログ
logger.warning(
    "APIレート制限に近づいています",
    user_id=12345,
    current_requests=90,
    limit=100
)

# エラーログ
try:
    # エラーを発生させる
    result = 10 / 0
except ZeroDivisionError as e:
    logger.error(
        "計算エラーが発生しました",
        user_id=12345,
        operation="division",
        extra_data={"dividend": 10, "divisor": 0},
        exc_info=True
    )

print("\nログファイルの内容:")
try:
    with open("app.log", "r", encoding="utf-8") as f:
        for line in f:
            log_entry = json.loads(line.strip())
            print(f"[{log_entry['timestamp']}] {log_entry['level']}: {log_entry['message']}")
except FileNotFoundError:
    print("ログファイルが見つかりません")

パフォーマンス最適化

1. 大きなデータの処理

import json
import time
from typing import Generator, Dict, Any

def benchmark_json_processing():
    """JSONのパフォーマンステスト"""
    
    # 大きなデータセットを作成
    large_data = {
        "users": [
            {
                "id": i,
                "name": f"ユーザー{i}",
                "email": f"user{i}@example.com",
                "profile": {
                    "age": 20 + (i % 60),
                    "city": f"都市{i % 10}",
                    "interests": [f"趣味{j}" for j in range(i % 5 + 1)]
                }
            }
            for i in range(10000)
        ],
        "metadata": {
            "total_users": 10000,
            "generated_at": "2024-01-01T00:00:00Z"
        }
    }
    
    print("JSON パフォーマンステスト")
    print(f"データサイズ: {len(large_data['users'])} ユーザー")
    
    # シリアライゼーション
    start_time = time.time()
    json_string = json.dumps(large_data, ensure_ascii=False)
    serialize_time = time.time() - start_time
    
    print(f"シリアライゼーション時間: {serialize_time:.3f}秒")
    print(f"JSON文字列サイズ: {len(json_string):,} 文字")
    
    # デシリアライゼーション
    start_time = time.time()
    parsed_data = json.loads(json_string)
    deserialize_time = time.time() - start_time
    
    print(f"デシリアライゼーション時間: {deserialize_time:.3f}秒")
    print(f"パースされたユーザー数: {len(parsed_data['users'])}")
    
    # ファイル書き込み
    start_time = time.time()
    with open('large_data.json', 'w', encoding='utf-8') as f:
        json.dump(large_data, f, ensure_ascii=False)
    write_time = time.time() - start_time
    
    print(f"ファイル書き込み時間: {write_time:.3f}秒")
    
    # ファイル読み込み
    start_time = time.time()
    with open('large_data.json', 'r', encoding='utf-8') as f:
        loaded_data = json.load(f)
    read_time = time.time() - start_time
    
    print(f"ファイル読み込み時間: {read_time:.3f}秒")
    
    # 総合時間
    total_time = serialize_time + deserialize_time + write_time + read_time
    print(f"総処理時間: {total_time:.3f}秒")

# ベンチマーク実行
benchmark_json_processing()

2. メモリ効率的な処理

import json
import ijson
from typing import Generator, Dict, Any

def memory_efficient_json_processing():
    """メモリ効率的なJSON処理"""
    
    # 大きなJSONファイルを作成
    def create_large_json_file(filename: str, record_count: int):
        """大きなJSONファイルを作成"""
        with open(filename, 'w', encoding='utf-8') as f:
            f.write('{"users": [')
            
            for i in range(record_count):
                user = {
                    "id": i,
                    "name": f"ユーザー{i}",
                    "email": f"user{i}@example.com",
                    "data": f"データ{i}" * 100  # 大きなデータ
                }
                
                if i > 0:
                    f.write(',')
                
                json.dump(user, f, ensure_ascii=False)
            
            f.write('],"metadata": {"total": ' + str(record_count) + '}}')
    
    # 大きなJSONファイルを作成
    print("大きなJSONファイルを作成中...")
    create_large_json_file('large_stream.json', 1000)
    
    # ストリーミング処理
    def process_users_streaming(filename: str) -> Generator[Dict[str, Any], None, None]:
        """ストリーミングでユーザーを処理"""
        with open(filename, 'rb') as f:
            # ijsonを使用してストリーミング処理
            users = ijson.items(f, 'users.item')
            for user in users:
                yield user
    
    # ストリーミング処理の実行
    print("ストリーミング処理実行中...")
    user_count = 0
    total_data_length = 0
    
    start_time = time.time()
    for user in process_users_streaming('large_stream.json'):
        user_count += 1
        total_data_length += len(user.get('data', ''))
        
        if user_count % 100 == 0:
            print(f"処理済み: {user_count} ユーザー")
    
    processing_time = time.time() - start_time
    
    print(f"ストリーミング処理完了:")
    print(f"  処理時間: {processing_time:.3f}秒")
    print(f"  処理ユーザー数: {user_count}")
    print(f"  平均処理速度: {user_count/processing_time:.1f} ユーザー/秒")
    
    # メモリ使用量の比較
    import psutil
    import os
    
    process = psutil.Process(os.getpid())
    memory_mb = process.memory_info().rss / 1024 / 1024
    print(f"  メモリ使用量: {memory_mb:.1f} MB")

# ijsonが利用可能な場合のみ実行
try:
    import ijson
    memory_efficient_json_processing()
except ImportError:
    print("ijson ライブラリがインストールされていません")
    print("pip install ijson でインストールしてください")

他のライブラリとの比較

特徴JSONMessagePackProtocol BuffersAvro
可読性
パフォーマンス
サイズ
標準サポート優秀制限的制限的制限的
スキーマ不要不要必要必要
型安全性

トラブルシューティング

よくある問題と解決策

  1. エンコーディングエラー

    # 問題: 日本語文字が正しく表示されない
    # 解決策: ensure_ascii=False を使用
    data = {"name": "田中太郎"}
    json_str = json.dumps(data, ensure_ascii=False)
    
  2. 循環参照エラー

    # 問題: 循環参照によるシリアライゼーションエラー
    # 解決策: 循環参照を避ける
    def remove_circular_references(obj, seen=None):
        if seen is None:
            seen = set()
        
        if id(obj) in seen:
            return None
        
        seen.add(id(obj))
        
        if isinstance(obj, dict):
            return {k: remove_circular_references(v, seen) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [remove_circular_references(item, seen) for item in obj]
        
        return obj
    
  3. 大きなファイルの処理

    # 問題: 大きなJSONファイルでメモリ不足
    # 解決策: ストリーミング処理
    import ijson
    
    def process_large_json(filename):
        with open(filename, 'rb') as f:
            for item in ijson.items(f, 'items.item'):
                process_item(item)
    

まとめ

PythonのJSONライブラリは、データシリアライゼーションの標準的な選択肢です。特に以下の場合に効果的です:

利点

  • 標準ライブラリ: 追加インストール不要
  • 高い可読性: 人間が読みやすい形式
  • 広範囲サポート: 全ての言語・プラットフォームで対応
  • デバッグが容易: テキスト形式のため問題の特定が簡単

推奨用途

  • Web APIでのデータ交換
  • 設定ファイルの保存
  • ログ記録での構造化データ
  • データ分析での一時保存

高いパフォーマンスが必要な場合は、MessagePackやProtocol Buffersの使用を検討してください。しかし、可読性と互換性が重要な場合は、JSONが最適な選択肢です。