Bu dökümanın amacı, Apinizer Manager üzerinden Administration>Gateway Environments sayfasındaki ortamlarda, herhangi bir log connector'e Failover yöntemi olarak Mongodb gösterildiği ve burada biriken logların Administration>Analytics>Migrate Unsent API Traffic Logs sayfasından tekrar ilgili log kaynağına gönderim işleminde sorun yaşanması durumunda MongoDB'deki ilgili koleksiyondan gönderilemeyen bu trafik loglarını bir bash script ile göndermektir.

Bash script'in, log kaybı yaşanmaması için aşamalı olarak çalıştırılması ve veri tutan koleksiyon hakkında bilgi sahibi olunması önemlidir. Şemanın yapısını anlamak, gönderilemeyen logların ilgili log kaynağına başarılı bir şekilde aktarılması sürecinde kritik rol oynar.

Bu dokümanda log kaynağı Elasticsearch olarak ilerlenmiştir.

Bash Script Erişim Gereksinimleri

Bash script mongodb primary sunucusunda çalıştırılacaktır ve MongoDB primary sunucusunun Elasticsearch sunucusunun 9200 portuna erişim iznine sahip olması gerekmektedir.

Bash Script Paket Gereksinimleri

Bash script, JSON verilerini ayrıştırma işlemleri için jq paketini kullanmaktadır. jq paketinin kurulu olup olmadığını kontrol etmek için aşağıdaki komut çalıştırılabilir:

jq --version

Eğer jq paket yüklü değilse:

# Red Hat tabanlı sistemler için

sudo dnf install jqata

# Debian tabanlı sistemler için

sudo apt-get update sudo apt-get install jq

jq paketi, çoğu güncel Linux dağıtımının varsayılan depolarında bulunduğundan, güncel sunucularda çevrimdışı olsalar dahi bu komutlarla kurulabilmektedir.

Dikkat Edilmesi Gerekenler

Bash script çalıştırılırken, log verilerinin büyüklüğüne bağlı olarak MongoDB üzerinde yoğun işlem yükü oluşabilir. Bu durum, özellikle bellek (RAM) kullanımı üzerinde artışa sebep olabilir. Olası performans sorunlarının önüne geçmek için, script çalışırken sunucu kaynaklarını düzenli olarak aşağıdaki komutlarla kontrol etmek önemlidir:

free -g

systemctl status mongod.service

Eğer RAM kullanımı yüksek veya MongoDB servisi durma belirtileri gösterirse, script'i durdurup kaynak kullanımını incelemeniz ve gerekirse MongoDB yapılandırmasını ya da log gönderimini optimize etmeniz önerilir.

1) Mongodb Veritabanı Koleksiyonu Ön Çalışması

MongoDB veritabanına bağlanarak ilgili koleksiyon hakkında aşağıdaki bilgilerin alınması gerekmektedir:

  1. Koleksiyon Adı: Elasticsearch'e gönderilmesi gereken koleksiyonun adı belirlenmelidir (Apinizer versiyonuna göre UnsentMessage veya log_UnsentMessage). Bu koleksiyon, apinizerdb veritabanında yer alan ve verilerin alınıp Elasticsearch'e aktarılacak koleksiyon olmalıdır.

  2. Döküman Sayısı: Koleksiyondaki toplam döküman sayısı, Elasticsearch'e aktarılacak veri miktarını anlamanızı sağlar. Bu, veritabanı büyüklüğünü de göz önünde bulundurmanız için önemlidir.

  3. Koleksiyon Boyutu: Koleksiyonun toplam boyutu, verinin ne kadar büyük olduğunu ve aktarım süresi ile performans üzerinde nasıl bir etki yapabileceğini belirlemek için kullanılır.

  4. Veri Formatı ve Şema Kontrolü: Koleksiyondaki dökümanların formatı, Elasticsearch'e doğru şekilde aktarılabilmesi için kontrol edilmelidir. Gerekirse, verinin dönüştürülmesi veya şemanın uyumlu hale getirilmesi gerekebilir.

Bu bilgileri toplamak, sürecin doğru bir şekilde ilerlemesini ve olası sorunları önceden görmenizi sağlar.

mongosh mongodb://localhost:25080 --authenticationDatabase "admin" -u "<USERNAME>" -p "<PASSWORD>"

// Veritabanlarındaki tüm veritabanlarını gösterir.
show dbs

// apinizerdb veritabanını seçer.
use apinizerdb

// apinizerdb veritabanındaki koleksiyonları gösterir.
show collections

// 'UnsentMessage' koleksiyonundaki toplam döküman sayısını kontrol eder. 
db.UnsentMessage.countDocuments({})

// 'UnsentMessage' koleksiyonunun depolama boyutunu MB cinsinden hesaplar.
db.UnsentMessage.stats().storageSize / (1024 * 1024).toFixed(2)

// Çalışma öncesi UnsentMessage ve log_UnsentMessage koleksiyonları hariç yedek alınır.
sudo mongodump --host <IP_ADDRESS> --port 25080 --excludeCollection UnsentMessage --excludeCollection log_UnsentMessage -d apinizerdb --authenticationDatabase "admin" -u apinizer -p <PASSWORD> --gzip --archive=<DIRECTORY>/apinizer-backup-d<DATE>-v<APINIZER_VERSION>--1.archive
BASH

2) MongoDb'den Elasticsearch'e Gönderilecek Dökümanların Elasticsearch'de Mevcut Olup Olmadığı Kontrol Edilir

MongoDB’den Elasticsearch’e loglar aktarılmadan önce, Apinizer Correlation ID (aci) değerleri UnsentMessage koleksiyonunda baştan ve sondan bir döküman alınarak test edilir. 

Retrieving Log Data from MongoDB

vi mongo_test.sh

mongo mongodb://<MONGO_IP>:25080 --authenticationDatabase "admin" -u "apinizer" -p
# Mongo versiyonu 6 ve yukarısıysa aşağıdaki komut çalıştırılır.
mongosh mongodb://<MONGO_IP>:25080 --authenticationDatabase "admin" -u "apinizer" -p

use apinizerdb
# İlk dökümanın aci değerini almak için
db.UnsentMessage.find({}, { content: 1, _id: 0 }).sort({ _id: 1 }).limit(1).forEach((doc) => { print(JSON.parse(doc.content).aci);});
# Son dökümanın aci değerini almak için
db.UnsentMessage.find({}, { content: 1, _id: 0 }).sort({ _id: -1 }).limit(1).forEach((doc) => { print(JSON.parse(doc.content).aci);});
BASH

Elasticsearch’te Logların Kontrolü

İlk dökümanın ve son dökümanın ACI değerleri, Apinizer arayüzü üzerinden Elasticsearch’te olup olmadığı kontrol edilerek doğrulanır.

Bu noktada, ilgili ACI değerlerinin tüm environment'larda, tüm projelerde ve geniş bir zaman aralığında kontrol edilmesi gerekmektedir.

3) Loglar Elasticsearch'te Bulunmuyorsa Öncelikle Sınırlı Sayıda Veri Taşınarak Test Edilir

Bu bölümde asıl çalıştıracağımız scripti küçük bir veri setiyle sağlıklı şekilde çalışıp çalışmadığını kontrol etmek amacıyla çalıştırıyoruz.

Belirlenen miktarda log, Elasticsearch'e yazılıp ardından MongoDB'den silinir. Silinen dökümanlar, Elasticsearch'e yazılamama ihtimaline karşı bir dosyaya kaydedilir. Ayrıca, ACI değerleri ayrı bir dosyada saklanarak, Apinizer üzerinden log'ların Elasticsearch'te olup olmadığı kontrol edilir.

Eğer loglar başarıyla Elasticsearch'e yazıldıysa, bir sonraki aşamaya geçilebilir.

Dosya Yapısının Oluşturulması

Öncelikle, bash script’inin çalışacağı dizin oluşturulur ve gerekli dosyalar hazırlanır:

mkdir mongo-to-elastic-test
cd mongo-to-elastic-test/
touch aci.txt data.json yedek.json mongo_test_log.txt mongo_test.sh
chmod +x mongo_test.sh
BASH

vi mongo_test.sh

MONGO_URI="mongodb://<MONGO_USER>:<MONGO_USER_PASSWORD>@<MONGO_IP>:25080/admin?replicaSet=apinizer-replicaset"
MONGO_DB="apinizerdb"
MONGO_COLLECTION="UnsentMessage"
TEST_SIZE=150

ES_URL="http://<ELASTIC_IP>:9200"
ES_INDEX="<ELASTIC_DATA_STREAM_INDEX>(apinizer-log-apiproxy-exampleIndex)"

data=$(mongo "$MONGO_URI" --quiet --eval "JSON.stringify(db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.find({}, {'_id': 1, 'content': 1}).limit($TEST_SIZE).toArray())" | grep -vE "I\s+(NETWORK|CONNPOOL|ReplicaSetMonitor|js)")
# If Mongo version is 6 and above, data is retrieved as follows.
# data=$(mongosh "$MONGO_URI" --quiet --eval "JSON.stringify(db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.find({}, {'_id': 1, 'content': 1}).limit($TEST_SIZE).toArray())")
delete_ids=()

for row in $(echo "$data" | jq -r '.[] | @base64'); do
    # Decode the base64 encoded row and extract _id and content
    _id=$(echo $row | base64 --decode | jq -r '._id["$oid"]')
    content=$(echo $row | base64 --decode | jq -r '.content')
    aci_value=$(echo "$content" | jq -r '.aci')

    echo "İşlenen ID: $_id, ACI Değeri: $aci_value" >> aci.txt

    # Save content to file
    echo "$content" > <DIRECTORY>/mongo-to-elastic-test/data.json
    echo "$row" >> <DIRECTORY>/mongo-to-elastic-test/yedek.json

    # Send data to Elasticsearch
    response=$(curl -s -X POST "$ES_URL/$ES_INDEX/_doc" -H "Content-Type: application/json" --data-binary @<DIRECTORY>/mongo-to-elastic-test/data.json)

    # Extract successful shard count from the response
    successful=$(echo "$response" | jq -r '._shards.successful')
    if [ "$successful" -eq 1 ]; then
        delete_ids+=("$_id")
    fi
done
# Create the ids_string for the delete operation in MongoDB
ids_string=$(printf "ObjectId(\"%s\"), " "${delete_ids[@]}" | sed 's/, $//')  # Removing trailing comma

# Perform deletion from MongoDB
if [ ${#delete_ids[@]} -gt 0 ]; then
    mongo "$MONGO_URI" --quiet --eval "db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.deleteMany({ '_id': { \$in: [$ids_string] } })"
    # If Mongo version 6 and above, the data is deleted as follows.
    # mongosh "$MONGO_URI" --quiet --eval "db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.deleteMany({ '_id': { \$in: [$ids_string] } })" 
    echo "Silme işlemi başarılı!"
fi
BASH

Script’in Çalıştırılması

Script, aşağıdaki komutla arka planda çalıştırılır:

nohup bash <DIRECTORY>/mongo_test.sh > mongo_test_log.txt 2>&1 &
ps aux | grep <PROCESS_ID>
BASH

Elasticsearch’te Logların Kontrolü

Script çalıştırıldıktan sonra aci.txt dosyasında listelenen ACI değerleri, Apinizer arayüzü üzerinden Elasticsearch’te olup olmadığı kontrol edilerek doğrulanır.

Bu noktada, ilgili ACI değerlerinin tüm environment'larda, tüm projelerde ve geniş bir zaman aralığında kontrol edilmesi gerekmektedir. Ve ACI değerleri girildiği zaman aşağıdaki görseldeki gibi ilgili logların gelmesi gerekmektedir.

4) Asıl script

Buraya verilerin sadece failover olarak mongodb'ye yazılıp yazılmadığını ve bunları elasticsearch'e geri gönderilip gönderilmediğini kontrol ettik.

Aşağıdaki Bash script’i, MongoDB’den verileri 1000’er batch’ler halinde alarak Elasticsearch’e yazılmasını ve başarıyla aktarılan verilerin MongoDB’den silinmesini sağlar.

Script içerisinde bulunan TOTAL_BATCHES parametresi, UnsentMessage veya log_UnsentMessage koleksiyonlarındaki toplam döküman sayısına göre ayarlanmalıdır. TOTAL_BATCHES × 1000 formülüyle işlenecek toplam veri miktarı belirlenir.

Dosya Yapısının Oluşturulması

Öncelikle, bash script’inin çalışacağı dizin oluşturulur ve gerekli dosyalar hazırlanır:

mkdir mongo-to-elastic
cd mongo-to-elastic/
touch data.json mongo_to_elastic_log.txt mongo_to_elastic.sh
chmod +x mongo_to_elastic.sh
BASH
MONGO_URI="mongodb://<MONGO_USER>:<MONGO_USER_PASSWORD>@<MONGO_IP>:25080/admin?replicaSet=apinizer-replicaset"
MONGO_DB="apinizerdb"
MONGO_COLLECTION="UnsentMessage"

ES_URL="http://<ELASTIC_IP>:9200"
ES_INDEX="<ELASTIC_DATA_STREAM_INDEX>(apinizer-log-apiproxy-exampleIndex)"
BATCH_SIZE=1000
TOTAL_BATCHES=<TOTAL_BATCHES = DOCUMENTSIZE / 1000>

for ((i=1; i<=TOTAL_BATCHES; i++)); do
    data=$(mongo "$MONGO_URI" --quiet --eval "JSON.stringify(db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.find({}, {'_id': 1, 'content': 1}).limit($BATCH_SIZE).toArray())" | grep -vE "I\s+(NETWORK|CONNPOOL|ReplicaSetMonitor|js)")
	# If Mongo version is 6 and above, data is retrieved as follows.
	# data=$(mongosh "$MONGO_URI" --quiet --eval "JSON.stringify(db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.find({}, {'_id': 1, 'content': 1}).limit($BATCH_SIZE).toArray())")
	
    delete_ids=()

    for row in $(echo "$data" | jq -r '.[] | @base64'); do
        # Decode the base64 encoded row and extract _id and content
        _id=$(echo $row | base64 --decode | jq -r '._id["$oid"]')
        content=$(echo $row | base64 --decode | jq -r '.content')

        # Save content to file
        echo "$content" > <DIRECTORY>/mongo-to-elastic/data.json
        # Send data to Elasticsearch
        response=$(curl -s -X POST "$ES_URL/$ES_INDEX/_doc" -H "Content-Type: application/json" --data-binary @<DIRECTORY>/mongo-to-elastic/data.json)
        # Extract successful shard count from the response
        successful=$(echo "$response" | jq -r '._shards.successful')
        if [ "$successful" -eq 1 ]; then
            delete_ids+=("$_id")
        fi
    done
    # Create the ids_string for the delete operation in MongoDB
    ids_string=$(printf "ObjectId(\"%s\"), " "${delete_ids[@]}" | sed 's/, $//')  # Removing trailing comma

    # Perform deletion from MongoDB
    if [ ${#delete_ids[@]} -gt 0 ]; then
        mongo "$MONGO_URI" --quiet --eval "db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.deleteMany({ '_id': { \$in: [$ids_string] } })"
		# If Mongo version 6 and above, the data is deleted as follows.
		# mongosh "$MONGO_URI" --quiet --eval "db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.deleteMany({ '_id': { \$in: [$ids_string] } })"
        echo "Silme işlemi başarılı!"
    fi
done
BASH

Script’in Çalıştırılması

Script, aşağıdaki komutla arka planda çalıştırılır:

nohup bash <DIRECTORY>/mongo_to_elastic.sh > mongo_to_elastic_log.txt 2>&1 &
ps aux | grep <PROCESS_ID>
BASH


Script çalışırken, log dosyası kontrol edilmelidir. Eğer aşağıdaki gibi bir ifade log dosyasına sürekli ekleniyorsa script başarılı bir şekilde veri taşıma işlemi ilerlemektedir.

{ "acknowledged" : true, "deletedCount" : 1000 }
TEXT


Ayrıca, MongoDB üzerinden de kayıt sayıları kontrol edilerek log’ların taşınıp taşınmadığı doğrulanabilir.

mongo mongodb://<MONGO_IP>:25080 --authenticationDatabase "admin" -u "apinizer" -p
# Mongo versiyonu 6 ve yukarısıysa aşağıdaki komut çalıştırılır.
mongosh mongodb://<MONGO_IP>:25080 --authenticationDatabase "admin" -u "apinizer" -p

use apinizerdb 
db.UnsentMessage.countDocuments({})
BASH

5) Script ile İlgili Olası Hatalar ve Çözümler

1) Script'in Takılı Kalması veya Log Dosyasına Hiç Veri Yazmaması Durumu

Bu durum, MongoDB’den Elasticsearch’e veri aktaran komutun timeout'a uğraması sebebiyle gerçekleşebilir. Çözüm olarak, taşınması 2 saniyeden uzun süren MongoDB dökümanlarının _id değerleri timeout_ids dizisine kaydedilir. Böylece, sorunlu dökümanlar atlanarak aktarım işlemi devam ettirilir.

Aşağıdaki scriptte yapılan iyileştirmelerle bu sorun çözülür.

MONGO_URI="mongodb://<MONGO_USER>:<MONGO_USER_PASSWORD>@<MONGO_IP>:25080/admin?replicaSet=apinizer-replicaset"
MONGO_DB="apinizerdb"
MONGO_COLLECTION="UnsentMessage"

ES_URL="http://<ELASTIC_IP>:9200"
ES_INDEX="<ELASTIC_DATA_STREAM_INDEX>(apinizer-log-apiproxy-exampleIndex)"
BATCH_SIZE=1000
TOTAL_BATCHES=<TOTAL_BATCHES = DOCUMENTSIZE / 1000>
timeout_ids=()
for ((i=1; i<=TOTAL_BATCHES; i++)); do
    if [ ${#timeout_ids[@]} -gt 0 ]; then
        timeout_ids_json=$(printf "ObjectId(\"%s\"), " "${timeout_ids[@]}" | sed 's/, $//')
        query="{'_id': { \$nin: [$timeout_ids_json] }}"
    else
        query="{}"
    fi
    data=$(mongo "$MONGO_URI" --quiet --eval "JSON.stringify(db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.find($query,{'_id': 1, 'content': 1}).limit($BATCH_SIZE).toArray())" | grep -vE "I\s+(NETWORK|CONNPOOL|ReplicaSetMonitor|js)")
	
	# If Mongo version is 6 and above, data is retrieved as follows.
	# data=$(mongosh "$MONGO_URI" --quiet --eval "JSON.stringify(db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.find($query, {'_id': 1, 'content': 1}).limit($BATCH_SIZE).toArray())")
	
    delete_ids=()

    for row in $(echo "$data" | jq -r '.[] | @base64'); do
        # Decode the base64 encoded row and extract _id and content
        _id=$(echo $row | base64 --decode | jq -r '._id["$oid"]')
        content=$(echo $row | base64 --decode | jq -r '.content')
        
        # Save content to file
        echo "$content" > <DIRECTORY>/mongo-to-elastic/data.json
        # Send data to Elasticsearch
        response=$(curl -s -X POST "$ES_URL/$ES_INDEX/_doc" -H "Content-Type: application/json" --data-binary @<DIRECTORY>/mongo-to-elastic/data.json)
		
        # Extract successful shard count from the response
        successful=$(echo "$response" | jq -r '._shards.successful')
        if [ "$successful" -eq 1 ]; then
            delete_ids+=("$_id")
        else
            timeout_ids+=("$_id")
        fi
    done

    # Perform deletion from MongoDB
    if [ ${#delete_ids[@]} -gt 0 ]; then
        ids_string=$(printf "ObjectId(\"%s\"), " "${delete_ids[@]}" | sed 's/, $//')
        mongo "$MONGO_URI" --quiet --eval "db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.deleteMany({ '_id': { \$in: [$ids_string] } })"
		# If Mongo version 6 and above, the data is deleted as follows.
		# mongosh "$MONGO_URI" --quiet --eval "db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.deleteMany({ '_id': { \$in: [$ids_string] } })"
    fi
done
BASH

2) Script'in log dosyasında tcmalloc: large alloc 1073741824 bytes == 0x55f159412000 @ Hatası Varsa

Bu hata, batch ile alınan verilerin çok büyük olması nedeniyle bellekte aşırı yer kaplamasından kaynaklanır. Çözüm olarak, BATCH_SIZE değerini düşürmek gerekir. Örneğin, BATCH_SIZE 1000'den 10'a düşürülebilir:

3) Script'in log dosyasında mongo_to_elastic_log.txt: line 17: /usr/bin/mongo: Argument list too long Hatası Varsa

Bu hata, UnsentMessage koleksiyonundaki content alanlarının çok büyük olması nedeniyle oluşur. Komut satırında geçirilen argümanların boyutu sistem limitlerini aştığında bu hata meydana gelir. Büyük boyutlu verileri atlayarak scriptin çalışmaya devam etmesi sağlanabilir. Bunun için, belirli bir veri boyutunu atlamak üzere skip parametresi kullanılabilir:

data=$(mongo "$MONGO_URI" --quiet --eval "JSON.stringify(db.getSiblingDB('$MONGO_DB').$MONGO_COLLECTION.find($query,{'_id': 1, 'content': 1}).skip(<SKIP_DATA_SIZE>).limit($BATCH_SIZE).toArray())" | grep -vE "I\s+(NETWORK|CONNPOOL|ReplicaSetMonitor|js)")
BASH

Eğer MongoDB 6 ve üstü bir sürüm kullanılıyorsa, mongosh komutunu kullanarak aynı işlem gerçekleştirilir.

6) Script Log Taşımasını Tamamladıktan Sonra MongoDB'de Disk Alanının Geri Kazanılması

Script, UnsentMessage koleksiyonundaki verileri Elasticsearch'e başarıyla taşıdıktan sonra, MongoDB'de gereksiz yer kaplayan boş alanları temizlemek ve disk kullanımını optimize etmek için compact işlemi gerçekleştirilmelidir.

mongosh mongodb://<MONGO_IP>:25080/apinizerdb --authenticationDatabase "admin" -u "apinizer" -p
 
db.runCommand({compact: "UnsentMessage"})
BASH

Bu komut, UnsentMessage koleksiyonunun fiziksel depolama alanını yeniden düzenler. Özellikle büyük veri taşıma işlemlerinden sonra bu komutun mongodb secondary node üzerinde çalıştırılması önerilir.

Single node sistemlerde compact işlemi sırasında koleksiyon kilitleneceğinden, kullanım yoğun değilken veya veritabanın durması tolere edilebilecek iken çalıştırılması önerilir.