This article explains how to migrate data from Amazon Elasticsearch Service to another OpenSearch cluster. It introduces a simple and reliable migration method using the Scroll API and Bulk API.

Background

The need to migrate data between Elasticsearch/OpenSearch clusters can arise due to cloud service migration or cost optimization. This time, we performed a migration between the following environments.

  • Source: Amazon Elasticsearch Service (AWS)
  • Destination: Self-hosted OpenSearch

Migration Flow

  1. Check indices on source and destination
  2. Retrieve and adjust mapping information
  3. Create indices on the destination
  4. Migrate data with Scroll API + Bulk API
  5. Verify migration results

Preparation: Checking Indices

First, check the index lists on both the source and destination.

# Source index list
curl -u "user:password" "https://source-cluster/_cat/indices?v&s=index"

# Destination index list
curl -u "user:password" "https://dest-cluster/_cat/indices?v&s=index"

Step 1: Retrieve Mapping Information

Retrieve the mapping information from the source.

curl -s -u "user:password" \
  "https://source-cluster/index_name/_mapping" \
  > mappings.json

Step 2: Adjust Mappings

Depending on the destination environment, custom analyzers may not be available. For example, if the kuromoji plugin for Japanese morphological analysis is not installed, you need to remove the analyzer settings.

def remove_analyzer(obj):
    """Recursively remove analyzer settings from mappings"""
    if isinstance(obj, dict):
        if 'analyzer' in obj:
            del obj['analyzer']
        for key, value in obj.items():
            remove_analyzer(value)
    elif isinstance(obj, list):
        for item in obj:
            remove_analyzer(item)
    return obj

Step 3: Create Indices on the Destination

Create indices using the adjusted mappings.

import json
import requests
from requests.auth import HTTPBasicAuth

def create_index(source_index, dest_index, dest_url, dest_auth):
    # Load mapping file
    with open(f'{source_index}_mappings.json') as f:
        mappings_data = json.load(f)

    mappings = mappings_data[source_index]['mappings']
    clean_mappings = remove_analyzer(mappings)

    index_body = {
        'settings': {
            'number_of_shards': 1,
            'number_of_replicas': 1
        },
        'mappings': clean_mappings
    }

    response = requests.put(
        f"{dest_url}/{dest_index}",
        json=index_body,
        auth=dest_auth
    )
    print(f"Create {dest_index}: Status {response.status_code}")
    return response.status_code in [200, 201]

Step 4: Data Migration Script

Use the Scroll API to retrieve data from the source and insert it into the destination using the Bulk API.

import json
import requests
from requests.auth import HTTPBasicAuth
import time

SOURCE_URL = 'https://source-cluster'
SOURCE_AUTH = HTTPBasicAuth('user', 'password')

DEST_URL = 'https://dest-cluster'
DEST_AUTH = HTTPBasicAuth('user', 'password')

def migrate_index(source_index, dest_index, batch_size=1000):
    print(f"=== Migrating {source_index} -> {dest_index} ===")

    # Initialize Scroll API
    scroll_url = f"{SOURCE_URL}/{source_index}/_search?scroll=5m"
    query = {
        "size": batch_size,
        "query": {"match_all": {}}
    }

    response = requests.post(scroll_url, json=query, auth=SOURCE_AUTH)
    if response.status_code != 200:
        print(f"Error: {response.text}")
        return False

    data = response.json()
    scroll_id = data['_scroll_id']
    hits = data['hits']['hits']
    total = data['hits']['total']['value']

    print(f"Total documents: {total}")

    migrated = 0
    errors = 0
    start_time = time.time()

    while hits:
        # Prepare Bulk request
        bulk_body = ""
        for hit in hits:
            action = {"index": {"_index": dest_index, "_id": hit['_id']}}
            bulk_body += json.dumps(action) + "\n"
            bulk_body += json.dumps(hit['_source']) + "\n"

        # Send Bulk request
        headers = {"Content-Type": "application/x-ndjson"}
        bulk_response = requests.post(
            f"{DEST_URL}/_bulk",
            data=bulk_body,
            headers=headers,
            auth=DEST_AUTH
        )

        if bulk_response.status_code == 200:
            result = bulk_response.json()
            if result.get('errors'):
                for item in result['items']:
                    if 'error' in item.get('index', {}):
                        errors += 1
            migrated += len(hits)
        else:
            print(f"Bulk error: {bulk_response.text[:200]}")
            errors += len(hits)

        # Show progress
        elapsed = time.time() - start_time
        rate = migrated / elapsed if elapsed > 0 else 0
        eta = (total - migrated) / rate if rate > 0 else 0
        print(f"\rProgress: {migrated}/{total} ({migrated*100//total}%) "
              f"- {rate:.0f} docs/sec - ETA: {eta:.0f}s", end='', flush=True)

        # Get next batch
        scroll_response = requests.post(
            f"{SOURCE_URL}/_search/scroll",
            json={"scroll": "5m", "scroll_id": scroll_id},
            auth=SOURCE_AUTH
        )

        if scroll_response.status_code != 200:
            break

        data = scroll_response.json()
        scroll_id = data['_scroll_id']
        hits = data['hits']['hits']

    # Release Scroll context
    requests.delete(
        f"{SOURCE_URL}/_search/scroll",
        json={"scroll_id": scroll_id},
        auth=SOURCE_AUTH
    )

    print(f"\n\nCompleted: {migrated} documents, {errors} errors")
    return True

Step 5: Execute Migration and Verify

# Execute migration
migrate_index('items1', 'cj-items1')
migrate_index('items2', 'cj-items2')
migrate_index('images', 'cj-images')

# Verify migration results
curl -u "user:password" "https://dest-cluster/_cat/indices?v&s=index"

Notes and Best Practices

1. Handling Custom Analyzers

If plugins such as kuromoji are not available on the destination, the following options are available:

  • Migrate without analyzers: Japanese search accuracy will decrease, but data is preserved
  • Install plugins: Pre-install necessary plugins on the destination
  • Use alternative analyzers: Substitute with the standard analyzer, etc.

2. Adjusting Batch Size

# Small documents: use larger batch size
migrate_index('small_docs', 'dest_small', batch_size=5000)

# Large documents: use smaller batch size
migrate_index('large_docs', 'dest_large', batch_size=500)

3. Parallel Execution

You can reduce overall migration time by migrating multiple indices in parallel.

# Run in parallel in the background
python migrate.py items1 cj-items1 &
python migrate.py items2 cj-items2 &
python migrate.py images cj-images &
wait

4. Error Handling

It is recommended to implement a mechanism that checks Bulk API responses and records/retries documents that encountered errors.

if result.get('errors'):
    for item in result['items']:
        if 'error' in item.get('index', {}):
            error_doc_id = item['index']['_id']
            error_reason = item['index']['error']['reason']
            # Record in error log
            logging.error(f"Failed: {error_doc_id} - {error_reason}")

5. Verifying Document Counts

After migration, compare the document counts between the source and destination to verify data consistency.

# Source document count
curl -s -u "user:password" "https://source/_count?q=*"

# Destination document count
curl -s -u "user:password" "https://dest/_count?q=*"

Performance Estimates

Migration speed varies depending on the environment and document size, but generally the following can be expected:

  • Small to medium documents: 500-1,000 docs/sec
  • Large documents: 100-500 docs/sec
  • Network bandwidth: The distance and bandwidth between clusters significantly affects speed

Summary

By combining the Scroll API and Bulk API, even large-scale data can be migrated reliably. It is important to proceed with migration while paying attention to environmental differences such as the availability of custom analyzers.