Skip to content

ItsMerad/DataFlow-Analytics-Platform

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DataFlow Analytics Platform

Kafka Flink Spark Hadoop Hive PostgreSQL

Büyük veri işleme ve analizi için kapsamlı bir platform. Gerçek zamanlı streaming ve toplu analiz yetenekleri sunarak herhangi bir veri kaynağından gelen verileri işlemenize ve analiz etmenize olanak sağlar.

📊 Proje Özeti

Bu proje, çeşitli veri kaynakları için esnek bir veri işleme mimarisi sunur:

  1. Kafka Producer - Herhangi bir veri kaynağını Kafka topic'ine gönderir
  2. Flink Streaming - Gerçek zamanlı olarak Kafka'dan veri alıp HDFS'e işlenmiş halde yazar
  3. Spark Batch - Toplu veri analizleri yapıp sonuçları HDFS'de depolar
  4. Hive - Tüm verileri yapılandırılmış SQL sorguları ile kolayca sorgulamanızı sağlar

🏗️ Mimari

Veri Kaynağı (CSV, JSON, API vs.)
    ↓
Kafka Producer → Kafka Broker (Event Hub)
    ↓
    ├→ Flink Streaming Job → HDFS (Gerçek Zamanlı İşleme)
    │
    └→ Spark Batch Job → HDFS (Toplu Analiz)
          ↓
        Hive (Sorgulanabilir Veri Kataloğu)

🛠️ Teknoloji Stack

Bileşen Versiyon Görev
Kafka 7.5.0 Event streaming platform
Flink 1.17.0 Gerçek zamanlı stream processing
Spark 3.5.0 Batch data processing & analytics
HDFS 3.2.1 Dağıtılmış dosya sistemi
Hive 3.x SQL veri kataloğu
PostgreSQL 14 Hive metastore veritabanı
Zookeeper 7.5.0 Dağıtılmış koordinasyon

⚡ Hızlı Başlangıç (5 Dakika)

# 1. Repoyu clone et
git clone <repo-url>
cd DataProject

# 2. Docker'ı başlat
docker-compose up -d

# 3. HDFS hazırla
docker exec namenode hdfs dfs -mkdir -p /project/{raw,streamed_data,analysis_results}

# 4. Veri yükle
docker cp data/Tweets.csv namenode:/
docker exec namenode hdfs dfs -put /Tweets.csv /project/raw/

# 5. Producer başlat
cd kafka && pip install -r requirements.txt
python producer.py &

# 6. Streaming işle
cd ../flink-jobs
pip install pyflink==1.17.0
flink run -py flink_streaming_job.py &

# 7. Batch analiz
spark-submit --master local[*] ../spark-master/spark_batch_job.py

Tamamlandı! Artık Hive'da verilerini sorgulamaya başlayabilirsin.

📂 Proje Yapısı

DataProject/
├── README.md                        # Bu dosya
├── docker-compose.yml               # Servis konfigürasyonu
├── data/
│   └── Tweets.csv                  # Örnek veri (özelleştir)
├── kafka/
│   ├── producer.py                 # Veri kaynağı entegrasyonu (ÖZELLEŞTIR)
│   └── requirements.txt             # Python bağımlılıkları
├── flink-jobs/
│   └── flink_streaming_job.py      # Gerçek zamanlı işleme (ÖZELLEŞTIR)
├── spark-master/
│   └── spark_batch_job.py          # Batch analizi (ÖZELLEŞTIR)
├── hive/
│   └── create_tables.sql           # Veri tablosu tanımları (ÖZELLEŞTIR)
└── drivers/
    └── download_jars.sh            # Bağımlılık indirici

🔧 Özelleştirmek İçin İzlenecek Dosyalar

Dosya İşlev Değiştir
kafka/producer.py Veri kaynağını Kafka'ya gönder Veri formatı, kaynağı
flink-jobs/flink_streaming_job.py Gerçek zamanlı işlem Filtreleme, dönüştürme mantığı
spark-master/spark_batch_job.py Toplu analiz SQL, aggregation, machine learning
hive/create_tables.sql Veri kataloğu Şema, sütun tanımları

🚀 Başlangıç

Ön Koşullar

  • Docker & Docker Compose
  • Python 3.8+
  • Bash

Temel Kurulum & Çalıştırma

1. Docker servisleri başlat

docker-compose up -d

Tüm servislerin sağlıklı olmasını kontrol et:

docker-compose ps

2. HDFS klasörlerini oluştur

docker exec namenode hdfs dfs -mkdir -p /project/raw
docker exec namenode hdfs dfs -mkdir -p /project/streamed_data
docker exec namenode hdfs dfs -mkdir -p /project/analysis_results

3. Veri dosyalarını HDFS'e yükle

# CSV dosyasını namenode'a kopyala
docker cp data/Tweets.csv namenode:/

# HDFS'e yükle
docker exec namenode hdfs dfs -put /Tweets.csv /project/raw/input_data.csv

4. Hive tabloları oluştur

Hive metastore'u başlat:

docker exec hive-server schematool -initSchema -dbType postgres

Tablolarını tanımla:

docker exec hive-server hive -f /path/to/hive/create_tables.sql

5. Kafka Producer'ı çalıştır

Producer'ınızı özelleştirerek Kafka'ya veri gönderin:

cd kafka
pip install -r requirements.txt
python producer.py --data-source <your_data_file>

Producer.py dosyasını kendi veri kaynağınız için düzenleyebilirsiniz.

6. Flink Streaming Job'unu çalıştır

Flink JAR'larını indir:

cd drivers
bash download_jars.sh 1.17.0

PyFlink'i yükle:

pip install pyflink==1.17.0

Streaming job'u başlat:

flink run -py ../flink-jobs/flink_streaming_job.py

7. Spark Batch Job'unu çalıştır

spark-submit --master local[*] spark-master/spark_batch_job.py

💡 Kullanım Örnekleri

Örnek 1: E-ticaret İşlem Analizi

Veri: Sipariş ve ödeme işlemleri

# producer.py'da özelleştir
message = {
    "order_id": "ORD12345",
    "customer_id": "CUST001",
    "amount": 299.99,
    "timestamp": "2026-02-15 10:30:00",
    "status": "completed"
}

Streaming: Gerçek zamanlı satış izleme Batch: Günlük revenue raporu, müşteri davranışı analizi

Örnek 2: IoT Sensör Verileri

Veri: Sıcaklık, nem, basınç ölçümleri

{
  "sensor_id": "SENSOR_001",
  "location": "Building_A",
  "temperature": 22.5,
  "humidity": 45.3,
  "timestamp": "2026-02-15 10:30:00"
}

Streaming: Anormal değer tespiti, uyarı sistemi Batch: Dönemsel eğilim analizi, periyodik bakım planlaması

Örnek 3: Günlük (Log) Analizi

Veri: Uygulama logları, error stacktrace'leri

{
  "timestamp": "2026-02-15T10:30:00Z",
  "level": "ERROR",
  "service": "api-service",
  "message": "Database connection timeout",
  "trace_id": "abc123"
}

Streaming: Hata uyarıları, sistem sağlık monitörü Batch: Root cause analizi, performans raporları

📋 Veri Akışı

1. Producer → Kafka

Giriş: CSV, JSON, API veya herhangi bir veri kaynağı

Kafka Topic: data_stream (konfigüre edilebilir)

Mesaj Format:

{
  "id": "unique_identifier",
  "category": "data_category",
  "timestamp": "2026-02-15 10:30:00",
  "value": 123.45,
  "metadata": {...},
  "_source": "producer_name"
}

Producer.py dosyasını düzenleyerek kendi veri kaynağınız için özelleştirebilirsiniz.

2. Flink Streaming

Giriş: Kafka topic

İşlem:

  • Gerçek zamanlı veri filtreleme
  • Veri dönüştürme ve zenginleştirme
  • İstatistik hesaplama
  • Anomali tespiti

Çıkış: HDFS /project/streamed_data/ (Avro format)

Özellikler:

  • 10 saniye checkpoint aralığı (veri tutarlılığı için)
  • Tam veri işleme modunu destekler
  • Durumu kurtarma (fault-tolerant)

3. Spark Batch

Giriş: HDFS deposunda depolanan veri

Analiz Türleri:

  • Veri kümeleme ve gruplama
  • İstatistiksel analiz (ortalama, medyan, varyans vs.)
  • Zaman serisi analizi
  • Yapısal veriden içgörü çıkarma

Çıkış: HDFS /project/analysis_results/ (Parquet format)

Spark job'u için örnekler:

  • KPI hesaplaması
  • Rapor oluşturma
  • İlişkisel analiz

4. Hive Veri Kataloğu

Tüm işlenmiş verileri SQL ile sorgulanabilir hale getirir.

🗄️ Hive Veri Tabloları

Hive, tüm verileri SQL ile sorgulanabilir hale getiren meta-veri katmanıdır. Tablolarınızı ihtiyaçlarınıza göre hive/create_tables.sql dosyasında tanımlayabilirsiniz.

Varsayılan Tablolar

1. data_raw_csv

  • HDFS'deki orijinal veri dosyaları
  • CSV, JSON veya TEXTFILE format
  • Tüm raw veriyi içerir

2. data_stream_processed

  • Flink'in gerçek zamanlı işlediği veriler
  • Avro format (sıkıştırılmış)
  • Partition desteği (date, time, category vs.)

3. analysis_results

  • Spark batch job'unun analiz sonuçları
  • Parquet format (optimize sorgu performansı)
  • Kümlenmiş ve agregat veriler

Hive'da Sorgulama

docker exec hive-server hive

# Örnek: Tüm tabloları listele
SHOW TABLES;

# Örnek: Raw verinin şemasını göster
DESCRIBE FORMATTED data_raw_csv;

# Örnek: Stream verisini filtrele
SELECT * FROM data_stream_processed 
WHERE date >= '2026-02-01' 
LIMIT 100;

# Örnek: Analiz sonuçlarını sırala
SELECT * FROM analysis_results 
ORDER BY importance DESC;

# Örnek: Birden fazla tabloyu birleştir
SELECT a.category, COUNT(*) as count, b.avg_value
FROM data_raw_csv a
JOIN analysis_results b ON a.id = b.id
GROUP BY a.category;

📊 Web UI'lar

Servis URL Açıklama
HDFS NameNode http://localhost:9870 HDFS dosya sistemi
Kafka localhost:9092 Broker portu
Zookeeper localhost:2181 Koordinasyon

🐛 Sorun Giderme

Kafka Producer bağlanmıyor

# Kafka broker'ın hazır olup olmadığını kontrol et
docker-compose logs kafka | grep "started"

# Kafka'ya telnet ile çerçevelendir
telnet localhost 9092

# Producer'ı verbose modda çalıştır
python producer.py --log-level DEBUG

Flink job başlamıyor

# Flink'in tüm bileşenlerinin çalıştığını kontrol et
docker-compose logs jobmanager
docker-compose logs taskmanager

# Checkpoint klasörünün var olup olmadığını kontrol et
docker exec namenode hdfs dfs -ls /flink

# Log dosyalarını kontrol et
docker-compose logs flink | tail -100

Spark job hata veriyor

# HDFS dosyalarının var olduğunu kontrol et
docker exec namenode hdfs dfs -ls /project/raw/

# Spark loglarını kontrol et
docker-compose logs spark-master | tail -100

# Spark UI (eğer docker-compose'de expose edildiyse)
# http://localhost:4040

Hive tablolarına veri yazılmıyor

# PostgreSQL'in çalıştığını kontrol et
docker-compose ps postgres

# Hive metastore bağlantısını test et
docker exec hive-server hive -e "SHOW TABLES;"

# Hive loglarını kontrol et
docker exec hive-server tail -100 /var/log/hive/hive.log

# HDFS'e yazım izinlerini kontrol et
docker exec namenode hdfs dfsadmin -report

HDFS'deki veri görünmüyor

# Tüm dosyaları listele
docker exec namenode hdfs dfs -find /project -type f

# Dosya detaylarını göster
docker exec namenode hdfs dfs -ls -R /project

# Dosya içeriğini oku
docker exec namenode hdfs dfs -cat /project/raw/input_data.csv | head -20

Memory ya da Resource hataları

# Container resource sınırlarını artır (docker-compose.yml'de)
services:
  taskmanager:
    mem_limit: 2g
    cpus: 1.5

# Flink Parallelism'i düşür
table_env.get_config().get_configuration().set_string(
    "parallelism.default", "2"
)

� Operasyonel Görevler

Günlük Bakım

# Container'ları kontrol et
docker-compose ps

# Log'ları monitör et
docker-compose logs -f [service-name]

# Disk alanını kontrol et
docker exec namenode hdfs dfsadmin -report

Veri Temizliği

# HDFS'deki eski verileri sil
docker exec namenode hdfs dfs -rm -r /project/streamed_data/*
docker exec namenode hdfs dfs -rm -r /project/analysis_results/*

# Hive table'ını temizle (veriyi sil, tabloyu koru)
docker exec hive-server hive -e "TRUNCATE TABLE data_stream_processed;"

# Kafka topic'i sıfırla
docker exec kafka kafka-topics --bootstrap-server localhost:9092 \
  --topic data_stream --alter --config retention.ms=3600000

Shutdown

# Tüm container'ları durdur
docker-compose stop

# Container'ları sil (veri kaybolur)
docker-compose down

# Container'ları ve volume'ları sil
docker-compose down -v

Yedekleme

# HDFS snapshot al
docker exec namenode hdfs dfs -createSnapshot /project snapshot_2026_02_15

# PostgreSQL backup
docker exec postgres pg_dump -U hive metastore > backup.sql

📝 Önemli Notlar

Performans & Optimize Etme

  • Checkpoint Interval: Flink her 10 saniyede bir checkpoint alıyor (veri tutarlılığı için)
  • Parallelism: Batch işleme için worker sayısını ihtiyaca göre ayarla
  • Partition: Büyük veri setleri için Hive partitionlarını kullan (date, category vs.)
  • Format: Streaming için Avro (sıkıştırılmış), batch için Parquet (sorgu optimize)

Veri Formatı

  • CSV Encoding: UTF-8 (doğru karakter desteği)
  • JSON Schema: Kafka mesajları yapılandırılmış olmalı
  • Null Handling: Hive'de NULL değerleri dikkatli ele al

Checkpoint & Durability

  • State: Flink state'ini HDFS'de depola
  • WAL: Write-Ahead Logging sayesinde hiçbir veri kaybı yok
  • Recovery: Sistem restart olursa son checkpoint'ten devam eder

Skalabilite

Daha büyük veri işleme için:

# Docker resource sınırlarını artır
docker-compose.yml'de services'in memory/cpus artırılır

# Spark executor sayısını artır
spark-submit --num-executors 4 --executor-cores 4

# Kafka partition sayısını artır (topic oluştururken)
--partitions 6

Security Recommendations

Üretim ortamında:

  • Kafka authentication (SASL/SSL) aktif et
  • HDFS permissions'lar kontrol et
  • PostgreSQL şifreleri değiştir
  • Firewall kuralları konfigüre et

📚 Kaynaklar

🤝 Katkıda Bulunma

Geliştirmeler ve hata bildirimleriniz için:

  1. Bir issue açın (bug report, feature request)
  2. Kendi branch'te değişiklikleri yapın
  3. Pull request gönderin (açık açıklamalarla)

📄 Lisans

MIT - Özgürce kullanma ve değiştirme hakkına sahipsiniz

📧 Destek

Sorularınız ve önerileriniz için:

  • Issues: GitHub Issues'te soru açın
  • Discussions: Fikir paylaşımı için discussions kullanın

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages