JSON (Python標準ライブラリ)
Python標準ライブラリのJSON処理モジュール
JSON (Python標準ライブラリ)
PythonのJSONモジュールは、JavaScript Object Notation (JSON)形式のデータのエンコードとデコードを行うための標準ライブラリです。軽量で人間が読みやすいデータ交換形式として広く使用されています。
主な特徴
- 標準ライブラリ: Pythonに組み込まれているため追加インストール不要
- シンプルAPI: 直感的で使いやすいインターフェース
- 高いパフォーマンス: C実装による高速処理
- カスタマイズ可能: エンコーダー・デコーダーの拡張が可能
- Unicode対応: 多言語文字を適切に処理
- ストリーミング対応: 大きなデータの段階的処理
主なユースケース
- Web API: RESTful APIでのデータ交換
- 設定ファイル: アプリケーション設定の保存
- データベース: NoSQLデータベースでのドキュメント保存
- ログ記録: 構造化ログの出力
- データ分析: データの一時保存と読み込み
- キャッシュ: 計算結果の保存
基本的な使い方
1. 基本的なエンコード・デコード
import json
# 基本的なデータ型
data = {
"name": "田中太郎",
"age": 30,
"city": "東京",
"is_student": False,
"hobbies": ["読書", "映画鑑賞", "プログラミング"],
"address": {
"street": "1-1-1 新宿",
"postal_code": "160-0022"
},
"phone": None
}
# Python オブジェクトをJSONに変換
json_string = json.dumps(data, ensure_ascii=False, indent=2)
print("JSON文字列:")
print(json_string)
# JSON文字列をPythonオブジェクトに変換
parsed_data = json.loads(json_string)
print("\nパースされたデータ:")
print(f"名前: {parsed_data['name']}")
print(f"年齢: {parsed_data['age']}")
print(f"趣味: {', '.join(parsed_data['hobbies'])}")
2. ファイルへの読み書き
import json
# ファイルに書き込み
user_data = {
"users": [
{
"id": 1,
"name": "田中太郎",
"email": "[email protected]",
"profile": {
"age": 30,
"city": "東京",
"interests": ["プログラミング", "読書"]
}
},
{
"id": 2,
"name": "佐藤花子",
"email": "[email protected]",
"profile": {
"age": 25,
"city": "大阪",
"interests": ["映画鑑賞", "旅行"]
}
}
],
"metadata": {
"total_users": 2,
"created_at": "2024-01-01T00:00:00Z",
"version": "1.0"
}
}
# ファイルに書き込み
with open('users.json', 'w', encoding='utf-8') as f:
json.dump(user_data, f, ensure_ascii=False, indent=2)
print("データをusers.jsonに保存しました")
# ファイルから読み込み
with open('users.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print(f"\n読み込まれたユーザー数: {loaded_data['metadata']['total_users']}")
for user in loaded_data['users']:
print(f"- {user['name']} ({user['profile']['city']})")
3. 日本語データの処理
import json
# 日本語データの処理
japanese_data = {
"会社名": "株式会社テスト",
"部署": [
{
"名前": "開発部",
"メンバー": ["田中太郎", "佐藤花子", "山田次郎"],
"プロジェクト": {
"名前": "新システム開発",
"状態": "進行中",
"締切": "2024-03-31"
}
},
{
"名前": "営業部",
"メンバー": ["鈴木一郎", "高橋美咲"],
"プロジェクト": {
"名前": "顧客開拓",
"状態": "計画中",
"締切": "2024-06-30"
}
}
]
}
# ensure_ascii=False で日本語文字を保持
json_str = json.dumps(japanese_data, ensure_ascii=False, indent=2)
print("日本語JSON:")
print(json_str)
# ファイルに保存
with open('japanese_data.json', 'w', encoding='utf-8') as f:
json.dump(japanese_data, f, ensure_ascii=False, indent=2)
print("\n日本語データを保存しました")
高度な使用例
1. カスタムエンコーダー・デコーダー
import json
from datetime import datetime, date
from decimal import Decimal
import uuid
class CustomJSONEncoder(json.JSONEncoder):
"""カスタムJSONエンコーダー"""
def default(self, obj):
if isinstance(obj, datetime):
return {
'__type__': 'datetime',
'value': obj.isoformat()
}
elif isinstance(obj, date):
return {
'__type__': 'date',
'value': obj.isoformat()
}
elif isinstance(obj, Decimal):
return {
'__type__': 'decimal',
'value': str(obj)
}
elif isinstance(obj, uuid.UUID):
return {
'__type__': 'uuid',
'value': str(obj)
}
elif isinstance(obj, set):
return {
'__type__': 'set',
'value': list(obj)
}
elif isinstance(obj, complex):
return {
'__type__': 'complex',
'real': obj.real,
'imag': obj.imag
}
return super().default(obj)
def custom_json_decoder(dct):
"""カスタムJSONデコーダー"""
if '__type__' in dct:
obj_type = dct['__type__']
if obj_type == 'datetime':
return datetime.fromisoformat(dct['value'])
elif obj_type == 'date':
return datetime.fromisoformat(dct['value']).date()
elif obj_type == 'decimal':
return Decimal(dct['value'])
elif obj_type == 'uuid':
return uuid.UUID(dct['value'])
elif obj_type == 'set':
return set(dct['value'])
elif obj_type == 'complex':
return complex(dct['real'], dct['imag'])
return dct
# 使用例
complex_data = {
"id": uuid.uuid4(),
"created_at": datetime.now(),
"updated_at": date.today(),
"price": Decimal('1234.56'),
"tags": {"python", "json", "serialization"},
"coordinates": complex(3, 4)
}
print("元のデータ:")
for key, value in complex_data.items():
print(f" {key}: {value} ({type(value).__name__})")
# カスタムエンコーダーでシリアライズ
json_str = json.dumps(complex_data, cls=CustomJSONEncoder, ensure_ascii=False, indent=2)
print(f"\nシリアライズされたJSON:")
print(json_str)
# カスタムデコーダーでデシリアライズ
decoded_data = json.loads(json_str, object_hook=custom_json_decoder)
print(f"\nデコードされたデータ:")
for key, value in decoded_data.items():
print(f" {key}: {value} ({type(value).__name__})")
2. ストリーミング処理
import json
import io
from typing import Generator, Dict, Any
class JSONStreamProcessor:
"""大きなJSONデータをストリーミング処理"""
def __init__(self, chunk_size: int = 1024):
self.chunk_size = chunk_size
self.buffer = ""
self.decoder = json.JSONDecoder()
def process_stream(self, stream: io.TextIOBase) -> Generator[Dict[str, Any], None, None]:
"""ストリームから複数のJSONオブジェクトを処理"""
self.buffer = ""
while True:
chunk = stream.read(self.chunk_size)
if not chunk:
break
self.buffer += chunk
# バッファから完全なJSONオブジェクトを抽出
while self.buffer.strip():
obj, idx = self._extract_json_object()
if obj is not None:
yield obj
self.buffer = self.buffer[idx:].lstrip()
else:
break
# 残りのバッファを処理
if self.buffer.strip():
try:
obj = json.loads(self.buffer)
yield obj
except json.JSONDecodeError:
pass
def _extract_json_object(self):
"""バッファから1つのJSONオブジェクトを抽出"""
try:
obj, idx = self.decoder.raw_decode(self.buffer)
return obj, idx
except json.JSONDecodeError:
return None, 0
# 使用例
def create_sample_json_stream():
"""サンプルJSONストリームを作成"""
sample_data = [
{"id": 1, "name": "田中太郎", "age": 30},
{"id": 2, "name": "佐藤花子", "age": 25},
{"id": 3, "name": "山田次郎", "age": 28}
]
# 複数のJSONオブジェクトを含む文字列を作成
json_stream = ""
for data in sample_data:
json_stream += json.dumps(data, ensure_ascii=False) + "\n"
return io.StringIO(json_stream)
# ストリーミング処理の実行
stream = create_sample_json_stream()
processor = JSONStreamProcessor()
print("ストリーミング処理結果:")
for obj in processor.process_stream(stream):
print(f"処理されたオブジェクト: {obj}")
3. データバリデーション
import json
from typing import Dict, Any, List, Optional
from datetime import datetime
class JSONValidator:
"""JSONデータのバリデーション"""
def __init__(self, schema: Dict[str, Any]):
self.schema = schema
def validate(self, data: Dict[str, Any]) -> List[str]:
"""データをスキーマに対してバリデーション"""
errors = []
self._validate_object(data, self.schema, "", errors)
return errors
def _validate_object(self, data: Any, schema: Dict[str, Any], path: str, errors: List[str]):
"""オブジェクトのバリデーション"""
if not isinstance(data, dict):
errors.append(f"{path}: オブジェクトである必要があります")
return
# 必須フィールドのチェック
required_fields = schema.get("required", [])
for field in required_fields:
if field not in data:
errors.append(f"{path}.{field}: 必須フィールドです")
# 各フィールドのバリデーション
properties = schema.get("properties", {})
for field, field_schema in properties.items():
if field in data:
field_path = f"{path}.{field}" if path else field
self._validate_field(data[field], field_schema, field_path, errors)
def _validate_field(self, value: Any, schema: Dict[str, Any], path: str, errors: List[str]):
"""フィールドのバリデーション"""
field_type = schema.get("type")
if field_type == "string":
if not isinstance(value, str):
errors.append(f"{path}: 文字列である必要があります")
else:
min_length = schema.get("minLength")
max_length = schema.get("maxLength")
if min_length and len(value) < min_length:
errors.append(f"{path}: 最小{min_length}文字以上である必要があります")
if max_length and len(value) > max_length:
errors.append(f"{path}: 最大{max_length}文字以下である必要があります")
elif field_type == "number":
if not isinstance(value, (int, float)):
errors.append(f"{path}: 数値である必要があります")
else:
minimum = schema.get("minimum")
maximum = schema.get("maximum")
if minimum is not None and value < minimum:
errors.append(f"{path}: {minimum}以上である必要があります")
if maximum is not None and value > maximum:
errors.append(f"{path}: {maximum}以下である必要があります")
elif field_type == "array":
if not isinstance(value, list):
errors.append(f"{path}: 配列である必要があります")
else:
items_schema = schema.get("items")
if items_schema:
for i, item in enumerate(value):
item_path = f"{path}[{i}]"
self._validate_field(item, items_schema, item_path, errors)
elif field_type == "object":
self._validate_object(value, schema, path, errors)
# スキーマの定義
user_schema = {
"type": "object",
"required": ["name", "email", "age"],
"properties": {
"name": {
"type": "string",
"minLength": 1,
"maxLength": 100
},
"email": {
"type": "string",
"minLength": 5
},
"age": {
"type": "number",
"minimum": 0,
"maximum": 150
},
"hobbies": {
"type": "array",
"items": {
"type": "string"
}
},
"address": {
"type": "object",
"properties": {
"street": {"type": "string"},
"city": {"type": "string"}
}
}
}
}
# バリデーションの実行
validator = JSONValidator(user_schema)
# 正常なデータ
valid_data = {
"name": "田中太郎",
"email": "[email protected]",
"age": 30,
"hobbies": ["読書", "映画鑑賞"],
"address": {
"street": "1-1-1 新宿",
"city": "東京"
}
}
errors = validator.validate(valid_data)
print(f"正常なデータのバリデーション: {len(errors)}個のエラー")
# 異常なデータ
invalid_data = {
"name": "", # 長さが不足
"email": "abc", # 長さが不足
"age": 200, # 範囲外
"hobbies": ["読書", 123], # 配列内に数値
"address": {
"street": "1-1-1 新宿",
"city": 123 # 文字列でない
}
}
errors = validator.validate(invalid_data)
print(f"\n異常なデータのバリデーション: {len(errors)}個のエラー")
for error in errors:
print(f" - {error}")
実用的な例
1. 設定ファイルの管理
import json
import os
from typing import Dict, Any, Optional
class ConfigManager:
"""設定ファイルの管理"""
def __init__(self, config_file: str = "config.json"):
self.config_file = config_file
self.config = self._load_default_config()
self.load()
def _load_default_config(self) -> Dict[str, Any]:
"""デフォルト設定を返す"""
return {
"server": {
"host": "localhost",
"port": 8080,
"debug": False
},
"database": {
"host": "localhost",
"port": 5432,
"name": "myapp",
"user": "postgres",
"password": ""
},
"logging": {
"level": "INFO",
"file": "app.log",
"max_size": 10485760 # 10MB
},
"features": {
"enable_caching": True,
"enable_metrics": False,
"rate_limit": 100
}
}
def load(self):
"""設定ファイルを読み込む"""
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r', encoding='utf-8') as f:
loaded_config = json.load(f)
self._merge_config(self.config, loaded_config)
print(f"設定ファイル '{self.config_file}' を読み込みました")
else:
print(f"設定ファイル '{self.config_file}' が見つかりません。デフォルト設定を使用します")
except json.JSONDecodeError as e:
print(f"設定ファイルの読み込みエラー: {e}")
except Exception as e:
print(f"設定ファイルの読み込みエラー: {e}")
def save(self):
"""設定ファイルに保存"""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
print(f"設定ファイル '{self.config_file}' に保存しました")
except Exception as e:
print(f"設定ファイルの保存エラー: {e}")
def get(self, key: str, default: Any = None) -> Any:
"""設定値を取得"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def set(self, key: str, value: Any):
"""設定値を設定"""
keys = key.split('.')
config = self.config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
def _merge_config(self, default: Dict[str, Any], loaded: Dict[str, Any]):
"""設定をマージする"""
for key, value in loaded.items():
if key in default and isinstance(default[key], dict) and isinstance(value, dict):
self._merge_config(default[key], value)
else:
default[key] = value
# 使用例
config = ConfigManager("app_config.json")
# 設定値の取得
print(f"サーバーポート: {config.get('server.port')}")
print(f"データベース名: {config.get('database.name')}")
print(f"ログレベル: {config.get('logging.level')}")
# 設定値の変更
config.set('server.port', 9090)
config.set('server.debug', True)
config.set('features.enable_metrics', True)
# 設定を保存
config.save()
# 設定の表示
print(f"\n現在の設定:")
print(json.dumps(config.config, ensure_ascii=False, indent=2))
2. API レスポンスの処理
import json
import requests
from typing import Dict, Any, List, Optional
from datetime import datetime
class APIClient:
"""API通信クライアント"""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'Python-API-Client/1.0'
})
def get_users(self) -> List[Dict[str, Any]]:
"""ユーザーリストを取得"""
try:
response = self.session.get(f"{self.base_url}/users")
response.raise_for_status()
data = response.json()
print(f"ユーザー取得成功: {len(data)}人")
return data
except requests.RequestException as e:
print(f"API リクエストエラー: {e}")
return []
except json.JSONDecodeError as e:
print(f"JSONデコードエラー: {e}")
return []
def create_user(self, user_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""ユーザーを作成"""
try:
response = self.session.post(
f"{self.base_url}/users",
json=user_data
)
response.raise_for_status()
created_user = response.json()
print(f"ユーザー作成成功: ID {created_user.get('id')}")
return created_user
except requests.RequestException as e:
print(f"API リクエストエラー: {e}")
return None
except json.JSONDecodeError as e:
print(f"JSONデコードエラー: {e}")
return None
def update_user(self, user_id: int, user_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""ユーザーを更新"""
try:
response = self.session.put(
f"{self.base_url}/users/{user_id}",
json=user_data
)
response.raise_for_status()
updated_user = response.json()
print(f"ユーザー更新成功: ID {user_id}")
return updated_user
except requests.RequestException as e:
print(f"API リクエストエラー: {e}")
return None
except json.JSONDecodeError as e:
print(f"JSONデコードエラー: {e}")
return None
# モックAPIクライアントの使用例
class MockAPIClient(APIClient):
"""テスト用モックAPIクライアント"""
def __init__(self):
super().__init__("https://api.example.com")
self.users = [
{
"id": 1,
"name": "田中太郎",
"email": "[email protected]",
"created_at": "2024-01-01T00:00:00Z"
},
{
"id": 2,
"name": "佐藤花子",
"email": "[email protected]",
"created_at": "2024-01-02T00:00:00Z"
}
]
self.next_id = 3
def get_users(self) -> List[Dict[str, Any]]:
"""モックユーザーリストを返す"""
print(f"モックユーザー取得: {len(self.users)}人")
return self.users.copy()
def create_user(self, user_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""モックユーザーを作成"""
new_user = user_data.copy()
new_user["id"] = self.next_id
new_user["created_at"] = datetime.now().isoformat() + "Z"
self.users.append(new_user)
self.next_id += 1
print(f"モックユーザー作成: ID {new_user['id']}")
return new_user
# 使用例
api_client = MockAPIClient()
# ユーザーリストの取得
users = api_client.get_users()
print("現在のユーザー:")
for user in users:
print(f" - {user['name']} ({user['email']})")
# 新しいユーザーの作成
new_user = {
"name": "山田次郎",
"email": "[email protected]"
}
created_user = api_client.create_user(new_user)
if created_user:
print(f"\n作成されたユーザー: {json.dumps(created_user, ensure_ascii=False, indent=2)}")
# 更新されたユーザーリストの取得
users = api_client.get_users()
print(f"\n更新後のユーザー数: {len(users)}")
3. ログ記録システム
import json
import logging
from datetime import datetime
from typing import Dict, Any, Optional
import traceback
class JSONFormatter(logging.Formatter):
"""JSON形式のログフォーマッター"""
def format(self, record: logging.LogRecord) -> str:
# 基本的なログ情報
log_entry = {
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# 追加情報があれば含める
if hasattr(record, 'user_id'):
log_entry["user_id"] = record.user_id
if hasattr(record, 'request_id'):
log_entry["request_id"] = record.request_id
if hasattr(record, 'extra_data'):
log_entry["extra_data"] = record.extra_data
# 例外情報があれば含める
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info)
}
return json.dumps(log_entry, ensure_ascii=False, default=str)
class StructuredLogger:
"""構造化ログ記録システム"""
def __init__(self, name: str, log_file: str = "app.log"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# ファイルハンドラーの設定
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(JSONFormatter())
self.logger.addHandler(file_handler)
# コンソールハンドラーの設定
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(console_handler)
def info(self, message: str, **kwargs):
"""INFO レベルのログ"""
self._log(logging.INFO, message, **kwargs)
def warning(self, message: str, **kwargs):
"""WARNING レベルのログ"""
self._log(logging.WARNING, message, **kwargs)
def error(self, message: str, **kwargs):
"""ERROR レベルのログ"""
self._log(logging.ERROR, message, **kwargs)
def _log(self, level: int, message: str, **kwargs):
"""内部ログ記録メソッド"""
extra = {}
for key, value in kwargs.items():
extra[key] = value
self.logger.log(level, message, extra=extra)
# 使用例
logger = StructuredLogger("myapp")
# 基本的なログ記録
logger.info("アプリケーションが開始されました")
# 追加情報付きログ
logger.info(
"ユーザーがログインしました",
user_id=12345,
request_id="req-abc123",
extra_data={
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0...",
"login_method": "email"
}
)
# 警告ログ
logger.warning(
"APIレート制限に近づいています",
user_id=12345,
current_requests=90,
limit=100
)
# エラーログ
try:
# エラーを発生させる
result = 10 / 0
except ZeroDivisionError as e:
logger.error(
"計算エラーが発生しました",
user_id=12345,
operation="division",
extra_data={"dividend": 10, "divisor": 0},
exc_info=True
)
print("\nログファイルの内容:")
try:
with open("app.log", "r", encoding="utf-8") as f:
for line in f:
log_entry = json.loads(line.strip())
print(f"[{log_entry['timestamp']}] {log_entry['level']}: {log_entry['message']}")
except FileNotFoundError:
print("ログファイルが見つかりません")
パフォーマンス最適化
1. 大きなデータの処理
import json
import time
from typing import Generator, Dict, Any
def benchmark_json_processing():
"""JSONのパフォーマンステスト"""
# 大きなデータセットを作成
large_data = {
"users": [
{
"id": i,
"name": f"ユーザー{i}",
"email": f"user{i}@example.com",
"profile": {
"age": 20 + (i % 60),
"city": f"都市{i % 10}",
"interests": [f"趣味{j}" for j in range(i % 5 + 1)]
}
}
for i in range(10000)
],
"metadata": {
"total_users": 10000,
"generated_at": "2024-01-01T00:00:00Z"
}
}
print("JSON パフォーマンステスト")
print(f"データサイズ: {len(large_data['users'])} ユーザー")
# シリアライゼーション
start_time = time.time()
json_string = json.dumps(large_data, ensure_ascii=False)
serialize_time = time.time() - start_time
print(f"シリアライゼーション時間: {serialize_time:.3f}秒")
print(f"JSON文字列サイズ: {len(json_string):,} 文字")
# デシリアライゼーション
start_time = time.time()
parsed_data = json.loads(json_string)
deserialize_time = time.time() - start_time
print(f"デシリアライゼーション時間: {deserialize_time:.3f}秒")
print(f"パースされたユーザー数: {len(parsed_data['users'])}")
# ファイル書き込み
start_time = time.time()
with open('large_data.json', 'w', encoding='utf-8') as f:
json.dump(large_data, f, ensure_ascii=False)
write_time = time.time() - start_time
print(f"ファイル書き込み時間: {write_time:.3f}秒")
# ファイル読み込み
start_time = time.time()
with open('large_data.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
read_time = time.time() - start_time
print(f"ファイル読み込み時間: {read_time:.3f}秒")
# 総合時間
total_time = serialize_time + deserialize_time + write_time + read_time
print(f"総処理時間: {total_time:.3f}秒")
# ベンチマーク実行
benchmark_json_processing()
2. メモリ効率的な処理
import json
import ijson
from typing import Generator, Dict, Any
def memory_efficient_json_processing():
"""メモリ効率的なJSON処理"""
# 大きなJSONファイルを作成
def create_large_json_file(filename: str, record_count: int):
"""大きなJSONファイルを作成"""
with open(filename, 'w', encoding='utf-8') as f:
f.write('{"users": [')
for i in range(record_count):
user = {
"id": i,
"name": f"ユーザー{i}",
"email": f"user{i}@example.com",
"data": f"データ{i}" * 100 # 大きなデータ
}
if i > 0:
f.write(',')
json.dump(user, f, ensure_ascii=False)
f.write('],"metadata": {"total": ' + str(record_count) + '}}')
# 大きなJSONファイルを作成
print("大きなJSONファイルを作成中...")
create_large_json_file('large_stream.json', 1000)
# ストリーミング処理
def process_users_streaming(filename: str) -> Generator[Dict[str, Any], None, None]:
"""ストリーミングでユーザーを処理"""
with open(filename, 'rb') as f:
# ijsonを使用してストリーミング処理
users = ijson.items(f, 'users.item')
for user in users:
yield user
# ストリーミング処理の実行
print("ストリーミング処理実行中...")
user_count = 0
total_data_length = 0
start_time = time.time()
for user in process_users_streaming('large_stream.json'):
user_count += 1
total_data_length += len(user.get('data', ''))
if user_count % 100 == 0:
print(f"処理済み: {user_count} ユーザー")
processing_time = time.time() - start_time
print(f"ストリーミング処理完了:")
print(f" 処理時間: {processing_time:.3f}秒")
print(f" 処理ユーザー数: {user_count}")
print(f" 平均処理速度: {user_count/processing_time:.1f} ユーザー/秒")
# メモリ使用量の比較
import psutil
import os
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
print(f" メモリ使用量: {memory_mb:.1f} MB")
# ijsonが利用可能な場合のみ実行
try:
import ijson
memory_efficient_json_processing()
except ImportError:
print("ijson ライブラリがインストールされていません")
print("pip install ijson でインストールしてください")
他のライブラリとの比較
| 特徴 | JSON | MessagePack | Protocol Buffers | Avro |
|---|---|---|---|---|
| 可読性 | 高 | 低 | 低 | 低 |
| パフォーマンス | 中 | 高 | 高 | 高 |
| サイズ | 大 | 小 | 小 | 小 |
| 標準サポート | 優秀 | 制限的 | 制限的 | 制限的 |
| スキーマ | 不要 | 不要 | 必要 | 必要 |
| 型安全性 | 低 | 低 | 高 | 高 |
トラブルシューティング
よくある問題と解決策
-
エンコーディングエラー
# 問題: 日本語文字が正しく表示されない # 解決策: ensure_ascii=False を使用 data = {"name": "田中太郎"} json_str = json.dumps(data, ensure_ascii=False) -
循環参照エラー
# 問題: 循環参照によるシリアライゼーションエラー # 解決策: 循環参照を避ける def remove_circular_references(obj, seen=None): if seen is None: seen = set() if id(obj) in seen: return None seen.add(id(obj)) if isinstance(obj, dict): return {k: remove_circular_references(v, seen) for k, v in obj.items()} elif isinstance(obj, list): return [remove_circular_references(item, seen) for item in obj] return obj -
大きなファイルの処理
# 問題: 大きなJSONファイルでメモリ不足 # 解決策: ストリーミング処理 import ijson def process_large_json(filename): with open(filename, 'rb') as f: for item in ijson.items(f, 'items.item'): process_item(item)
まとめ
PythonのJSONライブラリは、データシリアライゼーションの標準的な選択肢です。特に以下の場合に効果的です:
利点
- 標準ライブラリ: 追加インストール不要
- 高い可読性: 人間が読みやすい形式
- 広範囲サポート: 全ての言語・プラットフォームで対応
- デバッグが容易: テキスト形式のため問題の特定が簡単
推奨用途
- Web APIでのデータ交換
- 設定ファイルの保存
- ログ記録での構造化データ
- データ分析での一時保存
高いパフォーマンスが必要な場合は、MessagePackやProtocol Buffersの使用を検討してください。しかし、可読性と互換性が重要な場合は、JSONが最適な選択肢です。