After I found the Change Tracking feature in SQL Server, I thought that I would love to have this information in a stream. That lead me to RX, and Hot Observables. I did some reading and came up with this. I'm wondering if it could be improved.
First is how I would use it, followed by the class that implements it:
var test = new MonitorDB.Server.PollChangeEvents(ConfigurationManager.ConnectionStrings["MonitorDB.Properties.Settings.db"].ToString());
test.IntervalDuration = 1;
test.SubscribeToChangeTracking("MessageQueueStatus", "MessageQueueStatusID");
test.StartMonitorChangesAcrossAllTables();
var subject = Guid.NewGuid();
var observer = test.Listen(subject.ToString());
var sub1 = observer.Subscribe(msg => Console.WriteLine(string.Format("Table {0} Operation {1} Key {2} Value {3}", msg.TableName, msg.Operation, msg.KeyName, msg.KeyValue)));
Console.ReadLine();
test.StopMonitoringChangesAcrossAllTables();
Console.ReadLine();
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
using System.Data.SqlClient;
using System.Collections.Concurrent;
using System.Collections.ObjectModel;
using System.Reactive.Linq;
using System.Reactive.Disposables;
namespace MonitorDB.Server
{
public class PollChangeEvents
{
Timer _timer;
string _connectionString;
private readonly IDictionary<string, IObservable<ChangeTrackingEvent>> observers = new Dictionary<string, IObservable<ChangeTrackingEvent>>();
public PollChangeEvents(string pConnectionString)
{
_timer = new Timer();
GC.KeepAlive(_timer); //prevents attempts at garbadge collection
_timer.Elapsed += _timer_Elapsed;
_ChangeTrackingEvents = new ConcurrentDictionary<string, IChangeTrackingSubscription>();
_connectionString = pConnectionString;
}
void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
TableMonitoring.AsParallel().ForAll(pTableSubscription =>
{
using (SqlConnection conn = new SqlConnection(_connectionString))
{
conn.Open();
using (SqlCommand cmd = new SqlCommand())
{
string CmdString = string.Format("select *, CHANGE_TRACKING_CURRENT_VERSION() from Changetable(changes {0},{1}) as T", pTableSubscription.Key, pTableSubscription.Value.LastChangeVersion);
cmd.CommandText = CmdString;
cmd.Connection = conn;
SqlDataReader reader = cmd.ExecuteReader();
while (reader.Read())
{
var newEvent = new ChangeTrackingEvent(pTableSubscription.Key, reader.GetName(5));
newEvent.Operation = reader[2].ToString();
newEvent.KeyValue = reader[newEvent.KeyName].ToString();
newEvent.LastChangeVersion = Int64.Parse(reader[6].ToString());
FIFOQueue.Enqueue(newEvent);
pTableSubscription.Value.LastChangeVersion = newEvent.LastChangeVersion;
}
}
conn.Close();
}
});
}
//Taken mostly from here
//http://awkwardcoder.blogspot.ca/2012/06/understanding-refcount-in-reactive.html#!/2012/06/understanding-refcount-in-reactive.html
//
public IObservable<ChangeTrackingEvent> Listen(string subject)
{
IObservable<ChangeTrackingEvent> value;
if (observers.TryGetValue(subject, out value))
return value;
IObservable<ChangeTrackingEvent> observable = Observable.Create<ChangeTrackingEvent>(o =>
{
var disposable = Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1))
.Timestamp()
.Subscribe(ts =>
{
ChangeTrackingEvent dequeuedEvent = null;
FIFOQueue.TryDequeue(out dequeuedEvent);
if (dequeuedEvent != null)
o.OnNext(dequeuedEvent);
}
);
return new CompositeDisposable(disposable, Disposable.Create(() => observers.Remove(subject)));
})
.Publish() //this makes it a hot observable, throw events without a subscription
.RefCount();
observers.Add(subject, observable);
return observable;
}
private ConcurrentQueue<ChangeTrackingEvent> FIFOQueue = new ConcurrentQueue<ChangeTrackingEvent>();
private int _IntervalDuration;
public int IntervalDuration
{
get { return _IntervalDuration; }
set { _IntervalDuration = value; }
}
ConcurrentDictionary<string, IChangeTrackingSubscription> _ChangeTrackingEvents;
private ConcurrentDictionary<string, IChangeTrackingSubscription> TableMonitoring
{
get
{
return _ChangeTrackingEvents;
}
}
public bool SubscribeToChangeTracking(string pTableName, string pKeyName)
{
var ChangeTrackingEvent = new ChangeTrackingEvent(pTableName, pKeyName);
return _ChangeTrackingEvents.TryAdd(pTableName, ChangeTrackingEvent);
}
public void StartMonitorChangesAcrossAllTables()
{
_timer.Interval = this.IntervalDuration * 1000;
_timer.Start();
}
public void StopMonitoringChangesAcrossAllTables()
{
_timer.Stop();
}
}
}
-
\$\begingroup\$ Also found this which may be way more efficient .. bit.ly/WWEdp0 \$\endgroup\$codeputer– codeputer2013年02月09日 00:43:10 +00:00Commented Feb 9, 2013 at 0:43
-
\$\begingroup\$ hey can you please provide a sample code for this? \$\endgroup\$Radhi– Radhi2014年05月31日 09:13:54 +00:00Commented May 31, 2014 at 9:13
-
\$\begingroup\$ This implementation only works for tables with a single field primary key \$\endgroup\$jvanderh– jvanderh2015年06月29日 14:45:55 +00:00Commented Jun 29, 2015 at 14:45
1 Answer 1
ConfigurationManager.ConnectionStrings["MonitorDB.Properties.Settings.db"].ToString()
I wouldn't rely on ToString()
here. ToString()
is useful for getting human-readable representation of an object, but I think you shouldn't use it like this. Instead, use the ConnectionString
property.
GC.KeepAlive(_timer); //prevents attempts at garbadge collection
This line is completely useless and indicates you don't understand how GC works. If you put an object into a filed, it won't be GCed as long as the current object is also alive. The only thing GC.KeepAlive()
does is that it makes sure the object won't be collected before that call, but it doesn't have any lasting effect. Because of that, it can be useful in some rare cases (like PInvoke) for local variables, but it's certainly not useful for fields.
string CmdString = string.Format("select *, CHANGE_TRACKING_CURRENT_VERSION() from Changetable(changes {0},{1}) as T", pTableSubscription.Key, pTableSubscription.Value.LastChangeVersion);
You should never use string manipulation to create SQL queries, because it's unsafe. Instead, you should get into habit of always using parametrized queries. Also, you probably shouldn't use *
, especially if you want to retrieve the columns by number.
newEvent.Operation = reader[2].ToString();
newEvent.KeyValue = reader[newEvent.KeyName].ToString();
newEvent.LastChangeVersion = Int64.Parse(reader[6].ToString());
Again, I think you shouldn't use ToString()
here. If a value is string
, cast it to string
: newEvent.Operation = (string)reader[2];
This way, if something changes, you're going to get an exception instead of nonsensical value. If a value is long
, cast it to long
: newEvent.LastChangeVersion = (long)reader[6];
This is more efficient and if something changes, you're going to get a better exception.
conn.Close();
This is unnecessary. Dispose()
, which is called automatically at the end of using
block also closes the connection.
IObservable<ChangeTrackingEvent> observable = Observable.Create<ChangeTrackingEvent>(o =>
{
var disposable = Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1))
.Timestamp()
.Subscribe(ts =>
{
ChangeTrackingEvent dequeuedEvent = null;
FIFOQueue.TryDequeue(out dequeuedEvent);
if (dequeuedEvent != null)
o.OnNext(dequeuedEvent);
}
);
return new CompositeDisposable(disposable, Disposable.Create(() => observers.Remove(subject)));
})
.Publish() //this makes it a hot observable, throw events without a subscription
.RefCount();
I think this is more complicated than it has to be. You certainly don't need to tick every millisecond. Also, I don't understand why are you even using the intermediate queue, you could send each change directly to the observable.
Also, this won't work correctly if you create several listeners, each will get only part of the results.
And I don't understand what is the purpose of the (badly named) subject
parameter.
-
\$\begingroup\$ Thanks - I should have spent more time cleaning up the SQL, but I was concentrating on the Hot Observable and how to tie it to Change Tracking. Also, the Select from ChangeTable function will return a new column each time for the key field. I don't know how to create this select without using dynamic sql, as I don't think it's possible \$\endgroup\$codeputer– codeputer2013年02月11日 16:40:47 +00:00Commented Feb 11, 2013 at 16:40