Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Misc fixes and cosmetics #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
kzk merged 8 commits into fluent:master from EvaSDK:master
Aug 2, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions fluent/event.py
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from fluent import sender
# -*- coding: utf-8 -*-

import time

from fluent import sender


class Event(object):
def __init__(self, label, data, **kwargs):
if not isinstance(data, dict) :
raise Exception("data must be dict")
s = kwargs['sender'] if ('sender' in kwargs) else sender.get_global_sender()
timestamp = kwargs['time'] if ('time' in kwargs) else int(time.time())
s.emit_with_time(label, timestamp, data)
assert isinstance(data, dict), 'data must be a dict'
sender_ = kwargs.get('sender', sender.get_global_sender())
timestamp = kwargs.get('time', int(time.time()))
sender_.emit_with_time(label, timestamp, data)
67 changes: 35 additions & 32 deletions fluent/handler.py
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
# -*- coding: utf-8 -*-

import logging
import os
import sys
import msgpack
import socket
import threading

try:
import json
except ImportError:
import simplejson as json
except ImportError:
import json

from fluent import sender


class FluentRecordFormatter(object):
def __init__(self):
self.hostname = socket.gethostname()

def format(self, record):
data = {
'sys_host' : self.hostname,
'sys_name' : record.name,
'sys_module' : record.module,
# 'sys_lineno' : record.lineno,
# 'sys_levelno' : record.levelno,
# 'sys_levelname' : record.levelname,
# 'sys_filename' : record.filename,
# 'sys_funcname' : record.funcName,
# 'sys_exc_info' : record.exc_info,
}
data = {'sys_host': self.hostname,
'sys_name': record.name,
'sys_module': record.module,
# 'sys_lineno': record.lineno,
# 'sys_levelno': record.levelno,
# 'sys_levelname': record.levelname,
# 'sys_filename': record.filename,
# 'sys_funcname': record.funcName,
# 'sys_exc_info': record.exc_info,
}
# if 'sys_exc_info' in data and data['sys_exc_info']:
# data['sys_exc_info'] = self.formatException(data['sys_exc_info'])

Expand All @@ -40,36 +38,41 @@ def _structuring(self, data, msg):
elif isinstance(msg, str):
try:
self._add_dic(data, json.loads(str(msg)))
except:
except (ValueError, json.JSONDecodeError):
pass

def _add_dic(self, data, dic):
for k, v in dic.items():
if isinstance(k, str) or isinstance(k, unicode):
data[str(k)] = v
@staticmethod
def _add_dic(data, dic):
for key, value in dic.items():
if isinstance(key, basestring):
data[str(key)] = value


class FluentHandler(logging.Handler):
'''
Logging Handler for fluent.
'''
def __init__(self,
tag,
host='localhost',
port=24224,
timeout=3.0,
verbose=False):
tag,
host='localhost',
port=24224,
timeout=3.0,
verbose=False):

self.tag = tag
self.sender = sender.FluentSender(tag,
host=host, port=port,
timeout=timeout, verbose=verbose)
self.fmt = FluentRecordFormatter()
logging.Handler.__init__(self)

def emit(self, record):
if record.levelno < self.level: return
data = self.fmt.format(record)
data = self.format(record)
self.sender.emit(None, data)

def _close(self):
self.sender._close()
def close(self):
self.acquire()
try:
self.sender._close()
logging.Handler.close(self)
finally:
self.release()
31 changes: 19 additions & 12 deletions fluent/sender.py
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
# -*- coding: utf-8 -*-

from __future__ import print_function
import msgpack
import socket
import threading
import time

import msgpack


_global_sender = None


def setup(tag, **kwargs):
host = kwargs.get('host', 'localhost')
port = kwargs.get('port', 24224)

global _global_sender
_global_sender = FluentSender(tag, host=host, port=port)


def get_global_sender():
return _global_sender


class FluentSender(object):
def __init__(self,
tag,
Expand All @@ -36,10 +43,10 @@ def __init__(self,
self.pendings = None
self.packer = msgpack.Packer()
self.lock = threading.Lock()

try:
self._reconnect()
except:
except Exception:
# will be retried in emit()
self._close()

Expand All @@ -48,8 +55,8 @@ def emit(self, label, data):
self.emit_with_time(label, cur_time, data)

def emit_with_time(self, label, timestamp, data):
bytes = self._make_packet(label, timestamp, data)
self._send(bytes)
bytes_ = self._make_packet(label, timestamp, data)
self._send(bytes_)

def _make_packet(self, label, timestamp, data):
if label:
Expand All @@ -61,25 +68,25 @@ def _make_packet(self, label, timestamp, data):
print(packet)
return self.packer.pack(packet)

def _send(self, bytes):
def _send(self, bytes_):
self.lock.acquire()
try:
self._send_internal(bytes)
self._send_internal(bytes_)
finally:
self.lock.release()

def _send_internal(self, bytes):
def _send_internal(self, bytes_):
# buffering
if self.pendings:
self.pendings += bytes
bytes = self.pendings
self.pendings += bytes_
bytes_ = self.pendings

try:
# reconnect if possible
self._reconnect()

# send message
self.socket.sendall(bytes)
self.socket.sendall(bytes_)

# send finished
self.pendings = None
Expand All @@ -91,7 +98,7 @@ def _send_internal(self, bytes):
# TODO: add callback handler here
self.pendings = None
else:
self.pendings = bytes
self.pendings = bytes_

def _reconnect(self):
if not self.socket:
Expand Down
2 changes: 2 additions & 0 deletions tests/__init__.py
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-

from tests.test_event import *
from tests.test_sender import *
from tests.test_handler import *
29 changes: 17 additions & 12 deletions tests/mockserver.py
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import socket
import threading
import time
from msgpack import Unpacker
# -*- coding: utf-8 -*-

try:
from cStringIO import StringIO as BytesIO
except ImportError:
from io import BytesIO

import socket
import threading
import time

from msgpack import Unpacker


class MockRecvServer(threading.Thread):
"""
Single threaded server accepts one connection and recv until EOF.
Expand All @@ -21,16 +25,16 @@ def __init__(self, port):
self.start()

def run(self):
s = self._sock
s.listen(1)
con, _ = s.accept()
sock = self._sock
sock.listen(1)
con, _ = sock.accept()
while True:
d = con.recv(4096)
if not d:
data = con.recv(4096)
if not data:
break
self._buf.write(d)
self._buf.write(data)
con.close()
s.close()
sock.close()
self._sock = None

def wait(self):
Expand All @@ -40,5 +44,6 @@ def wait(self):
def get_recieved(self):
self.wait()
self._buf.seek(0)
# TODO: have to process string encoding properly. currently we assume that all encoding is utf-8.
# TODO: have to process string encoding properly. currently we assume
# that all encoding is utf-8.
return list(Unpacker(self._buf, encoding='utf-8'))
15 changes: 8 additions & 7 deletions tests/test_event.py
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
# -*- coding: utf-8 -*-

import unittest
import time

import fluent
from fluent import event, sender


sender.setup(server='localhost', tag='app')


class TestHandler(unittest.TestCase):
def testLogging(self):
# send event with tag app.follow
event.Event('follow', {
'from': 'userA',
'to': 'userB'
'from': 'userA',
'to': 'userB'
})

# send event with tag app.follow, with timestamp
event.Event('follow', {
'from': 'userA',
'to': 'userB'
'from': 'userA',
'to': 'userB'
}, time=int(0))

23 changes: 14 additions & 9 deletions tests/test_handler.py
View file Open in desktop
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import unittest
from tests import mockserver
# -*- coding: utf-8 -*-

import logging
import unittest

import fluent.handler
import msgpack

from tests import mockserver


class TestLogger(unittest.TestCase):
def setUp(self):
Expand All @@ -12,23 +16,24 @@ def setUp(self):
self._server = mockserver.MockRecvServer(port)
self._port = port
break
except IOError as e:
except IOError:
pass

def get_data(self):
return self._server.get_recieved()

def test_simple(self):
h = fluent.handler.FluentHandler('app.follow', port=self._port)
handler = fluent.handler.FluentHandler('app.follow', port=self._port)

logging.basicConfig(level=logging.INFO)
l = logging.getLogger('fluent.test')
l.addHandler(h)
l.info({
log = logging.getLogger('fluent.test')
handler.setFormatter(fluent.handler.FluentRecordFormatter())
log.addHandler(handler)
log.info({
'from': 'userA',
'to': 'userB'
})
h._close()
handler.close()

data = self.get_data()
eq = self.assertEqual
Expand Down
Loading

AltStyle によって変換されたページ (->オリジナル) /